t179: crawler benchmark mode #12

Open
husky wants to merge 1 commit from feature/nikocs/t179-crawler-benchmark-mode into develop
3 changed files with 151 additions and 88 deletions

View file

@ -25,4 +25,8 @@ mutex-timeouts = { version = "0.3.0", features = ["tokio"] }
prometheus_exporter = "0.8.5"
thirtyfour = "0.35.0"
stopwords = "0.1.1"
texting_robots = "0.2.2"
texting_robots = "0.2.2"
[features]
default = []
benchmark = []

View file

@ -28,6 +28,7 @@ use stopwords::{Language, Spark, Stopwords};
use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver};
use thirtyfour::common::capabilities::firefox::FirefoxPreferences;
use tokio::task::JoinHandle;
#[cfg(not(feature = "benchmark"))]
use asklyphe_common::ldb::DBConn;
use asklyphe_common::nats::vorebot::CrawlRequest;
use crate::webparse::web_parse;
@ -73,6 +74,8 @@ pub static USER_AGENT: Lazy<String> = Lazy::new(|| {
)
});
#[cfg(not(feature = "benchmark"))]
#[tokio::main]
async fn main() {
mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
@ -367,34 +370,38 @@ async fn main() {
}
}
}
#[cfg(feature = "benchmark")]
pub struct DBConn {}
#[cfg(feature = "benchmark")]
pub struct Context {}
//#[tokio::main]
//async fn main() {
// mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
//
// env_logger::init();
// info!("began at {}", chrono::Utc::now().to_string());
//
// let nats = async_nats::connect(NATS_URL.as_str()).await;
// if let Err(e) = nats {
// error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e);
// return;
// }
// let nats = nats.unwrap();
//
// let dbconn = DBConn::new(nats.clone(), "lyphedb-test");
//
// let nats = jetstream::new(nats);
//
// let mut prefs = FirefoxPreferences::new();
// prefs.set_user_agent(USER_AGENT.to_string()).unwrap();
// let mut caps = DesiredCapabilities::firefox();
// caps.set_preferences(prefs).unwrap();
// let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap();
//
// driver.in_new_tab(|| async {
// web_parse(nats.clone(), dbconn.clone(), &driver, "https://asklyphe.com/", 0.85).await.expect("Failed to run web parse");
//
// Ok(())
// }).await.unwrap();
//}
#[cfg(feature = "benchmark")]
#[tokio::main]
async fn main() {
mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
eprintln!("BENCHMARK MODE ENABLED");
println!("BENCHMARK MODE ENABLED");
let args: Vec<String> = std::env::args().collect();
let test_site = args.get(1);
if test_site.is_none() {
panic!("PLEASE PROVIDE TEST SITE!")
}
let test_site = test_site.unwrap();
env_logger::init();
info!("began at {}", chrono::Utc::now().to_string());
let mut prefs = FirefoxPreferences::new();
prefs.set_user_agent(USER_AGENT.to_string()).unwrap();
let mut caps = DesiredCapabilities::firefox();
caps.set_preferences(prefs).unwrap();
let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap();
driver.in_new_tab(|| async {
web_parse(Context {}, DBConn {}, &driver, test_site, 0.85).await.expect("Failed to run web parse");
Ok(())
}).await.unwrap();
}

View file

