timeout if crawling takes too long
All checks were successful
/ build-all-services (push) Successful in 9m27s
All checks were successful
/ build-all-services (push) Successful in 9m27s
default is 5 minutes before the task gets killed, in the future we should probably have a more graceful shutdown method
This commit is contained in:
parent
9237d1a048
commit
74c65d993d
1 changed files with 17 additions and 3 deletions
|
@ -22,7 +22,7 @@ use std::str::FromStr;
|
||||||
use std::string::ToString;
|
use std::string::ToString;
|
||||||
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||||
use async_nats::jetstream::kv;
|
use async_nats::jetstream::kv;
|
||||||
use stopwords::{Language, Spark, Stopwords};
|
use stopwords::{Language, Spark, Stopwords};
|
||||||
use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver};
|
use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver};
|
||||||
|
@ -47,6 +47,11 @@ pub static BROWSER_PROXY: Lazy<Option<String>> = Lazy::new(|| {
|
||||||
pub static DB_NAME: Lazy<String> =
|
pub static DB_NAME: Lazy<String> =
|
||||||
Lazy::new(|| std::env::var("DB_NAME").expect("PLEASE ADD DB_NAME"));
|
Lazy::new(|| std::env::var("DB_NAME").expect("PLEASE ADD DB_NAME"));
|
||||||
|
|
||||||
|
// in minutes
|
||||||
|
const DEFAULT_CRAWLER_TIMEOUT: u64 = 5;
|
||||||
|
pub static CRAWLER_TIMEOUT: Lazy<u64> =
|
||||||
|
Lazy::new(|| std::env::var("CRAWLER_TIMEOUT").map(|v| v.parse::<u64>().ok()).ok().flatten().unwrap_or(DEFAULT_CRAWLER_TIMEOUT));
|
||||||
|
|
||||||
pub static DOCUMENTS_CRAWLED: AtomicU64 = AtomicU64::new(0);
|
pub static DOCUMENTS_CRAWLED: AtomicU64 = AtomicU64::new(0);
|
||||||
pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0);
|
pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0);
|
||||||
|
|
||||||
|
@ -108,7 +113,7 @@ async fn main() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut tasks: Vec<(JoinHandle<()>, String)> = vec![];
|
let mut tasks: Vec<(JoinHandle<()>, String, Arc<AtomicU64>)> = vec![];
|
||||||
let mut available_browsers: Vec<String> = BROWSER_THREADS.clone();
|
let mut available_browsers: Vec<String> = BROWSER_THREADS.clone();
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -118,6 +123,8 @@ async fn main() {
|
||||||
let browser = available_browsers.pop().expect("NO BROWSERS LEFT, THIS IS A FATAL BUG!");
|
let browser = available_browsers.pop().expect("NO BROWSERS LEFT, THIS IS A FATAL BUG!");
|
||||||
let db = dbconn.clone();
|
let db = dbconn.clone();
|
||||||
let b = browser.clone();
|
let b = browser.clone();
|
||||||
|
let last_parse = Arc::new(AtomicU64::new(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()));
|
||||||
|
let lp = last_parse.clone();
|
||||||
tasks.push((tokio::spawn(async move {
|
tasks.push((tokio::spawn(async move {
|
||||||
let browser = b;
|
let browser = b;
|
||||||
info!("using {}", browser);
|
info!("using {}", browser);
|
||||||
|
@ -188,6 +195,7 @@ async fn main() {
|
||||||
info!("crawler ready");
|
info!("crawler ready");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
lp.store(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), Ordering::Relaxed);
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(highest) = StreamExt::next(&mut highest_messages) => {
|
Some(highest) = StreamExt::next(&mut highest_messages) => {
|
||||||
if let Err(e) = highest {
|
if let Err(e) = highest {
|
||||||
|
@ -337,7 +345,7 @@ async fn main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
browser
|
browser, last_parse.clone()
|
||||||
));
|
));
|
||||||
warn!("spawning new injest thread");
|
warn!("spawning new injest thread");
|
||||||
}
|
}
|
||||||
|
@ -347,6 +355,12 @@ async fn main() {
|
||||||
tasks_to_remove.push(task.1.clone());
|
tasks_to_remove.push(task.1.clone());
|
||||||
available_browsers.push(task.1.clone());
|
available_browsers.push(task.1.clone());
|
||||||
}
|
}
|
||||||
|
let last_parse = task.2.load(Ordering::Relaxed);
|
||||||
|
if SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() > (*CRAWLER_TIMEOUT * 60) + last_parse {
|
||||||
|
// task has been taking too long
|
||||||
|
warn!("task taking too long! aborting!");
|
||||||
|
task.0.abort();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tasks.retain(|v| !tasks_to_remove.contains(&v.1));
|
tasks.retain(|v| !tasks_to_remove.contains(&v.1));
|
||||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||||
|
|
Loading…
Add table
Reference in a new issue