Merge pull request 'vorebot timeouts seem stable' (#7) from feature/nikocs/vorebot-tweaks into develop
	
		
			
	
		
	
	
		
	
		
			Some checks failed
		
		
	
	
		
			
				
	
				/ build-all-services (push) Has been cancelled
				
			
		
		
	
	
				
					
				
			
		
			Some checks failed
		
		
	
	/ build-all-services (push) Has been cancelled
				
			Reviewed-on: #7
This commit is contained in:
		
						commit
						96478fb5d2
					
				
					 1 changed files with 17 additions and 3 deletions
				
			
		|  | @ -22,7 +22,7 @@ use std::str::FromStr; | |||
| use std::string::ToString; | ||||
| use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; | ||||
| use std::sync::Arc; | ||||
| use std::time::Duration; | ||||
| use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; | ||||
| use async_nats::jetstream::kv; | ||||
| use stopwords::{Language, Spark, Stopwords}; | ||||
| use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver}; | ||||
|  | @ -47,6 +47,11 @@ pub static BROWSER_PROXY: Lazy<Option<String>> = Lazy::new(|| { | |||
| pub static DB_NAME: Lazy<String> = | ||||
|     Lazy::new(|| std::env::var("DB_NAME").expect("PLEASE ADD DB_NAME")); | ||||
| 
 | ||||
| // in minutes
 | ||||
| const DEFAULT_CRAWLER_TIMEOUT: u64 = 5; | ||||
| pub static CRAWLER_TIMEOUT: Lazy<u64> = | ||||
|     Lazy::new(|| std::env::var("CRAWLER_TIMEOUT").map(|v| v.parse::<u64>().ok()).ok().flatten().unwrap_or(DEFAULT_CRAWLER_TIMEOUT)); | ||||
| 
 | ||||
| pub static DOCUMENTS_CRAWLED: AtomicU64 = AtomicU64::new(0); | ||||
| pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0); | ||||
| 
 | ||||
|  | @ -108,7 +113,7 @@ async fn main() { | |||
|         return; | ||||
|     } | ||||
| 
 | ||||
|     let mut tasks: Vec<(JoinHandle<()>, String)> = vec![]; | ||||
|     let mut tasks: Vec<(JoinHandle<()>, String, Arc<AtomicU64>)> = vec![]; | ||||
|     let mut available_browsers: Vec<String> = BROWSER_THREADS.clone(); | ||||
| 
 | ||||
|     { | ||||
|  | @ -118,6 +123,8 @@ async fn main() { | |||
|                 let browser = available_browsers.pop().expect("NO BROWSERS LEFT, THIS IS A FATAL BUG!"); | ||||
|                 let db = dbconn.clone(); | ||||
|                 let b = browser.clone(); | ||||
|                 let last_parse = Arc::new(AtomicU64::new(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs())); | ||||
|                 let lp = last_parse.clone(); | ||||
|                 tasks.push((tokio::spawn(async move { | ||||
|                     let browser = b; | ||||
|                     info!("using {}", browser); | ||||
|  | @ -188,6 +195,7 @@ async fn main() { | |||
|                     info!("crawler ready"); | ||||
| 
 | ||||
|                     loop { | ||||
|                         lp.store(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), Ordering::Relaxed); | ||||
|                         tokio::select! { | ||||
|                             Some(highest) = StreamExt::next(&mut highest_messages) => { | ||||
|                                 if let Err(e) = highest { | ||||
|  | @ -337,7 +345,7 @@ async fn main() { | |||
|                         } | ||||
|                     } | ||||
|                 }), | ||||
|                             browser | ||||
|                             browser, last_parse.clone() | ||||
|                 )); | ||||
|                 warn!("spawning new injest thread"); | ||||
|             } | ||||
|  | @ -347,6 +355,12 @@ async fn main() { | |||
|                     tasks_to_remove.push(task.1.clone()); | ||||
|                     available_browsers.push(task.1.clone()); | ||||
|                 } | ||||
|                 let last_parse = task.2.load(Ordering::Relaxed); | ||||
|                 if SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() > (*CRAWLER_TIMEOUT * 60) + last_parse { | ||||
|                     // task has been taking too long
 | ||||
|                     warn!("task taking too long! aborting!"); | ||||
|                     task.0.abort(); | ||||
|                 } | ||||
|             } | ||||
|             tasks.retain(|v| !tasks_to_remove.contains(&v.1)); | ||||
|             tokio::time::sleep(Duration::from_secs(3)).await; | ||||
|  |  | |||
		Loading…
	
	Add table
		
		Reference in a new issue