From 0f6f0deb9c96b4a1e9adf9d33ebb6fbe375b97fa Mon Sep 17 00:00:00 2001 From: husky Date: Fri, 14 Mar 2025 18:20:15 -0700 Subject: [PATCH] linkrelstore + site suggestion cli logs href relations in database, allowing for pagerank calculation in the future. sites can now be added to the queue by running ./vorebot [damping] default damping is 0.45 for suggested sites --- Cargo.lock | 10 +++ asklyphe-common/Cargo.toml | 1 + asklyphe-common/src/ldb/linkrelstore.rs | 112 ++++++++++++++++++++++++ asklyphe-common/src/ldb/mod.rs | 7 +- vorebot/src/main.rs | 17 +++- vorebot/src/webparse/mod.rs | 8 +- 6 files changed, 145 insertions(+), 10 deletions(-) create mode 100644 asklyphe-common/src/ldb/linkrelstore.rs diff --git a/Cargo.lock b/Cargo.lock index c6d6f10..0432eea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -253,6 +253,7 @@ dependencies = [ "rand 0.8.5", "rmp-serde", "serde", + "sha-rs", "tokio", "ulid", ] @@ -4439,6 +4440,15 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "sha-rs" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683c4940af5e947166f4da5f60f7ea30c26799b2189bec6e3d64cc3aa8f0f9de" +dependencies = [ + "hex", +] + [[package]] name = "sha1" version = "0.10.6" diff --git a/asklyphe-common/Cargo.toml b/asklyphe-common/Cargo.toml index 39a9691..6027a83 100644 --- a/asklyphe-common/Cargo.toml +++ b/asklyphe-common/Cargo.toml @@ -20,6 +20,7 @@ async-nats = "0.38.0" ulid = "1.1.0" rand = "0.8.5" percent-encoding = "2.3.1" +sha-rs = "0.1.0" [features] default = [] \ No newline at end of file diff --git a/asklyphe-common/src/ldb/linkrelstore.rs b/asklyphe-common/src/ldb/linkrelstore.rs new file mode 100644 index 0000000..3a5112f --- /dev/null +++ b/asklyphe-common/src/ldb/linkrelstore.rs @@ -0,0 +1,112 @@ +use crate::ldb::{construct_path, hash, DBConn}; +use log::{error, warn}; +use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy}; + +pub const INCOMINGSTORE: &str = "incomingstore"; +pub const OUTGOINGSTORE: &str = "outgoingstore"; + +pub async fn a_linksto_b(db: &DBConn, a: &str, b: &str) -> Result<(), ()> { + let key_sets = vec![ + ( + construct_path(&[OUTGOINGSTORE, &hash(a)]) + .as_bytes() + .to_vec(), + a.as_bytes().to_vec(), + ), + ( + construct_path(&[OUTGOINGSTORE, &hash(a), &hash(b)]) + .as_bytes() + .to_vec(), + b.as_bytes().to_vec(), + ), + ( + construct_path(&[INCOMINGSTORE, &hash(b)]) + .as_bytes() + .to_vec(), + b.as_bytes().to_vec(), + ), + ( + construct_path(&[INCOMINGSTORE, &hash(b), &hash(a)]) + .as_bytes() + .to_vec(), + a.as_bytes().to_vec(), + ), + ]; + let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys( + KVList { kvs: key_sets }, + PropagationStrategy::OnRead, + )); + + match db.query(cmd).await { + LDBNatsMessage::Success => Ok(()), + LDBNatsMessage::BadRequest => { + error!("bad request for a_linksto_b"); + Err(()) + } + LDBNatsMessage::NotFound => { + error!("not found for a_linksto_b"); + Err(()) + } + + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} + +pub async fn what_links_to_this(db: &DBConn, url: &str) -> Result, ()> { + let path = construct_path(&[INCOMINGSTORE, &hash(url)]) + .as_bytes() + .to_vec(); + let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { key: path })); + + match db.query(cmd).await { + LDBNatsMessage::Entries(kvs) => Ok(kvs + .kvs + .into_iter() + .map(|v| String::from_utf8_lossy(&v.1).to_string()) + .collect()), + LDBNatsMessage::Success => { + warn!("lyphedb responded with \"success\" to what_links_to_this, treating as error"); + Err(()) + } + LDBNatsMessage::BadRequest => { + warn!("bad request for what_links_to_this"); + Err(()) + } + LDBNatsMessage::NotFound => Ok(vec![]), + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} + +pub async fn what_links_from_this(db: &DBConn, url: &str) -> Result, ()> { + let path = construct_path(&[OUTGOINGSTORE, &hash(url)]) + .as_bytes() + .to_vec(); + let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { key: path })); + + match db.query(cmd).await { + LDBNatsMessage::Entries(kvs) => Ok(kvs + .kvs + .into_iter() + .map(|v| String::from_utf8_lossy(&v.1).to_string()) + .collect()), + LDBNatsMessage::Success => { + warn!("lyphedb responded with \"success\" to what_links_from_this, treating as error"); + Err(()) + } + LDBNatsMessage::BadRequest => { + warn!("bad request for what_links_from_this"); + Err(()) + } + LDBNatsMessage::NotFound => Ok(vec![]), + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} diff --git a/asklyphe-common/src/ldb/mod.rs b/asklyphe-common/src/ldb/mod.rs index 098e00d..633c8cb 100644 --- a/asklyphe-common/src/ldb/mod.rs +++ b/asklyphe-common/src/ldb/mod.rs @@ -3,6 +3,7 @@ pub mod sitestore; pub mod linkstore; pub mod metastore; pub mod titlestore; +pub mod linkrelstore; use std::hash::{DefaultHasher, Hasher}; use std::sync::Arc; @@ -11,15 +12,15 @@ use futures::StreamExt; use log::warn; use lyphedb::LDBNatsMessage; use percent_encoding::{percent_encode, AsciiSet, CONTROLS, NON_ALPHANUMERIC}; +use sha_rs::{Sha, Sha256}; static NEXT_REPLY_ID: AtomicU64 = AtomicU64::new(0); pub const NOT_ALLOWED_ASCII: AsciiSet = CONTROLS.add(b' ').add(b'/').add(b'.').add(b'\\'); pub fn hash(str: &str) -> String { - let mut hasher = DefaultHasher::new(); - hasher.write(str.as_bytes()); - hasher.finish().to_string() + let hasher = Sha256::new(); + hasher.digest(str.as_bytes()) } pub fn construct_path(path_elements: &[&str]) -> String { diff --git a/vorebot/src/main.rs b/vorebot/src/main.rs index c845b3f..4872750 100644 --- a/vorebot/src/main.rs +++ b/vorebot/src/main.rs @@ -93,6 +93,19 @@ async fn main() { let dbconn = DBConn::new(nats.clone(), DB_NAME.to_string()); let nats = jetstream::new(nats); + // fixme: remove this once we have proper site suggestion interface + let args = std::env::args().collect::>(); + if let Some(suggestion_site) = args.get(1) { + let damping = args.get(2).map(|v| v.parse::().expect("BAD FP")).unwrap_or(0.45); + warn!("suggesting {} with damping {}", suggestion_site, damping); + + nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest { + url: suggestion_site.to_string(), + damping, + }).unwrap().into()).await.unwrap(); + return; + } + let mut tasks: Vec<(JoinHandle<()>, String)> = vec![]; let mut available_browsers: Vec = BROWSER_THREADS.clone(); @@ -335,10 +348,6 @@ async fn main() { } tasks.retain(|v| !tasks_to_remove.contains(&v.1)); tokio::time::sleep(Duration::from_secs(3)).await; - - //while let Some(p) = initial.pop() { - // nats.publish(VOREBOT_SUGGESTED_SERVICE.to_string(), rmp_serde::to_vec(&p).unwrap().into()).await.unwrap(); - //} } } } diff --git a/vorebot/src/webparse/mod.rs b/vorebot/src/webparse/mod.rs index 0d34b0c..219361b 100644 --- a/vorebot/src/webparse/mod.rs +++ b/vorebot/src/webparse/mod.rs @@ -1,5 +1,5 @@ use crate::USER_AGENT; -use asklyphe_common::ldb::{linkstore, metastore, sitestore, titlestore, wordstore, DBConn}; +use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore, DBConn}; use async_nats::jetstream; use async_nats::jetstream::kv; use futures::AsyncReadExt; @@ -473,6 +473,10 @@ pub async fn web_parse( warn!("couldn't add {} to linkwords!", url); } } + + if linkrelstore::a_linksto_b(&db, url, &href).await.is_err() { + warn!("couldn't perform a_linksto_b (a {url} b {href})"); + } nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest { url: href, @@ -484,7 +488,5 @@ pub async fn web_parse( debug!("crawled {} in {} seconds", url, elapsed); - // todo: queue links to be crawled - Ok(()) }