asklyphe/vorebot/src/main.rs
husky 1ad6dad9eb
Some checks failed
/ build-all-services (push) Has been cancelled
fix foundationdb dependency
2025-03-12 21:09:35 -07:00

351 lines
18 KiB
Rust

mod webparse;
use asklyphe_common::nats::vorebot::{
VOREBOT_NEWHOSTNAME_SERVICE, VOREBOT_SERVICE, VOREBOT_SUGGESTED_SERVICE,
};
use async_nats::jetstream;
use async_nats::jetstream::consumer::PullConsumer;
use async_nats::jetstream::stream::RetentionPolicy;
use chrono::TimeZone;
use futures::StreamExt;
use log::{debug, error, info, warn};
use mutex_timeouts::tokio::MutexWithTimeoutAuto as Mutex;
use once_cell::sync::Lazy;
use prometheus_exporter::prometheus::core::{AtomicF64, GenericGauge};
use prometheus_exporter::prometheus::{register_counter, register_gauge, Counter};
use std::cmp::max;
use std::collections::{BTreeMap, BTreeSet};
use std::hash::{DefaultHasher, Hasher};
use std::io::Read;
use std::iter::Iterator;
use std::str::FromStr;
use std::string::ToString;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use async_nats::jetstream::kv;
use stopwords::{Language, Spark, Stopwords};
use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver};
use thirtyfour::common::capabilities::firefox::FirefoxPreferences;
use tokio::task::JoinHandle;
use asklyphe_common::ldb::DBConn;
use asklyphe_common::nats::vorebot::CrawlRequest;
use crate::webparse::web_parse;
pub static NATS_URL: Lazy<String> =
Lazy::new(|| std::env::var("NATS_URL").expect("NO NATS URL DEFINED"));
pub static NATS_CERT: Lazy<String> = Lazy::new(|| std::env::var("NATS_CERT").expect("NO NATS_CERT DEFINED"));
pub static NATS_KEY: Lazy<String> = Lazy::new(|| std::env::var("NATS_KEY").expect("NO NATS_KEY DEFINED"));
pub static BROWSER_THREADS: Lazy<Vec<String>> =
Lazy::new(|| std::env::var("BROWSER_THREADS").expect("PLEASE LIST BROWSER_THREADS").split(',').map(|v| v.to_string()).collect());
pub static DB_NAME: Lazy<String> =
Lazy::new(|| std::env::var("DB_NAME").expect("PLEASE ADD DB_NAME"));
pub static DOCUMENTS_CRAWLED: AtomicU64 = AtomicU64::new(0);
pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0);
pub static LAST_TASK_COMPLETE: Lazy<Vec<Arc<AtomicI64>>> = Lazy::new(|| {
let max_threads: usize = BROWSER_THREADS.len();
let mut vals = vec![];
for i in 0..max_threads {
// let db = Database::default().expect("couldn't connect to foundation db!");
// DBS.lock().await.push(Arc::new(db));
vals.push(Arc::new(AtomicI64::new(chrono::Utc::now().timestamp())));
}
vals
});
pub static USER_AGENT: Lazy<String> = Lazy::new(|| {
format!(
"Vorebot/{} (compatible; Googlebot/2.1; +https://voremicrocomputers.com/crawler.html)",
env!("CARGO_PKG_VERSION")
)
});
#[tokio::main]
async fn main() {
mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
env_logger::init();
info!("began at {}", chrono::Utc::now().to_string());
let nats = async_nats::ConnectOptions::new()
.add_client_certificate(NATS_CERT.as_str().into(), NATS_KEY.as_str().into())
.connect(NATS_URL.as_str())
.await;
if let Err(e) = nats {
error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e);
return;
}
let nats = nats.unwrap();
let dbconn = DBConn::new(nats.clone(), DB_NAME.to_string());
let nats = jetstream::new(nats);
let mut tasks: Vec<(JoinHandle<()>, String)> = vec![];
let mut available_browsers: Vec<String> = BROWSER_THREADS.clone();
{
loop {
while tasks.len() < BROWSER_THREADS.len() {
let nats = nats.clone();
let browser = available_browsers.pop().expect("NO BROWSERS LEFT, THIS IS A FATAL BUG!");
let db = dbconn.clone();
let b = browser.clone();
tasks.push((tokio::spawn(async move {
let browser = b;
info!("using {}", browser);
info!("crawler spawned");
/* normal priority */
let consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config {
name: VOREBOT_SERVICE.to_string(),
subjects: vec![VOREBOT_SERVICE.to_string()],
retention: RetentionPolicy::WorkQueue,
..Default::default()
}).await
.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!")
.get_or_create_consumer("parser", jetstream::consumer::pull::Config {
durable_name: Some("parser".to_string()),
filter_subject: VOREBOT_SERVICE.to_string(),
..Default::default()
}).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
let mut messages = consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
/* higher priority (new hostnames) */
let higher_consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config {
name: VOREBOT_NEWHOSTNAME_SERVICE.to_string(),
subjects: vec![VOREBOT_NEWHOSTNAME_SERVICE.to_string()],
retention: RetentionPolicy::WorkQueue,
..Default::default()
}).await
.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!")
.get_or_create_consumer("highparser", jetstream::consumer::pull::Config {
durable_name: Some("highparser".to_string()),
filter_subject: VOREBOT_NEWHOSTNAME_SERVICE.to_string(),
..Default::default()
}).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
let mut high_messages = higher_consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
/* highest priority (user-suggested) */
let highest_consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config {
name: VOREBOT_SUGGESTED_SERVICE.to_string(),
subjects: vec![VOREBOT_SUGGESTED_SERVICE.to_string()],
retention: RetentionPolicy::WorkQueue,
..Default::default()
}).await
.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!")
.get_or_create_consumer("highestparser", jetstream::consumer::pull::Config {
durable_name: Some("highestparser".to_string()),
filter_subject: VOREBOT_SUGGESTED_SERVICE.to_string(),
..Default::default()
}).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
let mut highest_messages = highest_consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
let mut prefs = FirefoxPreferences::new();
prefs.set_user_agent(USER_AGENT.to_string()).unwrap();
let mut caps = DesiredCapabilities::firefox();
caps.set_preferences(prefs).unwrap();
let driver = WebDriver::new(&browser, caps).await.unwrap();
info!("crawler ready");
loop {
tokio::select! {
Some(highest) = StreamExt::next(&mut highest_messages) => {
if let Err(e) = highest {
warn!("error when recv js message! {e}");
} else {
let message = highest.unwrap();
if let Err(e) = message.ack().await {
warn!("failed acking message {e}");
}
let req = rmp_serde::from_slice::<CrawlRequest>(message.payload.as_ref());
if let Err(e) = req {
error!("BAD NATS REQUEST: {e}");
continue;
}
let req = req.unwrap();
info!("RECV USER SUGGESTION!");
let now = chrono::Utc::now().timestamp();
LAST_MESSAGE.store(now, Ordering::Relaxed);
let nats = nats.clone();
let mut bad = false;
driver.in_new_tab(|| async {
if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() {
warn!("temporary failure detected in parsing, requeuing");
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE");
bad = true;
}
Ok(())
}).await.unwrap();
if bad {
continue;
}
if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 {
DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed);
info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed));
}
}
}
Some(high) = StreamExt::next(&mut high_messages) => {
if let Err(e) = high {
warn!("error when recv js message! {e}");
} else {
let message = high.unwrap();
if let Err(e) = message.ack().await {
warn!("failed acking message {e}");
}
let req = rmp_serde::from_slice::<CrawlRequest>(message.payload.as_ref());
if let Err(e) = req {
error!("BAD NATS REQUEST: {e}");
continue;
}
let req = req.unwrap();
info!("RECV HIGH PRIORITY!");
let now = chrono::Utc::now().timestamp();
LAST_MESSAGE.store(now, Ordering::Relaxed);
let nats = nats.clone();
let mut bad = false;
driver.in_new_tab(|| async {
if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() {
warn!("temporary failure detected in parsing, requeuing");
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE");
bad = true;
}
Ok(())
}).await.unwrap();
if bad {
continue;
}
if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 {
DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed);
info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed));
}
}
}
Some(normal) = StreamExt::next(&mut messages) => {
if let Err(e) = normal {
warn!("error when recv js message! {e}");
} else {
let message = normal.unwrap();
if let Err(e) = message.ack().await {
warn!("failed acking message {e}");
}
let req = rmp_serde::from_slice::<CrawlRequest>(message.payload.as_ref());
if let Err(e) = req {
error!("BAD NATS REQUEST: {e}");
continue;
}
let req = req.unwrap();
let now = chrono::Utc::now().timestamp();
LAST_MESSAGE.store(now, Ordering::Relaxed);
let nats = nats.clone();
let mut hash = DefaultHasher::new();
hash.write(req.url.as_bytes());
let hash = hash.finish();
let dehomo_bucket = nats.get_key_value("dehomo").await;
let dehomo_bucket = if dehomo_bucket.is_err() {
let dehomo_bucket = nats.create_key_value(kv::Config {
bucket: "dehomo".to_string(),
description: "prevent the same url from being scraped again too quickly".to_string(),
max_age: Duration::from_secs(60*60),
..Default::default()
}).await;
if let Err(e) = dehomo_bucket {
panic!("FAILED TO CREATE DEHOMO BUCKET!!! {e}");
} else {
dehomo_bucket.unwrap()
}
} else {
dehomo_bucket.unwrap()
};
if dehomo_bucket.get(hash.to_string()).await.ok().flatten().map(|v| *v.first().unwrap_or(&0) == 1).unwrap_or(false) {
info!("too soon to scrape {}", req.url);
continue;
}
let mut bad = false;
driver.in_new_tab(|| async {
if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() {
warn!("temporary failure detected in parsing, requeuing");
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE");
bad = true;
}
Ok(())
}).await.unwrap();
if bad {
continue;
}
dehomo_bucket.put(hash.to_string(), vec![1u8].into()).await.expect("failed to store dehomo");
if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 {
DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed);
info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed));
}
}
}
}
}
}),
browser
));
warn!("spawning new injest thread");
}
let mut tasks_to_remove = vec![];
for task in tasks.iter() {
if task.0.is_finished() {
tasks_to_remove.push(task.1.clone());
available_browsers.push(task.1.clone());
}
}
tasks.retain(|v| !tasks_to_remove.contains(&v.1));
tokio::time::sleep(Duration::from_secs(3)).await;
//while let Some(p) = initial.pop() {
// nats.publish(VOREBOT_SUGGESTED_SERVICE.to_string(), rmp_serde::to_vec(&p).unwrap().into()).await.unwrap();
//}
}
}
}
//#[tokio::main]
//async fn main() {
// mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
//
// env_logger::init();
// info!("began at {}", chrono::Utc::now().to_string());
//
// let nats = async_nats::connect(NATS_URL.as_str()).await;
// if let Err(e) = nats {
// error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e);
// return;
// }
// let nats = nats.unwrap();
//
// let dbconn = DBConn::new(nats.clone(), "lyphedb-test");
//
// let nats = jetstream::new(nats);
//
// let mut prefs = FirefoxPreferences::new();
// prefs.set_user_agent(USER_AGENT.to_string()).unwrap();
// let mut caps = DesiredCapabilities::firefox();
// caps.set_preferences(prefs).unwrap();
// let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap();
//
// driver.in_new_tab(|| async {
// web_parse(nats.clone(), dbconn.clone(), &driver, "https://asklyphe.com/", 0.85).await.expect("Failed to run web parse");
//
// Ok(())
// }).await.unwrap();
//}