@ -1,7 +1,13 @@
use crate::USER_AGENT;
use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore, DBConn};
use async_nats::jetstream;
use async_nats::jetstream::kv;
#[cfg(feature = "benchmark")]
use crate::{Context, DBConn};
#[cfg(not(feature = "benchmark"))]
use asklyphe_common::ldb::DBConn;
use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore};
use asklyphe_common::nats::vorebot::CrawlRequest;
use asklyphe_common::nats::vorebot::VOREBOT_SERVICE;
#[cfg(not(feature = "benchmark"))]
use async_nats::jetstream::{kv, Context};
use futures::AsyncReadExt;
use image::EncodableLayout;
use isahc::config::RedirectPolicy;
@ -16,8 +22,6 @@ use std::time::Duration;
use stopwords::{Language, Spark, Stopwords};
use texting_robots::{get_robots_url, Robot};
use thirtyfour::{By, WebDriver};
use asklyphe_common::nats::vorebot::CrawlRequest;
use asklyphe_common::nats::vorebot::VOREBOT_SERVICE;
pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result<bool, ()> {
let robot1 = Robot::new("Vorebot", robotstxt);
@ -35,50 +39,56 @@ pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result<bool, ()> {
// returns Err if we cannot access a page, but the error associated with it seems temporary (i.e. it's worth trying again later)
// otherwise, returns Ok
pub async fn web_parse(
nats: jetstream::Context,
nats: Context,
db: DBConn,
driver: &WebDriver,
url: &str,
damping: f64,
) -> Result<(), ()> {
driver.delete_all_cookies().await.map_err(|_| ())?;
let robots_bucket = nats.get_key_value("robots").await;
let robots_bucket = if robots_bucket.is_err() {
let robots_bucket = nats
.create_key_value(kv::Config {
bucket: "robots".to_string(),
description: "storage of robots.txt data for given hosts".to_string(),
..Default::default()
})
.await;
if let Err(e) = robots_bucket {
error!("could not create robots.txt bucket: {}", e);
None
#[cfg(not(feature = "benchmark"))]
let robots_bucket = {
let robots_bucket = nats.get_key_value("robots").await;
if robots_bucket.is_err() {
let robots_bucket = nats
.create_key_value(kv::Config {
bucket: "robots".to_string(),
description: "storage of robots.txt data for given hosts".to_string(),
..Default::default()
})
.await;
if let Err(e) = robots_bucket {
error!("could not create robots.txt bucket: {}", e);
None
} else {
Some(robots_bucket.unwrap())
}
} else {
Some(robots_bucket.unwrap())
robots_bucket.ok()
}
} else {
robots_bucket.ok()
};
let hosts_bucket = nats.get_key_value("hosts").await;
let hosts_bucket = if hosts_bucket.is_err() {
let hosts_bucket = nats
.create_key_value(kv::Config {
bucket: "hosts".to_string(),
description: "prevent the same host from being scraped too quickly".to_string(),
max_age: Duration::from_secs(60 * 10),
..Default::default()
})
.await;
if let Err(e) = hosts_bucket {
error!("could not create hosts bucket: {}", e);
return Err(());
#[cfg(not(feature = "benchmark"))]
let hosts_bucket = {
let hosts_bucket = nats.get_key_value("hosts").await;
if hosts_bucket.is_err() {
let hosts_bucket = nats
.create_key_value(kv::Config {
bucket: "hosts".to_string(),
description: "prevent the same host from being scraped too quickly".to_string(),
max_age: Duration::from_secs(60 * 10),
..Default::default()
})
.await;
if let Err(e) = hosts_bucket {
error!("could not create hosts bucket: {}", e);
return Err(());
} else {
hosts_bucket.unwrap()
}
} else {
hosts_bucket.unwrap()
}
} else {
hosts_bucket.unwrap()
};
let robots_url = get_robots_url(url);
@ -91,18 +101,29 @@ pub async fn web_parse(
hash.write(robots_url.as_bytes());
let hash = hash.finish();
#[cfg(not(feature = "benchmark"))]
if let Ok(Some(host)) = hosts_bucket.get(hash.to_string()).await {
let count = *host.first().unwrap_or(&0);
if count > 10 {
warn!("scraping {} too quickly, avoiding for one minute", robots_url);
warn!(
"scraping {} too quickly, avoiding for one minute",
robots_url
);
return Err(());
}
hosts_bucket.put(hash.to_string(), vec![count + 1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
hosts_bucket
.put(hash.to_string(), vec![count + 1].into())
.await
.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
} else {
hosts_bucket.put(hash.to_string(), vec![1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
hosts_bucket
.put(hash.to_string(), vec![1].into())
.await
.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
}
let mut skip_robots_check = false;
#[cfg(not(feature = "benchmark"))]
if let Some(robots_bucket) = &robots_bucket {
if let Ok(Some(entry)) = robots_bucket.get(hash.to_string()).await {
if let Ok(res) = allowed_to_crawl(entry.as_bytes(), url) {
@ -160,6 +181,7 @@ pub async fn web_parse(
}
if let Ok(res) = allowed_to_crawl(body.as_bytes(), url) {
#[cfg(not(feature = "benchmark"))]
if let Some(robots_bucket) = &robots_bucket {
if let Err(e) = robots_bucket
.put(hash.to_string(), body.as_bytes().to_vec().into())
@ -213,7 +235,11 @@ pub async fn web_parse(
}
if response.status().is_server_error() || response.status().is_client_error() {
// don't crawl at the moment
debug!("not crawling {} due to bad status code {}", url, response.status());
debug!(
"not crawling {} due to bad status code {}",
url,
response.status()
);
return Err(());
}
@ -226,7 +252,10 @@ pub async fn web_parse(
if !lang.starts_with("en") && !lang.starts_with("unknown") {
// i.e. non-english language
// fixme: remove this once we start expanding to non-english-speaking markets?
warn!("skipping {} due to {} language (currently prioritizing english", url, lang);
warn!(
"skipping {} due to {} language (currently prioritizing english",
url, lang
);
return Err(());
}
}
@ -334,11 +363,17 @@ pub async fn web_parse(
.collect();
debug!("headers...");
gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["h1","h2","h3","h4","h5","h6"], 3.0)
.await;
gather_elements_with_multiplier(
driver,
&mut wordmap,
&stops,
&["h1", "h2", "h3", "h4", "h5", "h6"],
3.0,
)
.await;
debug!("paragraphs...");
gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["p","div"], 1.0).await;
gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["p", "div"], 1.0).await;
let mut wordmap = wordmap.into_iter().collect::<Vec<_>>();
wordmap.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
@ -351,6 +386,7 @@ pub async fn web_parse(
.collect::<Vec<_>>();
#[allow(clippy::collapsible_if)]
if !words.is_empty() {
#[cfg(not(feature = "benchmark"))]
if wordstore::add_url_to_keywords(&db, &words, url)
.await
.is_err()
@ -379,6 +415,7 @@ pub async fn web_parse(
}
#[allow(clippy::collapsible_if)]
if !metawords.is_empty() {
#[cfg(not(feature = "benchmark"))]
if metastore::add_url_to_metawords(&db, &metawords, url)
.await
.is_err()
@ -400,6 +437,7 @@ pub async fn web_parse(
}
#[allow(clippy::collapsible_if)]
if !titlewords.is_empty() {
#[cfg(not(feature = "benchmark"))]
if titlestore::add_url_to_titlewords(&db, &titlewords, url)
.await
.is_err()
@ -409,6 +447,7 @@ pub async fn web_parse(
}
}
#[cfg(not(feature = "benchmark"))]
if sitestore::add_website(
&db,
url,
@ -421,7 +460,7 @@ pub async fn web_parse(
},
&wordmap,
raw_page_content,
damping
damping,
)
.await
.is_err()
@ -429,7 +468,7 @@ pub async fn web_parse(
warn!("couldn't add {} to sitestore!", url);
db_error_so_requeue_anyways = true;
}
debug!("finished with main site stuff for {}", url);
let linkelms = driver.find_all(By::Tag("a")).await.map_err(|_| ())?;
@ -466,27 +505,40 @@ pub async fn web_parse(
let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation());
linkwords.push(word);
}
#[allow(clippy::collapsible_if)]
if !linkwords.is_empty() {
if linkstore::add_url_to_linkwords(&db, &linkwords, &href).await.is_err() {
#[cfg(not(feature = "benchmark"))]
if linkstore::add_url_to_linkwords(&db, &linkwords, &href)
.await
.is_err()
{
warn!("couldn't add {} to linkwords!", url);
}
}
#[cfg(not(feature = "benchmark"))]
if linkrelstore::a_linksto_b(&db, url, &href).await.is_err() {
warn!("couldn't perform a_linksto_b (a {url} b {href})");
}
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest {
url: href,
damping: 0.85,
}).unwrap().into()).await.unwrap();
#[cfg(not(feature = "benchmark"))]
nats.publish(
VOREBOT_SERVICE.to_string(),
rmp_serde::to_vec(&CrawlRequest {
url: href,
damping: 0.85,
})
.unwrap()
.into(),
)
.await
.unwrap();
}
let elapsed = start.elapsed().as_secs_f64();
debug!("crawled {} in {} seconds", url, elapsed);
Ok(())
}