Compare commits

...
Sign in to create a new pull request.

39 commits

Author SHA1 Message Date
822ea728a5 initial benchmark mode impl
Some checks failed
/ build-all-services (push) Failing after 3m34s
could do with some more in-depth info, but will suffice for now
2025-03-31 15:04:37 -07:00
b8a338d9ba Merge pull request 'feature: Implement AUTH_URL for authservice emails' (#10) from feature/evie/T116 into develop
All checks were successful
/ build-all-services (push) Successful in 9m24s
Reviewed-on: #10
2025-03-24 13:09:43 -07:00
c2e28c3791 Merge pull request 'fix: Fix incorrect background for classic theme image search' (#9) from fix/evie/T154 into develop
All checks were successful
/ build-all-services (push) Successful in 9m39s
Reviewed-on: #9
2025-03-24 11:49:18 -07:00
050d45ee2e Merge pull request 'feature: Implement random theme and theme enum' (#8) from features/evie/T155 into develop
Some checks failed
/ build-all-services (push) Has been cancelled
Reviewed-on: #8
2025-03-24 11:48:00 -07:00
39dfa6816e
feature: Implement AUTH_URL for authservice emails
All checks were successful
/ build-all-services (push) Successful in 9m14s
Fixes #T116
2025-03-20 16:59:31 -07:00
03314a53d1
fix: Fix incorrect background for classic theme image search
All checks were successful
/ build-all-services (push) Successful in 9m56s
Fixes #T154
2025-03-19 21:20:23 -07:00
1c6edf2039
revert: Revert changes to asklyphe-frontend/src/routes/admin.rs
All checks were successful
/ build-all-services (push) Successful in 9m51s
2025-03-19 21:03:11 -07:00
95ba628934
feature: Implement random theme and theme enum
Some checks failed
/ build-all-services (push) Has been cancelled
For task T155
2025-03-19 20:58:23 -07:00
96478fb5d2 Merge pull request 'vorebot timeouts seem stable' (#7) from feature/nikocs/vorebot-tweaks into develop
Some checks failed
/ build-all-services (push) Has been cancelled
Reviewed-on: #7
2025-03-17 10:40:53 -07:00
74c65d993d timeout if crawling takes too long
All checks were successful
/ build-all-services (push) Successful in 9m27s
default is 5 minutes before the task gets killed, in the future we
should probably have a more graceful shutdown method
2025-03-16 16:07:47 -07:00
9237d1a048 Merge pull request 'bring latest develop changes into vorebot-tweaks' (#6) from develop into feature/nikocs/vorebot-tweaks
All checks were successful
/ build-all-services (push) Successful in 9m26s
Reviewed-on: #6
2025-03-16 15:50:02 -07:00
4be68e4ba1 Merge pull request 'store crawler link relations' (#5) from feature/nikocs/vorebot-store-link-relations into develop
All checks were successful
/ build-all-services (push) Successful in 9m39s
Reviewed-on: #5
2025-03-16 15:49:22 -07:00
2463986e8d await ack
All checks were successful
/ build-all-services (push) Successful in 9m17s
2025-03-14 18:49:27 -07:00
38ef5a45f6 use suggested route
All checks were successful
/ build-all-services (push) Successful in 9m32s
2025-03-14 18:38:43 -07:00
0f6f0deb9c linkrelstore + site suggestion cli
All checks were successful
/ build-all-services (push) Successful in 9m21s
logs href relations in database, allowing for pagerank calculation in
the future.

sites can now be added to the queue by running ./vorebot <url> [damping]
default damping is 0.45 for suggested sites
2025-03-14 18:20:15 -07:00
9c159c7170 Merge pull request 'bring over latest vorebot tweaks' (#4) from feature/nikocs/vorebot-tweaks into develop
All checks were successful
/ build-all-services (push) Successful in 9m13s
Reviewed-on: #4
2025-03-14 18:03:28 -07:00
d341c66390 prioritize english
All checks were successful
/ build-all-services (push) Successful in 9m36s
2025-03-14 14:29:46 -07:00
c754338bf4 favor newer hostnames
All checks were successful
/ build-all-services (push) Successful in 9m30s
2025-03-14 12:13:45 -07:00
56d07057c9 Merge pull request 'proper proxy support for vorebot' (#3) from feature/nikocs/vorebot-proxy into develop
Some checks failed
/ build-all-services (push) Has been cancelled
Reviewed-on: #3
2025-03-14 12:12:16 -07:00
384464bdbc preemptive fix
All checks were successful
/ build-all-services (push) Successful in 9m20s
2025-03-13 19:45:46 -07:00
d45f13f030 add proxy support to vorebot
All checks were successful
/ build-all-services (push) Successful in 9m6s
2025-03-13 19:23:59 -07:00
927ce9d3ed Merge pull request 'add password support to vorebot' (#2) from feature/nikocs/nats-password-auth into develop
All checks were successful
/ build-all-services (push) Successful in 9m18s
Reviewed-on: #2
2025-03-13 19:21:39 -07:00
17054c0a9c add password support to vorebot
All checks were successful
/ build-all-services (push) Successful in 9m33s
2025-03-13 19:10:55 -07:00
b6f5ae177a Merge pull request 'CI' (#1) from feature/nikocs/ci into develop
All checks were successful
/ build-all-services (push) Successful in 9m19s
Reviewed-on: #1
2025-03-12 21:23:45 -07:00
83053c57a8 remove searchservice from workspace (old)
All checks were successful
/ build-all-services (push) Successful in 9m19s
2025-03-12 21:12:06 -07:00
1ad6dad9eb fix foundationdb dependency
Some checks failed
/ build-all-services (push) Has been cancelled
2025-03-12 21:09:35 -07:00
7de4cdc0d6 install clang
Some checks failed
/ build-all-services (push) Failing after 3m36s
2025-03-12 21:00:34 -07:00
ce60a00868 oops
Some checks failed
/ build-all-services (push) Failing after 2m31s
2025-03-12 20:57:28 -07:00
9f87a327e0 please
Some checks failed
/ build-all-services (push) Failing after 8s
2025-03-12 20:57:00 -07:00
f3ee26f24b nodejs, not node
Some checks failed
/ build-all-services (push) Failing after 4s
2025-03-12 20:56:32 -07:00
d6509cccbe changes in order for container to work
Some checks failed
/ build-all-services (push) Failing after 4s
2025-03-12 20:56:09 -07:00
1dc0a399a0 change to v3
Some checks failed
/ build-all-services (push) Failing after 14s
2025-03-12 20:52:56 -07:00
30b3c2352e rearrange some stuff
Some checks failed
/ build-all-services (push) Failing after 4s
2025-03-12 20:50:59 -07:00
949a49e5de nvm
Some checks failed
/ build-all-services (push) Failing after 47s
2025-03-12 20:49:01 -07:00
a0b31d3326 change image to docker.io
Some checks failed
/ build-all-services (push) Failing after 0s
2025-03-12 20:48:33 -07:00
a16782617a change image
Some checks failed
/ build-all-services (push) Failing after 1s
2025-03-12 20:48:05 -07:00
73463a8d21 initial CI impl
Some checks failed
/ build-all-services (push) Failing after 0s
2025-03-12 15:21:58 -07:00
906c2ed8df bring over new crawler and lyphedb 2025-03-12 12:52:24 -07:00
b1202cc889 bring over changes from develop repos 2025-03-12 12:36:37 -07:00
51 changed files with 3717 additions and 269 deletions

View file

@ -0,0 +1,16 @@
inputs:
service-name:
description: 'name of the service to build and upload'
required: true
runs:
using: "composite"
steps:
- run: |
mkdir -pv artifacts
cargo build --release --bin ${{ inputs.service-name }}
mv target/release/${{ inputs.service-name }} artifacts/
shell: bash
- uses: actions/upload-artifact@v3
with:
name: "${{ inputs.service-name }}"
path: artifacts/

View file

@ -0,0 +1,39 @@
on: [push]
jobs:
build-all-services:
runs-on: docker
container:
image: rust:1-bookworm
steps:
- run: |
apt-get update -qq
apt-get install -y -qq nodejs git clang
- uses: actions/checkout@v3
- id: asklyphe-auth-frontend
uses: ./.forgejo/build-service
with:
service-name: "asklyphe-auth-frontend"
- id: asklyphe-frontend
uses: ./.forgejo/build-service
with:
service-name: "asklyphe-frontend"
- id: authservice
uses: ./.forgejo/build-service
with:
service-name: "authservice"
- id: bingservice
uses: ./.forgejo/build-service
with:
service-name: "bingservice"
- id: googleservice
uses: ./.forgejo/build-service
with:
service-name: "googleservice"
- id: lyphedb
uses: ./.forgejo/build-service
with:
service-name: "lyphedb"
- id: vorebot
uses: ./.forgejo/build-service
with:
service-name: "vorebot"

995
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,2 +1,2 @@
[workspace]
members = ["asklyphe-common", "asklyphe-frontend", "searchservice", "asklyphe-auth-frontend", "unit_converter", "authservice", "authservice/migration", "authservice/entity", "bingservice", "googleservice"]
members = ["asklyphe-common", "asklyphe-frontend", "asklyphe-auth-frontend", "unit_converter", "authservice", "authservice/migration", "authservice/entity", "bingservice", "googleservice", "vorebot", "lyphedb", "lyphedb/ldbtesttool"]

View file

@ -11,10 +11,16 @@ license-file = "LICENSE"
tokio = { version = "1.0", features = ["full"] }
chrono = "0.4.31"
serde = { version = "1.0", features = ["derive"] }
foundationdb = { version = "0.8.0", features = ["embedded-fdb-include"] }
lyphedb = { path = "../lyphedb" }
foundationdb = { version = "0.8.0", features = ["embedded-fdb-include"], optional = true }
log = "0.4.20"
rmp-serde = "1.1.2"
futures = "0.3.30"
async-nats = "0.38.0"
ulid = "1.1.0"
rand = "0.8.5"
rand = "0.8.5"
percent-encoding = "2.3.1"
sha-rs = "0.1.0"
[features]
default = []

View file

@ -0,0 +1,112 @@
use crate::ldb::{construct_path, hash, DBConn};
use log::{error, warn};
use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy};
pub const INCOMINGSTORE: &str = "incomingstore";
pub const OUTGOINGSTORE: &str = "outgoingstore";
pub async fn a_linksto_b(db: &DBConn, a: &str, b: &str) -> Result<(), ()> {
let key_sets = vec![
(
construct_path(&[OUTGOINGSTORE, &hash(a)])
.as_bytes()
.to_vec(),
a.as_bytes().to_vec(),
),
(
construct_path(&[OUTGOINGSTORE, &hash(a), &hash(b)])
.as_bytes()
.to_vec(),
b.as_bytes().to_vec(),
),
(
construct_path(&[INCOMINGSTORE, &hash(b)])
.as_bytes()
.to_vec(),
b.as_bytes().to_vec(),
),
(
construct_path(&[INCOMINGSTORE, &hash(b), &hash(a)])
.as_bytes()
.to_vec(),
a.as_bytes().to_vec(),
),
];
let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(
KVList { kvs: key_sets },
PropagationStrategy::OnRead,
));
match db.query(cmd).await {
LDBNatsMessage::Success => Ok(()),
LDBNatsMessage::BadRequest => {
error!("bad request for a_linksto_b");
Err(())
}
LDBNatsMessage::NotFound => {
error!("not found for a_linksto_b");
Err(())
}
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}
pub async fn what_links_to_this(db: &DBConn, url: &str) -> Result<Vec<String>, ()> {
let path = construct_path(&[INCOMINGSTORE, &hash(url)])
.as_bytes()
.to_vec();
let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { key: path }));
match db.query(cmd).await {
LDBNatsMessage::Entries(kvs) => Ok(kvs
.kvs
.into_iter()
.map(|v| String::from_utf8_lossy(&v.1).to_string())
.collect()),
LDBNatsMessage::Success => {
warn!("lyphedb responded with \"success\" to what_links_to_this, treating as error");
Err(())
}
LDBNatsMessage::BadRequest => {
warn!("bad request for what_links_to_this");
Err(())
}
LDBNatsMessage::NotFound => Ok(vec![]),
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}
pub async fn what_links_from_this(db: &DBConn, url: &str) -> Result<Vec<String>, ()> {
let path = construct_path(&[OUTGOINGSTORE, &hash(url)])
.as_bytes()
.to_vec();
let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { key: path }));
match db.query(cmd).await {
LDBNatsMessage::Entries(kvs) => Ok(kvs
.kvs
.into_iter()
.map(|v| String::from_utf8_lossy(&v.1).to_string())
.collect()),
LDBNatsMessage::Success => {
warn!("lyphedb responded with \"success\" to what_links_from_this, treating as error");
Err(())
}
LDBNatsMessage::BadRequest => {
warn!("bad request for what_links_from_this");
Err(())
}
LDBNatsMessage::NotFound => Ok(vec![]),
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}

View file

@ -0,0 +1,63 @@
use log::{error, warn};
use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy};
use crate::ldb::{construct_path, hash, DBConn};
pub const LINKSTORE: &str = "linkstore";
pub async fn add_url_to_linkwords(db: &DBConn, linkwords: &[&str], url: &str) -> Result<(), ()> {
let mut key_sets = Vec::new();
for linkword in linkwords {
let path = construct_path(&[LINKSTORE, linkword, &hash(url)]).as_bytes().to_vec();
key_sets.push((path, url.as_bytes().to_vec()));
}
let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(KVList {
kvs: key_sets,
}, PropagationStrategy::OnRead));
match db.query(cmd).await {
LDBNatsMessage::Success => {
Ok(())
}
LDBNatsMessage::BadRequest => {
error!("bad request for add_url_to_linkwords");
Err(())
}
LDBNatsMessage::NotFound => {
error!("not found for add_url_to_linkwords");
Err(())
}
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}
pub async fn get_urls_from_linkword(db: &DBConn, keyword: &str) -> Result<Vec<String>, ()> {
let path = construct_path(&[LINKSTORE, keyword]).as_bytes().to_vec();
let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory {
key: path,
}));
match db.query(cmd).await {
LDBNatsMessage::Entries(kvs) => {
Ok(kvs.kvs.into_iter().map(|v| String::from_utf8_lossy(&v.1).to_string()).collect())
}
LDBNatsMessage::Success => {
warn!("lyphedb responded with \"success\" to get_urls_from_linkwords, treating as error");
Err(())
}
LDBNatsMessage::BadRequest => {
warn!("bad request for get_urls_from_linkwords");
Err(())
}
LDBNatsMessage::NotFound => {
Ok(vec![])
}
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}

View file

@ -0,0 +1,63 @@
use log::{error, warn};
use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy};
use crate::ldb::{construct_path, hash, DBConn};
pub const METASTORE: &str = "metastore";
pub async fn add_url_to_metawords(db: &DBConn, metawords: &[&str], url: &str) -> Result<(), ()> {
let mut key_sets = Vec::new();
for metaword in metawords {
let path = construct_path(&[METASTORE, metaword, &hash(url)]).as_bytes().to_vec();
key_sets.push((path, url.as_bytes().to_vec()));
}
let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(KVList {
kvs: key_sets,
}, PropagationStrategy::OnRead));
match db.query(cmd).await {
LDBNatsMessage::Success => {
Ok(())
}
LDBNatsMessage::BadRequest => {
error!("bad request for add_url_to_metawords");
Err(())
}
LDBNatsMessage::NotFound => {
error!("not found for add_url_to_metawords");
Err(())
}
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}
pub async fn get_urls_from_metaword(db: &DBConn, metaword: &str) -> Result<Vec<String>, ()> {
let path = construct_path(&[METASTORE, metaword]).as_bytes().to_vec();
let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory {
key: path,
}));
match db.query(cmd).await {
LDBNatsMessage::Entries(kvs) => {
Ok(kvs.kvs.into_iter().map(|v| String::from_utf8_lossy(&v.1).to_string()).collect())
}
LDBNatsMessage::Success => {
warn!("lyphedb responded with \"success\" to get_urls_from_metawords, treating as error");
Err(())
}
LDBNatsMessage::BadRequest => {
warn!("bad request for get_urls_from_metawords");
Err(())
}
LDBNatsMessage::NotFound => {
Ok(vec![])
}
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}

View file

@ -0,0 +1,63 @@
pub mod wordstore;
pub mod sitestore;
pub mod linkstore;
pub mod metastore;
pub mod titlestore;
pub mod linkrelstore;
use std::hash::{DefaultHasher, Hasher};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use futures::StreamExt;
use log::warn;
use lyphedb::LDBNatsMessage;
use percent_encoding::{percent_encode, AsciiSet, CONTROLS, NON_ALPHANUMERIC};
use sha_rs::{Sha, Sha256};
static NEXT_REPLY_ID: AtomicU64 = AtomicU64::new(0);
pub const NOT_ALLOWED_ASCII: AsciiSet = CONTROLS.add(b' ').add(b'/').add(b'.').add(b'\\');
pub fn hash(str: &str) -> String {
let hasher = Sha256::new();
hasher.digest(str.as_bytes())
}
pub fn construct_path(path_elements: &[&str]) -> String {
let mut buf = String::new();
buf.push_str("ASKLYPHE/");
for el in path_elements {
buf.push_str(&percent_encode(el.as_bytes(), &NOT_ALLOWED_ASCII).to_string());
buf.push('/');
}
buf.pop();
buf
}
#[derive(Clone)]
pub struct DBConn {
nats: async_nats::Client,
name: String,
}
impl DBConn {
pub fn new(nats: async_nats::Client, name: impl ToString) -> DBConn {
DBConn { nats, name: name.to_string() }
}
pub async fn query(&self, message: LDBNatsMessage) -> LDBNatsMessage {
let data = rmp_serde::to_vec(&message).unwrap();
let replyto = format!("ldb-reply-{}", NEXT_REPLY_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst));
let mut subscriber = self.nats.subscribe(replyto.clone()).await.expect("NATS ERROR");
self.nats.publish_with_reply(self.name.clone(), replyto, data.into()).await.expect("NATS ERROR");
if let Some(reply) = subscriber.next().await {
let reply = rmp_serde::from_slice::<LDBNatsMessage>(&reply.payload);
if reply.is_err() {
warn!("DECODED BAD MESSAGE FROM LYPHEDB: {}", reply.err().unwrap());
return LDBNatsMessage::NotFound;
}
return reply.unwrap();
}
LDBNatsMessage::NotFound
}
}

View file

@ -0,0 +1,178 @@
use crate::ldb::{construct_path, hash, DBConn};
use log::{error, warn};
use lyphedb::{KVList, KeyDirectory, KeyList, LDBNatsMessage, LypheDBCommand, PropagationStrategy};
pub const SITESTORE: &str = "sitestore";
pub const TITLE: &str = "title";
pub const DESCRIPTION: &str = "desc";
pub const KEYWORDS: &str = "keywords";
pub const PAGE_TEXT_RANKING: &str = "ptranks";
pub const PAGE_TEXT_RAW: &str = "textraw";
pub const DAMPING: &str = "damping";
pub async fn add_website(
db: &DBConn,
url: &str,
title: Option<String>,
description: Option<String>,
keywords: Option<Vec<String>>,
page_text_ranking: &[(String, f64)],
page_text_raw: String,
damping: f64,
) -> Result<(), ()> {
let keyurl = hash(url);
let mut kvs = vec![
(
construct_path(&[SITESTORE, &keyurl]).as_bytes().to_vec(),
url.as_bytes().to_vec(),
),
(
construct_path(&[SITESTORE, &keyurl, PAGE_TEXT_RANKING])
.as_bytes()
.to_vec(),
rmp_serde::to_vec(page_text_ranking).unwrap(),
),
(
construct_path(&[SITESTORE, &keyurl, PAGE_TEXT_RAW])
.as_bytes()
.to_vec(),
page_text_raw.as_bytes().to_vec(),
),
(
construct_path(&[SITESTORE, &keyurl, DAMPING])
.as_bytes()
.to_vec(),
damping.to_be_bytes().to_vec(),
),
];
if let Some(title) = title {
kvs.push((
construct_path(&[SITESTORE, &keyurl, TITLE]).as_bytes().to_vec(),
title.as_bytes().to_vec(),
))
}
if let Some(description) = description {
kvs.push((
construct_path(&[SITESTORE, &keyurl, DESCRIPTION])
.as_bytes()
.to_vec(),
description.as_bytes().to_vec(),
))
}
if let Some(keywords) = keywords {
kvs.push((
construct_path(&[SITESTORE, &keyurl, KEYWORDS])
.as_bytes()
.to_vec(),
rmp_serde::to_vec(&keywords).unwrap(),
))
}
let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(
KVList { kvs },
PropagationStrategy::OnRead,
));
match db.query(cmd).await {
LDBNatsMessage::Success => Ok(()),
LDBNatsMessage::BadRequest => {
error!("bad request for add_website");
Err(())
}
LDBNatsMessage::NotFound => {
error!("not found for add_website");
Err(())
}
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}
#[derive(Default, Debug, Clone)]
pub struct WebsiteData {
pub title: Option<String>,
pub description: Option<String>,
pub keywords: Option<Vec<String>>,
pub page_text_ranking: Vec<(String, f64)>,
pub page_text_raw: String,
pub damping: f64,
}
pub async fn get_website(db: &DBConn, url: &str) -> Result<WebsiteData, ()> {
let keyurl = hash(url);
let keys = [
construct_path(&[SITESTORE, &keyurl, TITLE]).as_bytes().to_vec(),
construct_path(&[SITESTORE, &keyurl, DESCRIPTION]).as_bytes().to_vec(),
construct_path(&[SITESTORE, &keyurl, KEYWORDS]).as_bytes().to_vec(),
construct_path(&[SITESTORE, &keyurl, PAGE_TEXT_RANKING]).as_bytes().to_vec(),
construct_path(&[SITESTORE, &keyurl, PAGE_TEXT_RAW]).as_bytes().to_vec(),
construct_path(&[SITESTORE, &keyurl, DAMPING]).as_bytes().to_vec(),
].to_vec();
let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeys(KeyList { keys }));
match db.query(cmd).await {
LDBNatsMessage::Entries(kvlist) => {
let mut data = WebsiteData::default();
for (key, value) in kvlist.kvs {
let key = String::from_utf8_lossy(&key).to_string();
match key.as_str() {
_ if key.ends_with(TITLE) => {
data.title = Some(String::from_utf8_lossy(&value).to_string());
}
_ if key.ends_with(DESCRIPTION) => {
data.description = Some(String::from_utf8_lossy(&value).to_string());
}
_ if key.ends_with(KEYWORDS) => {
let deser = rmp_serde::from_slice::<Vec<String>>(&value);
if let Err(e) = deser {
error!("bad keywords entry for {}, deser error: {:?}", key, e);
} else {
data.keywords = Some(deser.unwrap());
}
}
_ if key.ends_with(PAGE_TEXT_RANKING) => {
let deser = rmp_serde::from_slice::<Vec<(String, f64)>>(&value);
if let Err(e) = deser {
error!("bad page_text_ranking entry for {}, deser error: {:?}", key, e);
} else {
data.page_text_ranking = deser.unwrap();
}
}
_ if key.ends_with(PAGE_TEXT_RAW) => {
data.page_text_raw = String::from_utf8_lossy(&value).to_string();
}
_ if key.ends_with(DAMPING) => {
data.damping = f64::from_be_bytes(value.try_into().unwrap_or(0.85f64.to_be_bytes()));
}
_ => {
warn!("encountered weird returned key for get_website");
}
}
}
Ok(data)
}
LDBNatsMessage::Success => {
warn!("lyphedb responded with \"success\" to get_website, treating as error");
Err(())
}
LDBNatsMessage::BadRequest => {
error!("bad request for get_website");
Err(())
}
LDBNatsMessage::NotFound => {
warn!("not found for get_website");
Err(())
}
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}

View file

@ -0,0 +1,63 @@
use log::{error, warn};
use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy};
use crate::ldb::{construct_path, hash, DBConn};
pub const TITLESTORE: &str = "titlestore";
pub async fn add_url_to_titlewords(db: &DBConn, titlewords: &[&str], url: &str) -> Result<(), ()> {
let mut key_sets = Vec::new();
for titleword in titlewords {
let path = construct_path(&[TITLESTORE, titleword, &hash(url)]).as_bytes().to_vec();
key_sets.push((path, url.as_bytes().to_vec()));
}
let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(KVList {
kvs: key_sets,
}, PropagationStrategy::OnRead));
match db.query(cmd).await {
LDBNatsMessage::Success => {
Ok(())
}
LDBNatsMessage::BadRequest => {
error!("bad request for add_url_to_titlewords");
Err(())
}
LDBNatsMessage::NotFound => {
error!("not found for add_url_to_titlewords");
Err(())
}
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}
pub async fn get_urls_from_titleword(db: &DBConn, titleword: &str) -> Result<Vec<String>, ()> {
let path = construct_path(&[TITLESTORE, titleword]).as_bytes().to_vec();
let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory {
key: path,
}));
match db.query(cmd).await {
LDBNatsMessage::Entries(kvs) => {
Ok(kvs.kvs.into_iter().map(|v| String::from_utf8_lossy(&v.1).to_string()).collect())
}
LDBNatsMessage::Success => {
warn!("lyphedb responded with \"success\" to get_urls_from_titlewords, treating as error");
Err(())
}
LDBNatsMessage::BadRequest => {
warn!("bad request for get_urls_from_titlewords");
Err(())
}
LDBNatsMessage::NotFound => {
Ok(vec![])
}
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}

