t179: crawler benchmark mode #12
3 changed files with 151 additions and 88 deletions
|
@ -25,4 +25,8 @@ mutex-timeouts = { version = "0.3.0", features = ["tokio"] }
|
|||
prometheus_exporter = "0.8.5"
|
||||
thirtyfour = "0.35.0"
|
||||
stopwords = "0.1.1"
|
||||
texting_robots = "0.2.2"
|
||||
texting_robots = "0.2.2"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
benchmark = []
|
|
@ -28,6 +28,7 @@ use stopwords::{Language, Spark, Stopwords};
|
|||
use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver};
|
||||
use thirtyfour::common::capabilities::firefox::FirefoxPreferences;
|
||||
use tokio::task::JoinHandle;
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
use asklyphe_common::ldb::DBConn;
|
||||
use asklyphe_common::nats::vorebot::CrawlRequest;
|
||||
use crate::webparse::web_parse;
|
||||
|
@ -73,6 +74,8 @@ pub static USER_AGENT: Lazy<String> = Lazy::new(|| {
|
|||
)
|
||||
});
|
||||
|
||||
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
|
||||
|
@ -367,34 +370,38 @@ async fn main() {
|
|||
}
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "benchmark")]
|
||||
pub struct DBConn {}
|
||||
#[cfg(feature = "benchmark")]
|
||||
pub struct Context {}
|
||||
|
||||
//#[tokio::main]
|
||||
//async fn main() {
|
||||
// mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
|
||||
//
|
||||
// env_logger::init();
|
||||
// info!("began at {}", chrono::Utc::now().to_string());
|
||||
//
|
||||
// let nats = async_nats::connect(NATS_URL.as_str()).await;
|
||||
// if let Err(e) = nats {
|
||||
// error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e);
|
||||
// return;
|
||||
// }
|
||||
// let nats = nats.unwrap();
|
||||
//
|
||||
// let dbconn = DBConn::new(nats.clone(), "lyphedb-test");
|
||||
//
|
||||
// let nats = jetstream::new(nats);
|
||||
//
|
||||
// let mut prefs = FirefoxPreferences::new();
|
||||
// prefs.set_user_agent(USER_AGENT.to_string()).unwrap();
|
||||
// let mut caps = DesiredCapabilities::firefox();
|
||||
// caps.set_preferences(prefs).unwrap();
|
||||
// let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap();
|
||||
//
|
||||
// driver.in_new_tab(|| async {
|
||||
// web_parse(nats.clone(), dbconn.clone(), &driver, "https://asklyphe.com/", 0.85).await.expect("Failed to run web parse");
|
||||
//
|
||||
// Ok(())
|
||||
// }).await.unwrap();
|
||||
//}
|
||||
#[cfg(feature = "benchmark")]
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
|
||||
|
||||
eprintln!("BENCHMARK MODE ENABLED");
|
||||
println!("BENCHMARK MODE ENABLED");
|
||||
|
||||
let args: Vec<String> = std::env::args().collect();
|
||||
let test_site = args.get(1);
|
||||
if test_site.is_none() {
|
||||
panic!("PLEASE PROVIDE TEST SITE!")
|
||||
}
|
||||
let test_site = test_site.unwrap();
|
||||
|
||||
env_logger::init();
|
||||
info!("began at {}", chrono::Utc::now().to_string());
|
||||
|
||||
let mut prefs = FirefoxPreferences::new();
|
||||
prefs.set_user_agent(USER_AGENT.to_string()).unwrap();
|
||||
let mut caps = DesiredCapabilities::firefox();
|
||||
caps.set_preferences(prefs).unwrap();
|
||||
let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap();
|
||||
|
||||
driver.in_new_tab(|| async {
|
||||
web_parse(Context {}, DBConn {}, &driver, test_site, 0.85).await.expect("Failed to run web parse");
|
||||
|
||||
Ok(())
|
||||
}).await.unwrap();
|
||||
}
|
||||
|
|
|
@ -1,7 +1,13 @@
|
|||
use crate::USER_AGENT;
|
||||
use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore, DBConn};
|
||||
use async_nats::jetstream;
|
||||
use async_nats::jetstream::kv;
|
||||
#[cfg(feature = "benchmark")]
|
||||
use crate::{Context, DBConn};
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
use asklyphe_common::ldb::DBConn;
|
||||
use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore};
|
||||
use asklyphe_common::nats::vorebot::CrawlRequest;
|
||||
use asklyphe_common::nats::vorebot::VOREBOT_SERVICE;
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
use async_nats::jetstream::{kv, Context};
|
||||
use futures::AsyncReadExt;
|
||||
use image::EncodableLayout;
|
||||
use isahc::config::RedirectPolicy;
|
||||
|
@ -16,8 +22,6 @@ use std::time::Duration;
|
|||
use stopwords::{Language, Spark, Stopwords};
|
||||
use texting_robots::{get_robots_url, Robot};
|
||||
use thirtyfour::{By, WebDriver};
|
||||
use asklyphe_common::nats::vorebot::CrawlRequest;
|
||||
use asklyphe_common::nats::vorebot::VOREBOT_SERVICE;
|
||||
|
||||
pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result<bool, ()> {
|
||||
let robot1 = Robot::new("Vorebot", robotstxt);
|
||||
|
@ -35,50 +39,56 @@ pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result<bool, ()> {
|
|||
// returns Err if we cannot access a page, but the error associated with it seems temporary (i.e. it's worth trying again later)
|
||||
// otherwise, returns Ok
|
||||
pub async fn web_parse(
|
||||
nats: jetstream::Context,
|
||||
nats: Context,
|
||||
db: DBConn,
|
||||
driver: &WebDriver,
|
||||
url: &str,
|
||||
damping: f64,
|
||||
) -> Result<(), ()> {
|
||||
|
||||
driver.delete_all_cookies().await.map_err(|_| ())?;
|
||||
let robots_bucket = nats.get_key_value("robots").await;
|
||||
let robots_bucket = if robots_bucket.is_err() {
|
||||
let robots_bucket = nats
|
||||
.create_key_value(kv::Config {
|
||||
bucket: "robots".to_string(),
|
||||
description: "storage of robots.txt data for given hosts".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
if let Err(e) = robots_bucket {
|
||||
error!("could not create robots.txt bucket: {}", e);
|
||||
None
|
||||
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
let robots_bucket = {
|
||||
let robots_bucket = nats.get_key_value("robots").await;
|
||||
if robots_bucket.is_err() {
|
||||
let robots_bucket = nats
|
||||
.create_key_value(kv::Config {
|
||||
bucket: "robots".to_string(),
|
||||
description: "storage of robots.txt data for given hosts".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
if let Err(e) = robots_bucket {
|
||||
error!("could not create robots.txt bucket: {}", e);
|
||||
None
|
||||
} else {
|
||||
Some(robots_bucket.unwrap())
|
||||
}
|
||||
} else {
|
||||
Some(robots_bucket.unwrap())
|
||||
robots_bucket.ok()
|
||||
}
|
||||
} else {
|
||||
robots_bucket.ok()
|
||||
};
|
||||
let hosts_bucket = nats.get_key_value("hosts").await;
|
||||
let hosts_bucket = if hosts_bucket.is_err() {
|
||||
let hosts_bucket = nats
|
||||
.create_key_value(kv::Config {
|
||||
bucket: "hosts".to_string(),
|
||||
description: "prevent the same host from being scraped too quickly".to_string(),
|
||||
max_age: Duration::from_secs(60 * 10),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
if let Err(e) = hosts_bucket {
|
||||
error!("could not create hosts bucket: {}", e);
|
||||
return Err(());
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
let hosts_bucket = {
|
||||
let hosts_bucket = nats.get_key_value("hosts").await;
|
||||
if hosts_bucket.is_err() {
|
||||
let hosts_bucket = nats
|
||||
.create_key_value(kv::Config {
|
||||
bucket: "hosts".to_string(),
|
||||
description: "prevent the same host from being scraped too quickly".to_string(),
|
||||
max_age: Duration::from_secs(60 * 10),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
if let Err(e) = hosts_bucket {
|
||||
error!("could not create hosts bucket: {}", e);
|
||||
return Err(());
|
||||
} else {
|
||||
hosts_bucket.unwrap()
|
||||
}
|
||||
} else {
|
||||
hosts_bucket.unwrap()
|
||||
}
|
||||
} else {
|
||||
hosts_bucket.unwrap()
|
||||
};
|
||||
|
||||
let robots_url = get_robots_url(url);
|
||||
|
@ -91,18 +101,29 @@ pub async fn web_parse(
|
|||
hash.write(robots_url.as_bytes());
|
||||
let hash = hash.finish();
|
||||
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
if let Ok(Some(host)) = hosts_bucket.get(hash.to_string()).await {
|
||||
let count = *host.first().unwrap_or(&0);
|
||||
if count > 10 {
|
||||
warn!("scraping {} too quickly, avoiding for one minute", robots_url);
|
||||
warn!(
|
||||
"scraping {} too quickly, avoiding for one minute",
|
||||
robots_url
|
||||
);
|
||||
return Err(());
|
||||
}
|
||||
hosts_bucket.put(hash.to_string(), vec![count + 1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
|
||||
hosts_bucket
|
||||
.put(hash.to_string(), vec![count + 1].into())
|
||||
.await
|
||||
.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
|
||||
} else {
|
||||
hosts_bucket.put(hash.to_string(), vec![1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
|
||||
hosts_bucket
|
||||
.put(hash.to_string(), vec![1].into())
|
||||
.await
|
||||
.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
|
||||
}
|
||||
|
||||
let mut skip_robots_check = false;
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
if let Some(robots_bucket) = &robots_bucket {
|
||||
if let Ok(Some(entry)) = robots_bucket.get(hash.to_string()).await {
|
||||
if let Ok(res) = allowed_to_crawl(entry.as_bytes(), url) {
|
||||
|
@ -160,6 +181,7 @@ pub async fn web_parse(
|
|||
}
|
||||
|
||||
if let Ok(res) = allowed_to_crawl(body.as_bytes(), url) {
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
if let Some(robots_bucket) = &robots_bucket {
|
||||
if let Err(e) = robots_bucket
|
||||
.put(hash.to_string(), body.as_bytes().to_vec().into())
|
||||
|
@ -213,7 +235,11 @@ pub async fn web_parse(
|
|||
}
|
||||
if response.status().is_server_error() || response.status().is_client_error() {
|
||||
// don't crawl at the moment
|
||||
debug!("not crawling {} due to bad status code {}", url, response.status());
|
||||
debug!(
|
||||
"not crawling {} due to bad status code {}",
|
||||
url,
|
||||
response.status()
|
||||
);
|
||||
return Err(());
|
||||
}
|
||||
|
||||
|
@ -226,7 +252,10 @@ pub async fn web_parse(
|
|||
if !lang.starts_with("en") && !lang.starts_with("unknown") {
|
||||
// i.e. non-english language
|
||||
// fixme: remove this once we start expanding to non-english-speaking markets?
|
||||
warn!("skipping {} due to {} language (currently prioritizing english", url, lang);
|
||||
warn!(
|
||||
"skipping {} due to {} language (currently prioritizing english",
|
||||
url, lang
|
||||
);
|
||||
return Err(());
|
||||
}
|
||||
}
|
||||
|
@ -334,11 +363,17 @@ pub async fn web_parse(
|
|||
.collect();
|
||||
|
||||
debug!("headers...");
|
||||
gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["h1","h2","h3","h4","h5","h6"], 3.0)
|
||||
.await;
|
||||
gather_elements_with_multiplier(
|
||||
driver,
|
||||
&mut wordmap,
|
||||
&stops,
|
||||
&["h1", "h2", "h3", "h4", "h5", "h6"],
|
||||
3.0,
|
||||
)
|
||||
.await;
|
||||
|
||||
debug!("paragraphs...");
|
||||
gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["p","div"], 1.0).await;
|
||||
gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["p", "div"], 1.0).await;
|
||||
|
||||
let mut wordmap = wordmap.into_iter().collect::<Vec<_>>();
|
||||
wordmap.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
|
||||
|
@ -351,6 +386,7 @@ pub async fn web_parse(
|
|||
.collect::<Vec<_>>();
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if !words.is_empty() {
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
if wordstore::add_url_to_keywords(&db, &words, url)
|
||||
.await
|
||||
.is_err()
|
||||
|
@ -379,6 +415,7 @@ pub async fn web_parse(
|
|||
}
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if !metawords.is_empty() {
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
if metastore::add_url_to_metawords(&db, &metawords, url)
|
||||
.await
|
||||
.is_err()
|
||||
|
@ -400,6 +437,7 @@ pub async fn web_parse(
|
|||
}
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if !titlewords.is_empty() {
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
if titlestore::add_url_to_titlewords(&db, &titlewords, url)
|
||||
.await
|
||||
.is_err()
|
||||
|
@ -409,6 +447,7 @@ pub async fn web_parse(
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
if sitestore::add_website(
|
||||
&db,
|
||||
url,
|
||||
|
@ -421,7 +460,7 @@ pub async fn web_parse(
|
|||
},
|
||||
&wordmap,
|
||||
raw_page_content,
|
||||
damping
|
||||
damping,
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
|
@ -429,7 +468,7 @@ pub async fn web_parse(
|
|||
warn!("couldn't add {} to sitestore!", url);
|
||||
db_error_so_requeue_anyways = true;
|
||||
}
|
||||
|
||||
|
||||
debug!("finished with main site stuff for {}", url);
|
||||
|
||||
let linkelms = driver.find_all(By::Tag("a")).await.map_err(|_| ())?;
|
||||
|
@ -466,27 +505,40 @@ pub async fn web_parse(
|
|||
let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation());
|
||||
linkwords.push(word);
|
||||
}
|
||||
|
||||
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if !linkwords.is_empty() {
|
||||
if linkstore::add_url_to_linkwords(&db, &linkwords, &href).await.is_err() {
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
if linkstore::add_url_to_linkwords(&db, &linkwords, &href)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
warn!("couldn't add {} to linkwords!", url);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
if linkrelstore::a_linksto_b(&db, url, &href).await.is_err() {
|
||||
warn!("couldn't perform a_linksto_b (a {url} b {href})");
|
||||
}
|
||||
|
||||
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest {
|
||||
url: href,
|
||||
damping: 0.85,
|
||||
}).unwrap().into()).await.unwrap();
|
||||
#[cfg(not(feature = "benchmark"))]
|
||||
nats.publish(
|
||||
VOREBOT_SERVICE.to_string(),
|
||||
rmp_serde::to_vec(&CrawlRequest {
|
||||
url: href,
|
||||
damping: 0.85,
|
||||
})
|
||||
.unwrap()
|
||||
.into(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
|
||||
let elapsed = start.elapsed().as_secs_f64();
|
||||
|
||||
|
||||
debug!("crawled {} in {} seconds", url, elapsed);
|
||||
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue