initial work on rewriting stuff; awaiting changes
All checks were successful
/ build-all-services (push) Successful in 9m21s
All checks were successful
/ build-all-services (push) Successful in 9m21s
crawlers need to be storing which links link where for pageranking to work; looks like we have to start over on crawling! :D
This commit is contained in:
parent
56d07057c9
commit
0a7db89373
13 changed files with 1595 additions and 140 deletions
19
Cargo.lock
generated
19
Cargo.lock
generated
|
@ -4276,6 +4276,25 @@ version = "4.1.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b"
|
||||
|
||||
[[package]]
|
||||
name = "searchservice"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"asklyphe-common",
|
||||
"async-nats",
|
||||
"async-recursion",
|
||||
"chrono",
|
||||
"env_logger 0.10.2",
|
||||
"futures",
|
||||
"log",
|
||||
"once_cell",
|
||||
"rand 0.8.5",
|
||||
"rmp-serde",
|
||||
"serde",
|
||||
"tokio",
|
||||
"ulid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "security-framework"
|
||||
version = "2.11.1"
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
[workspace]
|
||||
members = ["asklyphe-common", "asklyphe-frontend", "asklyphe-auth-frontend", "unit_converter", "authservice", "authservice/migration", "authservice/entity", "bingservice", "googleservice", "vorebot", "lyphedb", "lyphedb/ldbtesttool"]
|
||||
members = ["asklyphe-common", "asklyphe-frontend", "asklyphe-auth-frontend", "unit_converter", "authservice", "authservice/migration", "authservice/entity", "bingservice", "googleservice", "vorebot", "lyphedb", "lyphedb/ldbtesttool", "searchservice"]
|
||||
|
|
|
@ -176,3 +176,35 @@ pub async fn get_website(db: &DBConn, url: &str) -> Result<WebsiteData, ()> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn count_websites(db: &DBConn) -> Result<u64, ()> {
|
||||
let key = construct_path(&[SITESTORE]).as_bytes().to_vec();
|
||||
|
||||
let cmd = LDBNatsMessage::Command(LypheDBCommand::CountKeys(KeyDirectory { key }));
|
||||
|
||||
match db.query(cmd).await {
|
||||
LDBNatsMessage::Count(c) => {
|
||||
Ok(c)
|
||||
}
|
||||
LDBNatsMessage::Entries(_) => {
|
||||
warn!("lyphedb responded with \"entries\" to count_websites, treating as error");
|
||||
Err(())
|
||||
}
|
||||
LDBNatsMessage::Success => {
|
||||
warn!("lyphedb responded with \"success\" to count_websites, treating as error");
|
||||
Err(())
|
||||
}
|
||||
LDBNatsMessage::BadRequest => {
|
||||
error!("bad request for count_websites");
|
||||
Err(())
|
||||
}
|
||||
LDBNatsMessage::NotFound => {
|
||||
warn!("not found for count_websites");
|
||||
Err(())
|
||||
}
|
||||
_ => {
|
||||
warn!("lyphedb sent weird message as response, treating as error");
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "searchservice"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
edition = "2021"
|
||||
license = "AGPL-3"
|
||||
license-file = "LICENSE"
|
||||
|
@ -8,7 +8,7 @@ license-file = "LICENSE"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
asklyphe-common = { path = "../asklyphe-common", features = ["foundationdb"] }
|
||||
asklyphe-common = { path = "../asklyphe-common" }
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
rmp-serde = "1.1.2"
|
||||
|
|
|
@ -12,16 +12,13 @@
|
|||
*/
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||
use std::ops::Mul;
|
||||
use std::hash::{DefaultHasher, Hasher};
|
||||
use async_recursion::async_recursion;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use log::{debug, info};
|
||||
use rand::Rng;
|
||||
use asklyphe_common::db;
|
||||
use asklyphe_common::db::{STRHASH};
|
||||
use asklyphe_common::foundationdb::Database;
|
||||
use log::*;
|
||||
use asklyphe_common::ldb::DBConn;
|
||||
use asklyphe_common::nats::searchservice;
|
||||
|
||||
pub struct SearchResult {
|
||||
|
@ -29,56 +26,20 @@ pub struct SearchResult {
|
|||
pub title: Option<String>,
|
||||
pub description: Option<String>,
|
||||
pub url_contains_result: Option<usize>,
|
||||
pub word_occurs: f64,
|
||||
pub authorword_occurs: f64,
|
||||
pub descriptionword_occurs: f64,
|
||||
pub keyword_occurs: f64,
|
||||
pub sitename_occurs: f64,
|
||||
pub linkword_occurs: f64,
|
||||
pub pagerank: f64,
|
||||
pub relavence: f64,
|
||||
pub words_contained: BTreeSet<usize>,
|
||||
pub total_query_words: f64,
|
||||
pub words: BTreeMap<usize, usize>,
|
||||
pub closest_match: f64,
|
||||
pub phrase_match_count: f64,
|
||||
pub match_acc: f64,
|
||||
pub match_c: f64,
|
||||
pub highest_match: f64,
|
||||
}
|
||||
|
||||
impl SearchResult {
|
||||
pub fn relavence(&self, no_separator_flag: bool) -> f64 {
|
||||
if !no_separator_flag {
|
||||
((self.word_occurs * 1.3) + (self.descriptionword_occurs * 1.2) + (self.keyword_occurs * 0.5) + (self.authorword_occurs * 1.2) + (self.sitename_occurs * 1.3) + (self.linkword_occurs * 2.9) + (self.pagerank.powi(4)).max(0.001))
|
||||
.max(0.01)
|
||||
*
|
||||
if self.total_query_words > 1.0 {
|
||||
((1.0001 - if self.match_c > 0.0 { (self.match_acc / self.match_c) / self.highest_match.max(0.01) } else { 0.1 })
|
||||
+ if !self.words_contained.is_empty() { (self.total_query_words / self.words_contained.len() as f64).max(0.002) } else { 1.0 }).max(0.001)
|
||||
* self.phrase_match_count.max(0.02).powi(5)
|
||||
} else {
|
||||
1.0
|
||||
}
|
||||
} else {
|
||||
if self.total_query_words > 1.0 {
|
||||
(1.0001 - if self.match_c > 0.0 { (self.match_acc / self.match_c) / self.highest_match.max(0.01) } else { 0.1 })
|
||||
* self.phrase_match_count.max(0.02).powi(8)
|
||||
} else {
|
||||
1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
type STRHASH = u64;
|
||||
|
||||
pub static PRECALCED_PAGERANKS: Lazy<Mutex<BTreeMap<STRHASH, f64>>> = Lazy::new(|| Mutex::new(BTreeMap::new()));
|
||||
pub static CACHED_PAGERANKS: Lazy<Mutex<BTreeMap<STRHASH, f64>>> = Lazy::new(|| Mutex::new(BTreeMap::new()));
|
||||
pub static VISITED_PAGERANKS: Lazy<Mutex<BTreeSet<STRHASH>>> = Lazy::new(|| Mutex::new(BTreeSet::new()));
|
||||
pub static VISITED_PAGERANKS2: Lazy<Mutex<BTreeSet<STRHASH>>> = Lazy::new(|| Mutex::new(BTreeSet::new()));
|
||||
pub static HASH_CACHE: Lazy<HashCache<String, STRHASH>> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new()))));
|
||||
pub static UNHASH_CACHE: Lazy<HashCache<STRHASH, String>> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new()))));
|
||||
pub static TITLE_CACHE: Lazy<HashCache<STRHASH, Option<String>>> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new()))));
|
||||
pub static DESC_CACHE: Lazy<HashCache<STRHASH, Option<String>>> = Lazy::new(|| HashCache::new(Arc::new(Mutex::new(BTreeMap::new()))));
|
||||
|
||||
pub fn hash(data: &str) -> STRHASH {
|
||||
let mut hash = DefaultHasher::new();
|
||||
hash.write(data.as_bytes());
|
||||
hash.finish()
|
||||
}
|
||||
|
||||
pub struct HashCache<K: Ord, V: Clone> {
|
||||
inner: Arc<Mutex<BTreeMap<K, V>>>,
|
||||
|
@ -122,44 +83,12 @@ impl<K: Ord, V: Clone> HashCache<K, V> {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn cached_hash(db: &Database, key: String) -> STRHASH {
|
||||
HASH_CACHE.entry(key.clone()).or_insert(db::foa_strhash(db, &key)).await
|
||||
}
|
||||
|
||||
pub async fn cached_unhash(db: &Database, key: STRHASH) -> String {
|
||||
UNHASH_CACHE.entry(key).or_insert(db::unhash(db, key)).await
|
||||
}
|
||||
|
||||
pub async fn cached_title(db: &Database, key: STRHASH) -> Option<String> {
|
||||
if let Some(title) = TITLE_CACHE.entry(key).or_insert(db::info_title(db, key)).await {
|
||||
Some(title)
|
||||
} else {
|
||||
TITLE_CACHE.unconditional(key).or_insert(db::info_title(db, key)).await
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cached_desc(db: &Database, key: STRHASH) -> Option<String> {
|
||||
if let Some(desc) = DESC_CACHE.entry(key).or_insert(db::info_description(db, key)).await {
|
||||
Some(desc)
|
||||
} else {
|
||||
DESC_CACHE.unconditional(key).or_insert(db::info_description(db, key)).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_recursion]
|
||||
pub async fn pagerank(db: &Database, url: STRHASH) -> f64 {
|
||||
if let Some(precalc) = PRECALCED_PAGERANKS.lock().unwrap().get(&url) {
|
||||
pub async fn pagerank(db: &DBConn, url: &str) -> f64 {
|
||||
let uhash = hash(url);
|
||||
if let Some(precalc) = PRECALCED_PAGERANKS.lock().unwrap().get(&uhash) {
|
||||
return *precalc;
|
||||
}
|
||||
if let Some(precalc_db) = db::page_pagerank(db, url).await {
|
||||
//debug!("url {} in db {}", url, precalc_db);
|
||||
if precalc_db == 0.0 {
|
||||
//debug!("but 0 ):");
|
||||
} else {
|
||||
CACHED_PAGERANKS.lock().unwrap().insert(url, precalc_db);
|
||||
return precalc_db;
|
||||
}
|
||||
}
|
||||
let mut accum = 0.0;
|
||||
let incoming = {
|
||||
db::page_links_entering(db, url).await
|
||||
|
|
|
@ -27,11 +27,10 @@ use async_nats::jetstream::consumer::PullConsumer;
|
|||
use async_nats::jetstream::stream::RetentionPolicy;
|
||||
use futures::StreamExt;
|
||||
use log::{error, info, warn};
|
||||
use once_cell::sync::Lazy;
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
use asklyphe_common::{db, foundationdb};
|
||||
use asklyphe_common::foundationdb::Database;
|
||||
use asklyphe_common::ldb::DBConn;
|
||||
use asklyphe_common::nats::comms;
|
||||
use asklyphe_common::nats::comms::{Service, ServiceResponse};
|
||||
use asklyphe_common::nats::searchservice::SearchSrvcResponse;
|
||||
|
@ -39,13 +38,16 @@ use asklyphe_common::nats::searchservice::SearchSrvcResponse;
|
|||
pub static NATS_URL: Lazy<String> = Lazy::new(|| std::env::var("NATS_URL").expect("NO NATS DEFINED"));
|
||||
pub static NATS_CERT: Lazy<String> = Lazy::new(|| std::env::var("NATS_CERT").expect("NO NATS_CERT DEFINED"));
|
||||
pub static NATS_KEY: Lazy<String> = Lazy::new(|| std::env::var("NATS_KEY").expect("NO NATS_KEY DEFINED"));
|
||||
pub static DB_NAME: Lazy<String> =
|
||||
Lazy::new(|| std::env::var("DB_NAME").expect("PLEASE ADD DB_NAME"));
|
||||
|
||||
pub static PROCESSES_HANDLED: AtomicU64 = AtomicU64::new(0);
|
||||
pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0);
|
||||
|
||||
static DB_CONN: OnceCell<DBConn> = OnceCell::new();
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let _guard = unsafe { foundationdb::boot() };
|
||||
env_logger::init();
|
||||
info!("searchservice began at {}", chrono::Utc::now().to_string());
|
||||
let nats = async_nats::ConnectOptions::new()
|
||||
|
@ -57,12 +59,14 @@ async fn main() {
|
|||
return;
|
||||
}
|
||||
let nats = nats.unwrap();
|
||||
let dbconn = DBConn::new(nats.clone(), DB_NAME.to_string());
|
||||
let _ = DB_CONN.set(dbconn);
|
||||
let nats = jetstream::new(nats);
|
||||
|
||||
info!("searchbot spawned");
|
||||
|
||||
async fn on_recv(query: comms::Query) -> ServiceResponse {
|
||||
let db = Database::default().expect("couldn't connect to foundation db!");
|
||||
let db = DB_CONN.get().unwrap().clone();
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
LAST_MESSAGE.store(now, Ordering::Relaxed);
|
||||
|
||||
|
@ -85,7 +89,4 @@ async fn main() {
|
|||
if let Err(e) = comms::subscribe_service(Service::SearchService, &nats, Arc::new(on_recv)).await {
|
||||
error!("failed to subscribe to searchservice nats! reason {:?}", e);
|
||||
}
|
||||
|
||||
// we will respawn tasks if they crash
|
||||
drop(_guard);
|
||||
}
|
||||
|
|
|
@ -16,15 +16,12 @@ use async_nats::{jetstream, Subject};
|
|||
use log::{debug, error, warn};
|
||||
use tokio::sync::Mutex;
|
||||
use futures::StreamExt;
|
||||
use asklyphe_common::db::tables::{INFO_TABLE, WEBSITE_TABLE};
|
||||
use asklyphe_common::foundationdb::{Database, KeySelector, RangeOption};
|
||||
use asklyphe_common::foundationdb::options::{StreamingMode, TransactionOption};
|
||||
use asklyphe_common::foundationdb::tuple::{pack, Subspace};
|
||||
use asklyphe_common::ldb::{sitestore, DBConn};
|
||||
use asklyphe_common::nats::searchservice;
|
||||
use asklyphe_common::nats::searchservice::{SearchSrvcQuery, SearchSrvcRequest, SearchSrvcResponse, SearchRequest, SiteCountResponse, SearchResponse};
|
||||
use crate::{algorithm, hacks};
|
||||
|
||||
pub async fn process(query: SearchSrvcQuery, db: Database) -> SearchSrvcResponse {
|
||||
pub async fn process(query: SearchSrvcQuery, db: DBConn) -> SearchSrvcResponse {
|
||||
// a future is used so that the whole program doesn't die if an algorithm panics
|
||||
let response = tokio::spawn(async move {
|
||||
match query.request {
|
||||
|
@ -45,7 +42,7 @@ pub async fn process(query: SearchSrvcQuery, db: Database) -> SearchSrvcResponse
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn search_request(req: SearchRequest, db: &Database) -> SearchSrvcResponse {
|
||||
pub async fn search_request(req: SearchRequest, db: &DBConn) -> SearchSrvcResponse {
|
||||
let words_initial: Vec<String> = req.query.split_whitespace().map(|s| s.to_string()).collect();
|
||||
let mut words = vec![];
|
||||
let mut no_separator_flag = false;
|
||||
|
@ -75,44 +72,12 @@ pub async fn search_request(req: SearchRequest, db: &Database) -> SearchSrvcResp
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn count_websites(db: &Database) -> SearchSrvcResponse {
|
||||
let mut counter: u64 = 0;
|
||||
let subspace = Subspace::from(WEBSITE_TABLE);
|
||||
let mut last_key = subspace.range().0;
|
||||
let final_key = subspace.range().1;
|
||||
|
||||
for _failsafe in 0..10000 {
|
||||
let trx = db.create_trx();
|
||||
if let Err(e) = trx {
|
||||
error!("DATABASE ERROR page_links_exiting_count {e}");
|
||||
} else {
|
||||
let trx = trx.unwrap();
|
||||
// link -> from -> *
|
||||
let mut range = RangeOption::from((last_key.clone(), final_key.clone()));
|
||||
range.mode = StreamingMode::Iterator;
|
||||
range.limit = Some(10096);
|
||||
|
||||
let mut stream = trx.get_ranges_keyvalues(range, true);
|
||||
|
||||
let mut this_time = 0;
|
||||
|
||||
while let Some(kv) = stream.next().await {
|
||||
if let Ok(kv) = kv {
|
||||
counter += 1;
|
||||
this_time += 1;
|
||||
last_key = kv.key().to_vec();
|
||||
} else if let Err(e) = kv {
|
||||
eprintln!("err while counting {e}");
|
||||
}
|
||||
}
|
||||
|
||||
if this_time <= 10 {
|
||||
return SearchSrvcResponse::SiteCountResponse(SiteCountResponse {
|
||||
count: counter / 8,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn count_websites(db: &DBConn) -> SearchSrvcResponse {
|
||||
if let Ok(count) = sitestore::count_websites(db).await {
|
||||
SearchSrvcResponse::SiteCountResponse(SiteCountResponse {
|
||||
count,
|
||||
})
|
||||
} else {
|
||||
SearchSrvcResponse::OtherError("couldn't retrieve count ):".to_string())
|
||||
}
|
||||
SearchSrvcResponse::OtherError("couldn't retrieve count ):".to_string())
|
||||
}
|
2
searchservice_old/.gitignore
vendored
Normal file
2
searchservice_old/.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
/target
|
||||
.idea
|
23
searchservice_old/Cargo.toml
Normal file
23
searchservice_old/Cargo.toml
Normal file
|
@ -0,0 +1,23 @@
|
|||
[package]
|
||||
name = "searchservice"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "AGPL-3"
|
||||
license-file = "LICENSE"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
asklyphe-common = { path = "../asklyphe-common", features = ["foundationdb"] }
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
rmp-serde = "1.1.2"
|
||||
async-nats = "0.38.0"
|
||||
chrono = "0.4.26"
|
||||
once_cell = "1.18.0"
|
||||
ulid = "1.0.0"
|
||||
rand = "0.8.5"
|
||||
async-recursion = "1.0.5"
|
||||
futures = "0.3.30"
|
||||
log = "0.4.20"
|
||||
env_logger = "0.10.2"
|
1213
searchservice_old/src/algorithm.rs
Normal file
1213
searchservice_old/src/algorithm.rs
Normal file
File diff suppressed because it is too large
Load diff
62
searchservice_old/src/hacks.rs
Normal file
62
searchservice_old/src/hacks.rs
Normal file
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* searchservice hacks.rs
|
||||
* - awful awful solutions to our issues
|
||||
*
|
||||
* Copyright (C) 2025 Real Microsoft, LLC
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use std::ops::{ RangeInclusive};
|
||||
|
||||
const BURMESE_RANGE: RangeInclusive<char> = '\u{1000}'..='\u{104f}';
|
||||
const CHINESE_RANGE1: RangeInclusive<char> = '\u{4e00}'..='\u{9fff}';
|
||||
const CHINESE_RANGE2: RangeInclusive<char> = '\u{3400}'..='\u{4dbf}';
|
||||
const CHINESE_RANGE3: RangeInclusive<char> = '\u{20000}'..='\u{2a6df}';
|
||||
const CHINESE_RANGE4: RangeInclusive<char> = '\u{2A700}'..='\u{2B73F}';
|
||||
const CHINESE_RANGE5: RangeInclusive<char> = '\u{2B740}'..='\u{2B81F}';
|
||||
const CHINESE_RANGE6: RangeInclusive<char> = '\u{2B820}'..='\u{2CEAF}';
|
||||
const CHINESE_RANGE7: RangeInclusive<char> = '\u{2CEB0}'..='\u{2EBEF}';
|
||||
const CHINESE_RANGE8: RangeInclusive<char> = '\u{30000}'..='\u{3134F}';
|
||||
const CHINESE_RANGE9: RangeInclusive<char> = '\u{31350}'..='\u{323AF}';
|
||||
const CHINESE_RANGE10: RangeInclusive<char> = '\u{2EBF0}'..='\u{2EE5F}';
|
||||
const CHINESE_RANGE11: RangeInclusive<char> = '\u{F900}'..='\u{FAFF}';
|
||||
const JAPANESE_RANGE1: RangeInclusive<char> = '\u{3040}'..='\u{309F}';
|
||||
/// KATAKANA
|
||||
const JAPANESE_RANGE2: RangeInclusive<char> = '\u{30A0}'..='\u{30FF}';
|
||||
const JAVANESE_RANGE: RangeInclusive<char> = '\u{A980}'..='\u{A9DF}';
|
||||
const KHMER_RANGE1: RangeInclusive<char> = '\u{1780}'..='\u{17FF}';
|
||||
const KHMER_RANGE2: RangeInclusive<char> = '\u{19E0}'..='\u{19FF}';
|
||||
const LAO_RANGE: RangeInclusive<char> = '\u{0E80}'..='\u{0EFF}';
|
||||
const PHAGSPA_RANGE: RangeInclusive<char> = '\u{A840}'..='\u{A87F}';
|
||||
const TAITHAM_RANGE: RangeInclusive<char> = '\u{1A20}'..='\u{1AAF}';
|
||||
const THAI_RANGE: RangeInclusive<char> = '\u{0E00}'..='\u{E07F}';
|
||||
const TIBETAN_RANGE: RangeInclusive<char> = '\u{0F00}'..='\u{0FFF}';
|
||||
const NO_WORD_BOUNDRIES: &[RangeInclusive<char>] = &[
|
||||
BURMESE_RANGE,
|
||||
CHINESE_RANGE1, CHINESE_RANGE2, CHINESE_RANGE3, CHINESE_RANGE4, CHINESE_RANGE5, CHINESE_RANGE6, CHINESE_RANGE7, CHINESE_RANGE8, CHINESE_RANGE9, CHINESE_RANGE10, CHINESE_RANGE11,
|
||||
JAPANESE_RANGE1, JAPANESE_RANGE2,
|
||||
JAVANESE_RANGE,
|
||||
KHMER_RANGE1, KHMER_RANGE2,
|
||||
LAO_RANGE,
|
||||
PHAGSPA_RANGE,
|
||||
TAITHAM_RANGE,
|
||||
THAI_RANGE,
|
||||
TIBETAN_RANGE,
|
||||
];
|
||||
|
||||
pub fn is_from_language_that_doesnt_use_word_separators(str: &str) -> bool {
|
||||
for c in str.chars() {
|
||||
for range in NO_WORD_BOUNDRIES {
|
||||
if range.contains(&c) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
91
searchservice_old/src/main.rs
Normal file
91
searchservice_old/src/main.rs
Normal file
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* searchservice main.rs
|
||||
* - entrypoint for the asklyphe search service
|
||||
*
|
||||
* Copyright (C) 2025 Real Microsoft, LLC
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
pub mod algorithm;
|
||||
pub mod hacks;
|
||||
mod process;
|
||||
|
||||
use std::cmp::max;
|
||||
use std::future::Future;
|
||||
use std::io::Read;
|
||||
use std::string::ToString;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
||||
use std::time::Duration;
|
||||
use async_nats::jetstream;
|
||||
use async_nats::jetstream::consumer::PullConsumer;
|
||||
use async_nats::jetstream::stream::RetentionPolicy;
|
||||
use futures::StreamExt;
|
||||
use log::{error, info, warn};
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
use asklyphe_common::{db, foundationdb};
|
||||
use asklyphe_common::foundationdb::Database;
|
||||
use asklyphe_common::nats::comms;
|
||||
use asklyphe_common::nats::comms::{Service, ServiceResponse};
|
||||
use asklyphe_common::nats::searchservice::SearchSrvcResponse;
|
||||
|
||||
pub static NATS_URL: Lazy<String> = Lazy::new(|| std::env::var("NATS_URL").expect("NO NATS DEFINED"));
|
||||
pub static NATS_CERT: Lazy<String> = Lazy::new(|| std::env::var("NATS_CERT").expect("NO NATS_CERT DEFINED"));
|
||||
pub static NATS_KEY: Lazy<String> = Lazy::new(|| std::env::var("NATS_KEY").expect("NO NATS_KEY DEFINED"));
|
||||
|
||||
pub static PROCESSES_HANDLED: AtomicU64 = AtomicU64::new(0);
|
||||
pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0);
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let _guard = unsafe { foundationdb::boot() };
|
||||
env_logger::init();
|
||||
info!("searchservice began at {}", chrono::Utc::now().to_string());
|
||||
let nats = async_nats::ConnectOptions::new()
|
||||
.add_client_certificate(NATS_CERT.as_str().into(), NATS_KEY.as_str().into())
|
||||
.connect(NATS_URL.as_str())
|
||||
.await;
|
||||
if let Err(e) = nats {
|
||||
error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e);
|
||||
return;
|
||||
}
|
||||
let nats = nats.unwrap();
|
||||
let nats = jetstream::new(nats);
|
||||
|
||||
info!("searchbot spawned");
|
||||
|
||||
async fn on_recv(query: comms::Query) -> ServiceResponse {
|
||||
let db = Database::default().expect("couldn't connect to foundation db!");
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
LAST_MESSAGE.store(now, Ordering::Relaxed);
|
||||
|
||||
let response = match query {
|
||||
comms::Query::SearchService(query) => {
|
||||
process::process(query, db).await
|
||||
}
|
||||
_ => {
|
||||
SearchSrvcResponse::InvalidRequest
|
||||
}
|
||||
};
|
||||
|
||||
if PROCESSES_HANDLED.load(Ordering::Relaxed) % 100 == 0 {
|
||||
info!("handled {} requests!", PROCESSES_HANDLED.load(Ordering::Relaxed));
|
||||
}
|
||||
PROCESSES_HANDLED.fetch_add(1, Ordering::Relaxed);
|
||||
ServiceResponse::SearchService(response)
|
||||
}
|
||||
|
||||
if let Err(e) = comms::subscribe_service(Service::SearchService, &nats, Arc::new(on_recv)).await {
|
||||
error!("failed to subscribe to searchservice nats! reason {:?}", e);
|
||||
}
|
||||
|
||||
// we will respawn tasks if they crash
|
||||
drop(_guard);
|
||||
}
|
118
searchservice_old/src/process.rs
Normal file
118
searchservice_old/src/process.rs
Normal file
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* searchservice process.rs
|
||||
* - route incoming nats requests to their specific functions
|
||||
*
|
||||
* Copyright (C) 2025 Real Microsoft, LLC
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use std::sync::Arc;
|
||||
use async_nats::{jetstream, Subject};
|
||||
use log::{debug, error, warn};
|
||||
use tokio::sync::Mutex;
|
||||
use futures::StreamExt;
|
||||
use asklyphe_common::db::tables::{INFO_TABLE, WEBSITE_TABLE};
|
||||
use asklyphe_common::foundationdb::{Database, KeySelector, RangeOption};
|
||||
use asklyphe_common::foundationdb::options::{StreamingMode, TransactionOption};
|
||||
use asklyphe_common::foundationdb::tuple::{pack, Subspace};
|
||||
use asklyphe_common::nats::searchservice;
|
||||
use asklyphe_common::nats::searchservice::{SearchSrvcQuery, SearchSrvcRequest, SearchSrvcResponse, SearchRequest, SiteCountResponse, SearchResponse};
|
||||
use crate::{algorithm, hacks};
|
||||
|
||||
pub async fn process(query: SearchSrvcQuery, db: Database) -> SearchSrvcResponse {
|
||||
// a future is used so that the whole program doesn't die if an algorithm panics
|
||||
let response = tokio::spawn(async move {
|
||||
match query.request {
|
||||
SearchSrvcRequest::SearchRequest(req) => {
|
||||
search_request(req, &db).await
|
||||
}
|
||||
SearchSrvcRequest::SiteCountRequest => {
|
||||
count_websites(&db).await
|
||||
}
|
||||
}}).await;
|
||||
|
||||
if let Ok(response) = response {
|
||||
response
|
||||
} else if let Err(e) = response {
|
||||
SearchSrvcResponse::OtherError(e.to_string())
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn search_request(req: SearchRequest, db: &Database) -> SearchSrvcResponse {
|
||||
let words_initial: Vec<String> = req.query.split_whitespace().map(|s| s.to_string()).collect();
|
||||
let mut words = vec![];
|
||||
let mut no_separator_flag = false;
|
||||
for word in words_initial {
|
||||
if hacks::is_from_language_that_doesnt_use_word_separators(&word) {
|
||||
words.extend(word.chars().map(|c| c.to_string()).collect::<Vec<String>>());
|
||||
no_separator_flag = true;
|
||||
} else {
|
||||
words.push(word);
|
||||
}
|
||||
}
|
||||
|
||||
match algorithm::search(&db, words, no_separator_flag).await {
|
||||
Some(results) => {
|
||||
SearchSrvcResponse::SearchResponse(results)
|
||||
}
|
||||
None => {
|
||||
SearchSrvcResponse::SearchResponse(SearchResponse {
|
||||
results: vec![],
|
||||
blocked: vec![],
|
||||
pagerank_time_seconds: 0.0,
|
||||
total_query_seconds: 0.0,
|
||||
max_relevance: 0.0,
|
||||
exact_phrase_found: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn count_websites(db: &Database) -> SearchSrvcResponse {
|
||||
let mut counter: u64 = 0;
|
||||
let subspace = Subspace::from(WEBSITE_TABLE);
|
||||
let mut last_key = subspace.range().0;
|
||||
let final_key = subspace.range().1;
|
||||
|
||||
for _failsafe in 0..10000 {
|
||||
let trx = db.create_trx();
|
||||
if let Err(e) = trx {
|
||||
error!("DATABASE ERROR page_links_exiting_count {e}");
|
||||
} else {
|
||||
let trx = trx.unwrap();
|
||||
// link -> from -> *
|
||||
let mut range = RangeOption::from((last_key.clone(), final_key.clone()));
|
||||
range.mode = StreamingMode::Iterator;
|
||||
range.limit = Some(10096);
|
||||
|
||||
let mut stream = trx.get_ranges_keyvalues(range, true);
|
||||
|
||||
let mut this_time = 0;
|
||||
|
||||
while let Some(kv) = stream.next().await {
|
||||
if let Ok(kv) = kv {
|
||||
counter += 1;
|
||||
this_time += 1;
|
||||
last_key = kv.key().to_vec();
|
||||
} else if let Err(e) = kv {
|
||||
eprintln!("err while counting {e}");
|
||||
}
|
||||
}
|
||||
|
||||
if this_time <= 10 {
|
||||
return SearchSrvcResponse::SiteCountResponse(SiteCountResponse {
|
||||
count: counter / 8,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
SearchSrvcResponse::OtherError("couldn't retrieve count ):".to_string())
|
||||
}
|
Loading…
Add table
Reference in a new issue