From 0a7db89373789b6e6f2ebdf01645826d036bb9d1 Mon Sep 17 00:00:00 2001 From: husky Date: Fri, 14 Mar 2025 18:00:37 -0700 Subject: [PATCH] initial work on rewriting stuff; awaiting changes crawlers need to be storing which links link where for pageranking to work; looks like we have to start over on crawling! :D --- Cargo.lock | 19 + Cargo.toml | 2 +- asklyphe-common/src/ldb/sitestore.rs | 32 + searchservice/Cargo.toml | 4 +- searchservice/src/algorithm.rs | 97 +- searchservice/src/main.rs | 17 +- searchservice/src/process.rs | 55 +- searchservice_old/.gitignore | 2 + searchservice_old/Cargo.toml | 23 + searchservice_old/src/algorithm.rs | 1213 ++++++++++++++++++++++++++ searchservice_old/src/hacks.rs | 62 ++ searchservice_old/src/main.rs | 91 ++ searchservice_old/src/process.rs | 118 +++ 13 files changed, 1595 insertions(+), 140 deletions(-) create mode 100644 searchservice_old/.gitignore create mode 100644 searchservice_old/Cargo.toml create mode 100644 searchservice_old/src/algorithm.rs create mode 100644 searchservice_old/src/hacks.rs create mode 100644 searchservice_old/src/main.rs create mode 100644 searchservice_old/src/process.rs diff --git a/Cargo.lock b/Cargo.lock index c6d6f10..f46be2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4276,6 +4276,25 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "searchservice" +version = "0.2.0" +dependencies = [ + "asklyphe-common", + "async-nats", + "async-recursion", + "chrono", + "env_logger 0.10.2", + "futures", + "log", + "once_cell", + "rand 0.8.5", + "rmp-serde", + "serde", + "tokio", + "ulid", +] + [[package]] name = "security-framework" version = "2.11.1" diff --git a/Cargo.toml b/Cargo.toml index c56cf91..519317b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,2 @@ [workspace] -members = ["asklyphe-common", "asklyphe-frontend", "asklyphe-auth-frontend", "unit_converter", "authservice", "authservice/migration", "authservice/entity", "bingservice", "googleservice", "vorebot", "lyphedb", "lyphedb/ldbtesttool"] +members = ["asklyphe-common", "asklyphe-frontend", "asklyphe-auth-frontend", "unit_converter", "authservice", "authservice/migration", "authservice/entity", "bingservice", "googleservice", "vorebot", "lyphedb", "lyphedb/ldbtesttool", "searchservice"] diff --git a/asklyphe-common/src/ldb/sitestore.rs b/asklyphe-common/src/ldb/sitestore.rs index a99d5f3..0003849 100644 --- a/asklyphe-common/src/ldb/sitestore.rs +++ b/asklyphe-common/src/ldb/sitestore.rs @@ -176,3 +176,35 @@ pub async fn get_website(db: &DBConn, url: &str) -> Result { } } } + +pub async fn count_websites(db: &DBConn) -> Result { + let key = construct_path(&[SITESTORE]).as_bytes().to_vec(); + + let cmd = LDBNatsMessage::Command(LypheDBCommand::CountKeys(KeyDirectory { key })); + + match db.query(cmd).await { + LDBNatsMessage::Count(c) => { + Ok(c) + } + LDBNatsMessage::Entries(_) => { + warn!("lyphedb responded with \"entries\" to count_websites, treating as error"); + Err(()) + } + LDBNatsMessage::Success => { + warn!("lyphedb responded with \"success\" to count_websites, treating as error"); + Err(()) + } + LDBNatsMessage::BadRequest => { + error!("bad request for count_websites"); + Err(()) + } + LDBNatsMessage::NotFound => { + warn!("not found for count_websites"); + Err(()) + } + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} \ No newline at end of file diff --git a/searchservice/Cargo.toml b/searchservice/Cargo.toml index ad88aa4..6350c0e 100644 --- a/searchservice/Cargo.toml +++ b/searchservice/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "searchservice" -version = "0.1.0" +version = "0.2.0" edition = "2021" license = "AGPL-3" license-file = "LICENSE" @@ -8,7 +8,7 @@ license-file = "LICENSE" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -asklyphe-common = { path = "../asklyphe-common", features = ["foundationdb"] } +asklyphe-common = { path = "../asklyphe-common" } tokio = { version = "1.0", features = ["full"] } serde = { version = "1.0", features = ["derive"] } rmp-serde = "1.1.2" diff --git a/searchservice/src/algorithm.rs b/searchservice/src/algorithm.rs index 54c0b6a..3ef3d5c 100644 --- a/searchservice/src/algorithm.rs +++ b/searchservice/src/algorithm.rs @@ -12,16 +12,13 @@ */ use std::collections::{BTreeMap, BTreeSet, VecDeque}; -use std::ops::Mul; +use std::hash::{DefaultHasher, Hasher}; use async_recursion::async_recursion; use once_cell::sync::Lazy; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; -use log::{debug, info}; -use rand::Rng; -use asklyphe_common::db; -use asklyphe_common::db::{STRHASH}; -use asklyphe_common::foundationdb::Database; +use log::*; +use asklyphe_common::ldb::DBConn; use asklyphe_common::nats::searchservice; pub struct SearchResult { @@ -29,56 +26,20 @@ pub struct SearchResult { pub title: Option, pub description: Option, pub url_contains_result: Option, - pub word_occurs: f64, - pub authorword_occurs: f64, - pub descriptionword_occurs: f64, - pub keyword_occurs: f64, - pub sitename_occurs: f64, - pub linkword_occurs: f64, pub pagerank: f64, pub relavence: f64, - pub words_contained: BTreeSet, - pub total_query_words: f64, - pub words: BTreeMap, - pub closest_match: f64, - pub phrase_match_count: f64, - pub match_acc: f64, - pub match_c: f64, - pub highest_match: f64, } -impl SearchResult { - pub fn relavence(&self, no_separator_flag: bool) -> f64 { - if !no_separator_flag { - ((self.word_occurs * 1.3) + (self.descriptionword_occurs * 1.2) + (self.keyword_occurs * 0.5) + (self.authorword_occurs * 1.2) + (self.sitename_occurs * 1.3) + (self.linkword_occurs * 2.9) + (self.pagerank.powi(4)).max(0.001)) - .max(0.01) - * - if self.total_query_words > 1.0 { - ((1.0001 - if self.match_c > 0.0 { (self.match_acc / self.match_c) / self.highest_match.max(0.01) } else { 0.1 }) - + if !self.words_contained.is_empty() { (self.total_query_words / self.words_contained.len() as f64).max(0.002) } else { 1.0 }).max(0.001) - * self.phrase_match_count.max(0.02).powi(5) - } else { - 1.0 - } - } else { - if self.total_query_words > 1.0 { - (1.0001 - if self.match_c > 0.0 { (self.match_acc / self.match_c) / self.highest_match.max(0.01) } else { 0.1 }) - * self.phrase_match_count.max(0.02).powi(8) - } else { - 1.0 - } - } - } -} +type STRHASH = u64; pub static PRECALCED_PAGERANKS: Lazy>> = Lazy::new(|| Mutex::new(BTreeMap::new())); pub static CACHED_PAGERANKS: Lazy>> = Lazy::new(|| Mutex::new(BTreeMap::new())); -pub static VISITED_PAGERANKS: Lazy>> = Lazy::new(|| Mutex::new(BTreeSet::new())); -pub static VISITED_PAGERANKS2: Lazy>> = Lazy::new(|| Mutex::new(BTreeSet::new())); -pub static HASH_CACHE: Lazy> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new())))); -pub static UNHASH_CACHE: Lazy> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new())))); -pub static TITLE_CACHE: Lazy>> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new())))); -pub static DESC_CACHE: Lazy>> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new())))); + +pub fn hash(data: &str) -> STRHASH { + let mut hash = DefaultHasher::new(); + hash.write(data.as_bytes()); + hash.finish() +} pub struct HashCache { inner: Arc>>, @@ -122,44 +83,12 @@ impl HashCache { } } -pub async fn cached_hash(db: &Database, key: String) -> STRHASH { - HASH_CACHE.entry(key.clone()).or_insert(db::foa_strhash(db, &key)).await -} - -pub async fn cached_unhash(db: &Database, key: STRHASH) -> String { - UNHASH_CACHE.entry(key).or_insert(db::unhash(db, key)).await -} - -pub async fn cached_title(db: &Database, key: STRHASH) -> Option { - if let Some(title) = TITLE_CACHE.entry(key).or_insert(db::info_title(db, key)).await { - Some(title) - } else { - TITLE_CACHE.unconditional(key).or_insert(db::info_title(db, key)).await - } -} - -pub async fn cached_desc(db: &Database, key: STRHASH) -> Option { - if let Some(desc) = DESC_CACHE.entry(key).or_insert(db::info_description(db, key)).await { - Some(desc) - } else { - DESC_CACHE.unconditional(key).or_insert(db::info_description(db, key)).await - } -} - #[async_recursion] -pub async fn pagerank(db: &Database, url: STRHASH) -> f64 { - if let Some(precalc) = PRECALCED_PAGERANKS.lock().unwrap().get(&url) { +pub async fn pagerank(db: &DBConn, url: &str) -> f64 { + let uhash = hash(url); + if let Some(precalc) = PRECALCED_PAGERANKS.lock().unwrap().get(&uhash) { return *precalc; } - if let Some(precalc_db) = db::page_pagerank(db, url).await { - //debug!("url {} in db {}", url, precalc_db); - if precalc_db == 0.0 { - //debug!("but 0 ):"); - } else { - CACHED_PAGERANKS.lock().unwrap().insert(url, precalc_db); - return precalc_db; - } - } let mut accum = 0.0; let incoming = { db::page_links_entering(db, url).await diff --git a/searchservice/src/main.rs b/searchservice/src/main.rs index 570f820..027e68a 100644 --- a/searchservice/src/main.rs +++ b/searchservice/src/main.rs @@ -27,11 +27,10 @@ use async_nats::jetstream::consumer::PullConsumer; use async_nats::jetstream::stream::RetentionPolicy; use futures::StreamExt; use log::{error, info, warn}; -use once_cell::sync::Lazy; +use once_cell::sync::{Lazy, OnceCell}; use tokio::sync::Mutex; use tokio::task::JoinHandle; -use asklyphe_common::{db, foundationdb}; -use asklyphe_common::foundationdb::Database; +use asklyphe_common::ldb::DBConn; use asklyphe_common::nats::comms; use asklyphe_common::nats::comms::{Service, ServiceResponse}; use asklyphe_common::nats::searchservice::SearchSrvcResponse; @@ -39,13 +38,16 @@ use asklyphe_common::nats::searchservice::SearchSrvcResponse; pub static NATS_URL: Lazy = Lazy::new(|| std::env::var("NATS_URL").expect("NO NATS DEFINED")); pub static NATS_CERT: Lazy = Lazy::new(|| std::env::var("NATS_CERT").expect("NO NATS_CERT DEFINED")); pub static NATS_KEY: Lazy = Lazy::new(|| std::env::var("NATS_KEY").expect("NO NATS_KEY DEFINED")); +pub static DB_NAME: Lazy = + Lazy::new(|| std::env::var("DB_NAME").expect("PLEASE ADD DB_NAME")); pub static PROCESSES_HANDLED: AtomicU64 = AtomicU64::new(0); pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0); +static DB_CONN: OnceCell = OnceCell::new(); + #[tokio::main] async fn main() { - let _guard = unsafe { foundationdb::boot() }; env_logger::init(); info!("searchservice began at {}", chrono::Utc::now().to_string()); let nats = async_nats::ConnectOptions::new() @@ -57,12 +59,14 @@ async fn main() { return; } let nats = nats.unwrap(); + let dbconn = DBConn::new(nats.clone(), DB_NAME.to_string()); + let _ = DB_CONN.set(dbconn); let nats = jetstream::new(nats); info!("searchbot spawned"); async fn on_recv(query: comms::Query) -> ServiceResponse { - let db = Database::default().expect("couldn't connect to foundation db!"); + let db = DB_CONN.get().unwrap().clone(); let now = chrono::Utc::now().timestamp(); LAST_MESSAGE.store(now, Ordering::Relaxed); @@ -85,7 +89,4 @@ async fn main() { if let Err(e) = comms::subscribe_service(Service::SearchService, &nats, Arc::new(on_recv)).await { error!("failed to subscribe to searchservice nats! reason {:?}", e); } - - // we will respawn tasks if they crash - drop(_guard); } diff --git a/searchservice/src/process.rs b/searchservice/src/process.rs index 8a990f3..537efb8 100644 --- a/searchservice/src/process.rs +++ b/searchservice/src/process.rs @@ -16,15 +16,12 @@ use async_nats::{jetstream, Subject}; use log::{debug, error, warn}; use tokio::sync::Mutex; use futures::StreamExt; -use asklyphe_common::db::tables::{INFO_TABLE, WEBSITE_TABLE}; -use asklyphe_common::foundationdb::{Database, KeySelector, RangeOption}; -use asklyphe_common::foundationdb::options::{StreamingMode, TransactionOption}; -use asklyphe_common::foundationdb::tuple::{pack, Subspace}; +use asklyphe_common::ldb::{sitestore, DBConn}; use asklyphe_common::nats::searchservice; use asklyphe_common::nats::searchservice::{SearchSrvcQuery, SearchSrvcRequest, SearchSrvcResponse, SearchRequest, SiteCountResponse, SearchResponse}; use crate::{algorithm, hacks}; -pub async fn process(query: SearchSrvcQuery, db: Database) -> SearchSrvcResponse { +pub async fn process(query: SearchSrvcQuery, db: DBConn) -> SearchSrvcResponse { // a future is used so that the whole program doesn't die if an algorithm panics let response = tokio::spawn(async move { match query.request { @@ -45,7 +42,7 @@ pub async fn process(query: SearchSrvcQuery, db: Database) -> SearchSrvcResponse } } -pub async fn search_request(req: SearchRequest, db: &Database) -> SearchSrvcResponse { +pub async fn search_request(req: SearchRequest, db: &DBConn) -> SearchSrvcResponse { let words_initial: Vec = req.query.split_whitespace().map(|s| s.to_string()).collect(); let mut words = vec![]; let mut no_separator_flag = false; @@ -75,44 +72,12 @@ pub async fn search_request(req: SearchRequest, db: &Database) -> SearchSrvcResp } } -pub async fn count_websites(db: &Database) -> SearchSrvcResponse { - let mut counter: u64 = 0; - let subspace = Subspace::from(WEBSITE_TABLE); - let mut last_key = subspace.range().0; - let final_key = subspace.range().1; - - for _failsafe in 0..10000 { - let trx = db.create_trx(); - if let Err(e) = trx { - error!("DATABASE ERROR page_links_exiting_count {e}"); - } else { - let trx = trx.unwrap(); - // link -> from -> * - let mut range = RangeOption::from((last_key.clone(), final_key.clone())); - range.mode = StreamingMode::Iterator; - range.limit = Some(10096); - - let mut stream = trx.get_ranges_keyvalues(range, true); - - let mut this_time = 0; - - while let Some(kv) = stream.next().await { - if let Ok(kv) = kv { - counter += 1; - this_time += 1; - last_key = kv.key().to_vec(); - } else if let Err(e) = kv { - eprintln!("err while counting {e}"); - } - } - - if this_time <= 10 { - return SearchSrvcResponse::SiteCountResponse(SiteCountResponse { - count: counter / 8, - }); - } - } - +pub async fn count_websites(db: &DBConn) -> SearchSrvcResponse { + if let Ok(count) = sitestore::count_websites(db).await { + SearchSrvcResponse::SiteCountResponse(SiteCountResponse { + count, + }) + } else { + SearchSrvcResponse::OtherError("couldn't retrieve count ):".to_string()) } - SearchSrvcResponse::OtherError("couldn't retrieve count ):".to_string()) } \ No newline at end of file diff --git a/searchservice_old/.gitignore b/searchservice_old/.gitignore new file mode 100644 index 0000000..3a8cabc --- /dev/null +++ b/searchservice_old/.gitignore @@ -0,0 +1,2 @@ +/target +.idea diff --git a/searchservice_old/Cargo.toml b/searchservice_old/Cargo.toml new file mode 100644 index 0000000..ad88aa4 --- /dev/null +++ b/searchservice_old/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "searchservice" +version = "0.1.0" +edition = "2021" +license = "AGPL-3" +license-file = "LICENSE" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +asklyphe-common = { path = "../asklyphe-common", features = ["foundationdb"] } +tokio = { version = "1.0", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +rmp-serde = "1.1.2" +async-nats = "0.38.0" +chrono = "0.4.26" +once_cell = "1.18.0" +ulid = "1.0.0" +rand = "0.8.5" +async-recursion = "1.0.5" +futures = "0.3.30" +log = "0.4.20" +env_logger = "0.10.2" \ No newline at end of file diff --git a/searchservice_old/src/algorithm.rs b/searchservice_old/src/algorithm.rs new file mode 100644 index 0000000..54c0b6a --- /dev/null +++ b/searchservice_old/src/algorithm.rs @@ -0,0 +1,1213 @@ +/* + * searchservice algorithm.rs + * - how search works + * + * Copyright (C) 2025 Real Microsoft, LLC + * + * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. If not, see . +*/ + +use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::ops::Mul; +use async_recursion::async_recursion; +use once_cell::sync::Lazy; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use log::{debug, info}; +use rand::Rng; +use asklyphe_common::db; +use asklyphe_common::db::{STRHASH}; +use asklyphe_common::foundationdb::Database; +use asklyphe_common::nats::searchservice; + +pub struct SearchResult { + pub url: String, + pub title: Option, + pub description: Option, + pub url_contains_result: Option, + pub word_occurs: f64, + pub authorword_occurs: f64, + pub descriptionword_occurs: f64, + pub keyword_occurs: f64, + pub sitename_occurs: f64, + pub linkword_occurs: f64, + pub pagerank: f64, + pub relavence: f64, + pub words_contained: BTreeSet, + pub total_query_words: f64, + pub words: BTreeMap, + pub closest_match: f64, + pub phrase_match_count: f64, + pub match_acc: f64, + pub match_c: f64, + pub highest_match: f64, +} + +impl SearchResult { + pub fn relavence(&self, no_separator_flag: bool) -> f64 { + if !no_separator_flag { + ((self.word_occurs * 1.3) + (self.descriptionword_occurs * 1.2) + (self.keyword_occurs * 0.5) + (self.authorword_occurs * 1.2) + (self.sitename_occurs * 1.3) + (self.linkword_occurs * 2.9) + (self.pagerank.powi(4)).max(0.001)) + .max(0.01) + * + if self.total_query_words > 1.0 { + ((1.0001 - if self.match_c > 0.0 { (self.match_acc / self.match_c) / self.highest_match.max(0.01) } else { 0.1 }) + + if !self.words_contained.is_empty() { (self.total_query_words / self.words_contained.len() as f64).max(0.002) } else { 1.0 }).max(0.001) + * self.phrase_match_count.max(0.02).powi(5) + } else { + 1.0 + } + } else { + if self.total_query_words > 1.0 { + (1.0001 - if self.match_c > 0.0 { (self.match_acc / self.match_c) / self.highest_match.max(0.01) } else { 0.1 }) + * self.phrase_match_count.max(0.02).powi(8) + } else { + 1.0 + } + } + } +} + +pub static PRECALCED_PAGERANKS: Lazy>> = Lazy::new(|| Mutex::new(BTreeMap::new())); +pub static CACHED_PAGERANKS: Lazy>> = Lazy::new(|| Mutex::new(BTreeMap::new())); +pub static VISITED_PAGERANKS: Lazy>> = Lazy::new(|| Mutex::new(BTreeSet::new())); +pub static VISITED_PAGERANKS2: Lazy>> = Lazy::new(|| Mutex::new(BTreeSet::new())); +pub static HASH_CACHE: Lazy> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new())))); +pub static UNHASH_CACHE: Lazy> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new())))); +pub static TITLE_CACHE: Lazy>> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new())))); +pub static DESC_CACHE: Lazy>> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new())))); + +pub struct HashCache { + inner: Arc>>, +} + +pub enum EntryBuilder { + Found(V), + NeedsInsert((K, Arc>>)), +} + +impl EntryBuilder { + pub async fn or_insert(self, value: impl futures::Future) -> V { + match self { + EntryBuilder::Found(v) => { v } + EntryBuilder::NeedsInsert((key, inner)) => { + let value = (value).await; + inner.lock().unwrap().insert(key, value.clone()); + value + } + } + } +} + +impl HashCache { + pub fn new(inner: Arc>>) -> Self { + Self { + inner, + } + } + + pub fn entry(&self, key: K) -> EntryBuilder { + if let Some(value) = self.inner.lock().unwrap().get(&key).cloned() { + EntryBuilder::Found(value) + } else { + EntryBuilder::NeedsInsert((key, self.inner.clone())) + } + } + + pub fn unconditional(&self, key: K) -> EntryBuilder { + EntryBuilder::NeedsInsert((key, self.inner.clone())) + } +} + +pub async fn cached_hash(db: &Database, key: String) -> STRHASH { + HASH_CACHE.entry(key.clone()).or_insert(db::foa_strhash(db, &key)).await +} + +pub async fn cached_unhash(db: &Database, key: STRHASH) -> String { + UNHASH_CACHE.entry(key).or_insert(db::unhash(db, key)).await +} + +pub async fn cached_title(db: &Database, key: STRHASH) -> Option { + if let Some(title) = TITLE_CACHE.entry(key).or_insert(db::info_title(db, key)).await { + Some(title) + } else { + TITLE_CACHE.unconditional(key).or_insert(db::info_title(db, key)).await + } +} + +pub async fn cached_desc(db: &Database, key: STRHASH) -> Option { + if let Some(desc) = DESC_CACHE.entry(key).or_insert(db::info_description(db, key)).await { + Some(desc) + } else { + DESC_CACHE.unconditional(key).or_insert(db::info_description(db, key)).await + } +} + +#[async_recursion] +pub async fn pagerank(db: &Database, url: STRHASH) -> f64 { + if let Some(precalc) = PRECALCED_PAGERANKS.lock().unwrap().get(&url) { + return *precalc; + } + if let Some(precalc_db) = db::page_pagerank(db, url).await { + //debug!("url {} in db {}", url, precalc_db); + if precalc_db == 0.0 { + //debug!("but 0 ):"); + } else { + CACHED_PAGERANKS.lock().unwrap().insert(url, precalc_db); + return precalc_db; + } + } + let mut accum = 0.0; + let incoming = { + db::page_links_entering(db, url).await + }; + let d = { + db::page_damping(db, url).await.unwrap_or(0.85) + }; + let d = (1.0 - d).max(0.0); + for url in incoming { + if PRECALCED_PAGERANKS.lock().unwrap().get(&url).is_none() && VISITED_PAGERANKS2.lock().unwrap().contains(&url) { + continue; + } + let c = { + db::page_links_exiting_count(db, url).await + }; + if c == 0 { continue; } + if let Some(precalc) = PRECALCED_PAGERANKS.lock().unwrap().get(&url) { + if *precalc != 0.0 { + accum += *precalc / c as f64; + continue; + } + } + VISITED_PAGERANKS2.lock().unwrap().insert(url); + let pr = pagerank(db, url).await; + accum += pr / c as f64; + } + + let pr = (1.0 - d) + (d * accum); + db::document_set_pagerank(db, url, pr, chrono::Utc::now().timestamp()).await; + PRECALCED_PAGERANKS.lock().unwrap().insert(url, pr); + CACHED_PAGERANKS.lock().unwrap().insert(url, pr); + pr +} + +pub const MAX_PAGERANK_APPROX_DEPTH: u64 = 1; + +#[async_recursion] +pub async fn pagerank_approx(db: &Database, url: STRHASH, depth: u64) -> f64 { + if depth > MAX_PAGERANK_APPROX_DEPTH { + return 0.8; + } + if let Some(precalc) = PRECALCED_PAGERANKS.lock().unwrap().get(&url) { + return *precalc; + } else if let Some(precalc) = CACHED_PAGERANKS.lock().unwrap().get(&url) { + return *precalc; + } + + let mut not_in_db = false; + let mut in_db_but_zero = false; + + if let Some(precalc_db) = db::page_pagerank(db, url).await { + //debug!("url {} in db {}", url, precalc_db); + if precalc_db == 0.0 { + //debug!("but 0 ):"); + + // uncomment when we want to eventually try to recalc 0.0 prs + //not_in_db = true; + //in_db_but_zero = true; + if depth == 0 { + tokio::spawn(async move { + info!("task spawned to calc real pagerank..."); + let db = Database::default().expect("couldn't connect to foundation db!"); + let pr = pagerank(&db, url).await; + info!("finished calculating {} real pagerank: {}", url, pr); + }); + } + }// else { + CACHED_PAGERANKS.lock().unwrap().insert(url, precalc_db); + return precalc_db; + //} + } else { + not_in_db = true; + } + + // spawn task to eventually calculate real pagerank + if depth == 0 && not_in_db { + tokio::spawn(async move { + //info!("task spawned to calc real pagerank..."); + let db = Database::default().expect("couldn't connect to foundation db!"); + pagerank(&db, url).await; + //info!("finished calculating {} real pagerank: {}", url, pr); + }); + } + + if in_db_but_zero { + CACHED_PAGERANKS.lock().unwrap().insert(url, 0.0); + return 0.0; + } + + let mut accum = 0.0; + let incoming = { + db::page_links_entering(db, url).await + }; + let d = { + db::page_damping(db, url).await.unwrap_or(0.85) + }; + let d = (1.0 - d).max(0.0); + for url in incoming { + if PRECALCED_PAGERANKS.lock().unwrap().get(&url).is_none() && CACHED_PAGERANKS.lock().unwrap().get(&url).is_none() && VISITED_PAGERANKS.lock().unwrap().contains(&url) { + continue; + } + let c = { + db::page_links_exiting_count(db, url).await + }; + if c == 0 { continue; } + if let Some(precalc) = PRECALCED_PAGERANKS.lock().unwrap().get(&url) { + accum += *precalc / c as f64; + continue; + } + if let Some(precalc) = CACHED_PAGERANKS.lock().unwrap().get(&url) { + accum += *precalc / c as f64; + continue; + } + VISITED_PAGERANKS.lock().unwrap().insert(url); + let pr = pagerank_approx(db, url, depth + 1).await; + CACHED_PAGERANKS.lock().unwrap().insert(url, pr); + accum += pr / c as f64; + } + + let pr = (1.0 - d) + (d * accum); + CACHED_PAGERANKS.lock().unwrap().insert(url, pr); + pr +} + +fn multiword_penalty(dist_a: f64, dist_b: f64) -> f64 { + if (dist_b - dist_a).is_sign_negative() { + // second comes before first, not good! + (dist_b - dist_a) + } else { + 1.0 - (dist_b - dist_a) + } +} + +pub async fn search(db: &Database, args: Vec, no_separator_flag: bool) -> Option { + let mut results: BTreeMap = BTreeMap::new(); + let start_t = chrono::Utc::now(); + + if args.is_empty() { + return None; + } + + let args_count = args.len(); + + let arg_words = args.clone(); + + //let word = format!("_{}_", word); + + let first_query = args.first().cloned(); + + let mut hash = vec![]; + for (i, word) in args.into_iter().enumerate() { + hash.push((i, cached_hash(db, word.clone()).await)); + //hash.push((i, cached_hash(db, format!("{}.", word)).await)); + //hash.push((i, cached_hash(db, format!(".{}", word)).await)); + //hash.push((i, cached_hash(db, format!(".{}.", word)).await)); + //hash.push((i, cached_hash(db, format!("{}s", word)).await)); + } + + //let hash: Vec<(usize, STRHASH)> = hash.into_iter().filter_map(|v| v.1.map(|b| (v.0, b))).collect(); + + if hash.is_empty() { + println!("none in database"); + return None; + } + + let first_query = first_query.unwrap(); + + let SECONDARY_WORDS = [ + db::hash("how"), + db::hash("is"), + db::hash("are"), + db::hash("the"), + db::hash("a"), + db::hash("when"), + db::hash("what"), + db::hash("why"), + db::hash("to"), + db::hash("where"), + db::hash("from"), + db::hash("best"), + db::hash("for"), + db::hash("like"), + ]; + + let mut secondary_indices = BTreeSet::new(); + + let mut word_occurs = BTreeMap::new(); + for hash in &hash { + if SECONDARY_WORDS.contains(&hash.1) { + secondary_indices.insert(hash.0); + } + word_occurs.entry(hash.0).or_insert(VecDeque::new()).extend( + db::word_occurs(db, hash.1).await); + } + //let mut authorword_occurs = BTreeMap::new(); + let mut descriptionword_occurs = BTreeMap::new(); + let mut keyword_occurs = BTreeMap::new(); + //let mut sitenameword_occurs = BTreeMap::new(); + for hash in &hash { + //authorword_occurs.entry(hash.0).or_insert(VecDeque::new()).extend( + // db::metaword_occurs(db, db::hash("author"), hash.1).await); + descriptionword_occurs.entry(hash.0).or_insert(VecDeque::new()).extend( + db::metaword_occurs(db, db::hash("description"), hash.1).await); + keyword_occurs.entry(hash.0).or_insert(VecDeque::new()).extend( + db::metaword_occurs(db, db::hash("keywords"), hash.1).await); + //sitenameword_occurs.entry(hash.0).or_insert(VecDeque::new()).extend( + // db::metaword_occurs(db, db::hash("site_name"), hash.1).await); + } + let mut linkword_occurs = BTreeMap::new(); + for hash in &hash { + linkword_occurs.entry(hash.0).or_insert(VecDeque::new()).extend( + db::linkword_occurs(db, hash.1).await); + } + + let mut urls = vec![]; + + for (_, vals) in &word_occurs { + for (url, _) in vals { + urls.push(*url); + } + } + for (_, vals) in &linkword_occurs { + for (url, _) in vals { + urls.push(*url); + } + } + for (_, vals) in &descriptionword_occurs { + for (url, _) in vals { + urls.push(*url); + } + } + for (_, vals) in &keyword_occurs { + for (url, _) in vals { + urls.push(*url); + } + } + + let mut useless_urls = vec![]; + + // we want to remove urls that aren't contained in every word index (i.e. urls that don't contain every word of the search query) + // however, we don't want to remove the url if it's not contained in a secondary index (i.e. if someone searches "best x", + // we don't want to remove results that don't contain the word "best") + // we also don't want to remove results if the page doesn't have that word, but the linkwords / metawords does + for url in &urls { + // for every word index... + for (index, vals) in &word_occurs { + // don't worry about secondary indices + if secondary_indices.contains(index) { + continue; + } + // collect urls + let urls: Vec = vals.iter().map(|(url, _)| *url).collect(); + // is this url not in the words bucket for this index? + if !urls.contains(url) { + // does another bucket contain it for this index? + let mut found_elsewhere = 0; + if linkword_occurs.get(index).unwrap().iter() + .map(|(url, _)| *url).collect::>().contains(url) { + found_elsewhere += 1; + } + if descriptionword_occurs.get(index).unwrap().iter() + .map(|(url, _)| *url).collect::>().contains(url) { + found_elsewhere += 1; + } + if keyword_occurs.get(index).unwrap().iter() + .map(|(url, _)| *url).collect::>().contains(url) { + found_elsewhere += 1; + } + + if found_elsewhere < 2 { + // not found anywhere else, thus this url doesn't match every meaningful word in the query + useless_urls.push(*url); + } + } + } + } + + //for (_, vals) in &authorword_occurs { + // for (url, _) in vals { + // urls.push(*url); + // } + //} + //for (_, vals) in &sitenameword_occurs { + // for (url, _) in vals { + // urls.push(*url); + // } + //} + //for url in &urls { + // for (_, vals) in &linkword_occurs { + // let urls: Vec = vals.iter().map(|(u, _)| *u).collect(); + // if !urls.contains(url) { + // useless_urls.push(*url); + // } + // } + //} + + urls.sort(); + urls.dedup(); + urls.retain(|u| !useless_urls.contains(u)); + + for vals in word_occurs.values_mut() { + vals.retain(|(u, _)| !useless_urls.contains(u)); + } + for vals in linkword_occurs.values_mut() { + vals.retain(|(u, _)| !useless_urls.contains(u)); + } + for vals in descriptionword_occurs.values_mut() { + vals.retain(|(u, _)| !useless_urls.contains(u)); + } + for vals in keyword_occurs.values_mut() { + vals.retain(|(u, _)| !useless_urls.contains(u)); + } + + let start = chrono::Utc::now(); + + let mut word_count = Arc::new(Mutex::new(BTreeMap::new())); + + let mut allowed = Arc::new(AtomicUsize::new(0)); + let mut pr_acc = 0.0; + + const ALLOWED_BEFORE_PRUNE: usize = 512; + const ALLOWED_BEFORE_MEGAPRUNE: usize = 777; + const ALLOWED_BEFORE_GIGAPRUNE: usize = 2048; + //const ICKY_WORDS: &[&str] = &[ + // "distrowatch.com", // distrowatch is taking up too many results at the moment, remove this later + // "mariowiki.com", // mariowiki is taking up too many results at the moment, remove this later + // "wired.com", // we have too many wired articles + // "wired.cz", // we are very targeted at an english audience, they probably don't want czech wired articles at the moment + // "neocities.org/browse?", // people probably don't want to visit neocities tag lists + // "https://distrowatch.com/?language=", // people probably aren't looking for the distrowatch homepage in a random language + // "https://distrowatch.com/weekly.php/weekly.php?issue=", // a lot of results are unrelated distrowatch weekly posts + // "terms", // people probably aren't looking for tos pages + // "statement", // people probably aren't looking for tos pages + // "3cx", // nonenglish voip company, takes up unrelated search queries + // "1377x", // phishing site pretending to be 1337x, temporary fix until we can implement something like site blocking + // "//kickasstorrents.", // kickasstorrents has been down for years, only remaining sites are likely phishing scams + // "//kickasstorrent.", // kickasstorrents has been down for years, only remaining sites are likely phishing scams + // "//katcr.to", // fake kickasstorrents site + // "//kat.am", // fake kickasstorrents site + // "//kikass.to", // fake kickasstorrents site + // "//thepiratebays.com", // fake thepiratebay site + // ".fandom.com", // fuck fandom.com (todo: remove this since ultimately, it should be the user's choice to block fandom) + //]; + + // todo: since our list is so small this is okay for now, but we should cache this in the future + let deranks = Arc::new(db::get_deranked_websites(db).await); + + let mut initial_pruned = Arc::new(Mutex::new(vec![])); + // url, reason + let mut blocked: Arc>> = Arc::new(Mutex::new(vec![])); + + debug!("checking {} urls", urls.len()); + + let mut url_tasks = vec![]; + for (i, url) in urls.into_iter().enumerate() { + let allowed = allowed.clone(); + let blocked = blocked.clone(); + let initial_pruned = initial_pruned.clone(); + let word_count = word_count.clone(); + let arg_words = arg_words.clone(); + let deranks = deranks.clone(); + url_tasks.push(tokio::spawn(async move { + if i > ALLOWED_BEFORE_GIGAPRUNE { + initial_pruned.lock().unwrap().push(url); + return; + } + let db = Database::default().expect("FAILED TO CREATE NEW FDB HANDLE"); + let surl = { + cached_unhash(&db, url).await + }.to_lowercase(); + let mut contains_query_word = false; + for w in &arg_words { + if surl.contains(w) { + contains_query_word = true; + break; + } + } + let mut prepruned = false; + for (_, derank) in deranks.iter() { + if surl.contains(&derank.urlmatch) || surl.contains(&derank.urlmatch.replace("//", ".")) { + if let Some(and) = &derank.and { + if !surl.contains(and) { + continue; + } + } + if let Some(unless) = &derank.unless { + if surl.contains(unless) { + continue; + } + } + if !contains_query_word && + (i > ALLOWED_BEFORE_MEGAPRUNE || (i > ALLOWED_BEFORE_PRUNE && derank.amount < 0.85)) && + !contains_query_word { + initial_pruned.lock().unwrap().push(url); + prepruned = true; + } + if derank.amount == 0.0 { + initial_pruned.lock().unwrap().push(url); + blocked.lock().unwrap().push((surl.clone(), derank.comment.clone())); + prepruned = true; + } + } + } + if prepruned { + return; + } + let pr = pagerank_approx(&db, url, 0).await; + if i > ALLOWED_BEFORE_PRUNE { + let mut contains_query_word = false; + for w in &arg_words { + if surl.contains(w) { + contains_query_word = true; + break; + } + } + if contains_query_word || (pr > (((pr_acc / i as f64).max(0.01)) * if i > ALLOWED_BEFORE_MEGAPRUNE { 4.0 } else { 1.0 }) || (i < ALLOWED_BEFORE_MEGAPRUNE)) { + let wc = db::page_word_count(&db, url).await; + word_count.lock().unwrap().insert(url, wc); + pr_acc += pr; + allowed.fetch_add(1, Ordering::Relaxed); + } else { + initial_pruned.lock().unwrap().push(url); + } + } else { + let wc = db::page_word_count(&db, url).await; + word_count.lock().unwrap().insert(url, wc); + pr_acc += pr; + allowed.fetch_add(1, Ordering::Relaxed); + } + //let url = { + //}; + //debug!("{} pr: {}", url, pr); + //debug!("{} wc: {}", url, wc); + + // precache values + cached_unhash(&db, url).await; + cached_title(&db, url).await; + cached_desc(&db, url).await; + })); + } + + for url_task in url_tasks { + url_task.await.expect("url task failure"); + } + + let initial_pruned = initial_pruned.lock().unwrap().clone(); + let word_count = word_count.lock().unwrap().clone(); + + debug!("pruned {} results ({pr_acc}, {})", initial_pruned.len(), allowed.load(Ordering::Relaxed)); + + for vals in word_occurs.values_mut() { + vals.retain(|(u, _)| !initial_pruned.contains(u)); + } + for vals in linkword_occurs.values_mut() { + vals.retain(|(u, _)| !initial_pruned.contains(u)); + } + for vals in descriptionword_occurs.values_mut() { + vals.retain(|(u, _)| !initial_pruned.contains(u)); + } + for vals in keyword_occurs.values_mut() { + vals.retain(|(u, _)| !initial_pruned.contains(u)); + } + + let pagerank_secs = chrono::Utc::now().signed_duration_since(start).num_milliseconds() as f64 / 1000.0; + + info!("pageranks in {} secs", pagerank_secs); + + //word_occurs.sort_by(|a, b| { + // let av = *PRECALCED_PAGERANKS.lock().unwrap().get(&b.1.0).unwrap(); + // av.total_cmp(PRECALCED_PAGERANKS.lock().unwrap().get(&a.1.0).unwrap()) + //}); + //metaword_occurs.sort_by(|a, b| { + // let av = *PRECALCED_PAGERANKS.lock().unwrap().get(&b.1.0).unwrap(); + // av.total_cmp(PRECALCED_PAGERANKS.lock().unwrap().get(&a.1.0).unwrap()) + //}); + //linkword_occurs.sort_by(|a, b| { + // let av = *PRECALCED_PAGERANKS.lock().unwrap().get(&b.1.0).unwrap(); + // av.total_cmp(PRECALCED_PAGERANKS.lock().unwrap().get(&a.1.0).unwrap()) + //}); + + pub const MULTIWORD_PENALTY: f64 = 0.0001; + pub const PHRASE_PERCENTAGE: f64 = 0.7; + + for ((position, vals)) in word_occurs.iter() { + if let Some(vals2) = word_occurs.get(&(*position + 1)) { + for (url_a, occurs_a) in vals { + let url = { + cached_unhash(db, *url_a).await + }; + let occurs = u64::from_be_bytes(occurs_a.split_at(std::mem::size_of::()).0.try_into().unwrap()); + let mut acc = 0.0; + let mut distacc = 0.0; + let mut distc = 0; + let mut smallest = f64::MAX; + let mut largest = 0.0; + let mut phrase_matches = 0; + for i in 0..occurs { + let offset = (i * 2) + 8; + let dist = occurs_a[offset as usize] as f64 / 255.0; + for (url_b, occurs_b) in vals2 { + if url_a != url_b { + continue; + } + let occurs2 = u64::from_be_bytes(occurs_b.split_at(std::mem::size_of::()).0.try_into().unwrap()); + for j in 0..occurs2 { + let offset2 = (j * 2) + 8; + let dist2 = occurs_b[offset2 as usize] as f64 / 255.0; + distacc += (dist2 - dist); + if (dist2 - dist) >= 0.0 && (dist2 - dist) < smallest { + smallest = (dist2 - dist); + } + if (dist2 - dist) >= 0.0 && (dist2 - dist) > largest { + largest = (dist2 - dist); + } + if (dist2 - dist) >= 0.0 && (dist2 - dist) <= (2.1 / (*word_count.get(url_a).unwrap_or(&1) as f64).max(1.0)) { + phrase_matches += 1; + } + distc += 1; + acc += ((-(2.0 * dist + 1.0).log(1.2) / 30.0) + 1.0) * multiword_penalty(dist, dist2); + } + } + } + let res = results.entry(*url_a).or_insert(SearchResult { + url: url.clone(), + title: cached_title(db, *url_a).await, + description: cached_desc(db, *url_a).await, + url_contains_result: if url.contains(&first_query) { Some(url.len()) } else { None }, + word_occurs: acc, + authorword_occurs: 0.0, + descriptionword_occurs: 0.0, + keyword_occurs: 0.0, + sitename_occurs: 0.0, + linkword_occurs: 0.0, + pagerank: 0.0, + relavence: 0.0, + words_contained: BTreeSet::new(), + total_query_words: args_count as f64, + words: Default::default(), + closest_match: f64::MAX, + phrase_match_count: 0.0, + match_acc: 0.0, + match_c: 0.0, + highest_match: 0.0, + }); + res.word_occurs = acc; + res.words_contained.insert(*position); + res.match_acc += distacc; + res.match_c += distc as f64; + if smallest < res.closest_match { + res.closest_match = smallest; + } + if largest < res.highest_match { + res.highest_match = largest; + } + res.phrase_match_count += phrase_matches as f64; + *res.words.entry(*position).or_insert(0) += 1; + } + } else { + for (urlv, occursv) in vals { + let url = { + cached_unhash(db, *urlv).await + }; + let occurs = u64::from_be_bytes(occursv.split_at(std::mem::size_of::()).0.try_into().unwrap()); + let mut acc = 0.0; + let mut distc = 0; + for i in 0..occurs { + let offset = (i * 2) + 8; + let dist = occursv[offset as usize] as f64 / 255.0; + //distc += 1; + acc += ((-(2.0 * dist + 1.0).log(1.2) / 30.0) + 1.0); + } + let res = results.entry(*urlv).or_insert(SearchResult { + url: url.clone(), + title: cached_title(db, *urlv).await, + description: cached_desc(db, *urlv).await, + url_contains_result: if url.contains(&first_query) { Some(url.len()) } else { None }, + word_occurs: acc, + authorword_occurs: 0.0, + descriptionword_occurs: 0.0, + keyword_occurs: 0.0, + sitename_occurs: 0.0, + linkword_occurs: 0.0, + pagerank: 0.0, + relavence: 0.0, + words_contained: BTreeSet::new(), + total_query_words: args_count as f64, + words: Default::default(), + closest_match: 0.0, + phrase_match_count: 0.0, + match_acc: 0.0, + match_c: 0.0, + highest_match: 0.0, + }); + res.word_occurs = acc; + res.words_contained.insert(*position); + *res.words.entry(*position).or_insert(0) += 1; + } + } + } + + for ((position, vals)) in linkword_occurs.iter() { + if let Some(vals2) = linkword_occurs.get(&(*position + 1)) { + for (url_a, occurs_a) in vals { + let url = { + cached_unhash(db, *url_a).await + }; + let occurs = u64::from_be_bytes(occurs_a.split_at(std::mem::size_of::()).0.try_into().unwrap()); + let mut acc = 0.0; + let mut distacc = 0.0; + let mut distc = 0; + let mut smallest = f64::MAX; + let mut largest = 0.0; + let mut phrase_matches = 0; + for i in 0..occurs { + let offset = (i) + 8; + let dist = occurs_a[offset as usize] as f64 / 255.0; + for (url_b, occurs_b) in vals2 { + if url_a != url_b { + distc -= 4; + continue; + } + let occurs2 = u64::from_be_bytes(occurs_b.split_at(std::mem::size_of::()).0.try_into().unwrap()); + for j in 0..occurs2 { + let offset2 = (j) + 8; + let dist2 = occurs_b[offset2 as usize] as f64 / 255.0; + distacc += (dist2 - dist) * 12.0; + distc += 1; + if (dist2 - dist) < smallest { + smallest = (dist2 - dist); + } + if (dist2 - dist) >= 0.0 && (dist2 - dist) > largest { + largest = (dist2 - dist); + } + if (dist2 - dist) >= 0.0 && (dist2 - dist) <= (2.1 / (*word_count.get(url_a).unwrap_or(&1) as f64).max(20.0)) { + phrase_matches += 1; + } + acc += ((-(2.0 * dist + 1.0).log(1.2) / 30.0) + 1.0) * multiword_penalty(dist, dist2); + } + } + } + let res = results.entry(*url_a).or_insert(SearchResult { + url: url.clone(), + title: cached_title(db, *url_a).await, + description: cached_desc(db, *url_a).await, + url_contains_result: if url.contains(&first_query) { Some(url.len()) } else { None }, + word_occurs: 0.0, + authorword_occurs: 0.0, + descriptionword_occurs: 0.0, + keyword_occurs: 0.0, + sitename_occurs: 0.0, + linkword_occurs: acc, + pagerank: 0.0, + relavence: 0.0, + words_contained: BTreeSet::new(), + total_query_words: args_count as f64, + words: Default::default(), + closest_match: f64::MAX, + phrase_match_count: 0.0, + match_acc: 0.0, + match_c: 0.0, + highest_match: 0.0, + }); + res.linkword_occurs = acc; + res.match_acc += distacc; + res.match_c += distc as f64; + if smallest < res.closest_match { + res.closest_match = smallest; + } + res.phrase_match_count += phrase_matches as f64; + *res.words.entry(*position).or_insert(0) += 1; + } + } else { + for (url_a, occurs_a) in vals { + let url = { + cached_unhash(db, *url_a).await + }; + let occurs = u64::from_be_bytes(occurs_a.split_at(std::mem::size_of::()).0.try_into().unwrap()); + let mut acc = 0.0; + let mut distc = 0; + for i in 0..occurs { + let offset = (i) + 8; + let dist = occurs_a[offset as usize] as f64 / 255.0; + //distc += 1; + acc += ((-(2.0 * dist + 1.0).log(1.2) / 30.0) + 1.0); + } + let res = results.entry(*url_a).or_insert(SearchResult { + url: url.clone(), + title: cached_title(db, *url_a).await, + description: cached_desc(db, *url_a).await, + url_contains_result: if url.contains(&first_query) { Some(url.len()) } else { None }, + word_occurs: 0.0, + authorword_occurs: 0.0, + descriptionword_occurs: 0.0, + keyword_occurs: 0.0, + sitename_occurs: 0.0, + linkword_occurs: acc, + pagerank: 0.0, + relavence: 0.0, + words_contained: BTreeSet::new(), + total_query_words: args_count as f64, + words: Default::default(), + closest_match: 0.0, + phrase_match_count: 0.0, + match_acc: 0.0, + match_c: 0.0, + highest_match: 0.0, + }); + res.linkword_occurs = acc; + res.words_contained.insert(*position); + *res.words.entry(*position).or_insert(0) += 1; + } + } + } + + for ((position, vals)) in descriptionword_occurs.iter() { + if let Some(vals2) = descriptionword_occurs.get(&(*position + 1)) { + for (url_a, occurs_a) in vals { + let url = { + cached_unhash(db, *url_a).await + }; + let occurs = u64::from_be_bytes(occurs_a.split_at(std::mem::size_of::()).0.try_into().unwrap()); + let mut acc = 0.0; + let mut distacc = 0.0; + let mut distc = 0; + let mut smallest = f64::MAX; + let mut largest = 0.0; + let mut phrase_matches = 0; + for i in 0..occurs { + let offset = (i) + 8; + let dist = occurs_a[offset as usize] as f64 / 255.0; + for (url_b, occurs_b) in vals2 { + if url_a != url_b { + distc -= 4; + continue; + } + let occurs2 = u64::from_be_bytes(occurs_b.split_at(std::mem::size_of::()).0.try_into().unwrap()); + for j in 0..occurs2 { + let offset2 = (j) + 8; + let dist2 = occurs_b[offset2 as usize] as f64 / 255.0; + distacc += (dist2 - dist) * 12.0; + distc += 1; + if (dist2 - dist) < smallest { + smallest = (dist2 - dist); + } + if (dist2 - dist) >= 0.0 && (dist2 - dist) > largest { + largest = (dist2 - dist); + } + if (dist2 - dist) >= 0.0 && (dist2 - dist) <= (2.1 / (*word_count.get(url_a).unwrap_or(&1) as f64).max(20.0)) { + phrase_matches += 1; + } + acc += ((-(2.0 * dist + 1.0).log(1.2) / 30.0) + 1.0) * multiword_penalty(dist, dist2); + } + } + } + let res = results.entry(*url_a).or_insert(SearchResult { + url: url.clone(), + title: cached_title(db, *url_a).await, + description: cached_desc(db, *url_a).await, + url_contains_result: if url.contains(&first_query) { Some(url.len()) } else { None }, + word_occurs: 0.0, + authorword_occurs: 0.0, + descriptionword_occurs: 0.0, + keyword_occurs: 0.0, + sitename_occurs: 0.0, + linkword_occurs: acc, + pagerank: 0.0, + relavence: 0.0, + words_contained: BTreeSet::new(), + total_query_words: args_count as f64, + words: Default::default(), + closest_match: f64::MAX, + phrase_match_count: 0.0, + match_acc: 0.0, + match_c: 0.0, + highest_match: 0.0, + }); + res.descriptionword_occurs = acc; + res.match_acc += distacc; + res.match_c += distc as f64; + if smallest < res.closest_match { + res.closest_match = smallest; + } + res.phrase_match_count += phrase_matches as f64; + *res.words.entry(*position).or_insert(0) += 1; + } + } else { + for (url_a, occurs_a) in vals { + let url = { + cached_unhash(db, *url_a).await + }; + let occurs = u64::from_be_bytes(occurs_a.split_at(std::mem::size_of::()).0.try_into().unwrap()); + let mut acc = 0.0; + let mut distc = 0; + for i in 0..occurs { + let offset = (i) + 8; + let dist = occurs_a[offset as usize] as f64 / 255.0; + //distc += 1; + acc += ((-(2.0 * dist + 1.0).log(1.2) / 30.0) + 1.0); + } + let res = results.entry(*url_a).or_insert(SearchResult { + url: url.clone(), + title: cached_title(db, *url_a).await, + description: cached_desc(db, *url_a).await, + url_contains_result: if url.contains(&first_query) { Some(url.len()) } else { None }, + word_occurs: 0.0, + authorword_occurs: 0.0, + descriptionword_occurs: 0.0, + keyword_occurs: 0.0, + sitename_occurs: 0.0, + linkword_occurs: acc, + pagerank: 0.0, + relavence: 0.0, + words_contained: BTreeSet::new(), + total_query_words: args_count as f64, + words: Default::default(), + closest_match: 0.0, + phrase_match_count: 0.0, + match_acc: 0.0, + match_c: 0.0, + highest_match: 0.0, + }); + res.descriptionword_occurs = acc; + res.words_contained.insert(*position); + *res.words.entry(*position).or_insert(0) += 1; + } + } + } + + for ((position, vals)) in keyword_occurs.iter() { + if let Some(vals2) = keyword_occurs.get(&(*position + 1)) { + for (url_a, occurs_a) in vals { + let url = { + cached_unhash(db, *url_a).await + }; + let occurs = u64::from_be_bytes(occurs_a.split_at(std::mem::size_of::()).0.try_into().unwrap()); + let mut acc = 0.0; + let mut distacc = 0.0; + let mut distc = 0; + let mut smallest = f64::MAX; + let mut largest = 0.0; + let mut phrase_matches = 0; + for i in 0..occurs { + let offset = (i) + 8; + let dist = occurs_a[offset as usize] as f64 / 255.0; + for (url_b, occurs_b) in vals2 { + if url_a != url_b { + distc -= 4; + continue; + } + let occurs2 = u64::from_be_bytes(occurs_b.split_at(std::mem::size_of::()).0.try_into().unwrap()); + for j in 0..occurs2 { + let offset2 = (j) + 8; + let dist2 = occurs_b[offset2 as usize] as f64 / 255.0; + distacc += (dist2 - dist) * 12.0; + distc += 1; + if (dist2 - dist) < smallest { + smallest = (dist2 - dist); + } + if (dist2 - dist) >= 0.0 && (dist2 - dist) > largest { + largest = (dist2 - dist); + } + if (dist2 - dist) >= 0.0 && (dist2 - dist) <= (2.1 / (*word_count.get(url_a).unwrap_or(&1) as f64).max(20.0)) { + phrase_matches += 1; + } + acc += ((-(2.0 * dist + 1.0).log(1.2) / 30.0) + 1.0) * multiword_penalty(dist, dist2); + } + } + } + let res = results.entry(*url_a).or_insert(SearchResult { + url: url.clone(), + title: cached_title(db, *url_a).await, + description: cached_desc(db, *url_a).await, + url_contains_result: if url.contains(&first_query) { Some(url.len()) } else { None }, + word_occurs: 0.0, + authorword_occurs: 0.0, + descriptionword_occurs: 0.0, + keyword_occurs: 0.0, + sitename_occurs: 0.0, + linkword_occurs: acc, + pagerank: 0.0, + relavence: 0.0, + words_contained: BTreeSet::new(), + total_query_words: args_count as f64, + words: Default::default(), + closest_match: f64::MAX, + phrase_match_count: 0.0, + match_acc: 0.0, + match_c: 0.0, + highest_match: 0.0, + }); + res.keyword_occurs = acc; + res.match_acc += distacc; + res.match_c += distc as f64; + if smallest < res.closest_match { + res.closest_match = smallest; + } + res.phrase_match_count += phrase_matches as f64; + *res.words.entry(*position).or_insert(0) += 1; + } + } else { + for (url_a, occurs_a) in vals { + let url = { + cached_unhash(db, *url_a).await + }; + let occurs = u64::from_be_bytes(occurs_a.split_at(std::mem::size_of::()).0.try_into().unwrap()); + let mut acc = 0.0; + let mut distc = 0; + for i in 0..occurs { + let offset = (i) + 8; + let dist = occurs_a[offset as usize] as f64 / 255.0; + //distc += 1; + acc += ((-(2.0 * dist + 1.0).log(1.2) / 30.0) + 1.0); + } + let res = results.entry(*url_a).or_insert(SearchResult { + url: url.clone(), + title: cached_title(db, *url_a).await, + description: cached_desc(db, *url_a).await, + url_contains_result: if url.contains(&first_query) { Some(url.len()) } else { None }, + word_occurs: 0.0, + authorword_occurs: 0.0, + descriptionword_occurs: 0.0, + keyword_occurs: 0.0, + sitename_occurs: 0.0, + linkword_occurs: acc, + pagerank: 0.0, + relavence: 0.0, + words_contained: BTreeSet::new(), + total_query_words: args_count as f64, + words: Default::default(), + closest_match: 0.0, + phrase_match_count: 0.0, + match_acc: 0.0, + match_c: 0.0, + highest_match: 0.0, + }); + res.keyword_occurs = acc; + res.words_contained.insert(*position); + *res.words.entry(*position).or_insert(0) += 1; + } + } + } + + let mut max_relavence = 0.0; + let mut longest_url = 1; + let mut max_words = 1; + + let mut phrase_found = false; + + // todo: move to database + + results.iter_mut().for_each(|v| { + v.1.pagerank = *PRECALCED_PAGERANKS.lock().unwrap().get(v.0).unwrap_or( + CACHED_PAGERANKS.lock().unwrap().get(v.0).unwrap() + ); + + //debug!("{},{}", v.1.phrase_match_count, args_count); + if v.1.phrase_match_count + 1.0 >= args_count as f64 { + phrase_found = true; + } + if v.1.url.len() > longest_url { + longest_url = v.1.url.len(); + } + let mut wc = 1; + for word in &arg_words { + if v.1.url.to_lowercase().contains(word) { + wc += 1; + } + } + if wc > max_words { + max_words = wc; + } + + v.1.relavence = v.1.relavence(no_separator_flag); + + //debug!("{} -> {}/{}", v.1.url, v.1.words_contained, v.1.total_query_words); + if v.1.relavence > max_relavence { + max_relavence = v.1.relavence; + } + }); + + results.iter_mut().for_each(|v| { + let url_multiplier = ((v.1.url.len() as f64 * 2.3) / (longest_url as f64)); + //debug!("url {} multiplier {}", v.1.url, url_multiplier); + let mut word_multiplier = 1; + for word in &arg_words { + if v.1.url.to_lowercase().contains(word) { + word_multiplier += 1; + } + } + let word_multiplier = (word_multiplier as f64 * 4.0) / max_words as f64; + + let mut icky_found = 1.0; + for (_, derank) in deranks.iter() { + if v.1.url.contains(&derank.urlmatch) { + if let Some(and) = &derank.and { + if !v.1.url.contains(and) { + continue; + } + } + if let Some(unless) = &derank.unless { + if v.1.url.contains(unless) { + continue; + } + } + icky_found *= derank.amount; + } + } + + v.1.relavence *= word_multiplier.powi(8).min((max_words as f64).powi(8)); + v.1.relavence /= url_multiplier.max(1.0).powi(4); + v.1.relavence *= icky_found; + }); + + //results.iter_mut().for_each(|v| v.relevance /= max_occurs as f64); + + let mut results: Vec = results.into_iter().map(|v| v.1).collect(); + + max_relavence = 0.0; + results.sort_by(|a, b| { + if a.relavence > max_relavence { + max_relavence = a.relavence; + } + if b.relavence > max_relavence { + max_relavence = a.relavence; + } + b.relavence.total_cmp(&a.relavence) + }); + + let mut results_final = vec![]; + for result in results { + results_final.push(searchservice::SearchResult { + url: result.url, + relevance: result.relavence, + title: result.title, + description: result.description, + }); + } + + let time = chrono::Utc::now().signed_duration_since(start_t).num_milliseconds() as f64 / 1000.0; + + let blocked = blocked.lock().unwrap(); + + Some(searchservice::SearchResponse { + results: results_final, + blocked: blocked.clone(), + pagerank_time_seconds: pagerank_secs, + total_query_seconds: time, + max_relevance: max_relavence, + exact_phrase_found: phrase_found, + }) +} \ No newline at end of file diff --git a/searchservice_old/src/hacks.rs b/searchservice_old/src/hacks.rs new file mode 100644 index 0000000..2134715 --- /dev/null +++ b/searchservice_old/src/hacks.rs @@ -0,0 +1,62 @@ +/* + * searchservice hacks.rs + * - awful awful solutions to our issues + * + * Copyright (C) 2025 Real Microsoft, LLC + * + * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. If not, see . +*/ + +use std::ops::{ RangeInclusive}; + +const BURMESE_RANGE: RangeInclusive = '\u{1000}'..='\u{104f}'; +const CHINESE_RANGE1: RangeInclusive = '\u{4e00}'..='\u{9fff}'; +const CHINESE_RANGE2: RangeInclusive = '\u{3400}'..='\u{4dbf}'; +const CHINESE_RANGE3: RangeInclusive = '\u{20000}'..='\u{2a6df}'; +const CHINESE_RANGE4: RangeInclusive = '\u{2A700}'..='\u{2B73F}'; +const CHINESE_RANGE5: RangeInclusive = '\u{2B740}'..='\u{2B81F}'; +const CHINESE_RANGE6: RangeInclusive = '\u{2B820}'..='\u{2CEAF}'; +const CHINESE_RANGE7: RangeInclusive = '\u{2CEB0}'..='\u{2EBEF}'; +const CHINESE_RANGE8: RangeInclusive = '\u{30000}'..='\u{3134F}'; +const CHINESE_RANGE9: RangeInclusive = '\u{31350}'..='\u{323AF}'; +const CHINESE_RANGE10: RangeInclusive = '\u{2EBF0}'..='\u{2EE5F}'; +const CHINESE_RANGE11: RangeInclusive = '\u{F900}'..='\u{FAFF}'; +const JAPANESE_RANGE1: RangeInclusive = '\u{3040}'..='\u{309F}'; +/// KATAKANA +const JAPANESE_RANGE2: RangeInclusive = '\u{30A0}'..='\u{30FF}'; +const JAVANESE_RANGE: RangeInclusive = '\u{A980}'..='\u{A9DF}'; +const KHMER_RANGE1: RangeInclusive = '\u{1780}'..='\u{17FF}'; +const KHMER_RANGE2: RangeInclusive = '\u{19E0}'..='\u{19FF}'; +const LAO_RANGE: RangeInclusive = '\u{0E80}'..='\u{0EFF}'; +const PHAGSPA_RANGE: RangeInclusive = '\u{A840}'..='\u{A87F}'; +const TAITHAM_RANGE: RangeInclusive = '\u{1A20}'..='\u{1AAF}'; +const THAI_RANGE: RangeInclusive = '\u{0E00}'..='\u{E07F}'; +const TIBETAN_RANGE: RangeInclusive = '\u{0F00}'..='\u{0FFF}'; +const NO_WORD_BOUNDRIES: &[RangeInclusive] = &[ + BURMESE_RANGE, + CHINESE_RANGE1, CHINESE_RANGE2, CHINESE_RANGE3, CHINESE_RANGE4, CHINESE_RANGE5, CHINESE_RANGE6, CHINESE_RANGE7, CHINESE_RANGE8, CHINESE_RANGE9, CHINESE_RANGE10, CHINESE_RANGE11, + JAPANESE_RANGE1, JAPANESE_RANGE2, + JAVANESE_RANGE, + KHMER_RANGE1, KHMER_RANGE2, + LAO_RANGE, + PHAGSPA_RANGE, + TAITHAM_RANGE, + THAI_RANGE, + TIBETAN_RANGE, +]; + +pub fn is_from_language_that_doesnt_use_word_separators(str: &str) -> bool { + for c in str.chars() { + for range in NO_WORD_BOUNDRIES { + if range.contains(&c) { + return true; + } + } + } + + false +} \ No newline at end of file diff --git a/searchservice_old/src/main.rs b/searchservice_old/src/main.rs new file mode 100644 index 0000000..570f820 --- /dev/null +++ b/searchservice_old/src/main.rs @@ -0,0 +1,91 @@ +/* + * searchservice main.rs + * - entrypoint for the asklyphe search service + * + * Copyright (C) 2025 Real Microsoft, LLC + * + * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. If not, see . +*/ + +pub mod algorithm; +pub mod hacks; +mod process; + +use std::cmp::max; +use std::future::Future; +use std::io::Read; +use std::string::ToString; +use std::sync::Arc; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::time::Duration; +use async_nats::jetstream; +use async_nats::jetstream::consumer::PullConsumer; +use async_nats::jetstream::stream::RetentionPolicy; +use futures::StreamExt; +use log::{error, info, warn}; +use once_cell::sync::Lazy; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use asklyphe_common::{db, foundationdb}; +use asklyphe_common::foundationdb::Database; +use asklyphe_common::nats::comms; +use asklyphe_common::nats::comms::{Service, ServiceResponse}; +use asklyphe_common::nats::searchservice::SearchSrvcResponse; + +pub static NATS_URL: Lazy = Lazy::new(|| std::env::var("NATS_URL").expect("NO NATS DEFINED")); +pub static NATS_CERT: Lazy = Lazy::new(|| std::env::var("NATS_CERT").expect("NO NATS_CERT DEFINED")); +pub static NATS_KEY: Lazy = Lazy::new(|| std::env::var("NATS_KEY").expect("NO NATS_KEY DEFINED")); + +pub static PROCESSES_HANDLED: AtomicU64 = AtomicU64::new(0); +pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0); + +#[tokio::main] +async fn main() { + let _guard = unsafe { foundationdb::boot() }; + env_logger::init(); + info!("searchservice began at {}", chrono::Utc::now().to_string()); + let nats = async_nats::ConnectOptions::new() + .add_client_certificate(NATS_CERT.as_str().into(), NATS_KEY.as_str().into()) + .connect(NATS_URL.as_str()) + .await; + if let Err(e) = nats { + error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e); + return; + } + let nats = nats.unwrap(); + let nats = jetstream::new(nats); + + info!("searchbot spawned"); + + async fn on_recv(query: comms::Query) -> ServiceResponse { + let db = Database::default().expect("couldn't connect to foundation db!"); + let now = chrono::Utc::now().timestamp(); + LAST_MESSAGE.store(now, Ordering::Relaxed); + + let response = match query { + comms::Query::SearchService(query) => { + process::process(query, db).await + } + _ => { + SearchSrvcResponse::InvalidRequest + } + }; + + if PROCESSES_HANDLED.load(Ordering::Relaxed) % 100 == 0 { + info!("handled {} requests!", PROCESSES_HANDLED.load(Ordering::Relaxed)); + } + PROCESSES_HANDLED.fetch_add(1, Ordering::Relaxed); + ServiceResponse::SearchService(response) + } + + if let Err(e) = comms::subscribe_service(Service::SearchService, &nats, Arc::new(on_recv)).await { + error!("failed to subscribe to searchservice nats! reason {:?}", e); + } + + // we will respawn tasks if they crash + drop(_guard); +} diff --git a/searchservice_old/src/process.rs b/searchservice_old/src/process.rs new file mode 100644 index 0000000..8a990f3 --- /dev/null +++ b/searchservice_old/src/process.rs @@ -0,0 +1,118 @@ +/* + * searchservice process.rs + * - route incoming nats requests to their specific functions + * + * Copyright (C) 2025 Real Microsoft, LLC + * + * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. If not, see . +*/ + +use std::sync::Arc; +use async_nats::{jetstream, Subject}; +use log::{debug, error, warn}; +use tokio::sync::Mutex; +use futures::StreamExt; +use asklyphe_common::db::tables::{INFO_TABLE, WEBSITE_TABLE}; +use asklyphe_common::foundationdb::{Database, KeySelector, RangeOption}; +use asklyphe_common::foundationdb::options::{StreamingMode, TransactionOption}; +use asklyphe_common::foundationdb::tuple::{pack, Subspace}; +use asklyphe_common::nats::searchservice; +use asklyphe_common::nats::searchservice::{SearchSrvcQuery, SearchSrvcRequest, SearchSrvcResponse, SearchRequest, SiteCountResponse, SearchResponse}; +use crate::{algorithm, hacks}; + +pub async fn process(query: SearchSrvcQuery, db: Database) -> SearchSrvcResponse { + // a future is used so that the whole program doesn't die if an algorithm panics + let response = tokio::spawn(async move { + match query.request { + SearchSrvcRequest::SearchRequest(req) => { + search_request(req, &db).await + } + SearchSrvcRequest::SiteCountRequest => { + count_websites(&db).await + } + }}).await; + + if let Ok(response) = response { + response + } else if let Err(e) = response { + SearchSrvcResponse::OtherError(e.to_string()) + } else { + unreachable!() + } +} + +pub async fn search_request(req: SearchRequest, db: &Database) -> SearchSrvcResponse { + let words_initial: Vec = req.query.split_whitespace().map(|s| s.to_string()).collect(); + let mut words = vec![]; + let mut no_separator_flag = false; + for word in words_initial { + if hacks::is_from_language_that_doesnt_use_word_separators(&word) { + words.extend(word.chars().map(|c| c.to_string()).collect::>()); + no_separator_flag = true; + } else { + words.push(word); + } + } + + match algorithm::search(&db, words, no_separator_flag).await { + Some(results) => { + SearchSrvcResponse::SearchResponse(results) + } + None => { + SearchSrvcResponse::SearchResponse(SearchResponse { + results: vec![], + blocked: vec![], + pagerank_time_seconds: 0.0, + total_query_seconds: 0.0, + max_relevance: 0.0, + exact_phrase_found: false, + }) + } + } +} + +pub async fn count_websites(db: &Database) -> SearchSrvcResponse { + let mut counter: u64 = 0; + let subspace = Subspace::from(WEBSITE_TABLE); + let mut last_key = subspace.range().0; + let final_key = subspace.range().1; + + for _failsafe in 0..10000 { + let trx = db.create_trx(); + if let Err(e) = trx { + error!("DATABASE ERROR page_links_exiting_count {e}"); + } else { + let trx = trx.unwrap(); + // link -> from -> * + let mut range = RangeOption::from((last_key.clone(), final_key.clone())); + range.mode = StreamingMode::Iterator; + range.limit = Some(10096); + + let mut stream = trx.get_ranges_keyvalues(range, true); + + let mut this_time = 0; + + while let Some(kv) = stream.next().await { + if let Ok(kv) = kv { + counter += 1; + this_time += 1; + last_key = kv.key().to_vec(); + } else if let Err(e) = kv { + eprintln!("err while counting {e}"); + } + } + + if this_time <= 10 { + return SearchSrvcResponse::SiteCountResponse(SiteCountResponse { + count: counter / 8, + }); + } + } + + } + SearchSrvcResponse::OtherError("couldn't retrieve count ):".to_string()) +} \ No newline at end of file