/* * searchservice process.rs * - route incoming nats requests to their specific functions * * Copyright (C) 2025 Real Microsoft, LLC * * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3. * * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ use std::sync::Arc; use async_nats::{jetstream, Subject}; use log::{debug, error, warn}; use tokio::sync::Mutex; use futures::StreamExt; use asklyphe_common::db::tables::{INFO_TABLE, WEBSITE_TABLE}; use asklyphe_common::foundationdb::{Database, KeySelector, RangeOption}; use asklyphe_common::foundationdb::options::{StreamingMode, TransactionOption}; use asklyphe_common::foundationdb::tuple::{pack, Subspace}; use asklyphe_common::nats::searchservice; use asklyphe_common::nats::searchservice::{SearchSrvcQuery, SearchSrvcRequest, SearchSrvcResponse, SearchRequest, SiteCountResponse, SearchResponse}; use crate::{algorithm, hacks}; pub async fn process(query: SearchSrvcQuery, db: Database) -> SearchSrvcResponse { // a future is used so that the whole program doesn't die if an algorithm panics let response = tokio::spawn(async move { match query.request { SearchSrvcRequest::SearchRequest(req) => { search_request(req, &db).await } SearchSrvcRequest::SiteCountRequest => { count_websites(&db).await } }}).await; if let Ok(response) = response { response } else if let Err(e) = response { SearchSrvcResponse::OtherError(e.to_string()) } else { unreachable!() } } pub async fn search_request(req: SearchRequest, db: &Database) -> SearchSrvcResponse { let words_initial: Vec = req.query.split_whitespace().map(|s| s.to_string()).collect(); let mut words = vec![]; let mut no_separator_flag = false; for word in words_initial { if hacks::is_from_language_that_doesnt_use_word_separators(&word) { words.extend(word.chars().map(|c| c.to_string()).collect::>()); no_separator_flag = true; } else { words.push(word); } } match algorithm::search(&db, words, no_separator_flag).await { Some(results) => { SearchSrvcResponse::SearchResponse(results) } None => { SearchSrvcResponse::SearchResponse(SearchResponse { results: vec![], blocked: vec![], pagerank_time_seconds: 0.0, total_query_seconds: 0.0, max_relevance: 0.0, exact_phrase_found: false, }) } } } pub async fn count_websites(db: &Database) -> SearchSrvcResponse { let mut counter: u64 = 0; let subspace = Subspace::from(WEBSITE_TABLE); let mut last_key = subspace.range().0; let final_key = subspace.range().1; for _failsafe in 0..10000 { let trx = db.create_trx(); if let Err(e) = trx { error!("DATABASE ERROR page_links_exiting_count {e}"); } else { let trx = trx.unwrap(); // link -> from -> * let mut range = RangeOption::from((last_key.clone(), final_key.clone())); range.mode = StreamingMode::Iterator; range.limit = Some(10096); let mut stream = trx.get_ranges_keyvalues(range, true); let mut this_time = 0; while let Some(kv) = stream.next().await { if let Ok(kv) = kv { counter += 1; this_time += 1; last_key = kv.key().to_vec(); } else if let Err(e) = kv { eprintln!("err while counting {e}"); } } if this_time <= 10 { return SearchSrvcResponse::SiteCountResponse(SiteCountResponse { count: counter / 8, }); } } } SearchSrvcResponse::OtherError("couldn't retrieve count ):".to_string()) }