mod webparse; use asklyphe_common::nats::vorebot::{ VOREBOT_NEWHOSTNAME_SERVICE, VOREBOT_SERVICE, VOREBOT_SUGGESTED_SERVICE, }; use async_nats::jetstream; use async_nats::jetstream::consumer::PullConsumer; use async_nats::jetstream::stream::RetentionPolicy; use chrono::TimeZone; use futures::StreamExt; use log::{debug, error, info, warn}; use mutex_timeouts::tokio::MutexWithTimeoutAuto as Mutex; use once_cell::sync::Lazy; use prometheus_exporter::prometheus::core::{AtomicF64, GenericGauge}; use prometheus_exporter::prometheus::{register_counter, register_gauge, Counter}; use std::cmp::max; use std::collections::{BTreeMap, BTreeSet}; use std::hash::{DefaultHasher, Hasher}; use std::io::Read; use std::iter::Iterator; use std::str::FromStr; use std::string::ToString; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; use async_nats::jetstream::kv; use stopwords::{Language, Spark, Stopwords}; use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver}; use thirtyfour::common::capabilities::firefox::FirefoxPreferences; use tokio::task::JoinHandle; use asklyphe_common::ldb::DBConn; use asklyphe_common::nats::vorebot::CrawlRequest; use crate::webparse::web_parse; pub static NATS_URL: Lazy = Lazy::new(|| std::env::var("NATS_URL").expect("NO NATS URL DEFINED")); pub static NATS_CERT: Lazy = Lazy::new(|| std::env::var("NATS_CERT").expect("NO NATS_CERT DEFINED")); pub static NATS_KEY: Lazy = Lazy::new(|| std::env::var("NATS_KEY").expect("NO NATS_KEY DEFINED")); pub static BROWSER_THREADS: Lazy> = Lazy::new(|| std::env::var("BROWSER_THREADS").expect("PLEASE LIST BROWSER_THREADS").split(',').map(|v| v.to_string()).collect()); pub static DB_NAME: Lazy = Lazy::new(|| std::env::var("DB_NAME").expect("PLEASE ADD DB_NAME")); pub static DOCUMENTS_CRAWLED: AtomicU64 = AtomicU64::new(0); pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0); pub static LAST_TASK_COMPLETE: Lazy>> = Lazy::new(|| { let max_threads: usize = BROWSER_THREADS.len(); let mut vals = vec![]; for i in 0..max_threads { // let db = Database::default().expect("couldn't connect to foundation db!"); // DBS.lock().await.push(Arc::new(db)); vals.push(Arc::new(AtomicI64::new(chrono::Utc::now().timestamp()))); } vals }); pub static USER_AGENT: Lazy = Lazy::new(|| { format!( "Vorebot/{} (compatible; Googlebot/2.1; +https://voremicrocomputers.com/crawler.html)", env!("CARGO_PKG_VERSION") ) }); #[tokio::main] async fn main() { mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed); env_logger::init(); info!("began at {}", chrono::Utc::now().to_string()); let nats = async_nats::ConnectOptions::new() .add_client_certificate(NATS_CERT.as_str().into(), NATS_KEY.as_str().into()) .connect(NATS_URL.as_str()) .await; if let Err(e) = nats { error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e); return; } let nats = nats.unwrap(); let dbconn = DBConn::new(nats.clone(), DB_NAME.to_string()); let nats = jetstream::new(nats); let mut tasks: Vec<(JoinHandle<()>, String)> = vec![]; let mut available_browsers: Vec = BROWSER_THREADS.clone(); { loop { while tasks.len() < BROWSER_THREADS.len() { let nats = nats.clone(); let browser = available_browsers.pop().expect("NO BROWSERS LEFT, THIS IS A FATAL BUG!"); let db = dbconn.clone(); let b = browser.clone(); tasks.push((tokio::spawn(async move { let browser = b; info!("using {}", browser); info!("crawler spawned"); /* normal priority */ let consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config { name: VOREBOT_SERVICE.to_string(), subjects: vec![VOREBOT_SERVICE.to_string()], retention: RetentionPolicy::WorkQueue, ..Default::default() }).await .expect("FATAL! FAILED TO SUBSCRIBE TO NATS!") .get_or_create_consumer("parser", jetstream::consumer::pull::Config { durable_name: Some("parser".to_string()), filter_subject: VOREBOT_SERVICE.to_string(), ..Default::default() }).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); let mut messages = consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); /* higher priority (new hostnames) */ let higher_consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config { name: VOREBOT_NEWHOSTNAME_SERVICE.to_string(), subjects: vec![VOREBOT_NEWHOSTNAME_SERVICE.to_string()], retention: RetentionPolicy::WorkQueue, ..Default::default() }).await .expect("FATAL! FAILED TO SUBSCRIBE TO NATS!") .get_or_create_consumer("highparser", jetstream::consumer::pull::Config { durable_name: Some("highparser".to_string()), filter_subject: VOREBOT_NEWHOSTNAME_SERVICE.to_string(), ..Default::default() }).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); let mut high_messages = higher_consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); /* highest priority (user-suggested) */ let highest_consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config { name: VOREBOT_SUGGESTED_SERVICE.to_string(), subjects: vec![VOREBOT_SUGGESTED_SERVICE.to_string()], retention: RetentionPolicy::WorkQueue, ..Default::default() }).await .expect("FATAL! FAILED TO SUBSCRIBE TO NATS!") .get_or_create_consumer("highestparser", jetstream::consumer::pull::Config { durable_name: Some("highestparser".to_string()), filter_subject: VOREBOT_SUGGESTED_SERVICE.to_string(), ..Default::default() }).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); let mut highest_messages = highest_consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); let mut prefs = FirefoxPreferences::new(); prefs.set_user_agent(USER_AGENT.to_string()).unwrap(); let mut caps = DesiredCapabilities::firefox(); caps.set_preferences(prefs).unwrap(); let driver = WebDriver::new(&browser, caps).await.unwrap(); info!("crawler ready"); loop { tokio::select! { Some(highest) = StreamExt::next(&mut highest_messages) => { if let Err(e) = highest { warn!("error when recv js message! {e}"); } else { let message = highest.unwrap(); if let Err(e) = message.ack().await { warn!("failed acking message {e}"); } let req = rmp_serde::from_slice::(message.payload.as_ref()); if let Err(e) = req { error!("BAD NATS REQUEST: {e}"); continue; } let req = req.unwrap(); info!("RECV USER SUGGESTION!"); let now = chrono::Utc::now().timestamp(); LAST_MESSAGE.store(now, Ordering::Relaxed); let nats = nats.clone(); let mut bad = false; driver.in_new_tab(|| async { if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() { warn!("temporary failure detected in parsing, requeuing"); nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE"); bad = true; } Ok(()) }).await.unwrap(); if bad { continue; } if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 { DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed); info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed)); } } } Some(high) = StreamExt::next(&mut high_messages) => { if let Err(e) = high { warn!("error when recv js message! {e}"); } else { let message = high.unwrap(); if let Err(e) = message.ack().await { warn!("failed acking message {e}"); } let req = rmp_serde::from_slice::(message.payload.as_ref()); if let Err(e) = req { error!("BAD NATS REQUEST: {e}"); continue; } let req = req.unwrap(); info!("RECV HIGH PRIORITY!"); let now = chrono::Utc::now().timestamp(); LAST_MESSAGE.store(now, Ordering::Relaxed); let nats = nats.clone(); let mut bad = false; driver.in_new_tab(|| async { if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() { warn!("temporary failure detected in parsing, requeuing"); nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE"); bad = true; } Ok(()) }).await.unwrap(); if bad { continue; } if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 { DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed); info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed)); } } } Some(normal) = StreamExt::next(&mut messages) => { if let Err(e) = normal { warn!("error when recv js message! {e}"); } else { let message = normal.unwrap(); if let Err(e) = message.ack().await { warn!("failed acking message {e}"); } let req = rmp_serde::from_slice::(message.payload.as_ref()); if let Err(e) = req { error!("BAD NATS REQUEST: {e}"); continue; } let req = req.unwrap(); let now = chrono::Utc::now().timestamp(); LAST_MESSAGE.store(now, Ordering::Relaxed); let nats = nats.clone(); let mut hash = DefaultHasher::new(); hash.write(req.url.as_bytes()); let hash = hash.finish(); let dehomo_bucket = nats.get_key_value("dehomo").await; let dehomo_bucket = if dehomo_bucket.is_err() { let dehomo_bucket = nats.create_key_value(kv::Config { bucket: "dehomo".to_string(), description: "prevent the same url from being scraped again too quickly".to_string(), max_age: Duration::from_secs(60*60), ..Default::default() }).await; if let Err(e) = dehomo_bucket { panic!("FAILED TO CREATE DEHOMO BUCKET!!! {e}"); } else { dehomo_bucket.unwrap() } } else { dehomo_bucket.unwrap() }; if dehomo_bucket.get(hash.to_string()).await.ok().flatten().map(|v| *v.first().unwrap_or(&0) == 1).unwrap_or(false) { info!("too soon to scrape {}", req.url); continue; } let mut bad = false; driver.in_new_tab(|| async { if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() { warn!("temporary failure detected in parsing, requeuing"); nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE"); bad = true; } Ok(()) }).await.unwrap(); if bad { continue; } dehomo_bucket.put(hash.to_string(), vec![1u8].into()).await.expect("failed to store dehomo"); if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 { DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed); info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed)); } } } } } }), browser )); warn!("spawning new injest thread"); } let mut tasks_to_remove = vec![]; for task in tasks.iter() { if task.0.is_finished() { tasks_to_remove.push(task.1.clone()); available_browsers.push(task.1.clone()); } } tasks.retain(|v| !tasks_to_remove.contains(&v.1)); tokio::time::sleep(Duration::from_secs(3)).await; //while let Some(p) = initial.pop() { // nats.publish(VOREBOT_SUGGESTED_SERVICE.to_string(), rmp_serde::to_vec(&p).unwrap().into()).await.unwrap(); //} } } } //#[tokio::main] //async fn main() { // mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed); // // env_logger::init(); // info!("began at {}", chrono::Utc::now().to_string()); // // let nats = async_nats::connect(NATS_URL.as_str()).await; // if let Err(e) = nats { // error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e); // return; // } // let nats = nats.unwrap(); // // let dbconn = DBConn::new(nats.clone(), "lyphedb-test"); // // let nats = jetstream::new(nats); // // let mut prefs = FirefoxPreferences::new(); // prefs.set_user_agent(USER_AGENT.to_string()).unwrap(); // let mut caps = DesiredCapabilities::firefox(); // caps.set_preferences(prefs).unwrap(); // let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap(); // // driver.in_new_tab(|| async { // web_parse(nats.clone(), dbconn.clone(), &driver, "https://asklyphe.com/", 0.85).await.expect("Failed to run web parse"); // // Ok(()) // }).await.unwrap(); //}