initial benchmark mode impl
Some checks failed
/ build-all-services (push) Failing after 3m34s

could do with some more in-depth info, but will suffice for now
This commit is contained in:
husky 2025-03-31 15:04:37 -07:00
parent b8a338d9ba
commit 822ea728a5
3 changed files with 151 additions and 88 deletions

View file

@ -25,4 +25,8 @@ mutex-timeouts = { version = "0.3.0", features = ["tokio"] }
prometheus_exporter = "0.8.5" prometheus_exporter = "0.8.5"
thirtyfour = "0.35.0" thirtyfour = "0.35.0"
stopwords = "0.1.1" stopwords = "0.1.1"
texting_robots = "0.2.2" texting_robots = "0.2.2"
[features]
default = []
benchmark = []

View file

@ -28,6 +28,7 @@ use stopwords::{Language, Spark, Stopwords};
use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver}; use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver};
use thirtyfour::common::capabilities::firefox::FirefoxPreferences; use thirtyfour::common::capabilities::firefox::FirefoxPreferences;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
#[cfg(not(feature = "benchmark"))]
use asklyphe_common::ldb::DBConn; use asklyphe_common::ldb::DBConn;
use asklyphe_common::nats::vorebot::CrawlRequest; use asklyphe_common::nats::vorebot::CrawlRequest;
use crate::webparse::web_parse; use crate::webparse::web_parse;
@ -73,6 +74,8 @@ pub static USER_AGENT: Lazy<String> = Lazy::new(|| {
) )
}); });
#[cfg(not(feature = "benchmark"))]
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed); mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
@ -367,34 +370,38 @@ async fn main() {
} }
} }
} }
#[cfg(feature = "benchmark")]
pub struct DBConn {}
#[cfg(feature = "benchmark")]
pub struct Context {}
//#[tokio::main] #[cfg(feature = "benchmark")]
//async fn main() { #[tokio::main]
// mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed); async fn main() {
// mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
// env_logger::init();
// info!("began at {}", chrono::Utc::now().to_string()); eprintln!("BENCHMARK MODE ENABLED");
// println!("BENCHMARK MODE ENABLED");
// let nats = async_nats::connect(NATS_URL.as_str()).await;
// if let Err(e) = nats { let args: Vec<String> = std::env::args().collect();
// error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e); let test_site = args.get(1);
// return; if test_site.is_none() {
// } panic!("PLEASE PROVIDE TEST SITE!")
// let nats = nats.unwrap(); }
// let test_site = test_site.unwrap();
// let dbconn = DBConn::new(nats.clone(), "lyphedb-test");
// env_logger::init();
// let nats = jetstream::new(nats); info!("began at {}", chrono::Utc::now().to_string());
//
// let mut prefs = FirefoxPreferences::new(); let mut prefs = FirefoxPreferences::new();
// prefs.set_user_agent(USER_AGENT.to_string()).unwrap(); prefs.set_user_agent(USER_AGENT.to_string()).unwrap();
// let mut caps = DesiredCapabilities::firefox(); let mut caps = DesiredCapabilities::firefox();
// caps.set_preferences(prefs).unwrap(); caps.set_preferences(prefs).unwrap();
// let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap(); let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap();
//
// driver.in_new_tab(|| async { driver.in_new_tab(|| async {
// web_parse(nats.clone(), dbconn.clone(), &driver, "https://asklyphe.com/", 0.85).await.expect("Failed to run web parse"); web_parse(Context {}, DBConn {}, &driver, test_site, 0.85).await.expect("Failed to run web parse");
//
// Ok(()) Ok(())
// }).await.unwrap(); }).await.unwrap();
//} }

View file

