360 lines
18 KiB
Rust
360 lines
18 KiB
Rust
mod webparse;
|
|
|
|
use asklyphe_common::nats::vorebot::{
|
|
VOREBOT_NEWHOSTNAME_SERVICE, VOREBOT_SERVICE, VOREBOT_SUGGESTED_SERVICE,
|
|
};
|
|
use async_nats::jetstream;
|
|
use async_nats::jetstream::consumer::PullConsumer;
|
|
use async_nats::jetstream::stream::RetentionPolicy;
|
|
use chrono::TimeZone;
|
|
use futures::StreamExt;
|
|
use log::{debug, error, info, warn};
|
|
use mutex_timeouts::tokio::MutexWithTimeoutAuto as Mutex;
|
|
use once_cell::sync::Lazy;
|
|
use prometheus_exporter::prometheus::core::{AtomicF64, GenericGauge};
|
|
use prometheus_exporter::prometheus::{register_counter, register_gauge, Counter};
|
|
use std::cmp::max;
|
|
use std::collections::{BTreeMap, BTreeSet};
|
|
use std::hash::{DefaultHasher, Hasher};
|
|
use std::io::Read;
|
|
use std::iter::Iterator;
|
|
use std::str::FromStr;
|
|
use std::string::ToString;
|
|
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use async_nats::jetstream::kv;
|
|
use stopwords::{Language, Spark, Stopwords};
|
|
use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver};
|
|
use thirtyfour::common::capabilities::firefox::FirefoxPreferences;
|
|
use tokio::task::JoinHandle;
|
|
use asklyphe_common::ldb::DBConn;
|
|
use asklyphe_common::nats::vorebot::CrawlRequest;
|
|
use crate::webparse::web_parse;
|
|
|
|
pub static NATS_URL: Lazy<String> =
|
|
Lazy::new(|| std::env::var("NATS_URL").expect("NO NATS URL DEFINED"));
|
|
pub static NATS_PASSWORD: Lazy<Option<String>> = Lazy::new(|| {
|
|
std::env::var("NATS_PASSWORD").ok()
|
|
});
|
|
pub static NATS_CERT: Lazy<String> = Lazy::new(|| std::env::var("NATS_CERT").expect("NO NATS_CERT DEFINED"));
|
|
pub static NATS_KEY: Lazy<String> = Lazy::new(|| std::env::var("NATS_KEY").expect("NO NATS_KEY DEFINED"));
|
|
pub static BROWSER_THREADS: Lazy<Vec<String>> =
|
|
Lazy::new(|| std::env::var("BROWSER_THREADS").expect("PLEASE LIST BROWSER_THREADS").split(',').map(|v| v.to_string()).collect());
|
|
pub static DB_NAME: Lazy<String> =
|
|
Lazy::new(|| std::env::var("DB_NAME").expect("PLEASE ADD DB_NAME"));
|
|
|
|
pub static DOCUMENTS_CRAWLED: AtomicU64 = AtomicU64::new(0);
|
|
pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0);
|
|
|
|
pub static LAST_TASK_COMPLETE: Lazy<Vec<Arc<AtomicI64>>> = Lazy::new(|| {
|
|
let max_threads: usize = BROWSER_THREADS.len();
|
|
let mut vals = vec![];
|
|
for i in 0..max_threads {
|
|
// let db = Database::default().expect("couldn't connect to foundation db!");
|
|
// DBS.lock().await.push(Arc::new(db));
|
|
vals.push(Arc::new(AtomicI64::new(chrono::Utc::now().timestamp())));
|
|
}
|
|
vals
|
|
});
|
|
|
|
pub static USER_AGENT: Lazy<String> = Lazy::new(|| {
|
|
format!(
|
|
"Vorebot/{} (compatible; Googlebot/2.1; +https://voremicrocomputers.com/crawler.html)",
|
|
env!("CARGO_PKG_VERSION")
|
|
)
|
|
});
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
|
|
|
|
env_logger::init();
|
|
info!("began at {}", chrono::Utc::now().to_string());
|
|
|
|
let mut nats = async_nats::ConnectOptions::new();
|
|
|
|
if let Some(password) = NATS_PASSWORD.as_ref() {
|
|
nats = nats.user_and_password("vorebot".to_string(), password.to_string());
|
|
} else {
|
|
nats = nats.add_client_certificate(NATS_CERT.as_str().into(), NATS_KEY.as_str().into());
|
|
}
|
|
|
|
let nats = nats.connect(NATS_URL.as_str())
|
|
.await;
|
|
if let Err(e) = nats {
|
|
error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e);
|
|
return;
|
|
}
|
|
let nats = nats.unwrap();
|
|
let dbconn = DBConn::new(nats.clone(), DB_NAME.to_string());
|
|
let nats = jetstream::new(nats);
|
|
|
|
let mut tasks: Vec<(JoinHandle<()>, String)> = vec![];
|
|
let mut available_browsers: Vec<String> = BROWSER_THREADS.clone();
|
|
|
|
{
|
|
loop {
|
|
while tasks.len() < BROWSER_THREADS.len() {
|
|
let nats = nats.clone();
|
|
let browser = available_browsers.pop().expect("NO BROWSERS LEFT, THIS IS A FATAL BUG!");
|
|
let db = dbconn.clone();
|
|
let b = browser.clone();
|
|
tasks.push((tokio::spawn(async move {
|
|
let browser = b;
|
|
info!("using {}", browser);
|
|
info!("crawler spawned");
|
|
|
|
/* normal priority */
|
|
let consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config {
|
|
name: VOREBOT_SERVICE.to_string(),
|
|
subjects: vec![VOREBOT_SERVICE.to_string()],
|
|
retention: RetentionPolicy::WorkQueue,
|
|
..Default::default()
|
|
}).await
|
|
.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!")
|
|
.get_or_create_consumer("parser", jetstream::consumer::pull::Config {
|
|
durable_name: Some("parser".to_string()),
|
|
filter_subject: VOREBOT_SERVICE.to_string(),
|
|
..Default::default()
|
|
}).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
|
|
let mut messages = consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
|
|
|
|
/* higher priority (new hostnames) */
|
|
let higher_consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config {
|
|
name: VOREBOT_NEWHOSTNAME_SERVICE.to_string(),
|
|
subjects: vec![VOREBOT_NEWHOSTNAME_SERVICE.to_string()],
|
|
retention: RetentionPolicy::WorkQueue,
|
|
..Default::default()
|
|
}).await
|
|
.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!")
|
|
.get_or_create_consumer("highparser", jetstream::consumer::pull::Config {
|
|
durable_name: Some("highparser".to_string()),
|
|
filter_subject: VOREBOT_NEWHOSTNAME_SERVICE.to_string(),
|
|
..Default::default()
|
|
}).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
|
|
let mut high_messages = higher_consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
|
|
|
|
/* highest priority (user-suggested) */
|
|
let highest_consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config {
|
|
name: VOREBOT_SUGGESTED_SERVICE.to_string(),
|
|
subjects: vec![VOREBOT_SUGGESTED_SERVICE.to_string()],
|
|
retention: RetentionPolicy::WorkQueue,
|
|
..Default::default()
|
|
}).await
|
|
.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!")
|
|
.get_or_create_consumer("highestparser", jetstream::consumer::pull::Config {
|
|
durable_name: Some("highestparser".to_string()),
|
|
filter_subject: VOREBOT_SUGGESTED_SERVICE.to_string(),
|
|
..Default::default()
|
|
}).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
|
|
let mut highest_messages = highest_consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
|
|
|
|
let mut prefs = FirefoxPreferences::new();
|
|
prefs.set_user_agent(USER_AGENT.to_string()).unwrap();
|
|
let mut caps = DesiredCapabilities::firefox();
|
|
caps.set_preferences(prefs).unwrap();
|
|
let driver = WebDriver::new(&browser, caps).await.unwrap();
|
|
info!("crawler ready");
|
|
|
|
loop {
|
|
tokio::select! {
|
|
Some(highest) = StreamExt::next(&mut highest_messages) => {
|
|
if let Err(e) = highest {
|
|
warn!("error when recv js message! {e}");
|
|
} else {
|
|
let message = highest.unwrap();
|
|
if let Err(e) = message.ack().await {
|
|
warn!("failed acking message {e}");
|
|
}
|
|
let req = rmp_serde::from_slice::<CrawlRequest>(message.payload.as_ref());
|
|
if let Err(e) = req {
|
|
error!("BAD NATS REQUEST: {e}");
|
|
continue;
|
|
}
|
|
let req = req.unwrap();
|
|
info!("RECV USER SUGGESTION!");
|
|
let now = chrono::Utc::now().timestamp();
|
|
LAST_MESSAGE.store(now, Ordering::Relaxed);
|
|
let nats = nats.clone();
|
|
|
|
let mut bad = false;
|
|
|
|
driver.in_new_tab(|| async {
|
|
if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() {
|
|
warn!("temporary failure detected in parsing, requeuing");
|
|
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE");
|
|
bad = true;
|
|
}
|
|
Ok(())
|
|
}).await.unwrap();
|
|
|
|
if bad {
|
|
continue;
|
|
}
|
|
|
|
if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 {
|
|
DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed);
|
|
info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed));
|
|
}
|
|
}
|
|
}
|
|
Some(high) = StreamExt::next(&mut high_messages) => {
|
|
if let Err(e) = high {
|
|
warn!("error when recv js message! {e}");
|
|
} else {
|
|
let message = high.unwrap();
|
|
if let Err(e) = message.ack().await {
|
|
warn!("failed acking message {e}");
|
|
}
|
|
let req = rmp_serde::from_slice::<CrawlRequest>(message.payload.as_ref());
|
|
if let Err(e) = req {
|
|
error!("BAD NATS REQUEST: {e}");
|
|
continue;
|
|
}
|
|
let req = req.unwrap();
|
|
info!("RECV HIGH PRIORITY!");
|
|
let now = chrono::Utc::now().timestamp();
|
|
LAST_MESSAGE.store(now, Ordering::Relaxed);
|
|
let nats = nats.clone();
|
|
let mut bad = false;
|
|
|
|
driver.in_new_tab(|| async {
|
|
if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() {
|
|
warn!("temporary failure detected in parsing, requeuing");
|
|
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE");
|
|
bad = true;
|
|
}
|
|
Ok(())
|
|
}).await.unwrap();
|
|
|
|
if bad {
|
|
continue;
|
|
}
|
|
|
|
if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 {
|
|
DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed);
|
|
info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed));
|
|
}
|
|
}
|
|
}
|
|
Some(normal) = StreamExt::next(&mut messages) => {
|
|
if let Err(e) = normal {
|
|
warn!("error when recv js message! {e}");
|
|
} else {
|
|
let message = normal.unwrap();
|
|
if let Err(e) = message.ack().await {
|
|
warn!("failed acking message {e}");
|
|
}
|
|
let req = rmp_serde::from_slice::<CrawlRequest>(message.payload.as_ref());
|
|
if let Err(e) = req {
|
|
error!("BAD NATS REQUEST: {e}");
|
|
continue;
|
|
}
|
|
let req = req.unwrap();
|
|
let now = chrono::Utc::now().timestamp();
|
|
LAST_MESSAGE.store(now, Ordering::Relaxed);
|
|
let nats = nats.clone();
|
|
|
|
let mut hash = DefaultHasher::new();
|
|
hash.write(req.url.as_bytes());
|
|
let hash = hash.finish();
|
|
|
|
let dehomo_bucket = nats.get_key_value("dehomo").await;
|
|
let dehomo_bucket = if dehomo_bucket.is_err() {
|
|
let dehomo_bucket = nats.create_key_value(kv::Config {
|
|
bucket: "dehomo".to_string(),
|
|
description: "prevent the same url from being scraped again too quickly".to_string(),
|
|
max_age: Duration::from_secs(60*60),
|
|
..Default::default()
|
|
}).await;
|
|
if let Err(e) = dehomo_bucket {
|
|
panic!("FAILED TO CREATE DEHOMO BUCKET!!! {e}");
|
|
} else {
|
|
dehomo_bucket.unwrap()
|
|
}
|
|
} else {
|
|
dehomo_bucket.unwrap()
|
|
};
|
|
if dehomo_bucket.get(hash.to_string()).await.ok().flatten().map(|v| *v.first().unwrap_or(&0) == 1).unwrap_or(false) {
|
|
info!("too soon to scrape {}", req.url);
|
|
continue;
|
|
}
|
|
|
|
let mut bad = false;
|
|
|
|
driver.in_new_tab(|| async {
|
|
if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() {
|
|
warn!("temporary failure detected in parsing, requeuing");
|
|
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE");
|
|
bad = true;
|
|
}
|
|
Ok(())
|
|
}).await.unwrap();
|
|
|
|
if bad {
|
|
continue;
|
|
}
|
|
|
|
dehomo_bucket.put(hash.to_string(), vec![1u8].into()).await.expect("failed to store dehomo");
|
|
|
|
if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 {
|
|
DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed);
|
|
info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}),
|
|
browser
|
|
));
|
|
warn!("spawning new injest thread");
|
|
}
|
|
let mut tasks_to_remove = vec![];
|
|
for task in tasks.iter() {
|
|
if task.0.is_finished() {
|
|
tasks_to_remove.push(task.1.clone());
|
|
available_browsers.push(task.1.clone());
|
|
}
|
|
}
|
|
tasks.retain(|v| !tasks_to_remove.contains(&v.1));
|
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
|
|
|
//while let Some(p) = initial.pop() {
|
|
// nats.publish(VOREBOT_SUGGESTED_SERVICE.to_string(), rmp_serde::to_vec(&p).unwrap().into()).await.unwrap();
|
|
//}
|
|
}
|
|
}
|
|
}
|
|
|
|
//#[tokio::main]
|
|
//async fn main() {
|
|
// mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
|
|
//
|
|
// env_logger::init();
|
|
// info!("began at {}", chrono::Utc::now().to_string());
|
|
//
|
|
// let nats = async_nats::connect(NATS_URL.as_str()).await;
|
|
// if let Err(e) = nats {
|
|
// error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e);
|
|
// return;
|
|
// }
|
|
// let nats = nats.unwrap();
|
|
//
|
|
// let dbconn = DBConn::new(nats.clone(), "lyphedb-test");
|
|
//
|
|
// let nats = jetstream::new(nats);
|
|
//
|
|
// let mut prefs = FirefoxPreferences::new();
|
|
// prefs.set_user_agent(USER_AGENT.to_string()).unwrap();
|
|
// let mut caps = DesiredCapabilities::firefox();
|
|
// caps.set_preferences(prefs).unwrap();
|
|
// let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap();
|
|
//
|
|
// driver.in_new_tab(|| async {
|
|
// web_parse(nats.clone(), dbconn.clone(), &driver, "https://asklyphe.com/", 0.85).await.expect("Failed to run web parse");
|
|
//
|
|
// Ok(())
|
|
// }).await.unwrap();
|
|
//}
|