forked from asklyphe-public/asklyphe
92 lines
3.4 KiB
Rust
92 lines
3.4 KiB
Rust
|
|
/*
|
||
|
|
* searchservice main.rs
|
||
|
|
* - entrypoint for the asklyphe search service
|
||
|
|
*
|
||
|
|
* Copyright (C) 2025 Real Microsoft, LLC
|
||
|
|
*
|
||
|
|
* This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
|
||
|
|
*
|
||
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
|
||
|
|
*
|
||
|
|
* You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
|
*/
|
||
|
|
|
||
|
|
pub mod algorithm;
|
||
|
|
pub mod hacks;
|
||
|
|
mod process;
|
||
|
|
|
||
|
|
use std::cmp::max;
|
||
|
|
use std::future::Future;
|
||
|
|
use std::io::Read;
|
||
|
|
use std::string::ToString;
|
||
|
|
use std::sync::Arc;
|
||
|
|
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
||
|
|
use std::time::Duration;
|
||
|
|
use async_nats::jetstream;
|
||
|
|
use async_nats::jetstream::consumer::PullConsumer;
|
||
|
|
use async_nats::jetstream::stream::RetentionPolicy;
|
||
|
|
use futures::StreamExt;
|
||
|
|
use log::{error, info, warn};
|
||
|
|
use once_cell::sync::Lazy;
|
||
|
|
use tokio::sync::Mutex;
|
||
|
|
use tokio::task::JoinHandle;
|
||
|
|
use asklyphe_common::{db, foundationdb};
|
||
|
|
use asklyphe_common::foundationdb::Database;
|
||
|
|
use asklyphe_common::nats::comms;
|
||
|
|
use asklyphe_common::nats::comms::{Service, ServiceResponse};
|
||
|
|
use asklyphe_common::nats::searchservice::SearchSrvcResponse;
|
||
|
|
|
||
|
|
pub static NATS_URL: Lazy<String> = Lazy::new(|| std::env::var("NATS_URL").expect("NO NATS DEFINED"));
|
||
|
|
pub static NATS_CERT: Lazy<String> = Lazy::new(|| std::env::var("NATS_CERT").expect("NO NATS_CERT DEFINED"));
|
||
|
|
pub static NATS_KEY: Lazy<String> = Lazy::new(|| std::env::var("NATS_KEY").expect("NO NATS_KEY DEFINED"));
|
||
|
|
|
||
|
|
pub static PROCESSES_HANDLED: AtomicU64 = AtomicU64::new(0);
|
||
|
|
pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0);
|
||
|
|
|
||
|
|
#[tokio::main]
|
||
|
|
async fn main() {
|
||
|
|
let _guard = unsafe { foundationdb::boot() };
|
||
|
|
env_logger::init();
|
||
|
|
info!("searchservice began at {}", chrono::Utc::now().to_string());
|
||
|
|
let nats = async_nats::ConnectOptions::new()
|
||
|
|
.add_client_certificate(NATS_CERT.as_str().into(), NATS_KEY.as_str().into())
|
||
|
|
.connect(NATS_URL.as_str())
|
||
|
|
.await;
|
||
|
|
if let Err(e) = nats {
|
||
|
|
error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e);
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
let nats = nats.unwrap();
|
||
|
|
let nats = jetstream::new(nats);
|
||
|
|
|
||
|
|
info!("searchbot spawned");
|
||
|
|
|
||
|
|
async fn on_recv(query: comms::Query) -> ServiceResponse {
|
||
|
|
let db = Database::default().expect("couldn't connect to foundation db!");
|
||
|
|
let now = chrono::Utc::now().timestamp();
|
||
|
|
LAST_MESSAGE.store(now, Ordering::Relaxed);
|
||
|
|
|
||
|
|
let response = match query {
|
||
|
|
comms::Query::SearchService(query) => {
|
||
|
|
process::process(query, db).await
|
||
|
|
}
|
||
|
|
_ => {
|
||
|
|
SearchSrvcResponse::InvalidRequest
|
||
|
|
}
|
||
|
|
};
|
||
|
|
|
||
|
|
if PROCESSES_HANDLED.load(Ordering::Relaxed) % 100 == 0 {
|
||
|
|
info!("handled {} requests!", PROCESSES_HANDLED.load(Ordering::Relaxed));
|
||
|
|
}
|
||
|
|
PROCESSES_HANDLED.fetch_add(1, Ordering::Relaxed);
|
||
|
|
ServiceResponse::SearchService(response)
|
||
|
|
}
|
||
|
|
|
||
|
|
if let Err(e) = comms::subscribe_service(Service::SearchService, &nats, Arc::new(on_recv)).await {
|
||
|
|
error!("failed to subscribe to searchservice nats! reason {:?}", e);
|
||
|
|
}
|
||
|
|
|
||
|
|
// we will respawn tasks if they crash
|
||
|
|
drop(_guard);
|
||
|
|
}
|