pub mod wordstore; pub mod sitestore; pub mod linkstore; pub mod metastore; pub mod titlestore; use std::hash::{DefaultHasher, Hasher}; use std::sync::Arc; use std::sync::atomic::AtomicU64; use futures::StreamExt; use log::warn; use lyphedb::LDBNatsMessage; use percent_encoding::{percent_encode, AsciiSet, CONTROLS, NON_ALPHANUMERIC}; static NEXT_REPLY_ID: AtomicU64 = AtomicU64::new(0); pub const NOT_ALLOWED_ASCII: AsciiSet = CONTROLS.add(b' ').add(b'/').add(b'.').add(b'\\'); pub fn hash(str: &str) -> String { let mut hasher = DefaultHasher::new(); hasher.write(str.as_bytes()); hasher.finish().to_string() } pub fn construct_path(path_elements: &[&str]) -> String { let mut buf = String::new(); buf.push_str("ASKLYPHE/"); for el in path_elements { buf.push_str(&percent_encode(el.as_bytes(), &NOT_ALLOWED_ASCII).to_string()); buf.push('/'); } buf.pop(); buf } #[derive(Clone)] pub struct DBConn { nats: async_nats::Client, name: String, } impl DBConn { pub fn new(nats: async_nats::Client, name: impl ToString) -> DBConn { DBConn { nats, name: name.to_string() } } pub async fn query(&self, message: LDBNatsMessage) -> LDBNatsMessage { let data = rmp_serde::to_vec(&message).unwrap(); let replyto = format!("ldb-reply-{}", NEXT_REPLY_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)); let mut subscriber = self.nats.subscribe(replyto.clone()).await.expect("NATS ERROR"); self.nats.publish_with_reply(self.name.clone(), replyto, data.into()).await.expect("NATS ERROR"); if let Some(reply) = subscriber.next().await { let reply = rmp_serde::from_slice::(&reply.payload); if reply.is_err() { warn!("DECODED BAD MESSAGE FROM LYPHEDB: {}", reply.err().unwrap()); return LDBNatsMessage::NotFound; } return reply.unwrap(); } LDBNatsMessage::NotFound } }