asklyphe/asklyphe-common/src/ldb/mod.rs

63 lines
2 KiB
Rust
Raw Normal View History

2025-03-12 12:52:24 -07:00
pub mod wordstore;
pub mod sitestore;
pub mod linkstore;
pub mod metastore;
pub mod titlestore;
pub mod linkrelstore;
2025-03-12 12:52:24 -07:00
use std::hash::{DefaultHasher, Hasher};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use futures::StreamExt;
use log::warn;
use lyphedb::LDBNatsMessage;
use percent_encoding::{percent_encode, AsciiSet, CONTROLS, NON_ALPHANUMERIC};
use sha_rs::{Sha, Sha256};
2025-03-12 12:52:24 -07:00
static NEXT_REPLY_ID: AtomicU64 = AtomicU64::new(0);
pub const NOT_ALLOWED_ASCII: AsciiSet = CONTROLS.add(b' ').add(b'/').add(b'.').add(b'\\');
pub fn hash(str: &str) -> String {
let hasher = Sha256::new();
hasher.digest(str.as_bytes())
2025-03-12 12:52:24 -07:00
}
pub fn construct_path(path_elements: &[&str]) -> String {
let mut buf = String::new();
buf.push_str("ASKLYPHE/");
for el in path_elements {
buf.push_str(&percent_encode(el.as_bytes(), &NOT_ALLOWED_ASCII).to_string());
buf.push('/');
}
buf.pop();
buf
}
#[derive(Clone)]
pub struct DBConn {
nats: async_nats::Client,
name: String,
}
impl DBConn {
pub fn new(nats: async_nats::Client, name: impl ToString) -> DBConn {
DBConn { nats, name: name.to_string() }
}
pub async fn query(&self, message: LDBNatsMessage) -> LDBNatsMessage {
let data = rmp_serde::to_vec(&message).unwrap();
let replyto = format!("ldb-reply-{}", NEXT_REPLY_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst));
let mut subscriber = self.nats.subscribe(replyto.clone()).await.expect("NATS ERROR");
self.nats.publish_with_reply(self.name.clone(), replyto, data.into()).await.expect("NATS ERROR");
if let Some(reply) = subscriber.next().await {
let reply = rmp_serde::from_slice::<LDBNatsMessage>(&reply.payload);
if reply.is_err() {
warn!("DECODED BAD MESSAGE FROM LYPHEDB: {}", reply.err().unwrap());
return LDBNatsMessage::NotFound;
}
return reply.unwrap();
}
LDBNatsMessage::NotFound
}
}