View file

@ -0,0 +1,64 @@
use log::{error, warn};
use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy};
use crate::ldb::{construct_path, hash, DBConn};
pub const WORDSTORE: &str = "wordstore";
pub async fn add_url_to_keywords(db: &DBConn, keywords: &[&str], url: &str) -> Result<(), ()> {
let mut key_sets = Vec::new();
for keyword in keywords {
let path = construct_path(&[WORDSTORE, keyword, &hash(url)]).as_bytes().to_vec();
let data = url.as_bytes().to_vec();
key_sets.push((path, data));
}
let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(KVList {
kvs: key_sets,
}, PropagationStrategy::OnRead));
match db.query(cmd).await {
LDBNatsMessage::Success => {
Ok(())
}
LDBNatsMessage::BadRequest => {
error!("bad request for add_url_to_keywords");
Err(())
}
LDBNatsMessage::NotFound => {
error!("not found for add_url_to_keywords");
Err(())
}
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}
pub async fn get_urls_from_keyword(db: &DBConn, keyword: &str) -> Result<Vec<String>, ()> {
let path = construct_path(&[WORDSTORE, keyword]).as_bytes().to_vec();
let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory {
key: path,
}));
match db.query(cmd).await {
LDBNatsMessage::Entries(kvs) => {
Ok(kvs.kvs.into_iter().map(|v| String::from_utf8_lossy(&v.1).to_string()).collect())
}
LDBNatsMessage::Success => {
warn!("lyphedb responded with \"success\" to get_urls_from_keywords, treating as error");
Err(())
}
LDBNatsMessage::BadRequest => {
warn!("bad request for get_urls_from_keywords");
Err(())
}
LDBNatsMessage::NotFound => {
Ok(vec![])
}
_ => {
warn!("lyphedb sent weird message as response, treating as error");
Err(())
}
}
}

View file

