linkrelstore + site suggestion cli
All checks were successful
/ build-all-services (push) Successful in 9m21s
All checks were successful
/ build-all-services (push) Successful in 9m21s
logs href relations in database, allowing for pagerank calculation in the future. sites can now be added to the queue by running ./vorebot <url> [damping] default damping is 0.45 for suggested sites
This commit is contained in:
parent
9c159c7170
commit
0f6f0deb9c
6 changed files with 145 additions and 10 deletions
10
Cargo.lock
generated
10
Cargo.lock
generated
|
@ -253,6 +253,7 @@ dependencies = [
|
|||
"rand 0.8.5",
|
||||
"rmp-serde",
|
||||
"serde",
|
||||
"sha-rs",
|
||||
"tokio",
|
||||
"ulid",
|
||||
]
|
||||
|
@ -4439,6 +4440,15 @@ dependencies = [
|
|||
"stable_deref_trait",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha-rs"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "683c4940af5e947166f4da5f60f7ea30c26799b2189bec6e3d64cc3aa8f0f9de"
|
||||
dependencies = [
|
||||
"hex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1"
|
||||
version = "0.10.6"
|
||||
|
|
|
@ -20,6 +20,7 @@ async-nats = "0.38.0"
|
|||
ulid = "1.1.0"
|
||||
rand = "0.8.5"
|
||||
percent-encoding = "2.3.1"
|
||||
sha-rs = "0.1.0"
|
||||
|
||||
[features]
|
||||
default = []
|
112
asklyphe-common/src/ldb/linkrelstore.rs
Normal file
112
asklyphe-common/src/ldb/linkrelstore.rs
Normal file
|
@ -0,0 +1,112 @@
|
|||
use crate::ldb::{construct_path, hash, DBConn};
|
||||
use log::{error, warn};
|
||||
use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy};
|
||||
|
||||
pub const INCOMINGSTORE: &str = "incomingstore";
|
||||
pub const OUTGOINGSTORE: &str = "outgoingstore";
|
||||
|
||||
pub async fn a_linksto_b(db: &DBConn, a: &str, b: &str) -> Result<(), ()> {
|
||||
let key_sets = vec![
|
||||
(
|
||||
construct_path(&[OUTGOINGSTORE, &hash(a)])
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
a.as_bytes().to_vec(),
|
||||
),
|
||||
(
|
||||
construct_path(&[OUTGOINGSTORE, &hash(a), &hash(b)])
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
b.as_bytes().to_vec(),
|
||||
),
|
||||
(
|
||||
construct_path(&[INCOMINGSTORE, &hash(b)])
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
b.as_bytes().to_vec(),
|
||||
),
|
||||
(
|
||||
construct_path(&[INCOMINGSTORE, &hash(b), &hash(a)])
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
a.as_bytes().to_vec(),
|
||||
),
|
||||
];
|
||||
let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(
|
||||
KVList { kvs: key_sets },
|
||||
PropagationStrategy::OnRead,
|
||||
));
|
||||
|
||||
match db.query(cmd).await {
|
||||
LDBNatsMessage::Success => Ok(()),
|
||||
LDBNatsMessage::BadRequest => {
|
||||
error!("bad request for a_linksto_b");
|
||||
Err(())
|
||||
}
|
||||
LDBNatsMessage::NotFound => {
|
||||
error!("not found for a_linksto_b");
|
||||
Err(())
|
||||
}
|
||||
|
||||
_ => {
|
||||
warn!("lyphedb sent weird message as response, treating as error");
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn what_links_to_this(db: &DBConn, url: &str) -> Result<Vec<String>, ()> {
|
||||
let path = construct_path(&[INCOMINGSTORE, &hash(url)])
|
||||
.as_bytes()
|
||||
.to_vec();
|
||||
let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { key: path }));
|
||||
|
||||
match db.query(cmd).await {
|
||||
LDBNatsMessage::Entries(kvs) => Ok(kvs
|
||||
.kvs
|
||||
.into_iter()
|
||||
.map(|v| String::from_utf8_lossy(&v.1).to_string())
|
||||
.collect()),
|
||||
LDBNatsMessage::Success => {
|
||||
warn!("lyphedb responded with \"success\" to what_links_to_this, treating as error");
|
||||
Err(())
|
||||
}
|
||||
LDBNatsMessage::BadRequest => {
|
||||
warn!("bad request for what_links_to_this");
|
||||
Err(())
|
||||
}
|
||||
LDBNatsMessage::NotFound => Ok(vec![]),
|
||||
_ => {
|
||||
warn!("lyphedb sent weird message as response, treating as error");
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn what_links_from_this(db: &DBConn, url: &str) -> Result<Vec<String>, ()> {
|
||||
let path = construct_path(&[OUTGOINGSTORE, &hash(url)])
|
||||
.as_bytes()
|
||||
.to_vec();
|
||||
let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { key: path }));
|
||||
|
||||
match db.query(cmd).await {
|
||||
LDBNatsMessage::Entries(kvs) => Ok(kvs
|
||||
.kvs
|
||||
.into_iter()
|
||||
.map(|v| String::from_utf8_lossy(&v.1).to_string())
|
||||
.collect()),
|
||||
LDBNatsMessage::Success => {
|
||||
warn!("lyphedb responded with \"success\" to what_links_from_this, treating as error");
|
||||
Err(())
|
||||
}
|
||||
LDBNatsMessage::BadRequest => {
|
||||
warn!("bad request for what_links_from_this");
|
||||
Err(())
|
||||
}
|
||||
LDBNatsMessage::NotFound => Ok(vec![]),
|
||||
_ => {
|
||||
warn!("lyphedb sent weird message as response, treating as error");
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ pub mod sitestore;
|
|||
pub mod linkstore;
|
||||
pub mod metastore;
|
||||
pub mod titlestore;
|
||||
pub mod linkrelstore;
|
||||
|
||||
use std::hash::{DefaultHasher, Hasher};
|
||||
use std::sync::Arc;
|
||||
|
@ -11,15 +12,15 @@ use futures::StreamExt;
|
|||
use log::warn;
|
||||
use lyphedb::LDBNatsMessage;
|
||||
use percent_encoding::{percent_encode, AsciiSet, CONTROLS, NON_ALPHANUMERIC};
|
||||
use sha_rs::{Sha, Sha256};
|
||||
|
||||
static NEXT_REPLY_ID: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
pub const NOT_ALLOWED_ASCII: AsciiSet = CONTROLS.add(b' ').add(b'/').add(b'.').add(b'\\');
|
||||
|
||||
pub fn hash(str: &str) -> String {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
hasher.write(str.as_bytes());
|
||||
hasher.finish().to_string()
|
||||
let hasher = Sha256::new();
|
||||
hasher.digest(str.as_bytes())
|
||||
}
|
||||
|
||||
pub fn construct_path(path_elements: &[&str]) -> String {
|
||||
|
|
|
@ -93,6 +93,19 @@ async fn main() {
|
|||
let dbconn = DBConn::new(nats.clone(), DB_NAME.to_string());
|
||||
let nats = jetstream::new(nats);
|
||||
|
||||
// fixme: remove this once we have proper site suggestion interface
|
||||
let args = std::env::args().collect::<Vec<_>>();
|
||||
if let Some(suggestion_site) = args.get(1) {
|
||||
let damping = args.get(2).map(|v| v.parse::<f64>().expect("BAD FP")).unwrap_or(0.45);
|
||||
warn!("suggesting {} with damping {}", suggestion_site, damping);
|
||||
|
||||
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest {
|
||||
url: suggestion_site.to_string(),
|
||||
damping,
|
||||
}).unwrap().into()).await.unwrap();
|
||||
return;
|
||||
}
|
||||
|
||||
let mut tasks: Vec<(JoinHandle<()>, String)> = vec![];
|
||||
let mut available_browsers: Vec<String> = BROWSER_THREADS.clone();
|
||||
|
||||
|
@ -335,10 +348,6 @@ async fn main() {
|
|||
}
|
||||
tasks.retain(|v| !tasks_to_remove.contains(&v.1));
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
|
||||
//while let Some(p) = initial.pop() {
|
||||
// nats.publish(VOREBOT_SUGGESTED_SERVICE.to_string(), rmp_serde::to_vec(&p).unwrap().into()).await.unwrap();
|
||||
//}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::USER_AGENT;
|
||||
use asklyphe_common::ldb::{linkstore, metastore, sitestore, titlestore, wordstore, DBConn};
|
||||
use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore, DBConn};
|
||||
use async_nats::jetstream;
|
||||
use async_nats::jetstream::kv;
|
||||
use futures::AsyncReadExt;
|
||||
|
@ -473,6 +473,10 @@ pub async fn web_parse(
|
|||
warn!("couldn't add {} to linkwords!", url);
|
||||
}
|
||||
}
|
||||
|
||||
if linkrelstore::a_linksto_b(&db, url, &href).await.is_err() {
|
||||
warn!("couldn't perform a_linksto_b (a {url} b {href})");
|
||||
}
|
||||
|
||||
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest {
|
||||
url: href,
|
||||
|
@ -484,7 +488,5 @@ pub async fn web_parse(
|
|||
|
||||
debug!("crawled {} in {} seconds", url, elapsed);
|
||||
|
||||
// todo: queue links to be crawled
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue