From 822ea728a5c64896d538c829febabbc72f00403f Mon Sep 17 00:00:00 2001 From: husky Date: Mon, 31 Mar 2025 15:04:37 -0700 Subject: [PATCH] initial benchmark mode impl could do with some more in-depth info, but will suffice for now --- vorebot/Cargo.toml | 6 +- vorebot/src/main.rs | 67 ++++++++------- vorebot/src/webparse/mod.rs | 166 +++++++++++++++++++++++------------- 3 files changed, 151 insertions(+), 88 deletions(-) diff --git a/vorebot/Cargo.toml b/vorebot/Cargo.toml index 00ae58e..d2b3ab8 100644 --- a/vorebot/Cargo.toml +++ b/vorebot/Cargo.toml @@ -25,4 +25,8 @@ mutex-timeouts = { version = "0.3.0", features = ["tokio"] } prometheus_exporter = "0.8.5" thirtyfour = "0.35.0" stopwords = "0.1.1" -texting_robots = "0.2.2" \ No newline at end of file +texting_robots = "0.2.2" + +[features] +default = [] +benchmark = [] \ No newline at end of file diff --git a/vorebot/src/main.rs b/vorebot/src/main.rs index cbad6e3..201dbee 100644 --- a/vorebot/src/main.rs +++ b/vorebot/src/main.rs @@ -28,6 +28,7 @@ use stopwords::{Language, Spark, Stopwords}; use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver}; use thirtyfour::common::capabilities::firefox::FirefoxPreferences; use tokio::task::JoinHandle; +#[cfg(not(feature = "benchmark"))] use asklyphe_common::ldb::DBConn; use asklyphe_common::nats::vorebot::CrawlRequest; use crate::webparse::web_parse; @@ -73,6 +74,8 @@ pub static USER_AGENT: Lazy = Lazy::new(|| { ) }); + +#[cfg(not(feature = "benchmark"))] #[tokio::main] async fn main() { mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed); @@ -367,34 +370,38 @@ async fn main() { } } } +#[cfg(feature = "benchmark")] +pub struct DBConn {} +#[cfg(feature = "benchmark")] +pub struct Context {} -//#[tokio::main] -//async fn main() { -// mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed); -// -// env_logger::init(); -// info!("began at {}", chrono::Utc::now().to_string()); -// -// let nats = async_nats::connect(NATS_URL.as_str()).await; -// if let Err(e) = nats { -// error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e); -// return; -// } -// let nats = nats.unwrap(); -// -// let dbconn = DBConn::new(nats.clone(), "lyphedb-test"); -// -// let nats = jetstream::new(nats); -// -// let mut prefs = FirefoxPreferences::new(); -// prefs.set_user_agent(USER_AGENT.to_string()).unwrap(); -// let mut caps = DesiredCapabilities::firefox(); -// caps.set_preferences(prefs).unwrap(); -// let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap(); -// -// driver.in_new_tab(|| async { -// web_parse(nats.clone(), dbconn.clone(), &driver, "https://asklyphe.com/", 0.85).await.expect("Failed to run web parse"); -// -// Ok(()) -// }).await.unwrap(); -//} +#[cfg(feature = "benchmark")] +#[tokio::main] +async fn main() { + mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed); + + eprintln!("BENCHMARK MODE ENABLED"); + println!("BENCHMARK MODE ENABLED"); + + let args: Vec = std::env::args().collect(); + let test_site = args.get(1); + if test_site.is_none() { + panic!("PLEASE PROVIDE TEST SITE!") + } + let test_site = test_site.unwrap(); + + env_logger::init(); + info!("began at {}", chrono::Utc::now().to_string()); + + let mut prefs = FirefoxPreferences::new(); + prefs.set_user_agent(USER_AGENT.to_string()).unwrap(); + let mut caps = DesiredCapabilities::firefox(); + caps.set_preferences(prefs).unwrap(); + let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap(); + + driver.in_new_tab(|| async { + web_parse(Context {}, DBConn {}, &driver, test_site, 0.85).await.expect("Failed to run web parse"); + + Ok(()) + }).await.unwrap(); +} diff --git a/vorebot/src/webparse/mod.rs b/vorebot/src/webparse/mod.rs index 219361b..4e87a39 100644 --- a/vorebot/src/webparse/mod.rs +++ b/vorebot/src/webparse/mod.rs @@ -1,7 +1,13 @@ use crate::USER_AGENT; -use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore, DBConn}; -use async_nats::jetstream; -use async_nats::jetstream::kv; +#[cfg(feature = "benchmark")] +use crate::{Context, DBConn}; +#[cfg(not(feature = "benchmark"))] +use asklyphe_common::ldb::DBConn; +use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore}; +use asklyphe_common::nats::vorebot::CrawlRequest; +use asklyphe_common::nats::vorebot::VOREBOT_SERVICE; +#[cfg(not(feature = "benchmark"))] +use async_nats::jetstream::{kv, Context}; use futures::AsyncReadExt; use image::EncodableLayout; use isahc::config::RedirectPolicy; @@ -16,8 +22,6 @@ use std::time::Duration; use stopwords::{Language, Spark, Stopwords}; use texting_robots::{get_robots_url, Robot}; use thirtyfour::{By, WebDriver}; -use asklyphe_common::nats::vorebot::CrawlRequest; -use asklyphe_common::nats::vorebot::VOREBOT_SERVICE; pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result { let robot1 = Robot::new("Vorebot", robotstxt); @@ -35,50 +39,56 @@ pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result { // returns Err if we cannot access a page, but the error associated with it seems temporary (i.e. it's worth trying again later) // otherwise, returns Ok pub async fn web_parse( - nats: jetstream::Context, + nats: Context, db: DBConn, driver: &WebDriver, url: &str, damping: f64, ) -> Result<(), ()> { - driver.delete_all_cookies().await.map_err(|_| ())?; - let robots_bucket = nats.get_key_value("robots").await; - let robots_bucket = if robots_bucket.is_err() { - let robots_bucket = nats - .create_key_value(kv::Config { - bucket: "robots".to_string(), - description: "storage of robots.txt data for given hosts".to_string(), - ..Default::default() - }) - .await; - if let Err(e) = robots_bucket { - error!("could not create robots.txt bucket: {}", e); - None + + #[cfg(not(feature = "benchmark"))] + let robots_bucket = { + let robots_bucket = nats.get_key_value("robots").await; + if robots_bucket.is_err() { + let robots_bucket = nats + .create_key_value(kv::Config { + bucket: "robots".to_string(), + description: "storage of robots.txt data for given hosts".to_string(), + ..Default::default() + }) + .await; + if let Err(e) = robots_bucket { + error!("could not create robots.txt bucket: {}", e); + None + } else { + Some(robots_bucket.unwrap()) + } } else { - Some(robots_bucket.unwrap()) + robots_bucket.ok() } - } else { - robots_bucket.ok() }; - let hosts_bucket = nats.get_key_value("hosts").await; - let hosts_bucket = if hosts_bucket.is_err() { - let hosts_bucket = nats - .create_key_value(kv::Config { - bucket: "hosts".to_string(), - description: "prevent the same host from being scraped too quickly".to_string(), - max_age: Duration::from_secs(60 * 10), - ..Default::default() - }) - .await; - if let Err(e) = hosts_bucket { - error!("could not create hosts bucket: {}", e); - return Err(()); + #[cfg(not(feature = "benchmark"))] + let hosts_bucket = { + let hosts_bucket = nats.get_key_value("hosts").await; + if hosts_bucket.is_err() { + let hosts_bucket = nats + .create_key_value(kv::Config { + bucket: "hosts".to_string(), + description: "prevent the same host from being scraped too quickly".to_string(), + max_age: Duration::from_secs(60 * 10), + ..Default::default() + }) + .await; + if let Err(e) = hosts_bucket { + error!("could not create hosts bucket: {}", e); + return Err(()); + } else { + hosts_bucket.unwrap() + } } else { hosts_bucket.unwrap() } - } else { - hosts_bucket.unwrap() }; let robots_url = get_robots_url(url); @@ -91,18 +101,29 @@ pub async fn web_parse( hash.write(robots_url.as_bytes()); let hash = hash.finish(); + #[cfg(not(feature = "benchmark"))] if let Ok(Some(host)) = hosts_bucket.get(hash.to_string()).await { let count = *host.first().unwrap_or(&0); if count > 10 { - warn!("scraping {} too quickly, avoiding for one minute", robots_url); + warn!( + "scraping {} too quickly, avoiding for one minute", + robots_url + ); return Err(()); } - hosts_bucket.put(hash.to_string(), vec![count + 1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!"); + hosts_bucket + .put(hash.to_string(), vec![count + 1].into()) + .await + .expect("COULDN'T INSERT INTO HOSTS BUCKET!"); } else { - hosts_bucket.put(hash.to_string(), vec![1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!"); + hosts_bucket + .put(hash.to_string(), vec![1].into()) + .await + .expect("COULDN'T INSERT INTO HOSTS BUCKET!"); } let mut skip_robots_check = false; + #[cfg(not(feature = "benchmark"))] if let Some(robots_bucket) = &robots_bucket { if let Ok(Some(entry)) = robots_bucket.get(hash.to_string()).await { if let Ok(res) = allowed_to_crawl(entry.as_bytes(), url) { @@ -160,6 +181,7 @@ pub async fn web_parse( } if let Ok(res) = allowed_to_crawl(body.as_bytes(), url) { + #[cfg(not(feature = "benchmark"))] if let Some(robots_bucket) = &robots_bucket { if let Err(e) = robots_bucket .put(hash.to_string(), body.as_bytes().to_vec().into()) @@ -213,7 +235,11 @@ pub async fn web_parse( } if response.status().is_server_error() || response.status().is_client_error() { // don't crawl at the moment - debug!("not crawling {} due to bad status code {}", url, response.status()); + debug!( + "not crawling {} due to bad status code {}", + url, + response.status() + ); return Err(()); } @@ -226,7 +252,10 @@ pub async fn web_parse( if !lang.starts_with("en") && !lang.starts_with("unknown") { // i.e. non-english language // fixme: remove this once we start expanding to non-english-speaking markets? - warn!("skipping {} due to {} language (currently prioritizing english", url, lang); + warn!( + "skipping {} due to {} language (currently prioritizing english", + url, lang + ); return Err(()); } } @@ -334,11 +363,17 @@ pub async fn web_parse( .collect(); debug!("headers..."); - gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["h1","h2","h3","h4","h5","h6"], 3.0) - .await; + gather_elements_with_multiplier( + driver, + &mut wordmap, + &stops, + &["h1", "h2", "h3", "h4", "h5", "h6"], + 3.0, + ) + .await; debug!("paragraphs..."); - gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["p","div"], 1.0).await; + gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["p", "div"], 1.0).await; let mut wordmap = wordmap.into_iter().collect::>(); wordmap.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap()); @@ -351,6 +386,7 @@ pub async fn web_parse( .collect::>(); #[allow(clippy::collapsible_if)] if !words.is_empty() { + #[cfg(not(feature = "benchmark"))] if wordstore::add_url_to_keywords(&db, &words, url) .await .is_err() @@ -379,6 +415,7 @@ pub async fn web_parse( } #[allow(clippy::collapsible_if)] if !metawords.is_empty() { + #[cfg(not(feature = "benchmark"))] if metastore::add_url_to_metawords(&db, &metawords, url) .await .is_err() @@ -400,6 +437,7 @@ pub async fn web_parse( } #[allow(clippy::collapsible_if)] if !titlewords.is_empty() { + #[cfg(not(feature = "benchmark"))] if titlestore::add_url_to_titlewords(&db, &titlewords, url) .await .is_err() @@ -409,6 +447,7 @@ pub async fn web_parse( } } + #[cfg(not(feature = "benchmark"))] if sitestore::add_website( &db, url, @@ -421,7 +460,7 @@ pub async fn web_parse( }, &wordmap, raw_page_content, - damping + damping, ) .await .is_err() @@ -429,7 +468,7 @@ pub async fn web_parse( warn!("couldn't add {} to sitestore!", url); db_error_so_requeue_anyways = true; } - + debug!("finished with main site stuff for {}", url); let linkelms = driver.find_all(By::Tag("a")).await.map_err(|_| ())?; @@ -466,27 +505,40 @@ pub async fn web_parse( let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation()); linkwords.push(word); } - + #[allow(clippy::collapsible_if)] if !linkwords.is_empty() { - if linkstore::add_url_to_linkwords(&db, &linkwords, &href).await.is_err() { + #[cfg(not(feature = "benchmark"))] + if linkstore::add_url_to_linkwords(&db, &linkwords, &href) + .await + .is_err() + { warn!("couldn't add {} to linkwords!", url); } } - + + #[cfg(not(feature = "benchmark"))] if linkrelstore::a_linksto_b(&db, url, &href).await.is_err() { warn!("couldn't perform a_linksto_b (a {url} b {href})"); } - nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest { - url: href, - damping: 0.85, - }).unwrap().into()).await.unwrap(); + #[cfg(not(feature = "benchmark"))] + nats.publish( + VOREBOT_SERVICE.to_string(), + rmp_serde::to_vec(&CrawlRequest { + url: href, + damping: 0.85, + }) + .unwrap() + .into(), + ) + .await + .unwrap(); } - + let elapsed = start.elapsed().as_secs_f64(); - + debug!("crawled {} in {} seconds", url, elapsed); - + Ok(()) }