@ -1,7 +1,13 @@
use crate::USER_AGENT; use crate::USER_AGENT;
use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore, DBConn}; #[cfg(feature = "benchmark")]
use async_nats::jetstream; use crate::{Context, DBConn};
use async_nats::jetstream::kv; #[cfg(not(feature = "benchmark"))]
use asklyphe_common::ldb::DBConn;
use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore};
use asklyphe_common::nats::vorebot::CrawlRequest;
use asklyphe_common::nats::vorebot::VOREBOT_SERVICE;
#[cfg(not(feature = "benchmark"))]
use async_nats::jetstream::{kv, Context};
use futures::AsyncReadExt; use futures::AsyncReadExt;
use image::EncodableLayout; use image::EncodableLayout;
use isahc::config::RedirectPolicy; use isahc::config::RedirectPolicy;
@ -16,8 +22,6 @@ use std::time::Duration;
use stopwords::{Language, Spark, Stopwords}; use stopwords::{Language, Spark, Stopwords};
use texting_robots::{get_robots_url, Robot}; use texting_robots::{get_robots_url, Robot};
use thirtyfour::{By, WebDriver}; use thirtyfour::{By, WebDriver};
use asklyphe_common::nats::vorebot::CrawlRequest;
use asklyphe_common::nats::vorebot::VOREBOT_SERVICE;
pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result<bool, ()> { pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result<bool, ()> {
let robot1 = Robot::new("Vorebot", robotstxt); let robot1 = Robot::new("Vorebot", robotstxt);
@ -35,50 +39,56 @@ pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result<bool, ()> {
// returns Err if we cannot access a page, but the error associated with it seems temporary (i.e. it's worth trying again later) // returns Err if we cannot access a page, but the error associated with it seems temporary (i.e. it's worth trying again later)
// otherwise, returns Ok // otherwise, returns Ok
pub async fn web_parse( pub async fn web_parse(
nats: jetstream::Context, nats: Context,
db: DBConn, db: DBConn,
driver: &WebDriver, driver: &WebDriver,
url: &str, url: &str,
damping: f64, damping: f64,
) -> Result<(), ()> { ) -> Result<(), ()> {
driver.delete_all_cookies().await.map_err(|_| ())?; driver.delete_all_cookies().await.map_err(|_| ())?;
let robots_bucket = nats.get_key_value("robots").await;
let robots_bucket = if robots_bucket.is_err() { #[cfg(not(feature = "benchmark"))]
let robots_bucket = nats let robots_bucket = {
.create_key_value(kv::Config { let robots_bucket = nats.get_key_value("robots").await;
bucket: "robots".to_string(), if robots_bucket.is_err() {
description: "storage of robots.txt data for given hosts".to_string(), let robots_bucket = nats
..Default::default() .create_key_value(kv::Config {
}) bucket: "robots".to_string(),
.await; description: "storage of robots.txt data for given hosts".to_string(),
if let Err(e) = robots_bucket { ..Default::default()
error!("could not create robots.txt bucket: {}", e); })
None .await;
if let Err(e) = robots_bucket {
error!("could not create robots.txt bucket: {}", e);
None
} else {
Some(robots_bucket.unwrap())
}
} else { } else {
Some(robots_bucket.unwrap()) robots_bucket.ok()
} }
} else {
robots_bucket.ok()
}; };
let hosts_bucket = nats.get_key_value("hosts").await; #[cfg(not(feature = "benchmark"))]
let hosts_bucket = if hosts_bucket.is_err() { let hosts_bucket = {
let hosts_bucket = nats let hosts_bucket = nats.get_key_value("hosts").await;
.create_key_value(kv::Config { if hosts_bucket.is_err() {
bucket: "hosts".to_string(), let hosts_bucket = nats
description: "prevent the same host from being scraped too quickly".to_string(), .create_key_value(kv::Config {
max_age: Duration::from_secs(60 * 10), bucket: "hosts".to_string(),
..Default::default() description: "prevent the same host from being scraped too quickly".to_string(),
}) max_age: Duration::from_secs(60 * 10),
.await; ..Default::default()
if let Err(e) = hosts_bucket { })
error!("could not create hosts bucket: {}", e); .await;
return Err(()); if let Err(e) = hosts_bucket {
error!("could not create hosts bucket: {}", e);
return Err(());
} else {
hosts_bucket.unwrap()
}
} else { } else {
hosts_bucket.unwrap() hosts_bucket.unwrap()
} }
} else {
hosts_bucket.unwrap()
}; };
let robots_url = get_robots_url(url); let robots_url = get_robots_url(url);
@ -91,18 +101,29 @@ pub async fn web_parse(
hash.write(robots_url.as_bytes()); hash.write(robots_url.as_bytes());
let hash = hash.finish(); let hash = hash.finish();
#[cfg(not(feature = "benchmark"))]
if let Ok(Some(host)) = hosts_bucket.get(hash.to_string()).await { if let Ok(Some(host)) = hosts_bucket.get(hash.to_string()).await {
let count = *host.first().unwrap_or(&0); let count = *host.first().unwrap_or(&0);
if count > 10 { if count > 10 {
warn!("scraping {} too quickly, avoiding for one minute", robots_url); warn!(
"scraping {} too quickly, avoiding for one minute",
robots_url
);
return Err(()); return Err(());
} }
hosts_bucket.put(hash.to_string(), vec![count + 1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!"); hosts_bucket
.put(hash.to_string(), vec![count + 1].into())
.await
.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
} else { } else {
hosts_bucket.put(hash.to_string(), vec![1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!"); hosts_bucket
.put(hash.to_string(), vec![1].into())
.await
.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
} }
let mut skip_robots_check = false; let mut skip_robots_check = false;
#[cfg(not(feature = "benchmark"))]
if let Some(robots_bucket) = &robots_bucket { if let Some(robots_bucket) = &robots_bucket {
if let Ok(Some(entry)) = robots_bucket.get(hash.to_string()).await { if let Ok(Some(entry)) = robots_bucket.get(hash.to_string()).await {
if let Ok(res) = allowed_to_crawl(entry.as_bytes(), url) { if let Ok(res) = allowed_to_crawl(entry.as_bytes(), url) {
@ -160,6 +181,7 @@ pub async fn web_parse(
} }
if let Ok(res) = allowed_to_crawl(body.as_bytes(), url) { if let Ok(res) = allowed_to_crawl(body.as_bytes(), url) {
#[cfg(not(feature = "benchmark"))]
if let Some(robots_bucket) = &robots_bucket { if let Some(robots_bucket) = &robots_bucket {
if let Err(e) = robots_bucket if let Err(e) = robots_bucket
.put(hash.to_string(), body.as_bytes().to_vec().into()) .put(hash.to_string(), body.as_bytes().to_vec().into())
@ -213,7 +235,11 @@ pub async fn web_parse(
} }
if response.status().is_server_error() || response.status().is_client_error() { if response.status().is_server_error() || response.status().is_client_error() {
// don't crawl at the moment // don't crawl at the moment
debug!("not crawling {} due to bad status code {}", url, response.status()); debug!(
"not crawling {} due to bad status code {}",
url,
response.status()
);
return Err(()); return Err(());
} }
@ -226,7 +252,10 @@ pub async fn web_parse(
if !lang.starts_with("en") && !lang.starts_with("unknown") { if !lang.starts_with("en") && !lang.starts_with("unknown") {
// i.e. non-english language // i.e. non-english language
// fixme: remove this once we start expanding to non-english-speaking markets? // fixme: remove this once we start expanding to non-english-speaking markets?
warn!("skipping {} due to {} language (currently prioritizing english", url, lang); warn!(
"skipping {} due to {} language (currently prioritizing english",
url, lang
);
return Err(()); return Err(());
} }
} }
@ -334,11 +363,17 @@ pub async fn web_parse(
.collect(); .collect();
debug!("headers..."); debug!("headers...");
gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["h1","h2","h3","h4","h5","h6"], 3.0) gather_elements_with_multiplier(
.await; driver,
&mut wordmap,
&stops,
&["h1", "h2", "h3", "h4", "h5", "h6"],
3.0,
)
.await;
debug!("paragraphs..."); debug!("paragraphs...");
gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["p","div"], 1.0).await; gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["p", "div"], 1.0).await;
let mut wordmap = wordmap.into_iter().collect::<Vec<_>>(); let mut wordmap = wordmap.into_iter().collect::<Vec<_>>();
wordmap.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap()); wordmap.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
@ -351,6 +386,7 @@ pub async fn web_parse(
.collect::<Vec<_>>(); .collect::<Vec<_>>();
#[allow(clippy::collapsible_if)] #[allow(clippy::collapsible_if)]
if !words.is_empty() { if !words.is_empty() {
#[cfg(not(feature = "benchmark"))]
if wordstore::add_url_to_keywords(&db, &words, url) if wordstore::add_url_to_keywords(&db, &words, url)
.await .await
.is_err() .is_err()
@ -379,6 +415,7 @@ pub async fn web_parse(
} }
#[allow(clippy::collapsible_if)] #[allow(clippy::collapsible_if)]
if !metawords.is_empty() { if !metawords.is_empty() {
#[cfg(not(feature = "benchmark"))]
if metastore::add_url_to_metawords(&db, &metawords, url) if metastore::add_url_to_metawords(&db, &metawords, url)
.await .await
.is_err() .is_err()
@ -400,6 +437,7 @@ pub async fn web_parse(
} }
#[allow(clippy::collapsible_if)] #[allow(clippy::collapsible_if)]
if !titlewords.is_empty() { if !titlewords.is_empty() {
#[cfg(not(feature = "benchmark"))]
if titlestore::add_url_to_titlewords(&db, &titlewords, url) if titlestore::add_url_to_titlewords(&db, &titlewords, url)
.await .await
.is_err() .is_err()
@ -409,6 +447,7 @@ pub async fn web_parse(
} }
} }
#[cfg(not(feature = "benchmark"))]
if sitestore::add_website( if sitestore::add_website(
&db, &db,
url, url,
@ -421,7 +460,7 @@ pub async fn web_parse(
}, },
&wordmap, &wordmap,
raw_page_content, raw_page_content,
damping damping,
) )
.await .await
.is_err() .is_err()
@ -429,7 +468,7 @@ pub async fn web_parse(
warn!("couldn't add {} to sitestore!", url); warn!("couldn't add {} to sitestore!", url);
db_error_so_requeue_anyways = true; db_error_so_requeue_anyways = true;
} }
debug!("finished with main site stuff for {}", url); debug!("finished with main site stuff for {}", url);
let linkelms = driver.find_all(By::Tag("a")).await.map_err(|_| ())?; let linkelms = driver.find_all(By::Tag("a")).await.map_err(|_| ())?;
@ -466,27 +505,40 @@ pub async fn web_parse(
let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation()); let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation());
linkwords.push(word); linkwords.push(word);
} }
#[allow(clippy::collapsible_if)] #[allow(clippy::collapsible_if)]
if !linkwords.is_empty() { if !linkwords.is_empty() {
if linkstore::add_url_to_linkwords(&db, &linkwords, &href).await.is_err() { #[cfg(not(feature = "benchmark"))]
if linkstore::add_url_to_linkwords(&db, &linkwords, &href)
.await
.is_err()
{
warn!("couldn't add {} to linkwords!", url); warn!("couldn't add {} to linkwords!", url);
} }
} }
#[cfg(not(feature = "benchmark"))]
if linkrelstore::a_linksto_b(&db, url, &href).await.is_err() { if linkrelstore::a_linksto_b(&db, url, &href).await.is_err() {
warn!("couldn't perform a_linksto_b (a {url} b {href})"); warn!("couldn't perform a_linksto_b (a {url} b {href})");
} }
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest { #[cfg(not(feature = "benchmark"))]
url: href, nats.publish(
damping: 0.85, VOREBOT_SERVICE.to_string(),
}).unwrap().into()).await.unwrap(); rmp_serde::to_vec(&CrawlRequest {
url: href,
damping: 0.85,
})
.unwrap()
.into(),
)
.await
.unwrap();
} }
let elapsed = start.elapsed().as_secs_f64(); let elapsed = start.elapsed().as_secs_f64();
debug!("crawled {} in {} seconds", url, elapsed); debug!("crawled {} in {} seconds", url, elapsed);
Ok(()) Ok(())
} }