@ -12,8 +12,12 @@
*/
pub mod nats;
#[cfg(feature = "foundationdb")]
pub mod db;
pub mod ldb;
pub use lyphedb;
#[cfg(feature = "foundationdb")]
pub use foundationdb;
pub fn add(left: usize, right: usize) -> usize {

View file

@ -18,7 +18,7 @@ pub const VOREBOT_NEWHOSTNAME_SERVICE: &str = "websiteparse_highpriority";
pub const VOREBOT_SUGGESTED_SERVICE: &str = "websiteparse_highestpriority";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebParseRequest {
pub struct CrawlRequest {
pub url: String,
pub damping_factor: f32,
}
pub damping: f64,
}

View file

@ -36,5 +36,7 @@ once_cell = "1.19.0"
chrono = "0.4.33"
rand = "0.8.5"
url_encoded_data = "0.6.1"
strum = "0.27.1"
strum_macros = "0.27.1"
env_logger = "*"

View file

@ -343,14 +343,45 @@ pub async fn admin_invitecode(
}
let active_codes = match list_invite_codes(nats.clone(), token.clone(), false).await {
Ok(v) => v,
Ok(mut v) => {
for v in &mut v {
if let Some(used_by) = &v.used_by {
if used_by.len() > 32 {
v.used_by = Some(format!("{}...", &used_by[0..32]));
}
}
if v.creator.len() > 32 {
v.creator = format!("{}...", &v.creator[0..32]);
}
}
v
},
Err(e) => {
return e.into_response();
}
};
let used_codes = match list_invite_codes(nats.clone(), token.clone(), true).await {
Ok(v) => v.into_iter().map(|mut v| if v.used_at.is_none() { v.used_at = Some(String::from("unset")); v } else { v }).collect(),
Ok(v) => v.into_iter().map(|mut v| {
if let Some(used_by) = &v.used_by {
if used_by.len() > 32 {
v.used_by = Some(format!("{}...", &used_by[0..32]));
}
}
if v.creator.len() > 32 {
v.creator = format!("{}...", &v.creator[0..32]);
}
if v.used_at.is_none() {
v.used_at = Some(String::from("unset"));
v
} else {
v
}
}).collect(),
Err(e) => {
return e.into_response();
}
@ -626,4 +657,4 @@ pub async fn admin_user_list(
} else {
Redirect::to("/").into_response()
}
}
}

View file

@ -27,6 +27,7 @@ use tracing::{debug, error};
use tracing::log::warn;
use crate::{Opts, ALPHA, BUILT_ON, GIT_COMMIT, VERSION, YEAR};
use crate::routes::index::FrontpageAnnouncement;
use crate::routes::Themes;
#[derive(Serialize, Debug)]
struct FullAnnouncement {
@ -96,7 +97,7 @@ pub struct AnnouncementTemplate {
built_on: String,
year: String,
alpha: bool,
theme: String,
theme: Themes,
announcement: FullAnnouncement,
}
@ -109,10 +110,10 @@ pub async fn announcement_full(Extension(nats): Extension<Arc<jetstream::Context
built_on: BUILT_ON.to_string(),
year: YEAR.to_string(),
alpha: ALPHA,
theme: "default".to_string(),
theme: Themes::Default,
announcement,
}.into_response()
} else {
StatusCode::NOT_FOUND.into_response()
}
}
}

View file

@ -30,7 +30,7 @@ use tokio::sync::Mutex;
use tracing::error;
use tracing::log::warn;
use crate::{BUILT_ON, GIT_COMMIT, Opts, ALPHA, VERSION, WEBSITE_COUNT, YEAR};
use crate::routes::{authenticate_user, UserInfo};
use crate::routes::{authenticate_user, Themes, UserInfo};
#[derive(Serialize, Debug)]
pub struct FrontpageAnnouncement {
@ -102,7 +102,7 @@ pub fn frontpage_error(error: &str, auth_url: String) -> FrontpageTemplate {
year: YEAR.to_string(),
alpha: ALPHA,
count: WEBSITE_COUNT.load(Ordering::Relaxed),
theme: "default".to_string(),
theme: Themes::Default,
announcement: None,
}
}
@ -185,7 +185,7 @@ pub struct FrontpageTemplate {
year: String,
alpha: bool,
count: u64,
theme: String,
theme: Themes,
announcement: Option<FrontpageAnnouncement>,
}
@ -202,7 +202,7 @@ pub async fn frontpage(
year: YEAR.to_string(),
alpha: ALPHA,
count: WEBSITE_COUNT.load(Ordering::Relaxed),
theme: "default".to_string(),
theme: Themes::Default,
announcement,
}
}
@ -217,7 +217,7 @@ struct IndexTemplate {
year: String,
alpha: bool,
count: u64,
theme: String,
theme: Themes,
announcement: Option<FrontpageAnnouncement>,
}
@ -234,7 +234,7 @@ pub async fn index(
return (jar.remove("token"), frontpage_error(e.as_str(), opts.auth_url.clone())).into_response();
}
};
let theme = info.theme.clone();
let theme = info.get_theme();
let announcement = latest_announcement(nats.clone()).await;
@ -260,7 +260,7 @@ pub async fn index(
year: YEAR.to_string(),
alpha: ALPHA,
count: WEBSITE_COUNT.load(Ordering::Relaxed),
theme: "default".to_string(),
theme: Themes::Default,
announcement,
}.into_response()
}

View file

