Compare commits
No commits in common. "9237d1a048b77b2464ac57adae823f9fe60fda20" and "d341c66390e38401a2f17522fc89977e2f72226c" have entirely different histories.
9237d1a048
...
d341c66390
6 changed files with 10 additions and 147 deletions
10
Cargo.lock
generated
10
Cargo.lock
generated
|
@ -253,7 +253,6 @@ dependencies = [
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rmp-serde",
|
"rmp-serde",
|
||||||
"serde",
|
"serde",
|
||||||
"sha-rs",
|
|
||||||
"tokio",
|
"tokio",
|
||||||
"ulid",
|
"ulid",
|
||||||
]
|
]
|
||||||
|
@ -4440,15 +4439,6 @@ dependencies = [
|
||||||
"stable_deref_trait",
|
"stable_deref_trait",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "sha-rs"
|
|
||||||
version = "0.1.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "683c4940af5e947166f4da5f60f7ea30c26799b2189bec6e3d64cc3aa8f0f9de"
|
|
||||||
dependencies = [
|
|
||||||
"hex",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sha1"
|
name = "sha1"
|
||||||
version = "0.10.6"
|
version = "0.10.6"
|
||||||
|
|
|
@ -20,7 +20,6 @@ async-nats = "0.38.0"
|
||||||
ulid = "1.1.0"
|
ulid = "1.1.0"
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
percent-encoding = "2.3.1"
|
percent-encoding = "2.3.1"
|
||||||
sha-rs = "0.1.0"
|
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
|
@ -1,112 +0,0 @@
|
||||||
use crate::ldb::{construct_path, hash, DBConn};
|
|
||||||
use log::{error, warn};
|
|
||||||
use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy};
|
|
||||||
|
|
||||||
pub const INCOMINGSTORE: &str = "incomingstore";
|
|
||||||
pub const OUTGOINGSTORE: &str = "outgoingstore";
|
|
||||||
|
|
||||||
pub async fn a_linksto_b(db: &DBConn, a: &str, b: &str) -> Result<(), ()> {
|
|
||||||
let key_sets = vec![
|
|
||||||
(
|
|
||||||
construct_path(&[OUTGOINGSTORE, &hash(a)])
|
|
||||||
.as_bytes()
|
|
||||||
.to_vec(),
|
|
||||||
a.as_bytes().to_vec(),
|
|
||||||
),
|
|
||||||
(
|
|
||||||
construct_path(&[OUTGOINGSTORE, &hash(a), &hash(b)])
|
|
||||||
.as_bytes()
|
|
||||||
.to_vec(),
|
|
||||||
b.as_bytes().to_vec(),
|
|
||||||
),
|
|
||||||
(
|
|
||||||
construct_path(&[INCOMINGSTORE, &hash(b)])
|
|
||||||
.as_bytes()
|
|
||||||
.to_vec(),
|
|
||||||
b.as_bytes().to_vec(),
|
|
||||||
),
|
|
||||||
(
|
|
||||||
construct_path(&[INCOMINGSTORE, &hash(b), &hash(a)])
|
|
||||||
.as_bytes()
|
|
||||||
.to_vec(),
|
|
||||||
a.as_bytes().to_vec(),
|
|
||||||
),
|
|
||||||
];
|
|
||||||
let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(
|
|
||||||
KVList { kvs: key_sets },
|
|
||||||
PropagationStrategy::OnRead,
|
|
||||||
));
|
|
||||||
|
|
||||||
match db.query(cmd).await {
|
|
||||||
LDBNatsMessage::Success => Ok(()),
|
|
||||||
LDBNatsMessage::BadRequest => {
|
|
||||||
error!("bad request for a_linksto_b");
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
LDBNatsMessage::NotFound => {
|
|
||||||
error!("not found for a_linksto_b");
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => {
|
|
||||||
warn!("lyphedb sent weird message as response, treating as error");
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn what_links_to_this(db: &DBConn, url: &str) -> Result<Vec<String>, ()> {
|
|
||||||
let path = construct_path(&[INCOMINGSTORE, &hash(url)])
|
|
||||||
.as_bytes()
|
|
||||||
.to_vec();
|
|
||||||
let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { key: path }));
|
|
||||||
|
|
||||||
match db.query(cmd).await {
|
|
||||||
LDBNatsMessage::Entries(kvs) => Ok(kvs
|
|
||||||
.kvs
|
|
||||||
.into_iter()
|
|
||||||
.map(|v| String::from_utf8_lossy(&v.1).to_string())
|
|
||||||
.collect()),
|
|
||||||
LDBNatsMessage::Success => {
|
|
||||||
warn!("lyphedb responded with \"success\" to what_links_to_this, treating as error");
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
LDBNatsMessage::BadRequest => {
|
|
||||||
warn!("bad request for what_links_to_this");
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
LDBNatsMessage::NotFound => Ok(vec![]),
|
|
||||||
_ => {
|
|
||||||
warn!("lyphedb sent weird message as response, treating as error");
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn what_links_from_this(db: &DBConn, url: &str) -> Result<Vec<String>, ()> {
|
|
||||||
let path = construct_path(&[OUTGOINGSTORE, &hash(url)])
|
|
||||||
.as_bytes()
|
|
||||||
.to_vec();
|
|
||||||
let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { key: path }));
|
|
||||||
|
|
||||||
match db.query(cmd).await {
|
|
||||||
LDBNatsMessage::Entries(kvs) => Ok(kvs
|
|
||||||
.kvs
|
|
||||||
.into_iter()
|
|
||||||
.map(|v| String::from_utf8_lossy(&v.1).to_string())
|
|
||||||
.collect()),
|
|
||||||
LDBNatsMessage::Success => {
|
|
||||||
warn!("lyphedb responded with \"success\" to what_links_from_this, treating as error");
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
LDBNatsMessage::BadRequest => {
|
|
||||||
warn!("bad request for what_links_from_this");
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
LDBNatsMessage::NotFound => Ok(vec![]),
|
|
||||||
_ => {
|
|
||||||
warn!("lyphedb sent weird message as response, treating as error");
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,7 +3,6 @@ pub mod sitestore;
|
||||||
pub mod linkstore;
|
pub mod linkstore;
|
||||||
pub mod metastore;
|
pub mod metastore;
|
||||||
pub mod titlestore;
|
pub mod titlestore;
|
||||||
pub mod linkrelstore;
|
|
||||||
|
|
||||||
use std::hash::{DefaultHasher, Hasher};
|
use std::hash::{DefaultHasher, Hasher};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -12,15 +11,15 @@ use futures::StreamExt;
|
||||||
use log::warn;
|
use log::warn;
|
||||||
use lyphedb::LDBNatsMessage;
|
use lyphedb::LDBNatsMessage;
|
||||||
use percent_encoding::{percent_encode, AsciiSet, CONTROLS, NON_ALPHANUMERIC};
|
use percent_encoding::{percent_encode, AsciiSet, CONTROLS, NON_ALPHANUMERIC};
|
||||||
use sha_rs::{Sha, Sha256};
|
|
||||||
|
|
||||||
static NEXT_REPLY_ID: AtomicU64 = AtomicU64::new(0);
|
static NEXT_REPLY_ID: AtomicU64 = AtomicU64::new(0);
|
||||||
|
|
||||||
pub const NOT_ALLOWED_ASCII: AsciiSet = CONTROLS.add(b' ').add(b'/').add(b'.').add(b'\\');
|
pub const NOT_ALLOWED_ASCII: AsciiSet = CONTROLS.add(b' ').add(b'/').add(b'.').add(b'\\');
|
||||||
|
|
||||||
pub fn hash(str: &str) -> String {
|
pub fn hash(str: &str) -> String {
|
||||||
let hasher = Sha256::new();
|
let mut hasher = DefaultHasher::new();
|
||||||
hasher.digest(str.as_bytes())
|
hasher.write(str.as_bytes());
|
||||||
|
hasher.finish().to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn construct_path(path_elements: &[&str]) -> String {
|
pub fn construct_path(path_elements: &[&str]) -> String {
|
||||||
|
|
|
@ -93,21 +93,6 @@ async fn main() {
|
||||||
let dbconn = DBConn::new(nats.clone(), DB_NAME.to_string());
|
let dbconn = DBConn::new(nats.clone(), DB_NAME.to_string());
|
||||||
let nats = jetstream::new(nats);
|
let nats = jetstream::new(nats);
|
||||||
|
|
||||||
// fixme: remove this once we have proper site suggestion interface
|
|
||||||
let args = std::env::args().collect::<Vec<_>>();
|
|
||||||
if let Some(suggestion_site) = args.get(1) {
|
|
||||||
let damping = args.get(2).map(|v| v.parse::<f64>().expect("BAD FP")).unwrap_or(0.45);
|
|
||||||
warn!("suggesting {} with damping {}", suggestion_site, damping);
|
|
||||||
|
|
||||||
let ack = nats.publish(VOREBOT_SUGGESTED_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest {
|
|
||||||
url: suggestion_site.to_string(),
|
|
||||||
damping,
|
|
||||||
}).unwrap().into()).await.unwrap();
|
|
||||||
|
|
||||||
ack.await.expect("FATAL ERROR");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut tasks: Vec<(JoinHandle<()>, String)> = vec![];
|
let mut tasks: Vec<(JoinHandle<()>, String)> = vec![];
|
||||||
let mut available_browsers: Vec<String> = BROWSER_THREADS.clone();
|
let mut available_browsers: Vec<String> = BROWSER_THREADS.clone();
|
||||||
|
|
||||||
|
@ -350,6 +335,10 @@ async fn main() {
|
||||||
}
|
}
|
||||||
tasks.retain(|v| !tasks_to_remove.contains(&v.1));
|
tasks.retain(|v| !tasks_to_remove.contains(&v.1));
|
||||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||||
|
|
||||||
|
//while let Some(p) = initial.pop() {
|
||||||
|
// nats.publish(VOREBOT_SUGGESTED_SERVICE.to_string(), rmp_serde::to_vec(&p).unwrap().into()).await.unwrap();
|
||||||
|
//}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use crate::USER_AGENT;
|
use crate::USER_AGENT;
|
||||||
use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore, DBConn};
|
use asklyphe_common::ldb::{linkstore, metastore, sitestore, titlestore, wordstore, DBConn};
|
||||||
use async_nats::jetstream;
|
use async_nats::jetstream;
|
||||||
use async_nats::jetstream::kv;
|
use async_nats::jetstream::kv;
|
||||||
use futures::AsyncReadExt;
|
use futures::AsyncReadExt;
|
||||||
|
@ -474,10 +474,6 @@ pub async fn web_parse(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if linkrelstore::a_linksto_b(&db, url, &href).await.is_err() {
|
|
||||||
warn!("couldn't perform a_linksto_b (a {url} b {href})");
|
|
||||||
}
|
|
||||||
|
|
||||||
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest {
|
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest {
|
||||||
url: href,
|
url: href,
|
||||||
damping: 0.85,
|
damping: 0.85,
|
||||||
|
@ -488,5 +484,7 @@ pub async fn web_parse(
|
||||||
|
|
||||||
debug!("crawled {} in {} seconds", url, elapsed);
|
debug!("crawled {} in {} seconds", url, elapsed);
|
||||||
|
|
||||||
|
// todo: queue links to be crawled
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue