diff --git a/vorebot/src/main.rs b/vorebot/src/main.rs index 7661dbd..cbad6e3 100644 --- a/vorebot/src/main.rs +++ b/vorebot/src/main.rs @@ -22,7 +22,7 @@ use std::str::FromStr; use std::string::ToString; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use async_nats::jetstream::kv; use stopwords::{Language, Spark, Stopwords}; use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver}; @@ -47,6 +47,11 @@ pub static BROWSER_PROXY: Lazy> = Lazy::new(|| { pub static DB_NAME: Lazy = Lazy::new(|| std::env::var("DB_NAME").expect("PLEASE ADD DB_NAME")); +// in minutes +const DEFAULT_CRAWLER_TIMEOUT: u64 = 5; +pub static CRAWLER_TIMEOUT: Lazy = + Lazy::new(|| std::env::var("CRAWLER_TIMEOUT").map(|v| v.parse::().ok()).ok().flatten().unwrap_or(DEFAULT_CRAWLER_TIMEOUT)); + pub static DOCUMENTS_CRAWLED: AtomicU64 = AtomicU64::new(0); pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0); @@ -108,7 +113,7 @@ async fn main() { return; } - let mut tasks: Vec<(JoinHandle<()>, String)> = vec![]; + let mut tasks: Vec<(JoinHandle<()>, String, Arc)> = vec![]; let mut available_browsers: Vec = BROWSER_THREADS.clone(); { @@ -118,6 +123,8 @@ async fn main() { let browser = available_browsers.pop().expect("NO BROWSERS LEFT, THIS IS A FATAL BUG!"); let db = dbconn.clone(); let b = browser.clone(); + let last_parse = Arc::new(AtomicU64::new(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs())); + let lp = last_parse.clone(); tasks.push((tokio::spawn(async move { let browser = b; info!("using {}", browser); @@ -188,6 +195,7 @@ async fn main() { info!("crawler ready"); loop { + lp.store(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), Ordering::Relaxed); tokio::select! { Some(highest) = StreamExt::next(&mut highest_messages) => { if let Err(e) = highest { @@ -337,7 +345,7 @@ async fn main() { } } }), - browser + browser, last_parse.clone() )); warn!("spawning new injest thread"); } @@ -347,6 +355,12 @@ async fn main() { tasks_to_remove.push(task.1.clone()); available_browsers.push(task.1.clone()); } + let last_parse = task.2.load(Ordering::Relaxed); + if SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() > (*CRAWLER_TIMEOUT * 60) + last_parse { + // task has been taking too long + warn!("task taking too long! aborting!"); + task.0.abort(); + } } tasks.retain(|v| !tasks_to_remove.contains(&v.1)); tokio::time::sleep(Duration::from_secs(3)).await;