@ -10,7 +10,8 @@
*
* You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::fmt::Display;
use std::str::FromStr;
use std::sync::Arc;
use askama::Template;
use askama_axum::IntoResponse;
@ -21,7 +22,13 @@ use asklyphe_common::nats::comms::ServiceResponse;
use async_nats::jetstream;
use axum::http::StatusCode;
use serde::Serialize;
use tracing::error;
use strum::IntoEnumIterator;
use strum_macros::EnumIter;
use time::macros::utc_datetime;
use time::{OffsetDateTime, UtcDateTime};
use tracing::{debug, error};
const RANDOM_THEME_EPOCH: UtcDateTime = utc_datetime!(2025-03-19 00:00);
pub mod search;
pub mod index;
@ -30,7 +37,102 @@ pub mod user_settings;
pub mod admin;
pub mod announcement;
#[derive(Serialize)]
#[derive(Default, EnumIter, PartialEq, Eq, Copy, Clone)]
pub enum Themes {
Classic,
Dark,
#[default]
Default,
Freaky,
Gloss,
Oled,
Water,
Random
}
impl Themes {
pub fn get_all_themes() -> Vec<Themes> {
Self::iter().collect()
}
pub fn display_name(&self) -> String {
match self {
Themes::Classic => {
"classic".to_string()
}
Themes::Dark => {
"dark theme".to_string()
}
Themes::Default => {
"default theme".to_string()
}
Themes::Freaky => {
"freaky".to_string()
}
Themes::Gloss => {
"gloss".to_string()
}
Themes::Oled => {
"lights out".to_string()
}
Themes::Water => {
"water".to_string()
}
Themes::Random => {
"random".to_string()
}
}
}
pub fn internal_name(&self) -> String {
match self {
Themes::Classic => {
"classic".to_string()
}
Themes::Dark => {
"dark".to_string()
}
Themes::Default => {
"default".to_string()
}
Themes::Freaky => {
"freaky".to_string()
}
Themes::Gloss => {
"gloss".to_string()
}
Themes::Oled => {
"oled".to_string()
}
Themes::Water => {
"water".to_string()
}
Themes::Random => {
"random".to_string()
}
}
}
}
impl FromStr for Themes {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"classic" => Ok(Themes::Classic),
"dark" => Ok(Themes::Dark),
"default" => Ok(Themes::Default),
"freaky" => Ok(Themes::Freaky),
"gloss" => Ok(Themes::Gloss),
"oled" => Ok(Themes::Oled),
"water" => Ok(Themes::Water),
"random" => Ok(Themes::Random),
_ => Err(())
}
}
}
#[derive(Serialize, Clone)]
pub struct UserInfo {
pub username: String,
pub email: String,
@ -39,6 +141,27 @@ pub struct UserInfo {
pub administrator: bool,
}
impl UserInfo {
pub fn get_theme(&self) -> Themes {
let theme: Themes = self.theme.parse().unwrap_or_default();
if theme.eq(&Themes::Random) {
let possible_themes = Themes::get_all_themes();
let current_day = UtcDateTime::now();
let rand_value = (((current_day - RANDOM_THEME_EPOCH).as_seconds_f64() / 86400.0) % possible_themes.len() as f64) as usize;
*possible_themes.get(rand_value).unwrap_or(&Themes::Default)
} else {
theme
}
}
pub fn get_true_theme(&self) -> Themes {
self.theme.parse().unwrap()
}
}
pub async fn authenticate_user(nats: Arc<jetstream::Context>, token: String) -> Result<UserInfo, String> {
let response = comms::query_service(
comms::Query::AuthService(AuthServiceQuery {
@ -114,4 +237,4 @@ pub struct NotFoundTemplate;
pub async fn not_found() -> impl IntoResponse {
(StatusCode::NOT_FOUND, NotFoundTemplate).into_response()
}
}

View file

@ -12,7 +12,7 @@
*/
use crate::routes::index::frontpage_error;
use crate::routes::{authenticate_user, UserInfo};
use crate::routes::{authenticate_user, Themes, UserInfo};
use crate::searchbot::{gather_image_results, gather_search_results};
use crate::unit_converter;
use crate::unit_converter::UnitConversion;
@ -111,7 +111,7 @@ struct SearchTemplateJavascript {
built_on: String,
year: String,
alpha: bool,
theme: String,
theme: Themes,
}
pub async fn search_js(
@ -121,7 +121,7 @@ pub async fn search_js(
Extension(opts): Extension<Opts>,
) -> impl IntoResponse {
fn error_response(query: String, info: UserInfo, error: &str) -> SearchTemplateJavascript {
let theme = info.theme.clone();
let theme = info.get_theme();
let querystr = url_encoded_data::stringify(&[("q", query.as_str())]);
SearchTemplateJavascript {
info,
@ -175,7 +175,7 @@ pub async fn search_js(
query = query.replace("-complications", "");
}
let theme = info.theme.clone();
let theme = info.get_theme();
let querystr = url_encoded_data::stringify(&[("q", og_query.as_str())]);
SearchTemplateJavascript {
info,
@ -217,7 +217,7 @@ pub struct SearchTemplate {
pub built_on: String,
pub year: String,
pub alpha: bool,
pub theme: String,
pub theme: Themes,
}
pub async fn search_nojs(
@ -227,7 +227,7 @@ pub async fn search_nojs(
Extension(opts): Extension<Opts>,
) -> impl IntoResponse {
fn error_response(query: String, info: UserInfo, error: &str) -> SearchTemplate {
let theme = info.theme.clone();
let theme = info.get_theme();
let querystr = url_encoded_data::stringify(&[("q", query.as_str())]);
SearchTemplate {
info,
@ -416,7 +416,7 @@ pub struct ImageSearchTemplate {
pub built_on: String,
pub year: String,
pub alpha: bool,
pub theme: String,
pub theme: Themes,
}
pub async fn image_search(
jar: CookieJar,
@ -425,7 +425,7 @@ pub async fn image_search(
Extension(opts): Extension<Opts>,
) -> impl IntoResponse {
fn error_response(query: String, info: UserInfo, error: &str) -> ImageSearchTemplate {
let theme = info.theme.clone();
let theme = info.get_theme();
let querystr = url_encoded_data::stringify(&[("q", query.as_str())]);
ImageSearchTemplate {
info,

View file

@ -27,49 +27,12 @@ use serde::Deserialize;
use tokio::sync::Mutex;
use tracing::error;
use crate::{BUILT_ON, GIT_COMMIT, Opts, ALPHA, VERSION, WEBSITE_COUNT, YEAR};
use crate::routes::{authenticate_user, UserInfo};
pub struct Theme<'a> {
pub value: &'a str,
pub name: &'a str,
}
pub static THEMES: &[Theme] = &[
Theme {
value: "default",
name: "default theme",
},
Theme {
value: "dark",
name: "dark theme",
},
Theme {
value: "oled",
name: "lights out",
},
Theme {
value: "classic",
name: "classic",
},
Theme {
value: "freaky",
name: "freaky",
},
Theme {
value: "water",
name: "water",
},
Theme {
value: "gloss",
name: "gloss",
},
];
use crate::routes::{authenticate_user, Themes, UserInfo};
#[derive(Template)]
#[template(path = "user_settings.html")]
pub struct SettingsTemplate {
themes: &'static [Theme<'static>],
themes: Vec<Themes>,
error: Option<String>,
info: UserInfo,
@ -80,7 +43,8 @@ pub struct SettingsTemplate {
year: String,
alpha: bool,
count: u64,
theme: String,
theme: Themes,
true_theme: Themes,
}
pub async fn user_settings(
@ -96,11 +60,11 @@ pub async fn user_settings(
return (jar.remove("token"), crate::routes::index::frontpage_error(e.as_str(), opts.auth_url.clone())).into_response();
}
};
let theme = info.theme.clone();
let theme = info.get_theme();
SettingsTemplate {
themes: THEMES,
themes: Themes::get_all_themes(),
error: None,
info,
info: info.clone(),
search_query: "".to_string(),
version: VERSION.to_string(),
git_commit: GIT_COMMIT.to_string(),
@ -109,6 +73,7 @@ pub async fn user_settings(
alpha: ALPHA,
count: WEBSITE_COUNT.load(Ordering::Relaxed),
theme,
true_theme: info.get_true_theme(),
}.into_response()
} else {
Redirect::temporary("/").into_response()
@ -126,11 +91,11 @@ pub async fn theme_change_post(
Extension(opts): Extension<Opts>,
Form(input): Form<ThemeChangeForm>,
) -> impl IntoResponse {
fn settings_error(info: UserInfo, theme: String, error: String) -> impl IntoResponse {
fn settings_error(info: UserInfo, theme: Themes, error: String) -> impl IntoResponse {
SettingsTemplate {
themes: THEMES,
themes: Themes::get_all_themes(),
error: Some(error),
info,
info: info.clone(),
search_query: "".to_string(),
version: VERSION.to_string(),
git_commit: GIT_COMMIT.to_string(),
@ -139,6 +104,7 @@ pub async fn theme_change_post(
alpha: ALPHA,
count: WEBSITE_COUNT.load(Ordering::Relaxed),
theme,
true_theme: info.get_true_theme(),
}.into_response()
}
@ -151,10 +117,12 @@ pub async fn theme_change_post(
}
};
let theme = info.theme.clone();
if !THEMES.iter().map(|v| v.value.to_string()).collect::<Vec<String>>().contains(&input.theme.clone().unwrap_or("default".to_string())) {
return settings_error(info, theme, "invalid input, please try again!".to_string()).into_response();
let theme = info.get_theme();
if let Some(theme_input) = &input.theme {
if !Themes::get_all_themes().iter().map(|x| x.internal_name().to_string()).collect::<Vec<String>>().contains(&theme_input) {
return settings_error(info, theme.clone(), "invalid input, please try again!".to_string()).into_response();
}
}
let response = comms::query_service(comms::Query::AuthService(AuthServiceQuery {
@ -200,4 +168,4 @@ pub async fn theme_change_post(
} else {
Redirect::to("/").into_response()
}
}
}

View file

@ -341,7 +341,7 @@ pub async fn gather_search_results(nats: Arc<jetstream::Context>, query: &str, u
search_results.remove(rm - i);
}
let theme = user_info.theme.clone();
let theme = user_info.get_theme();
let querystr = url_encoded_data::stringify(&[("q", query)]);
SearchTemplate {
info: user_info,
@ -489,7 +489,7 @@ pub async fn gather_image_results(nats: Arc<jetstream::Context>, query: &str, us
result.src = format!("/imgproxy?{}", url);
}
let theme = user_info.theme.clone();
let theme = user_info.get_theme();
ImageSearchTemplate {
info: user_info,
error: None,

View file

@ -0,0 +1,11 @@
.pagecontent {
background: url("/static/snow.gif");
border: 3px inset black;
color: black;
}
@media screen and (max-width: 800px) {
.pagecontent {
width: calc(100% - 4px);
}
}

View file

@ -1,6 +1,6 @@
{% extends "admin/shell.html" %}
{% block title %}Home{% endblock %}
{% block title %}Invite Codes{% endblock %}
{% block head %}
{% endblock %}

View file

@ -3,8 +3,8 @@
{% block title %}the best search engine{% endblock %}
{% block head %}
<link rel="stylesheet" href="/static/themes/{{theme}}/frontpage.css"/>
<link rel="stylesheet" href="/static/themes/{{theme}}/inline-announcement.css"/>
<link rel="stylesheet" href="/static/themes/{{theme.internal_name()}}/frontpage.css"/>
<link rel="stylesheet" href="/static/themes/{{theme.internal_name()}}/inline-announcement.css"/>
{% endblock %}
{% block page %}

View file

@ -4,8 +4,8 @@
{% block head %}
<link rel="stylesheet" href="/static/themes/default/home.css"/>
<link rel="stylesheet" href="/static/themes/{{theme}}/home.css"/>
<link rel="stylesheet" href="/static/themes/{{theme}}/inline-announcement.css"/>
<link rel="stylesheet" href="/static/themes/{{theme.internal_name()}}/home.css"/>
<link rel="stylesheet" href="/static/themes/{{theme.internal_name()}}/inline-announcement.css"/>
{% endblock %}
{% block page %}
@ -71,4 +71,4 @@
</div>
{% include "ui/footer.html" %}
</div>
{% endblock %}
{% endblock %}

View file

@ -4,7 +4,7 @@
{% block head %}
<link rel="stylesheet" href="/static/themes/default/imagesearch.css"/>
<link rel="stylesheet" href="/static/themes/{{theme}}/imagesearch.css"/>
<link rel="stylesheet" href="/static/themes/{{theme.internal_name()}}/imagesearch.css"/>
{% if search_query == "notnite" %}<link rel="stylesheet" href="/static/creature.css"/>{% endif %}
{% endblock %}
@ -61,4 +61,4 @@
{% include "ui/footer.html" %}
</div>
{% endblock %}
{% endblock %}

View file

@ -4,7 +4,7 @@
{% block head %}
<link rel="stylesheet" href="/static/themes/default/search.css"/>
<link rel="stylesheet" href="/static/themes/{{theme}}/search.css"/>
<link rel="stylesheet" href="/static/themes/{{theme.internal_name()}}/search.css"/>
{% if search_query == "notnite" %}<link rel="stylesheet" href="/static/creature.css"/>{% endif %}
{% endblock %}
@ -97,4 +97,4 @@
{% include "ui/footer.html" %}
</div>
{% endblock %}
{% endblock %}

View file

@ -4,7 +4,7 @@
{% block head %}
<link rel="stylesheet" href="/static/themes/default/search.css"/>
<link rel="stylesheet" href="/static/themes/{{theme}}/search.css"/>
<link rel="stylesheet" href="/static/themes/{{theme.internal_name()}}/search.css"/>
{% if search_query == "notnite" %}<link rel="stylesheet" href="/static/creature.css"/>{% endif %}
<script src="/static/js/search.js" defer></script>
{% endblock %}
@ -76,4 +76,4 @@
{% include "ui/footer.html" %}
</div>
{% endblock %}
{% endblock %}

View file

@ -12,11 +12,11 @@
href="/static/osd.xml" />
<link rel="stylesheet" href="/static/themes/default/shell.css" />
<link rel="stylesheet" href="/static/themes/{{theme}}/shell.css" />
<link rel="stylesheet" href="/static/themes/{{theme.internal_name()}}/shell.css" />
{% block head %}{% endblock %}
</head>
<body>
{% block page %}{% endblock %}
</body>
</html>
</html>

View file

@ -4,7 +4,7 @@
{% block head %}
<link rel="stylesheet" href="/static/themes/default/settings.css"/>
<link rel="stylesheet" href="/static/themes/{{theme}}/settings.css"/>
<link rel="stylesheet" href="/static/themes/{{theme.internal_name()}}/settings.css"/>
{% endblock %}
{% block page %}
@ -43,16 +43,15 @@
<div class="settings-row">
<div id="theme" class="settings-section">
<h2>theme</h2>
{% for t in themes %}
{%if theme==t.value%}
<p>your current theme is: "{{t.name}}"</p>
{%endif%}
{% endfor %}
<p>your current theme is: "{{true_theme.display_name()}}"</p>
{% if true_theme.internal_name() != theme.internal_name() %}
<p>today's random theme is {{ theme.display_name() }}</p>
{% endif %}
<form action="/user_settings/set_theme" method="post">
<label for="theme-selector">theme</label>
<select name="theme" id="theme-selector">
{% for t in themes %}
<option value="{{t.value}}" {%if theme==t.value%}selected{%endif%}>{{t.name}}</option>
<option value="{{t.internal_name()}}" {%if true_theme.internal_name()==t.internal_name()%}selected{%endif%}>{{t.display_name()}}</option>
{% endfor %}
</select>
<button type="submit" id="theme-submit">change theme!</button>
@ -65,4 +64,4 @@
{% include "ui/footer.html" %}
</div>
{% endblock %}
{% endblock %}

View file

@ -457,6 +457,7 @@ pub async fn user_count(db: &DatabaseConnection) -> Result<usize, FetchUserError
/// returns the number of users in the database who are admins
pub async fn admin_count(db: &DatabaseConnection) -> Result<usize, FetchUserError> {
// dont fucking touch this, i don't know why it works but it does, it's actually evil
// note: doesn't work
Ok(user::Entity::find().filter(user::Column::Flags.into_expr().binary(BinOper::LShift, Expr::value(63 - 2)).lt(1 << (63 - 2)))
.count(db).await.map_err(|e| {
error!("DATABASE ERROR WHILE ADMINCOUNT: {e}");
@ -482,4 +483,4 @@ pub async fn email_list(db: &DatabaseConnection) -> Result<Vec<(String, String)>
})?.into_iter().map(|v| {
(v.username, v.email)
}).collect())
}
}

View file

@ -15,6 +15,7 @@ mod process;
pub mod db;
mod email;
use std::env;
use std::string::ToString;
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
@ -36,6 +37,8 @@ pub static SMTP_URL: Lazy<String> = Lazy::new(|| std::env::var("SMTP_URL").expec
pub static SMTP_USERNAME: Lazy<String> = Lazy::new(|| std::env::var("SMTP_USERNAME").expect("NO SMTP_USERNAME DEFINED"));
pub static SMTP_PASSWORD: Lazy<String> = Lazy::new(|| std::env::var("SMTP_PASSWORD").expect("NO SMTP_PASSWORD DEFINED"));
pub static AUTH_URL: Lazy<String> = Lazy::new(|| std::env::var("AUTH_URL").unwrap_or("https://auth.asklyphe.com".to_string()));
pub static PROCESSES_HANDLED: AtomicU64 = AtomicU64::new(0);
pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0);

View file

@ -21,7 +21,7 @@ use crate::db::{invite_code, session, user};
use crate::db::invite_code::ConsumeInviteCodeError;
use crate::db::session::{CreateSessionError, DeleteSessionError, FetchSessionError};
use crate::db::user::{CreateUserError, DeleteUserError, EmailChangeError, FetchUserError, RegisterVerificationCodeError, VerifyEmailPassComboError, VerifyVerificationCodeError};
use crate::email;
use crate::{email, AUTH_URL};
fn generate_email_verification_code() -> String {
rand::thread_rng()
@ -121,7 +121,7 @@ pub async fn register_request(db: &DatabaseConnection, request: RegisterRequest)
}
debug!("verification code for {} is \"{}\"", request.username, verification_code);
email::send_verification_code_email(&request.email, &request.username, format!("https://auth.asklyphe.com/verify?username={}&token={}", request.username, verification_code).as_str());
email::send_verification_code_email(&request.email, &request.username, format!("{}/verify?username={}&token={}", AUTH_URL.as_str(), request.username, verification_code).as_str());
RegisterResponse::Success
}

30
lyphedb/.gitignore vendored Normal file
View file

@ -0,0 +1,30 @@
# ---> Rust
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
# RustRover
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Added by cargo
/target
# don't commit employee test configs (:
husky_config.toml

18
lyphedb/Cargo.toml Normal file
View file

@ -0,0 +1,18 @@
[package]
name = "lyphedb"
version = "0.1.0"
edition = "2024"
[dependencies]
log = "0.4.20"
rmp-serde = "1.1.2"
futures = "0.3.30"
async-nats = "0.38.0"
tokio = { version = "1.0", features = ["full"] }
chrono = "0.4.31"
serde = { version = "1.0", features = ["derive"] }
rand = "0.9.0"
toml = "0.8.20"
env_logger = "0.11.6"
once_cell = "1.20.3"
percent-encoding = "2.3.1"

10
lyphedb/config.toml Normal file
View file

@ -0,0 +1,10 @@
name = "lyphedb-test"
write = true
master = "/lyphedb_master"
ram_limit = "1mb" # supported suffixes: b,kb,mb,gb
gc_limit = "512kb"
avg_entry_size = "1kb"
log = "debug"
nats_cert = "nats/nats.cert"
nats_key = "nats/nats.pem"
nats_url = "127.0.0.1:4222"

View file

@ -0,0 +1,14 @@
[package]
name = "ldbtesttool"
version = "0.1.0"
edition = "2024"
[dependencies]
rmp-serde = "1.1.2"
futures = "0.3.30"
async-nats = "0.38.0"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
lyphedb = { path = "../" }
toml = "0.8.20"
rand = "0.9.0"

View file

@ -0,0 +1,40 @@
use serde::Deserialize;
#[derive(Deserialize)]
struct ConfigFile {
pub name: String,
pub nats_cert: String,
pub nats_key: String,
pub nats_url: String,
}
#[derive(Clone)]
pub struct LypheDBConfig {
pub name: String,
pub nats_cert: String,
pub nats_key: String,
pub nats_url: String,
}
pub fn load_config(path: &str) -> LypheDBConfig {
let s = std::fs::read_to_string(path);
if let Err(e) = s {
panic!("failed to read config file: {}", e);
}
let s = s.unwrap();
let cnf = toml::from_str::<ConfigFile>(&s);
if let Err(e) = cnf {
panic!("failed to parse config file: {}", e);
}
let cnf = cnf.unwrap();
// quick checks and conversions
let mut config = LypheDBConfig {
name: cnf.name,
nats_cert: cnf.nats_cert,
nats_key: cnf.nats_key,
nats_url: cnf.nats_url,
};
config
}

View file

@ -0,0 +1,114 @@
use crate::config::load_config;
use lyphedb::{KVList, KeyList, LDBNatsMessage, LypheDBCommand, PropagationStrategy};
use std::collections::BTreeMap;
use async_nats::Message;
use futures::StreamExt;
use tokio::sync::mpsc;
mod config;
#[tokio::main]
async fn main() {
let config = load_config(&std::env::args().nth(1).expect("please specify config file"));
let nats = async_nats::ConnectOptions::new()
.add_client_certificate(
config.nats_cert.as_str().into(),
config.nats_key.as_str().into(),
)
.connect(config.nats_url.as_str())
.await;
if let Err(e) = nats {
eprintln!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e);
return;
}
let nats = nats.unwrap();
// test 1: create 10000 keys, send in chunks of 10
println!("test 1: create 10_000 keys, send in chunks of 100");
let (key_tx, mut key_rx) = mpsc::unbounded_channel();
let start = std::time::Instant::now();
let mut tasks = vec![];
for _ in 0..(10_000 / 100) {
let name = config.name.clone();
let nats = nats.clone();
let key_tx = key_tx.clone();
tasks.push(tokio::spawn(async move {
let mut keys = BTreeMap::new();
for _ in 0..100 {
let key = rand::random::<[u8; 16]>();
let data = rand::random::<[u8; 16]>();
key_tx.send((key, data)).unwrap();
keys.insert(key, data);
}
let data = rmp_serde::to_vec(&LDBNatsMessage::Command(LypheDBCommand::SetKeys(
KVList {
kvs: keys
.into_iter()
.map(|(k, v)| (k.to_vec(), v.to_vec()))
.collect(),
},
PropagationStrategy::Immediate,
)))
.unwrap();
let replyto_sub = "lyphedb_test_1".to_string();
let mut subscriber = nats.subscribe(replyto_sub.clone()).await.unwrap();
nats.publish_with_reply(name.clone(), replyto_sub, data.into()).await.unwrap();
if let Some(reply) = subscriber.next().await {
let reply = rmp_serde::from_slice::<LDBNatsMessage>(&reply.payload).unwrap();
if let LDBNatsMessage::Success = reply {} else {
eprintln!("failure");
}
}
}));
}
for task in tasks {
task.await.unwrap();
}
let end = std::time::Instant::now();
println!("test 1: {}ms", end.duration_since(start).as_millis());
let mut our_copy = BTreeMap::new();
let mut buffer = vec![];
key_rx.recv_many(&mut buffer, 10_000).await;
for (k, v) in buffer {
our_copy.insert(k, v);
}
// test 2: read back all keys and check for accuracy
println!("test 2: read back all keys and check for accuracy");
let kv: Vec<_> = our_copy.into_iter().map(|(k, v)| (k.to_vec(), v.to_vec())).collect();
let start = std::time::Instant::now();
let mut tasks = vec![];
for i in 0..100 {
let batch = kv[i * 100..((i + 1) * 100).min(kv.len())].to_vec();
let name = config.name.clone();
let nats = nats.clone();
tasks.push(tokio::spawn(async move {
let data = rmp_serde::to_vec(&LDBNatsMessage::Command(LypheDBCommand::GetKeys(
KeyList {
keys: batch.iter().map(|(k, _)| k.to_vec()).collect()
}
))).unwrap();
let replyto_sub = format!("lyphedb_test_2_{}", i);
let mut subscriber = nats.subscribe(replyto_sub.clone()).await.unwrap();
nats.publish_with_reply(name.clone(), replyto_sub, data.into()).await.unwrap();
if let Some(reply) = subscriber.next().await {
let reply = rmp_serde::from_slice::<LDBNatsMessage>(&reply.payload).unwrap();
if let LDBNatsMessage::Entries(kvlist) = reply {
// check all keys
for (k1, v1) in batch {
let v2 = kvlist.kvs.iter().find(|(k, v)| k == &k1).expect("key not found");
assert_eq!(v1, v2.1);
}
} else {
eprintln!("failure");
}
}
}));
}
for task in tasks {
task.await.unwrap();
}
let end = std::time::Instant::now();
println!("test 2: {}ms", end.duration_since(start).as_millis());
}

129
lyphedb/src/config.rs Normal file
View file

@ -0,0 +1,129 @@
use serde::Deserialize;
#[derive(Deserialize)]
struct ConfigFile {
pub name: String,
pub write: bool,
pub master: String,
pub ram_limit: String,
pub gc_limit: String,
pub avg_entry_size: String,
pub log: String,
pub nats_cert: String,
pub nats_key: String,
pub nats_url: String,
}
#[derive(Clone)]
pub struct LypheDBConfig {
pub name: String,
pub write: bool,
pub master: String,
pub ram_limit: usize,
pub gc_limit: usize,
pub avg_entry_size: usize,
pub log: String,
pub nats_cert: String,
pub nats_key: String,
pub nats_url: String,
}
pub fn load_config(path: &str) -> LypheDBConfig {
let s = std::fs::read_to_string(path);
if let Err(e) = s {
panic!("failed to read config file: {}", e);
}
let s = s.unwrap();
let cnf = toml::from_str::<ConfigFile>(&s);
if let Err(e) = cnf {
panic!("failed to parse config file: {}", e);
}
let cnf = cnf.unwrap();
// quick checks and conversions
let mut config = LypheDBConfig {
name: cnf.name,
write: cnf.write,
master: cnf.master,
ram_limit: 0,
gc_limit: 0,
avg_entry_size: 0,
log: cnf.log,
nats_cert: cnf.nats_cert,
nats_key: cnf.nats_key,
nats_url: cnf.nats_url,
};
if !(cnf.ram_limit.ends_with("b") &&
(
cnf.ram_limit.trim_end_matches("b").ends_with("k") ||
cnf.ram_limit.trim_end_matches("b").ends_with("m") ||
cnf.ram_limit.trim_end_matches("b").ends_with("g")
)
) {
panic!("invalid ram limit");
}
config.ram_limit = if cnf.ram_limit.ends_with("gb") {
cnf.ram_limit.trim_end_matches("gb").parse::<usize>().unwrap() * 1024 * 1024 * 1024
} else if cnf.ram_limit.ends_with("mb") {
cnf.ram_limit.trim_end_matches("mb").parse::<usize>().unwrap() * 1024 * 1024
} else if cnf.ram_limit.ends_with("kb") {
cnf.ram_limit.trim_end_matches("kb").parse::<usize>().unwrap() * 1024
} else {
cnf.ram_limit.trim_end_matches("b").parse::<usize>().unwrap()
};
if !(cnf.gc_limit.ends_with("b") &&
(
cnf.gc_limit.trim_end_matches("b").ends_with("k") ||
cnf.gc_limit.trim_end_matches("b").ends_with("m") ||
cnf.gc_limit.trim_end_matches("b").ends_with("g")
)
) {
panic!("invalid ram limit");
}
config.gc_limit = if cnf.gc_limit.ends_with("gb") {
cnf.gc_limit.trim_end_matches("gb").parse::<usize>().unwrap() * 1024 * 1024 * 1024
} else if cnf.gc_limit.ends_with("mb") {
cnf.gc_limit.trim_end_matches("mb").parse::<usize>().unwrap() * 1024 * 1024
} else if cnf.gc_limit.ends_with("kb") {
cnf.gc_limit.trim_end_matches("kb").parse::<usize>().unwrap() * 1024
} else {
cnf.gc_limit.trim_end_matches("b").parse::<usize>().unwrap()
};
if !(cnf.avg_entry_size.ends_with("b") &&
(
cnf.avg_entry_size.trim_end_matches("b").ends_with("k") ||
cnf.avg_entry_size.trim_end_matches("b").ends_with("m") ||
cnf.avg_entry_size.trim_end_matches("b").ends_with("g")
)
) {
panic!("invalid ram limit");
}
config.avg_entry_size = if cnf.avg_entry_size.ends_with("gb") {
cnf.avg_entry_size.trim_end_matches("gb").parse::<usize>().unwrap() * 1024 * 1024 * 1024
} else if cnf.avg_entry_size.ends_with("mb") {
cnf.avg_entry_size.trim_end_matches("mb").parse::<usize>().unwrap() * 1024 * 1024
} else if cnf.avg_entry_size.ends_with("kb") {
cnf.avg_entry_size.trim_end_matches("kb").parse::<usize>().unwrap() * 1024
} else {
cnf.avg_entry_size.trim_end_matches("b").parse::<usize>().unwrap()
};
if config.avg_entry_size > config.ram_limit {
panic!("avg entry size is larger than ram limit");
}
if config.gc_limit > config.ram_limit {
panic!("gc limit is larger than ram limit");
}
if config.log.is_empty() {
config.log = "info".to_string();
}
if config.log != "debug" && config.log != "info" && config.log != "warn" && config.log != "error" {
panic!("invalid log level");
}
config
}

321
lyphedb/src/dbimpl/mod.rs Normal file
View file

@ -0,0 +1,321 @@
use crate::config::LypheDBConfig;
use log::*;
use lyphedb::PropagationStrategy;
use once_cell::sync::Lazy;
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use percent_encoding::{percent_decode_str, percent_encode, AsciiSet, CONTROLS};
use tokio::sync::RwLock;
pub const NOT_ALLOWED_ASCII: AsciiSet = CONTROLS.add(b' ').add(b'/').add(b'.').add(b'\\');
pub type LDBCache = BTreeMap<Arc<Vec<u8>>, Arc<Vec<u8>>>;
/// Reads will read from this cache
pub static PRIMARY_CACHE: Lazy<RwLock<LDBCache>> =
Lazy::new(|| RwLock::new(BTreeMap::new()));
/// Writes will be written to this cache, and then depending on the propagation method, the
/// Primary cache will be set to a copy
pub static SECONDARY_CACHE: Lazy<RwLock<LDBCache>> =
Lazy::new(|| RwLock::new(BTreeMap::new()));
/// how often are keys accessed? this will influence the garbage collector
pub static KEY_ACCESS_COUNTER: Lazy<RwLock<BTreeMap<Arc<Vec<u8>>, AtomicU64>>> =
Lazy::new(|| RwLock::new(BTreeMap::new()));
pub fn key_to_path(config: &LypheDBConfig, key: &[u8]) -> PathBuf {
let mut path = PathBuf::new();
path.push(&config.master);
let mut fnbuf = Vec::new();
for (i, byte) in key.iter().enumerate() {
if *byte == b'/' {
let encoded = percent_encode(&fnbuf, &NOT_ALLOWED_ASCII).to_string();
path.push(encoded);
fnbuf.clear();
} else {
fnbuf.push(*byte);
}
}
if !fnbuf.is_empty() {
let encoded = percent_encode(&fnbuf, &NOT_ALLOWED_ASCII).to_string();
path.push(encoded);
fnbuf.clear();
}
path.push(".self");
path
}
#[derive(Debug)]
pub enum OperationError {
KeyCannotBeEmpty,
FilesystemPermissionError,
BadFilesystemEntry,
}
pub async fn set_key_disk(
config: &LypheDBConfig,
key: &[u8],
value: &[u8],
) -> Result<(), OperationError> {
if key.is_empty() {
return Err(OperationError::KeyCannotBeEmpty);
}
let path = key_to_path(config, key);
let directory = path.parent().ok_or(OperationError::KeyCannotBeEmpty)?;
if let Ok(directory_exists) = directory.try_exists() {
if !directory_exists {
std::fs::create_dir_all(directory).map_err(|e| {
warn!("couldn't create directory: {:?}", e);
OperationError::FilesystemPermissionError
})?;
}
} else {
return Err(OperationError::FilesystemPermissionError);
}
std::fs::write(path, value).map_err(|e| {
warn!("couldn't write file: {:?}", e);
OperationError::FilesystemPermissionError
})?;
Ok(())
}
pub async fn delete_key_disk(
config: &LypheDBConfig,
key: &[u8],
) -> Result<(), OperationError> {
if key.is_empty() {
return Err(OperationError::KeyCannotBeEmpty);
}
let path = key_to_path(config, key);
if let Ok(exists) = path.try_exists() {
if !exists {
return Ok(());
}
}
std::fs::remove_file(path).map_err(|e| {
warn!("couldn't remove file: {:?}", e);
OperationError::FilesystemPermissionError
})
}
pub async fn get_key_disk(
config: &LypheDBConfig,
key: &[u8],
) -> Result<Option<Vec<u8>>, OperationError> {
if key.is_empty() {
return Err(OperationError::KeyCannotBeEmpty);
}
let path = key_to_path(config, key);
if let Ok(exists) = path.try_exists() {
if !exists {
return Ok(None);
}
}
let data = std::fs::read(path).map_err(|e| {
warn!("couldn't read file: {:?}", e);
OperationError::FilesystemPermissionError
})?;
Ok(Some(data))
}
/// this function allows empty keys, so you can get all keys under root by doing
/// get_keys_under_keydir_disk(..., b"")
pub async fn get_keys_under_keydir_disk(
config: &LypheDBConfig,
keydir: &[u8],
) -> Result<Vec<Vec<u8>>, OperationError> {
let path = key_to_path(config, keydir);
let path = path.parent().expect("bad master");
let mut keys = Vec::new();
for entry in std::fs::read_dir(path).map_err(|e| {
warn!("couldn't read directory: {:?}", e);
OperationError::FilesystemPermissionError
})? {
let entry = entry.map_err(|e| {
warn!("couldn't read directory entry: {:?}", e);
OperationError::FilesystemPermissionError
})?;
let path = entry.path();
let filename = path
.to_str()
.ok_or(OperationError::FilesystemPermissionError)?;
if filename.ends_with(".self") {
// this is a value file, ignore
continue;
}
let filename = filename.trim_start_matches(&config.master);
let key = percent_decode_str(filename).collect();
keys.push(key);
}
Ok(keys)
}
pub async fn set_key(
config: LypheDBConfig,
key: &[u8],
value: &[u8],
strat: &PropagationStrategy,
) -> Result<(), OperationError> {
let k1 = Arc::new(key.to_vec());
let v1 = Arc::new(value.to_vec());
let disk_task = {
let k1 = k1.clone();
let v1 = v1.clone();
tokio::spawn(async move { set_key_disk(&config, &k1, &v1).await })
};
let prop_task = match strat {
PropagationStrategy::Immediate => {
let k1 = k1.clone();
let v1 = v1.clone();
tokio::spawn(async move {
let mut secondary_cache = SECONDARY_CACHE.write().await;
secondary_cache.insert(k1, v1);
let mut primary_cache = PRIMARY_CACHE.write().await;
*primary_cache = secondary_cache.clone();
})
}
PropagationStrategy::Timeout => {
let k1 = k1.clone();
let v1 = v1.clone();
tokio::spawn(async move {
tokio::spawn(async move {
{
let mut secondary_cache = SECONDARY_CACHE.write().await;
secondary_cache.insert(k1, v1);
}
tokio::spawn(async move {
let start = std::time::Instant::now();
loop {
if start.elapsed().as_secs() > 60 {
break;
}
let pc = PRIMARY_CACHE.try_write();
if pc.is_err() {
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
continue;
} else {
break;
}
}
let mut primary_cache = PRIMARY_CACHE.write().await;
let secondary_cache = SECONDARY_CACHE.read().await;
*primary_cache = secondary_cache.clone();
});
});
})
}
PropagationStrategy::OnRead => {
let k1 = k1.clone();
tokio::spawn(async move {
{
let mut secondary_cache = SECONDARY_CACHE.write().await;
secondary_cache.remove(&k1);
}
{
let mut primary_cache = PRIMARY_CACHE.write().await;
primary_cache.remove(&k1);
}
})
}
};
if let Ok(Err(e)) = disk_task.await {
error!("couldn't set key on disk: {:?} ({:?})", e, key);
// undo propagation
prop_task.abort();
{
let mut primary_cache = PRIMARY_CACHE.write().await;
primary_cache.remove(&k1);
}
{
let mut secondary_cache = SECONDARY_CACHE.write().await;
secondary_cache.remove(&k1);
}
return Err(e);
}
let _ = prop_task.await;
Ok(())
}
pub async fn get_key(config: LypheDBConfig, key: &[u8]) -> Result<Option<Vec<u8>>, OperationError> {
let k1 = Arc::new(key.to_vec());
{
let k1 = k1.clone();
tokio::spawn(async move {
let mut access_counter = KEY_ACCESS_COUNTER.write().await;
let counter = access_counter.entry(k1.clone()).or_insert(AtomicU64::new(0));
if counter.load(Ordering::Relaxed) > 10000000 {
return;
}
counter.fetch_add(1, Ordering::SeqCst);
});
}
let disk_task = {
let k1 = k1.clone();
tokio::spawn(async move { get_key_disk(&config, &k1).await })
};
{
// read from cache
let cache = PRIMARY_CACHE.read().await;
if let Some(val) = cache.get(&k1) {
disk_task.abort();
return Ok(Some(val.to_vec()));
}
}
//debug!("cache miss");
if let Ok(result) = disk_task.await {
if let Ok(Some(val)) = &result {
let val = Arc::new(val.to_vec());
tokio::spawn(async move {
{
let mut cache = SECONDARY_CACHE.write().await;
cache.insert(k1.clone(), val.clone());
}
{
let secondary_cache = SECONDARY_CACHE.read().await;
let mut primary_cache = PRIMARY_CACHE.write().await;
*primary_cache = secondary_cache.clone();
}
//debug!("cache insert");
});
}
result
} else {
Err(OperationError::FilesystemPermissionError)
}
}
pub async fn delete_key(config: LypheDBConfig, key: &[u8]) -> Result<(), OperationError> {
let k1 = Arc::new(key.to_vec());
let disk_task = {
let k1 = k1.clone();
tokio::spawn(async move { delete_key_disk(&config, &k1).await })
};
let prop_task = {
let k1 = k1.clone();
tokio::spawn(async move {
{
let mut key_access_counter = KEY_ACCESS_COUNTER.write().await;
key_access_counter.remove(&k1);
}
{
let mut secondary_cache = SECONDARY_CACHE.write().await;
secondary_cache.remove(&k1);
}
let mut primary_cache = PRIMARY_CACHE.write().await;
let secondary_cache = SECONDARY_CACHE.read().await;
*primary_cache = secondary_cache.clone();
})
};
prop_task.await.expect("couldn't delete key");
disk_task.await.expect("couldn't delete key")
}

49
lyphedb/src/lib.rs Normal file
View file

@ -0,0 +1,49 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum LDBNatsMessage {
Command(LypheDBCommand),
Entries(KVList),
Count(u64),
Success,
BadRequest,
NotFound,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum PropagationStrategy {
/// Reads will immediately be able to read this key's value as soon as it has been set
Immediate,
/// The value change will be queued along with others,
/// then in a period of either inactivity or a maximum time, all changes will be immediately
/// seen at once
Timeout,
/// The key will be removed from the cache, and thus the next read will cause a cache miss,
/// and the value will be loaded from disk
OnRead,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum LypheDBCommand {
SetKeys(KVList, PropagationStrategy),
GetKeys(KeyList),
CountKeys(KeyDirectory),
/// NOT RECURSIVE
GetKeyDirectory(KeyDirectory),
DeleteKeys(KeyList),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KVList {
pub kvs: Vec<(Vec<u8>, Vec<u8>)>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KeyList {
pub keys: Vec<Vec<u8>>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KeyDirectory {
pub key: Vec<u8>,
}

248
lyphedb/src/main.rs Normal file
View file

@ -0,0 +1,248 @@
/*
keys are stored on disk like this
<master>/<keyname>/.self
so if i stored the key "/this/is/a/key" with the data "hello world", it'd look like this
<master>/this/is/a/key/.self -> "hello world"
*/
use crate::config::{LypheDBConfig, load_config};
use crate::dbimpl::{KEY_ACCESS_COUNTER, PRIMARY_CACHE, SECONDARY_CACHE};
use futures::StreamExt;
use log::{debug, error, info, warn};
use lyphedb::{KVList, LDBNatsMessage, LypheDBCommand};
mod config;
mod dbimpl;
pub async fn gc_thread(config: LypheDBConfig) {
loop {
tokio::time::sleep(std::time::Duration::from_secs(61)).await;
{
let cache = PRIMARY_CACHE.read().await;
let cache_size = cache.len() * config.avg_entry_size;
if cache_size > config.gc_limit {
debug!("gc triggered, cache size: {} bytes", cache_size);
let keycount_to_remove = cache.len() - config.gc_limit / config.avg_entry_size;
drop(cache);
let mut least_freq_keys = vec![];
for (key, count) in KEY_ACCESS_COUNTER.read().await.iter() {
let count = count.load(std::sync::atomic::Ordering::Relaxed);
if least_freq_keys.len() < keycount_to_remove {
least_freq_keys.push((key.clone(), count));
} else {
for (other, oc) in least_freq_keys.iter_mut() {
if count < *oc {
*other = key.clone();
*oc = count;
break;
}
}
}
}
let mut cache = SECONDARY_CACHE.write().await;
for (key, _) in least_freq_keys.iter() {
cache.remove(key);
}
let mut primary_cache = PRIMARY_CACHE.write().await;
*primary_cache = cache.clone();
debug!(
"gc finished, cache size: {} bytes",
primary_cache.len() * config.avg_entry_size
);
}
}
}
}
pub async fn precache_keys_until_limit(config: LypheDBConfig) {
info!("precache started");
let mut precache_count = 0;
let mut precache_stack = dbimpl::get_keys_under_keydir_disk(&config, b"")
.await
.expect("couldn't get root keys");
while let Some(precache_key) = precache_stack.pop() {
{
let cache = PRIMARY_CACHE.read().await;
if cache.len() * config.avg_entry_size > config.gc_limit {
break;
}
if cache.len() % 10000 == 0 {
info!("precache size: {} mb", (cache.len() * config.avg_entry_size) / (1024*1024));
}
}
let children = dbimpl::get_keys_under_keydir_disk(&config, &precache_key)
.await
.expect("couldn't get children of key");
if !children.is_empty() {
precache_stack.extend(children);
} else {
let _value = dbimpl::get_key(config.clone(), &precache_key)
.await
.expect("couldn't get value of key");
precache_count += 1;
}
}
info!("precache finished, {} values precached", precache_count);
}
#[tokio::main]
async fn main() {
let config = load_config(&std::env::args().nth(1).expect("please specify config file"));
let logger = env_logger::builder()
.filter_level(match config.log.as_str() {
"debug" => log::LevelFilter::Debug,
"info" => log::LevelFilter::Info,
"warn" => log::LevelFilter::Warn,
"error" => log::LevelFilter::Error,
_ => unreachable!(),
})
.build();
log::set_boxed_logger(Box::new(logger)).unwrap();
log::set_max_level(match config.log.as_str() {
"debug" => log::LevelFilter::Debug,
"info" => log::LevelFilter::Info,
"warn" => log::LevelFilter::Warn,
"error" => log::LevelFilter::Error,
_ => unreachable!(),
});
info!("lyphedb started");
let nats = async_nats::ConnectOptions::new()
.add_client_certificate(
config.nats_cert.as_str().into(),
config.nats_key.as_str().into(),
)
.connect(config.nats_url.as_str())
.await;
if let Err(e) = nats {
error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e);
return;
}
let nats = nats.unwrap();
let mut subscriber = nats
.queue_subscribe(config.name.clone(), "lyphedb".to_string())
.await
.expect("couldn't subscribe to subject");
info!("nats connected");
tokio::spawn(precache_keys_until_limit(config.clone()));
tokio::spawn(gc_thread(config.clone()));
while let Some(msg) = subscriber.next().await {
let config = config.clone();
let nats = nats.clone();
tokio::spawn(async move {
async fn bad_request(nats: &async_nats::Client, replyto: async_nats::Subject) {
let reply = rmp_serde::to_vec(&LDBNatsMessage::BadRequest).unwrap();
if let Err(e) = nats.publish(replyto, reply.into()).await {
warn!("couldn't send reply: {:?}", e);
}
}
let data = msg.payload.to_vec();
let data = rmp_serde::from_slice::<LDBNatsMessage>(&data);
if let Err(e) = data {
warn!("couldn't deserialize message: {:?}", e);
if let Some(replyto) = msg.reply {
bad_request(&nats, replyto).await;
}
return;
}
let data = data.unwrap();
match data {
LDBNatsMessage::Command(cmd) => match cmd {
LypheDBCommand::SetKeys(kvlist, propstrat) => {
for (key, value) in kvlist.kvs {
if let Err(e) =
dbimpl::set_key(config.clone(), &key, &value, &propstrat).await
{
warn!("couldn't set key: {:?}", e);
}
}
let reply = rmp_serde::to_vec(&LDBNatsMessage::Success).unwrap();
if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await {
warn!("couldn't send reply: {:?}", e);
}
}
LypheDBCommand::GetKeys(klist) => {
let mut reply = Vec::new();
for key in klist.keys {
if let Ok(Some(value)) = dbimpl::get_key(config.clone(), &key).await {
reply.push((key, value));
} else {
warn!("couldn't get key: {:?}", key);
}
}
let reply =
rmp_serde::to_vec(&LDBNatsMessage::Entries(KVList { kvs: reply }))
.unwrap();
if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await {
warn!("couldn't send reply: {:?}", e);
}
}
LypheDBCommand::CountKeys(keydir) => {
let keys = dbimpl::get_keys_under_keydir_disk(&config, &keydir.key)
.await
.expect("couldn't get keys under keydir");
let mut count = 0;
for key in keys {
let value = dbimpl::get_key(config.clone(), &key)
.await
.expect("couldn't get value of key");
if value.is_some() {
count += 1;
}
}
let reply = rmp_serde::to_vec(&LDBNatsMessage::Count(count)).unwrap();
if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await {
warn!("couldn't send reply: {:?}", e);
}
}
LypheDBCommand::GetKeyDirectory(keydir) => {
let mut reply = Vec::new();
let keys = dbimpl::get_keys_under_keydir_disk(&config, &keydir.key)
.await
.expect("couldn't get keys under keydir");
for key in keys {
let value = dbimpl::get_key(config.clone(), &key)
.await
.expect("couldn't get value of key");
if let Some(value) = value {
reply.push((key, value));
}
}
let reply =
rmp_serde::to_vec(&LDBNatsMessage::Entries(KVList { kvs: reply }))
.unwrap();
if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await {
warn!("couldn't send reply: {:?}", e);
}
}
LypheDBCommand::DeleteKeys(klist) => {
for key in klist.keys {
if let Err(e) = dbimpl::delete_key(config.clone(), &key).await {
warn!("couldn't delete key: {:?}", e);
}
}
let reply = rmp_serde::to_vec(&LDBNatsMessage::Success).unwrap();
if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await {
warn!("couldn't send reply: {:?}", e);
}
}
},
_ => {
warn!("bad request, not command");
if let Some(replyto) = msg.reply {
bad_request(&nats, replyto).await;
}
}
}
});
}
info!("lyphedb shutting down");
}

View file

@ -8,7 +8,7 @@ license-file = "LICENSE"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
asklyphe-common = { path = "../asklyphe-common" }
asklyphe-common = { path = "../asklyphe-common", features = ["foundationdb"] }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
rmp-serde = "1.1.2"

View file

@ -16,7 +16,7 @@ use astro_float::{BigFloat, RoundingMode};
use once_cell::sync::Lazy;
use std::collections::BTreeMap;
pub const PRECISION: usize = 1024;
pub const PRECISION: usize = 2048;
// length unit -> value in meters
pub static LENGTH_STORE: Lazy<BTreeMap<LengthUnit, BigFloat>> = Lazy::new(|| {

View file

@ -15,7 +15,7 @@ use crate::length_units::{LengthUnit, LENGTH_NAMES, LENGTH_STORE, PRECISION};
use crate::unit_defs::{ConvertTo, MetricPrefix};
use astro_float::{BigFloat, Consts, Radix, RoundingMode};
pub const MAX_PRECISION: usize = 1024;
pub const MAX_PRECISION: usize = 2048;
pub mod length_units;
pub mod unit_defs;

1
vorebot/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

32
vorebot/Cargo.toml Normal file
View file

@ -0,0 +1,32 @@
[package]
name = "vorebot"
version = "0.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
asklyphe-common = { path = "../asklyphe-common" }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
rand = "0.8.5"
rmp-serde = "1.1.2"
base64 = "0.21.7"
image = "0.24.8"
isahc = "1.7.2"
ulid = "1.0.0"
async-nats = "0.38.0"
futures = "0.3.28"
chrono = "0.4.26"
once_cell = "1.18.0"
env_logger = "0.10.0"
log = "0.4.19"
mutex-timeouts = { version = "0.3.0", features = ["tokio"] }
prometheus_exporter = "0.8.5"
thirtyfour = "0.35.0"
stopwords = "0.1.1"
texting_robots = "0.2.2"
[features]
default = []
benchmark = []

407
vorebot/src/main.rs Normal file
View file

@ -0,0 +1,407 @@
mod webparse;
use asklyphe_common::nats::vorebot::{
VOREBOT_NEWHOSTNAME_SERVICE, VOREBOT_SERVICE, VOREBOT_SUGGESTED_SERVICE,
};
use async_nats::jetstream;
use async_nats::jetstream::consumer::PullConsumer;
use async_nats::jetstream::stream::RetentionPolicy;
use chrono::TimeZone;
use futures::StreamExt;
use log::{debug, error, info, warn};
use mutex_timeouts::tokio::MutexWithTimeoutAuto as Mutex;
use once_cell::sync::Lazy;
use prometheus_exporter::prometheus::core::{AtomicF64, GenericGauge};
use prometheus_exporter::prometheus::{register_counter, register_gauge, Counter};
use std::cmp::max;
use std::collections::{BTreeMap, BTreeSet};
use std::hash::{DefaultHasher, Hasher};
use std::io::Read;
use std::iter::Iterator;
use std::str::FromStr;
use std::string::ToString;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use async_nats::jetstream::kv;
use stopwords::{Language, Spark, Stopwords};
use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver};
use thirtyfour::common::capabilities::firefox::FirefoxPreferences;
use tokio::task::JoinHandle;
#[cfg(not(feature = "benchmark"))]
use asklyphe_common::ldb::DBConn;
use asklyphe_common::nats::vorebot::CrawlRequest;
use crate::webparse::web_parse;
pub static NATS_URL: Lazy<String> =
Lazy::new(|| std::env::var("NATS_URL").expect("NO NATS URL DEFINED"));
pub static NATS_PASSWORD: Lazy<Option<String>> = Lazy::new(|| {
std::env::var("NATS_PASSWORD").ok()
});
pub static NATS_CERT: Lazy<String> = Lazy::new(|| std::env::var("NATS_CERT").expect("NO NATS_CERT DEFINED"));
pub static NATS_KEY: Lazy<String> = Lazy::new(|| std::env::var("NATS_KEY").expect("NO NATS_KEY DEFINED"));
pub static BROWSER_THREADS: Lazy<Vec<String>> =
Lazy::new(|| std::env::var("BROWSER_THREADS").expect("PLEASE LIST BROWSER_THREADS").split(',').map(|v| v.to_string()).collect());
pub static BROWSER_PROXY: Lazy<Option<String>> = Lazy::new(|| {
std::env::var("BROWSER_PROXY").ok()
});
pub static DB_NAME: Lazy<String> =
Lazy::new(|| std::env::var("DB_NAME").expect("PLEASE ADD DB_NAME"));
// in minutes
const DEFAULT_CRAWLER_TIMEOUT: u64 = 5;
pub static CRAWLER_TIMEOUT: Lazy<u64> =
Lazy::new(|| std::env::var("CRAWLER_TIMEOUT").map(|v| v.parse::<u64>().ok()).ok().flatten().unwrap_or(DEFAULT_CRAWLER_TIMEOUT));
pub static DOCUMENTS_CRAWLED: AtomicU64 = AtomicU64::new(0);
pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0);
pub static LAST_TASK_COMPLETE: Lazy<Vec<Arc<AtomicI64>>> = Lazy::new(|| {
let max_threads: usize = BROWSER_THREADS.len();
let mut vals = vec![];
for i in 0..max_threads {
// let db = Database::default().expect("couldn't connect to foundation db!");
// DBS.lock().await.push(Arc::new(db));
vals.push(Arc::new(AtomicI64::new(chrono::Utc::now().timestamp())));
}
vals
});
pub static USER_AGENT: Lazy<String> = Lazy::new(|| {
format!(
"Vorebot/{} (compatible; Googlebot/2.1; +https://voremicrocomputers.com/crawler.html)",
env!("CARGO_PKG_VERSION")
)
});
#[cfg(not(feature = "benchmark"))]
#[tokio::main]
async fn main() {
mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
env_logger::init();
info!("began at {}", chrono::Utc::now().to_string());
let mut nats = async_nats::ConnectOptions::new();
if let Some(password) = NATS_PASSWORD.as_ref() {
nats = nats.user_and_password("vorebot".to_string(), password.to_string());
} else {
nats = nats.add_client_certificate(NATS_CERT.as_str().into(), NATS_KEY.as_str().into());
}
let nats = nats.connect(NATS_URL.as_str())
.await;
if let Err(e) = nats {
error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e);
return;
}
let nats = nats.unwrap();
let dbconn = DBConn::new(nats.clone(), DB_NAME.to_string());
let nats = jetstream::new(nats);
// fixme: remove this once we have proper site suggestion interface
let args = std::env::args().collect::<Vec<_>>();
if let Some(suggestion_site) = args.get(1) {
let damping = args.get(2).map(|v| v.parse::<f64>().expect("BAD FP")).unwrap_or(0.45);
warn!("suggesting {} with damping {}", suggestion_site, damping);
let ack = nats.publish(VOREBOT_SUGGESTED_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest {
url: suggestion_site.to_string(),
damping,
}).unwrap().into()).await.unwrap();
ack.await.expect("FATAL ERROR");
return;
}
let mut tasks: Vec<(JoinHandle<()>, String, Arc<AtomicU64>)> = vec![];
let mut available_browsers: Vec<String> = BROWSER_THREADS.clone();
{
loop {
while tasks.len() < BROWSER_THREADS.len() {
let nats = nats.clone();
let browser = available_browsers.pop().expect("NO BROWSERS LEFT, THIS IS A FATAL BUG!");
let db = dbconn.clone();
let b = browser.clone();
let last_parse = Arc::new(AtomicU64::new(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()));
let lp = last_parse.clone();
tasks.push((tokio::spawn(async move {
let browser = b;
info!("using {}", browser);
info!("crawler spawned");
/* normal priority */
let consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config {
name: VOREBOT_SERVICE.to_string(),
subjects: vec![VOREBOT_SERVICE.to_string()],
retention: RetentionPolicy::WorkQueue,
..Default::default()
}).await
.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!")
.get_or_create_consumer("parser", jetstream::consumer::pull::Config {
durable_name: Some("parser".to_string()),
filter_subject: VOREBOT_SERVICE.to_string(),
..Default::default()
}).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
let mut messages = consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
/* higher priority (new hostnames) */
let higher_consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config {
name: VOREBOT_NEWHOSTNAME_SERVICE.to_string(),
subjects: vec![VOREBOT_NEWHOSTNAME_SERVICE.to_string()],
retention: RetentionPolicy::WorkQueue,
..Default::default()
}).await
.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!")
.get_or_create_consumer("highparser", jetstream::consumer::pull::Config {
durable_name: Some("highparser".to_string()),
filter_subject: VOREBOT_NEWHOSTNAME_SERVICE.to_string(),
..Default::default()
}).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
let mut high_messages = higher_consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
/* highest priority (user-suggested) */
let highest_consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config {
name: VOREBOT_SUGGESTED_SERVICE.to_string(),
subjects: vec![VOREBOT_SUGGESTED_SERVICE.to_string()],
retention: RetentionPolicy::WorkQueue,
..Default::default()
}).await
.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!")
.get_or_create_consumer("highestparser", jetstream::consumer::pull::Config {
durable_name: Some("highestparser".to_string()),
filter_subject: VOREBOT_SUGGESTED_SERVICE.to_string(),
..Default::default()
}).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
let mut highest_messages = highest_consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!");
let mut prefs = FirefoxPreferences::new();
prefs.set_user_agent(USER_AGENT.to_string()).unwrap();
let mut caps = DesiredCapabilities::firefox();
caps.set_preferences(prefs).unwrap();
if let Some(proxy) = BROWSER_PROXY.as_ref() {
caps.set_proxy(Proxy::Manual {
ftp_proxy: None,
http_proxy: Some(proxy.to_string()),
ssl_proxy: Some(proxy.to_string()),
socks_proxy: None,
socks_version: None,
socks_username: None,
socks_password: None,
no_proxy: None,
}).unwrap();
}
let driver = WebDriver::new(&browser, caps).await.unwrap();
info!("crawler ready");
loop {
lp.store(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), Ordering::Relaxed);
tokio::select! {
Some(highest) = StreamExt::next(&mut highest_messages) => {
if let Err(e) = highest {
warn!("error when recv js message! {e}");
} else {
let message = highest.unwrap();
if let Err(e) = message.ack().await {
warn!("failed acking message {e}");
}
let req = rmp_serde::from_slice::<CrawlRequest>(message.payload.as_ref());
if let Err(e) = req {
error!("BAD NATS REQUEST: {e}");
continue;
}
let req = req.unwrap();
info!("RECV USER SUGGESTION!");
let now = chrono::Utc::now().timestamp();
LAST_MESSAGE.store(now, Ordering::Relaxed);
let nats = nats.clone();
let mut bad = false;
driver.in_new_tab(|| async {
if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() {
warn!("temporary failure detected in parsing, requeuing");
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE");
bad = true;
}
Ok(())
}).await.unwrap();
if bad {
continue;
}
if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 {
DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed);
info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed));
}
}
}
Some(high) = StreamExt::next(&mut high_messages) => {
if let Err(e) = high {
warn!("error when recv js message! {e}");
} else {
let message = high.unwrap();
if let Err(e) = message.ack().await {
warn!("failed acking message {e}");
}
let req = rmp_serde::from_slice::<CrawlRequest>(message.payload.as_ref());
if let Err(e) = req {
error!("BAD NATS REQUEST: {e}");
continue;
}
let req = req.unwrap();
info!("RECV HIGH PRIORITY!");
let now = chrono::Utc::now().timestamp();
LAST_MESSAGE.store(now, Ordering::Relaxed);
let nats = nats.clone();
let mut bad = false;
driver.in_new_tab(|| async {
if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() {
warn!("temporary failure detected in parsing, requeuing");
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE");
bad = true;
}
Ok(())
}).await.unwrap();
if bad {
continue;
}
if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 {
DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed);
info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed));
}
}
}
Some(normal) = StreamExt::next(&mut messages) => {
if let Err(e) = normal {
warn!("error when recv js message! {e}");
} else {
let message = normal.unwrap();
if let Err(e) = message.ack().await {
warn!("failed acking message {e}");
}
let req = rmp_serde::from_slice::<CrawlRequest>(message.payload.as_ref());
if let Err(e) = req {
error!("BAD NATS REQUEST: {e}");
continue;
}
let req = req.unwrap();
let now = chrono::Utc::now().timestamp();
LAST_MESSAGE.store(now, Ordering::Relaxed);
let nats = nats.clone();
let mut hash = DefaultHasher::new();
hash.write(req.url.as_bytes());
let hash = hash.finish();
let dehomo_bucket = nats.get_key_value("dehomo").await;
let dehomo_bucket = if dehomo_bucket.is_err() {
let dehomo_bucket = nats.create_key_value(kv::Config {
bucket: "dehomo".to_string(),
description: "prevent the same url from being scraped again too quickly".to_string(),
max_age: Duration::from_secs(60*60),
..Default::default()
}).await;
if let Err(e) = dehomo_bucket {
panic!("FAILED TO CREATE DEHOMO BUCKET!!! {e}");
} else {
dehomo_bucket.unwrap()
}
} else {
dehomo_bucket.unwrap()
};
if dehomo_bucket.get(hash.to_string()).await.ok().flatten().map(|v| *v.first().unwrap_or(&0) == 1).unwrap_or(false) {
info!("too soon to scrape {}", req.url);
continue;
}
let mut bad = false;
driver.in_new_tab(|| async {
if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() {
warn!("temporary failure detected in parsing, requeuing");
nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE");
bad = true;
}
Ok(())
}).await.unwrap();
if bad {
continue;
}
dehomo_bucket.put(hash.to_string(), vec![1u8].into()).await.expect("failed to store dehomo");
if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 {
DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed);
info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed));
}
}
}
}
}
}),
browser, last_parse.clone()
));
warn!("spawning new injest thread");
}
let mut tasks_to_remove = vec![];
for task in tasks.iter() {
if task.0.is_finished() {
tasks_to_remove.push(task.1.clone());
available_browsers.push(task.1.clone());
}
let last_parse = task.2.load(Ordering::Relaxed);
if SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() > (*CRAWLER_TIMEOUT * 60) + last_parse {
// task has been taking too long
warn!("task taking too long! aborting!");
task.0.abort();
}
}
tasks.retain(|v| !tasks_to_remove.contains(&v.1));
tokio::time::sleep(Duration::from_secs(3)).await;
}
}
}
#[cfg(feature = "benchmark")]
pub struct DBConn {}
#[cfg(feature = "benchmark")]
pub struct Context {}
#[cfg(feature = "benchmark")]
#[tokio::main]
async fn main() {
mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed);
eprintln!("BENCHMARK MODE ENABLED");
println!("BENCHMARK MODE ENABLED");
let args: Vec<String> = std::env::args().collect();
let test_site = args.get(1);
if test_site.is_none() {
panic!("PLEASE PROVIDE TEST SITE!")
}
let test_site = test_site.unwrap();
env_logger::init();
info!("began at {}", chrono::Utc::now().to_string());
let mut prefs = FirefoxPreferences::new();
prefs.set_user_agent(USER_AGENT.to_string()).unwrap();
let mut caps = DesiredCapabilities::firefox();
caps.set_preferences(prefs).unwrap();
let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap();
driver.in_new_tab(|| async {
web_parse(Context {}, DBConn {}, &driver, test_site, 0.85).await.expect("Failed to run web parse");
Ok(())
}).await.unwrap();
}

544
vorebot/src/webparse/mod.rs Normal file
View file

@ -0,0 +1,544 @@
use crate::USER_AGENT;
#[cfg(feature = "benchmark")]
use crate::{Context, DBConn};
#[cfg(not(feature = "benchmark"))]
use asklyphe_common::ldb::DBConn;
use asklyphe_common::ldb::{linkrelstore, linkstore, metastore, sitestore, titlestore, wordstore};
use asklyphe_common::nats::vorebot::CrawlRequest;
use asklyphe_common::nats::vorebot::VOREBOT_SERVICE;
#[cfg(not(feature = "benchmark"))]
use async_nats::jetstream::{kv, Context};
use futures::AsyncReadExt;
use image::EncodableLayout;
use isahc::config::RedirectPolicy;
use isahc::prelude::Configurable;
use isahc::HttpClient;
use log::{debug, error, warn};
use std::collections::{BTreeMap, BTreeSet};
use std::hash::{DefaultHasher, Hasher};
use std::sync::atomic::AtomicBool;
use std::sync::{mpsc, Arc};
use std::time::Duration;
use stopwords::{Language, Spark, Stopwords};
use texting_robots::{get_robots_url, Robot};
use thirtyfour::{By, WebDriver};
pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result<bool, ()> {
let robot1 = Robot::new("Vorebot", robotstxt);
if let Err(e) = robot1 {
warn!(
"potentially malformed robots.txt ({}), not crawling {}",
e, url
);
return Err(());
}
let robot1 = robot1.unwrap();
Ok(robot1.allowed(url))
}
// returns Err if we cannot access a page, but the error associated with it seems temporary (i.e. it's worth trying again later)
// otherwise, returns Ok
pub async fn web_parse(
nats: Context,
db: DBConn,
driver: &WebDriver,
url: &str,
damping: f64,
) -> Result<(), ()> {
driver.delete_all_cookies().await.map_err(|_| ())?;
#[cfg(not(feature = "benchmark"))]
let robots_bucket = {
let robots_bucket = nats.get_key_value("robots").await;
if robots_bucket.is_err() {
let robots_bucket = nats
.create_key_value(kv::Config {
bucket: "robots".to_string(),
description: "storage of robots.txt data for given hosts".to_string(),
..Default::default()
})
.await;
if let Err(e) = robots_bucket {
error!("could not create robots.txt bucket: {}", e);
None
} else {
Some(robots_bucket.unwrap())
}
} else {
robots_bucket.ok()
}
};
#[cfg(not(feature = "benchmark"))]
let hosts_bucket = {
let hosts_bucket = nats.get_key_value("hosts").await;
if hosts_bucket.is_err() {
let hosts_bucket = nats
.create_key_value(kv::Config {
bucket: "hosts".to_string(),
description: "prevent the same host from being scraped too quickly".to_string(),
max_age: Duration::from_secs(60 * 10),
..Default::default()
})
.await;
if let Err(e) = hosts_bucket {
error!("could not create hosts bucket: {}", e);
return Err(());
} else {
hosts_bucket.unwrap()
}
} else {
hosts_bucket.unwrap()
}
};
let robots_url = get_robots_url(url);
if robots_url.is_err() {
error!("could not get a robots.txt url from {}, not crawling", url);
return Ok(());
}
let robots_url = robots_url.unwrap();
let mut hash = DefaultHasher::new();
hash.write(robots_url.as_bytes());
let hash = hash.finish();
#[cfg(not(feature = "benchmark"))]
if let Ok(Some(host)) = hosts_bucket.get(hash.to_string()).await {
let count = *host.first().unwrap_or(&0);
if count > 10 {
warn!(
"scraping {} too quickly, avoiding for one minute",
robots_url
);
return Err(());
}
hosts_bucket
.put(hash.to_string(), vec![count + 1].into())
.await
.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
} else {
hosts_bucket
.put(hash.to_string(), vec![1].into())
.await
.expect("COULDN'T INSERT INTO HOSTS BUCKET!");
}
let mut skip_robots_check = false;
#[cfg(not(feature = "benchmark"))]
if let Some(robots_bucket) = &robots_bucket {
if let Ok(Some(entry)) = robots_bucket.get(hash.to_string()).await {
if let Ok(res) = allowed_to_crawl(entry.as_bytes(), url) {
if !res {
debug!("robots.txt does not allow us to crawl {}", url);
return Ok(());
} else {
skip_robots_check = true;
}
}
}
}
if !skip_robots_check {
// check manually
debug!("checking new robots.txt \"{}\"", robots_url);
let client = HttpClient::builder()
.redirect_policy(RedirectPolicy::Limit(10))
.timeout(Duration::from_secs(60))
.build();
if let Err(e) = client {
error!("could not create new robots.txt httpclient: {}", e);
return Err(());
}
let client = client.unwrap();
let request = isahc::Request::get(&robots_url)
.header("user-agent", USER_AGENT.as_str())
.body(());
if let Err(e) = request {
error!("could not create robots.txt get request: {}", e);
return Ok(());
}
let request = request.unwrap();
let response = client.send_async(request).await;
if let Err(e) = response {
warn!("could not get robots.txt page: {}", e);
return Err(());
}
let mut response = response.unwrap();
if response.status() == 429 {
// too many requests
warn!("too many requests for {}", robots_url);
return Err(());
}
if response.status().is_server_error() {
// don't crawl at the moment
debug!("not crawling {} due to server error", robots_url);
return Err(());
}
let mut body = "".to_string();
if let Err(e) = response.body_mut().read_to_string(&mut body).await {
warn!("could not read from robots.txt response: {}", e);
return Err(());
}
if let Ok(res) = allowed_to_crawl(body.as_bytes(), url) {
#[cfg(not(feature = "benchmark"))]
if let Some(robots_bucket) = &robots_bucket {
if let Err(e) = robots_bucket
.put(hash.to_string(), body.as_bytes().to_vec().into())
.await
{
warn!("could not put robots.txt data: {}", e);
}
}
if !res {
debug!("robots.txt does not allow us to crawl {}", url);
return Ok(());
} else {
// we're allowed to crawl!
}
}
}
let start = std::time::Instant::now();
debug!("handling request for {}", url);
// check for bad status codes
// fixme: i hate this solution, can we get something that actually checks the browser's request?
let client = HttpClient::builder()
.redirect_policy(RedirectPolicy::Limit(10))
.timeout(Duration::from_secs(60))
.build();
if let Err(e) = client {
error!("could not create new badstatuscode httpclient: {}", e);
return Err(());
}
let client = client.unwrap();
let request = isahc::Request::get(url)
.header("user-agent", USER_AGENT.as_str())
.body(());
if let Err(e) = request {
error!("could not create badstatuscode get request: {}", e);
return Ok(());
}
let request = request.unwrap();
let response = client.send_async(request).await;
if let Err(e) = response {
warn!("could not get badstatuscode page: {}", e);
return Err(());
}
let mut response = response.unwrap();
if response.status() == 429 {
// too many requests
warn!("too many requests for {}", url);
return Err(());
}
if response.status().is_server_error() || response.status().is_client_error() {
// don't crawl at the moment
debug!(
"not crawling {} due to bad status code {}",
url,
response.status()
);
return Err(());
}
// i guess we're good
driver.goto(url).await.map_err(|_| ())?;
let html_element = driver.find(By::Tag("html")).await.map_err(|_| ())?;
if let Some(lang) = html_element.attr("lang").await.ok().flatten() {
if !lang.starts_with("en") && !lang.starts_with("unknown") {
// i.e. non-english language
// fixme: remove this once we start expanding to non-english-speaking markets?
warn!(
"skipping {} due to {} language (currently prioritizing english",
url, lang
);
return Err(());
}
}
let meta_elements = driver.find_all(By::Tag("meta")).await.map_err(|_| ())?;
let title = driver.title().await.map_err(|_| ())?;
let mut description = None;
let mut keywords = vec![];
for elem in meta_elements {
if let Ok(Some(name)) = elem.attr("name").await {
match name.as_str() {
"description" => {
if let Ok(Some(content)) = elem.attr("content").await {
description = Some(content);
}
}
"keywords" => {
if let Ok(Some(content)) = elem.attr("content").await {
keywords = content
.split(',')
.map(|v| v.to_lowercase())
.filter(|v| !v.is_empty())
.collect();
}
}
_ => {}
}
}
}
let body = driver.find(By::Tag("body")).await.map_err(|_| ())?;
let raw_page_content = body.text().await.map_err(|_| ())?;
async fn gather_elements_with_multiplier(
driver: &WebDriver,
wordmap: &mut BTreeMap<String, f64>,
stops: &BTreeSet<&&str>,
elements: &[&str],
multiplier: f64,
) {
let mut elms = vec![];
for tag in elements {
elms.push(driver.find_all(By::Tag(*tag)).await);
}
let elms = elms.iter().flatten().flatten().collect::<Vec<_>>();
let mut sentences = vec![];
let mut sentence_set = BTreeSet::new();
debug!("processing elements...");
for node in elms {
let _ = node.scroll_into_view().await;
let boxmodel = node.rect().await;
if boxmodel.is_err() {
// not visible
continue;
}
let boxmodel = boxmodel.unwrap();
let current_text = node.text().await;
if current_text.is_err() {
// no text on this node
continue;
}
let current_text = current_text.unwrap().trim().to_string();
if current_text.is_empty() {
continue;
}
let sqs = (boxmodel.width * boxmodel.height).max(1.0); // no 0 divides pls (:
let ccount = current_text.chars().count() as f64;
let cssq = if ccount > 0.0 { sqs / ccount } else { 0.0 };
if sentence_set.contains(&current_text) {
continue;
}
sentence_set.insert(current_text.clone());
sentences.push((current_text, cssq));
}
for (sentence, cssq) in sentences {
let mut cssq = (cssq / 500.0).powi(2) * multiplier;
for word in sentence.split_whitespace() {
let word = word
.to_lowercase()
.trim_end_matches(|v: char| v.is_ascii_punctuation())
.to_string();
if stops.contains(&word.as_str()) {
// less valuable
cssq /= 100.0;
}
if let Some(wentry) = wordmap.get_mut(&word) {
*wentry += cssq;
} else {
if word.is_empty() {
continue;
}
wordmap.insert(word.to_string(), cssq);
}
}
}
}
let mut wordmap: BTreeMap<String, f64> = BTreeMap::new();
let stops: BTreeSet<_> = Spark::stopwords(Language::English)
.unwrap()
.iter()
.collect();
debug!("headers...");
gather_elements_with_multiplier(
driver,
&mut wordmap,
&stops,
&["h1", "h2", "h3", "h4", "h5", "h6"],
3.0,
)
.await;
debug!("paragraphs...");
gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["p", "div"], 1.0).await;
let mut wordmap = wordmap.into_iter().collect::<Vec<_>>();
wordmap.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
let mut db_error_so_requeue_anyways = false;
let words = wordmap
.iter()
.map(|(word, _)| word.as_str())
.collect::<Vec<_>>();
#[allow(clippy::collapsible_if)]
if !words.is_empty() {
#[cfg(not(feature = "benchmark"))]
if wordstore::add_url_to_keywords(&db, &words, url)
.await
.is_err()
{
warn!("couldn't add {} to keywords!", url);
db_error_so_requeue_anyways = true;
}
}
let mut metawords = keywords.iter().map(|v| v.as_str()).collect::<Vec<_>>();
let desc2 = description.clone();
let desc2 = desc2.map(|v| {
v.to_lowercase()
.split_whitespace()
.map(String::from)
.collect::<Vec<_>>()
});
if let Some(description) = &desc2 {
for word in description {
let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation());
if word.is_empty() {
continue;
}
metawords.push(word);
}
}
#[allow(clippy::collapsible_if)]
if !metawords.is_empty() {
#[cfg(not(feature = "benchmark"))]
if metastore::add_url_to_metawords(&db, &metawords, url)
.await
.is_err()
{
warn!("couldn't add {} to metawords!", url);
db_error_so_requeue_anyways = true;
}
}
let mut titlewords = vec![];
let title2 = title.clone();
let title2 = title2.to_lowercase();
for word in title2.split_whitespace() {
let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation());
if word.is_empty() {
continue;
}
titlewords.push(word);
}
#[allow(clippy::collapsible_if)]
if !titlewords.is_empty() {
#[cfg(not(feature = "benchmark"))]
if titlestore::add_url_to_titlewords(&db, &titlewords, url)
.await
.is_err()
{
warn!("couldn't add {} to titlewords!", url);
db_error_so_requeue_anyways = true;
}
}
#[cfg(not(feature = "benchmark"))]
if sitestore::add_website(
&db,
url,
Some(title),
description,
if keywords.is_empty() {
None
} else {
Some(keywords)
},
&wordmap,
raw_page_content,
damping,
)
.await
.is_err()
{
warn!("couldn't add {} to sitestore!", url);
db_error_so_requeue_anyways = true;
}
debug!("finished with main site stuff for {}", url);
let linkelms = driver.find_all(By::Tag("a")).await.map_err(|_| ())?;
for linkelm in linkelms {
if linkelm.scroll_into_view().await.is_err() {
debug!("couldn't scroll into view!");
}
let href = linkelm.prop("href").await.map_err(|_| ())?;
if href.is_none() {
debug!("no href!");
continue;
}
let href = href.unwrap();
if href.contains('#') {
continue;
}
let linktext = linkelm.text().await.map_err(|_| ())?.to_lowercase();
let linkimgs = linkelm.find_all(By::Tag("img")).await.map_err(|_| ())?;
let mut alts = "".to_string();
for img in linkimgs {
if let Ok(Some(alt)) = img.attr("alt").await {
alts.push_str(&alt);
alts.push(' ');
}
}
let alts = alts.trim().to_lowercase();
let mut linkwords = vec![];
for word in linktext.split_whitespace() {
let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation());
linkwords.push(word);
}
for word in alts.split_whitespace() {
let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation());
linkwords.push(word);
}
#[allow(clippy::collapsible_if)]
if !linkwords.is_empty() {
#[cfg(not(feature = "benchmark"))]
if linkstore::add_url_to_linkwords(&db, &linkwords, &href)
.await
.is_err()
{
warn!("couldn't add {} to linkwords!", url);
}
}
#[cfg(not(feature = "benchmark"))]
if linkrelstore::a_linksto_b(&db, url, &href).await.is_err() {
warn!("couldn't perform a_linksto_b (a {url} b {href})");
}
#[cfg(not(feature = "benchmark"))]
nats.publish(
VOREBOT_SERVICE.to_string(),
rmp_serde::to_vec(&CrawlRequest {
url: href,
damping: 0.85,
})
.unwrap()
.into(),
)
.await
.unwrap();
}
let elapsed = start.elapsed().as_secs_f64();
debug!("crawled {} in {} seconds", url, elapsed);
Ok(())
}