asklyphe/vorebot/src/webparse/mod.rs
husky 0f6f0deb9c
All checks were successful
/ build-all-services (push) Successful in 9m21s
linkrelstore + site suggestion cli
logs href relations in database, allowing for pagerank calculation in
the future.

sites can now be added to the queue by running ./vorebot <url> [damping]
default damping is 0.45 for suggested sites
2025-03-14 18:20:15 -07:00

492 lines
16 KiB
Rust

use crate::USER_AGENT;
use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore, DBConn};
use async_nats::jetstream;
use async_nats::jetstream::kv;
use futures::AsyncReadExt;
use image::EncodableLayout;
use isahc::config::RedirectPolicy;
use isahc::prelude::Configurable;
use isahc::HttpClient;
use log::{debug, error, warn};
use std::collections::{BTreeMap, BTreeSet};
use std::hash::{DefaultHasher, Hasher};
use std::sync::atomic::AtomicBool;
use std::sync::{mpsc, Arc};
use std::time::Duration;
use stopwords::{Language, Spark, Stopwords};
use texting_robots::{get_robots_url, Robot};
use thirtyfour::{By, WebDriver};
use asklyphe_common::nats::vorebot::CrawlRequest;
use asklyphe_common::nats::vorebot::VOREBOT_SERVICE;
pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result<bool, ()> {
let robot1 = Robot::new("Vorebot", robotstxt);
if let Err(e) = robot1 {
warn!(
"potentially malformed robots.txt ({}), not crawling {}",
e, url
);
return Err(());
}
let robot1 = robot1.unwrap();
Ok(robot1.allowed(url))
}
// returns Err if we cannot access a page, but the error associated with it seems temporary (i.e. it's worth trying again later)
// otherwise, returns Ok
pub async fn web_parse(
nats: jetstream::Context,
db: DBConn,
driver: &WebDriver,
url: &str,
damping: f64,
) -> Result<(), ()> {
driver.delete_all_cookies().await.map_err(|_| ())?;
let robots_bucket = nats.get_key_value("robots").await;
let robots_bucket = if robots_bucket.is_err() {
let robots_bucket = nats
.create_key_value(kv::Config {
bucket: "robots".to_string(),
description: "storage of robots.txt data for given hosts".to_string(),
..Default::default()
})
.await;
if let Err(e) = robots_bucket {
error!("could not create robots.txt bucket: {}", e);
None
} else {
Some(robots_bucket.unwrap())
}
} else {
robots_bucket.ok()
};
let hosts_bucket = nats.get_key_value("hosts").await;
let hosts_bucket = if hosts_bucket.is_err() {
let hosts_bucket = nats
.create_key_value(kv::Config {
bucket: "hosts".to_string(),
description: "prevent the same host from being scraped too quickly".to_string(),
max_age: Duration::from_secs(60 * 10),
..Default::default()
})
.await;
if let Err(e) = hosts_bucket {
error!("could not create hosts bucket: {}", e);
return Err(());
} else {
hosts_bucket.unwrap()
}
} else {
hosts_bucket.unwrap()
};
let robots_url = get_robots_url(url);
if robots_url.is_err() {
error!("could not get a robots.txt url from {}, not crawling", url);
return Ok(());
}
let robots_url = robots_url.unwrap();
let mut hash = DefaultHasher::new();
hash.write(robots_url.as_bytes());
let hash = hash.finish();
if let Ok(Some(host)) = hosts_bucket.get(hash.to_string()).await {
let count = *host.first().unwrap_or(&0);
if count > 10 {
warn!("scraping {} too quickly, avoiding for one minute", robots_url);
return Err(());
}
hosts_bucket.put(hash.to_string(), vec![count + 1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
} else {
hosts_bucket.put(hash.to_string(), vec![1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
}
let mut skip_robots_check = false;
if let Some(robots_bucket) = &robots_bucket {
if let Ok(Some(entry)) = robots_bucket.get(hash.to_string()).await {
if let Ok(res) = allowed_to_crawl(entry.as_bytes(), url) {
if !res {
debug!("robots.txt does not allow us to crawl {}", url);
return Ok(());
} else {
skip_robots_check = true;
}
}
}
}
if !skip_robots_check {
// check manually
debug!("checking new robots.txt \"{}\"", robots_url);
let client = HttpClient::builder()
.redirect_policy(RedirectPolicy::Limit(10))
.timeout(Duration::from_secs(60))
.build();
if let Err(e) = client {
error!("could not create new robots.txt httpclient: {}", e);
return Err(());
}
let client = client.unwrap();
let request = isahc::Request::get(&robots_url)
.header("user-agent", USER_AGENT.as_str())
.body(());
if let Err(e) = request {
error!("could not create robots.txt get request: {}", e);
return Ok(());
}
let request = request.unwrap();
let response = client.send_async(request).await;
if let Err(e) = response {
warn!("could not get robots.txt page: {}", e);
return Err(());
}
let mut response = response.unwrap();
if response.status() == 429 {
// too many requests
warn!("too many requests for {}", robots_url);
return Err(());
}
if response.status().is_server_error() {
// don't crawl at the moment
debug!("not crawling {} due to server error", robots_url);
return Err(());
}
let mut body = "".to_string();
if let Err(e) = response.body_mut().read_to_string(&mut body).await {
warn!("could not read from robots.txt response: {}", e);
return Err(());
}
if let Ok(res) = allowed_to_crawl(body.as_bytes(), url) {
if let Some(robots_bucket) = &robots_bucket {
if let Err(e) = robots_bucket
.put(hash.to_string(), body.as_bytes().to_vec().into())
.await
{
warn!("could not put robots.txt data: {}", e);
}
}
if !res {
debug!("robots.txt does not allow us to crawl {}", url);
return Ok(());
} else {
// we're allowed to crawl!
}
}
}
let start = std::time::Instant::now();
debug!("handling request for {}", url);
// check for bad status codes
// fixme: i hate this solution, can we get something that actually checks the browser's request?
let client = HttpClient::builder()
.redirect_policy(RedirectPolicy::Limit(10))
.timeout(Duration::from_secs(60))
.build();
if let Err(e) = client {
error!("could not create new badstatuscode httpclient: {}", e);
return Err(());
}
let client = client.unwrap();
let request = isahc::Request::get(url)
.header("user-agent", USER_AGENT.as_str())
.body(());
if let Err(e) = request {
error!("could not create badstatuscode get request: {}", e);
return Ok(());
}
let request = request.unwrap();
let response = client.send_async(request).await;
if let Err(e) = response {
warn!("could not get badstatuscode page: {}", e);
return Err(());
}
let mut response = response.unwrap();
if response.status() == 429 {
// too many requests
warn!("too many requests for {}", url);
return Err(());
}
if response.status().is_server_error() || response.status().is_client_error() {
// don't crawl at the moment
debug!("not crawling {} due to bad status code {}", url, response.status());
return Err(());
}
// i guess we're good
driver.goto(url).await.map_err(|_| ())?;
let html_element = driver.find(By::Tag("html")).await.map_err(|_| ())?;
if let Some(lang) = html_element.attr("lang").await.ok().flatten() {
if !lang.starts_with("en") && !lang.starts_with("unknown") {
// i.e. non-english language
// fixme: remove this once we start expanding to non-english-speaking markets?
warn!("skipping {} due to {} language (currently prioritizing english", url, lang);
return Err(());
}
}
let meta_elements = driver.find_all(By::Tag("meta")).await.map_err(|_| ())?;
let title = driver.title().await.map_err(|_| ())?;
let mut description = None;
let mut keywords = vec![];
for elem in meta_elements {
if let Ok(Some(name)) = elem.attr("name").await {
match name.as_str() {
"description" => {
if let Ok(Some(content)) = elem.attr("content").await {
description = Some(content);
}
}
"keywords" => {
if let Ok(Some(content)) = elem.attr("content").await {
keywords = content
.split(',')
.map(|v| v.to_lowercase())
.filter(|v| !v.is_empty())
.collect();
}
}
_ => {}
}
}
}
let body = driver.find(By::Tag("body")).await.map_err(|_| ())?;
let raw_page_content = body.text().await.map_err(|_| ())?;
async fn gather_elements_with_multiplier(
driver: &WebDriver,
wordmap: &mut BTreeMap<String, f64>,
stops: &BTreeSet<&&str>,
elements: &[&str],
multiplier: f64,
) {
let mut elms = vec![];
for tag in elements {
elms.push(driver.find_all(By::Tag(*tag)).await);
}
let elms = elms.iter().flatten().flatten().collect::<Vec<_>>();
let mut sentences = vec![];
let mut sentence_set = BTreeSet::new();
debug!("processing elements...");
for node in elms {
let _ = node.scroll_into_view().await;
let boxmodel = node.rect().await;
if boxmodel.is_err() {
// not visible
continue;
}
let boxmodel = boxmodel.unwrap();
let current_text = node.text().await;
if current_text.is_err() {
// no text on this node
continue;
}
let current_text = current_text.unwrap().trim().to_string();
if current_text.is_empty() {
continue;
}
let sqs = (boxmodel.width * boxmodel.height).max(1.0); // no 0 divides pls (:
let ccount = current_text.chars().count() as f64;
let cssq = if ccount > 0.0 { sqs / ccount } else { 0.0 };
if sentence_set.contains(&current_text) {
continue;
}
sentence_set.insert(current_text.clone());
sentences.push((current_text, cssq));
}
for (sentence, cssq) in sentences {
let mut cssq = (cssq / 500.0).powi(2) * multiplier;
for word in sentence.split_whitespace() {
let word = word
.to_lowercase()
.trim_end_matches(|v: char| v.is_ascii_punctuation())
.to_string();
if stops.contains(&word.as_str()) {
// less valuable
cssq /= 100.0;
}
if let Some(wentry) = wordmap.get_mut(&word) {
*wentry += cssq;
} else {
if word.is_empty() {
continue;
}
wordmap.insert(word.to_string(), cssq);
}
}
}
}
let mut wordmap: BTreeMap<String, f64> = BTreeMap::new();
let stops: BTreeSet<_> = Spark::stopwords(Language::English)
.unwrap()
.iter()
.collect();
debug!("headers...");
gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["h1","h2","h3","h4","h5","h6"], 3.0)
.await;
debug!("paragraphs...");
gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["p","div"], 1.0).await;
let mut wordmap = wordmap.into_iter().collect::<Vec<_>>();
wordmap.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
let mut db_error_so_requeue_anyways = false;
let words = wordmap
.iter()
.map(|(word, _)| word.as_str())
.collect::<Vec<_>>();
#[allow(clippy::collapsible_if)]
if !words.is_empty() {
if wordstore::add_url_to_keywords(&db, &words, url)
.await
.is_err()
{
warn!("couldn't add {} to keywords!", url);
db_error_so_requeue_anyways = true;
}
}
let mut metawords = keywords.iter().map(|v| v.as_str()).collect::<Vec<_>>();
let desc2 = description.clone();
let desc2 = desc2.map(|v| {
v.to_lowercase()
.split_whitespace()
.map(String::from)
.collect::<Vec<_>>()
});
if let Some(description) = &desc2 {
for word in description {
let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation());
if word.is_empty() {
continue;
}
metawords.push(word);
}
}
#[allow(clippy::collapsible_if)]
if !metawords.is_empty() {
if metastore::add_url_to_metawords(&db, &metawords, url)
.await
.is_err()
{
warn!("couldn't add {} to metawords!", url);
db_error_so_requeue_anyways = true;
}
}
let mut titlewords = vec![];
let title2 = title.clone();
let title2 = title2.to_lowercase();
for word in title2.split_whitespace() {
let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation());
if word.is_empty() {
continue;
}
titlewords.push(word);
}
#[allow(clippy::collapsible_if)]
if !titlewords.is_empty() {
if titlestore::add_url_to_titlewords(&db, &titlewords, url)
.await
.is_err()
{
warn!("couldn't add {} to titlewords!", url);
db_error_so_requeue_anyways = true;
}
}
if sitestore::add_website(
&db,
url,
Some(title),
description,
if keywords.is_empty() {
None
} else {
Some(keywords)
},
&wordmap,
raw_page_content,
damping
)
.await
.is_err()
{
warn!("couldn't add {} to sitestore!", url);
db_error_so_requeue_anyways = true;
}
debug!("finished with main site stuff for {}", url);
let linkelms = driver.find_all(By::Tag("a")).await.map_err(|_| ())?;
for linkelm in linkelms {
if linkelm.scroll_into_view().await.is_err() {
debug!("couldn't scroll into view!");
}
let href = linkelm.prop("href").await.map_err(|_| ())?;
if href.is_none() {
debug!("no href!");
continue;
}
let href = href.unwrap();
if href.contains('#') {
continue;
}
let linktext = linkelm.text().await.map_err(|_| ())?.to_lowercase();
let linkimgs = linkelm.find_all(By::Tag("img")).await.map_err(|_| ())?;
let mut alts = "".to_string();
for img in linkimgs {
if let Ok(Some(alt)) = img.attr("alt").await {
alts.push_str(&alt);
alts.push(' ');
}
}
let alts = alts.trim().to_lowercase();
let mut linkwords = vec![];
for word in linktext.split_whitespace() {
let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation());
linkwords.push(word);
}
for word in alts.split_whitespace() {
let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation());
linkwords.push(word);
}
#[allow(clippy::collapsible_if)]
if !linkwords.is_empty() {
if linkstore::add_url_to_linkwords(&db, &linkwords, &href).await.is_err() {
warn!("couldn't add {} to linkwords!", url);
}
}
if linkrelstore::a_linksto_b(&db, url, &href).await.is_err() {
warn!("couldn't perform a_linksto_b (a {url} b {href})");
}
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest {
url: href,
damping: 0.85,
}).unwrap().into()).await.unwrap();
}
let elapsed = start.elapsed().as_secs_f64();
debug!("crawled {} in {} seconds", url, elapsed);
Ok(())
}