diff --git a/Cargo.lock b/Cargo.lock index 324f355..89154f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -133,6 +133,12 @@ version = "1.0.97" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "argon2" version = "0.5.3" @@ -151,6 +157,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "ascii" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" + [[package]] name = "askama" version = "0.12.1" @@ -236,6 +248,8 @@ dependencies = [ "foundationdb", "futures", "log", + "lyphedb", + "percent-encoding", "rand 0.8.5", "rmp-serde", "serde", @@ -406,7 +420,7 @@ version = "0.38.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76433c4de73442daedb3a59e991d94e85c14ebfc33db53dfcd347a21cd6ef4f8" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures", "memchr", @@ -651,6 +665,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -703,7 +723,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 1.1.0", "shlex", "syn 2.0.99", "which", @@ -726,6 +746,12 @@ dependencies = [ "url_encoded_data", ] +[[package]] +name = "bit_field" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc827186963e592360843fb5ba4b973e145841266c1357f7180c43526f2e5b61" + [[package]] name = "bitflags" version = "1.3.2" @@ -807,6 +833,17 @@ dependencies = [ "syn 2.0.99", ] +[[package]] +name = "bstr" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata 0.1.10", +] + [[package]] name = "bumpalo" version = "3.17.0" @@ -835,6 +872,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytemuck" +version = "1.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6b1fc10dbac614ebc03540c9dbd60e83887fda27794998c6528f1782047d540" + [[package]] name = "byteorder" version = "1.5.0" @@ -911,6 +954,12 @@ dependencies = [ "stacker", ] +[[package]] +name = "chunked_transfer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" + [[package]] name = "clang-sys" version = "1.8.1" @@ -962,6 +1011,12 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" +[[package]] +name = "color_quant" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" + [[package]] name = "colorchoice" version = "1.0.3" @@ -983,6 +1038,26 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const_format" +version = "0.2.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "126f97965c8ad46d6d9163268ff28432e8f6a1196a55578867832e3049df63dd" +dependencies = [ + "const_format_proc_macros", +] + +[[package]] +name = "const_format_proc_macros" +version = "0.2.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d57c2eccfb16dbac1f4e61e206105db5820c9d26c3c472bc17c774259ef7744" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + [[package]] name = "cookie" version = "0.18.1" @@ -1044,6 +1119,34 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -1059,6 +1162,12 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crunchy" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929" + [[package]] name = "crypto-common" version = "0.1.6" @@ -1308,7 +1417,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea3d894bbbab314476b265f9b2d46bf24b123a36dd0e96b06a1b49545b9d9dcc" dependencies = [ - "base64", + "base64 0.22.1", "memchr", ] @@ -1424,6 +1533,21 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "exr" +version = "1.73.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83197f59927b46c04a183a619b7c29df34e63e63c7869320862268c0ef687e0" +dependencies = [ + "bit_field", + "half", + "lebe", + "miniz_oxide", + "rayon-core", + "smallvec", + "zune-inflate", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -1439,12 +1563,31 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "fdeflate" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6853b52649d4ac5c0bd02320cddc5ba956bdb407c4b75a2c6b75bf51500f8c" +dependencies = [ + "simd-adler32", +] + [[package]] name = "fiat-crypto" version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "flate2" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.11.1" @@ -1722,8 +1865,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1738,6 +1883,16 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "gif" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb2d69b19215e18bb912fa30f7ce15846e301408695e44e0ef719f1da9e19f2" +dependencies = [ + "color_quant", + "weezl", +] + [[package]] name = "gimli" version = "0.31.1" @@ -1779,6 +1934,16 @@ dependencies = [ "url_encoded_data", ] +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2001,6 +2166,24 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2" +dependencies = [ + "futures-util", + "http 1.2.0", + "hyper", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", + "webpki-roots", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -2204,6 +2387,24 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "image" +version = "0.24.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5690139d2f55868e080017335e4b94cb7414274c74f1669c84fb5feba2c9f69d" +dependencies = [ + "bytemuck", + "byteorder", + "color_quant", + "exr", + "gif", + "jpeg-decoder", + "num-traits", + "png", + "qoi", + "tiff", +] + [[package]] name = "indexmap" version = "2.7.1" @@ -2310,6 +2511,15 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jpeg-decoder" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5d4a7da358eff58addd2877a45865158f0d78c911d43a5784ceb7bbf52833b0" +dependencies = [ + "rayon", +] + [[package]] name = "js-sys" version = "0.3.77" @@ -2344,13 +2554,33 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "ldbtesttool" +version = "0.1.0" +dependencies = [ + "async-nats", + "futures", + "lyphedb", + "rand 0.9.0", + "rmp-serde", + "serde", + "tokio", + "toml", +] + +[[package]] +name = "lebe" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03087c2bad5e1034e8cace5926dec053fb3790248370865f5117a7d0213354c8" + [[package]] name = "lettre" version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d476fe7a4a798f392ce34947aa7d53d981127e37523c5251da3c927f7fa901f" dependencies = [ - "base64", + "base64 0.22.1", "chumsky", "email-encoding", "email_address", @@ -2470,6 +2700,24 @@ dependencies = [ "prost-types", ] +[[package]] +name = "lyphedb" +version = "0.1.0" +dependencies = [ + "async-nats", + "chrono", + "env_logger 0.11.6", + "futures", + "log", + "once_cell", + "percent-encoding", + "rand 0.9.0", + "rmp-serde", + "serde", + "tokio", + "toml", +] + [[package]] name = "mac" version = "0.1.1" @@ -2558,6 +2806,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" dependencies = [ "adler2", + "simd-adler32", ] [[package]] @@ -2588,6 +2837,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "mutex-timeouts" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2e9a1502e1bb4bbb96a448f684d77f17023e5a14ff49fc87c3161df6aaac2d9" +dependencies = [ + "once_cell", + "tokio", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -3061,6 +3320,19 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "png" +version = "0.17.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82151a2fc869e011c153adc57cf2789ccb8d9906ce52c0b39a6b5697749d7526" +dependencies = [ + "bitflags 1.3.2", + "crc32fast", + "fdeflate", + "flate2", + "miniz_oxide", +] + [[package]] name = "polling" version = "2.8.0" @@ -3182,6 +3454,34 @@ dependencies = [ "yansi", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "thiserror 1.0.69", +] + +[[package]] +name = "prometheus_exporter" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf17cbebe0bfdf4f279ef84eeefe0d50468b0b7116f078acf41d456e48fe81a" +dependencies = [ + "ascii", + "lazy_static", + "log", + "prometheus", + "thiserror 1.0.69", + "tiny_http", +] + [[package]] name = "prost" version = "0.13.5" @@ -3243,6 +3543,67 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "qoi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f6d64c71eb498fe9eae14ce4ec935c555749aef511cca85b5568910d6e48001" +dependencies = [ + "bytemuck", +] + +[[package]] +name = "quinn" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash 2.1.1", + "rustls", + "socket2", + "thiserror 2.0.12", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" +dependencies = [ + "bytes", + "getrandom 0.2.15", + "rand 0.8.5", + "ring", + "rustc-hash 2.1.1", + "rustls", + "rustls-pki-types", + "slab", + "thiserror 2.0.12", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e46f3055866785f6b92bc6164b76be02ca8f2eb4b002c0354b28cf4c119e5944" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.59.0", +] + [[package]] name = "quote" version = "1.0.39" @@ -3324,6 +3685,26 @@ dependencies = [ "getrandom 0.3.1", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.10" @@ -3392,7 +3773,7 @@ version = "0.12.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-core", "futures-util", @@ -3400,6 +3781,7 @@ dependencies = [ "http-body", "http-body-util", "hyper", + "hyper-rustls", "hyper-tls", "hyper-util", "ipnet", @@ -3410,19 +3792,24 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "quinn", + "rustls", "rustls-pemfile", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "tokio", "tokio-native-tls", + "tokio-rustls", "tower", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots", "windows-registry", ] @@ -3539,6 +3926,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustc_version" version = "0.4.1" @@ -3627,6 +4020,9 @@ name = "rustls-pki-types" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" +dependencies = [ + "web-time", +] [[package]] name = "rustls-webpki" @@ -3938,9 +4334,9 @@ dependencies = [ [[package]] name = "serde_bytes" -version = "0.11.16" +version = "0.11.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "364fec0df39c49a083c9a8a18a23a6bcfd9af130fe9fe321d18520a0d113e09e" +checksum = "8437fd221bde2d4ca316d61b90e337e9e702b3820b87d63caa9ba6c02bd06d96" dependencies = [ "serde", ] @@ -3962,6 +4358,7 @@ version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" dependencies = [ + "indexmap", "itoa", "memchr", "ryu", @@ -3998,6 +4395,15 @@ dependencies = [ "syn 2.0.99", ] +[[package]] +name = "serde_spanned" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -4087,6 +4493,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simd-adler32" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" + [[package]] name = "simdutf8" version = "0.1.5" @@ -4270,7 +4682,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4560278f0e00ce64938540546f59f590d60beee33fffbd3b9cd47851e5fff233" dependencies = [ "atoi", - "base64", + "base64 0.22.1", "bigdecimal", "bitflags 2.9.0", "byteorder", @@ -4317,7 +4729,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5b98a57f363ed6764d5b3a12bfedf62f07aa16e1856a7ddc2a0bb190a959613" dependencies = [ "atoi", - "base64", + "base64 0.22.1", "bigdecimal", "bitflags 2.9.0", "byteorder", @@ -4404,6 +4816,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stopwords" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4508a6e132e6ea159112d42ed1f29927460dd45a118eed298d7666c81b713e" +dependencies = [ + "lazy_static", + "thiserror 1.0.69", +] + [[package]] name = "string_cache" version = "0.8.8" @@ -4429,6 +4851,15 @@ dependencies = [ "quote", ] +[[package]] +name = "stringmatch" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6aadc0801d92f0cdc26127c67c4b8766284f52a5ba22894f285e3101fa57d05d" +dependencies = [ + "regex", +] + [[package]] name = "stringprep" version = "0.1.5" @@ -4540,6 +4971,61 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "texting_robots" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82a718a28dda2e67ad6e0464597b58eae39e2e4d0451e03d1028d71e81bb4a" +dependencies = [ + "anyhow", + "bstr", + "lazy_static", + "nom 7.1.3", + "percent-encoding", + "regex", + "thiserror 1.0.69", + "url", +] + +[[package]] +name = "thirtyfour" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d1dd5271076f9cf5cb6a673497d1476cda2633e6cf2523edd00ac16218d651" +dependencies = [ + "arc-swap", + "async-trait", + "base64 0.22.1", + "bytes", + "cfg-if", + "const_format", + "futures-util", + "http 1.2.0", + "indexmap", + "paste", + "reqwest", + "serde", + "serde_json", + "serde_repr", + "stringmatch", + "thirtyfour-macros", + "thiserror 1.0.69", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "thirtyfour-macros" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf0ffc3ba4368e99597bd6afd83f4ff6febad66d9ae541ab46e697d32285fc0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.99", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -4590,6 +5076,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "tiff" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba1310fcea54c6a9a4fd1aad794ecc02c31682f6bfbecdf460bf19533eed1e3e" +dependencies = [ + "flate2", + "jpeg-decoder", + "weezl", +] + [[package]] name = "time" version = "0.3.39" @@ -4621,6 +5118,19 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny_http" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5f8734c6d6943ad6df6b588d228a87b4af184998bcffa268ceddf05c2055a8c" +dependencies = [ + "ascii", + "chunked_transfer", + "log", + "time", + "url", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -4725,7 +5235,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-core", "futures-sink", @@ -4740,11 +5250,26 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "toml" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd87a5cdd6ffab733b2f74bc4fd7ee5fff6634124999ac278c35fc78c6120148" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + [[package]] name = "toml_datetime" version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" +dependencies = [ + "serde", +] [[package]] name = "toml_edit" @@ -4753,6 +5278,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" dependencies = [ "indexmap", + "serde", + "serde_spanned", "toml_datetime", "winnow", ] @@ -4982,6 +5509,12 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "unit_converter" version = "0.1.0" @@ -5080,6 +5613,32 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vorebot" +version = "0.2.0" +dependencies = [ + "asklyphe-common", + "async-nats", + "base64 0.21.7", + "chrono", + "env_logger 0.10.2", + "futures", + "image", + "isahc", + "log", + "mutex-timeouts", + "once_cell", + "prometheus_exporter", + "rand 0.8.5", + "rmp-serde", + "serde", + "stopwords", + "texting_robots", + "thirtyfour", + "tokio", + "ulid", +] + [[package]] name = "waker-fn" version = "1.2.0" @@ -5216,6 +5775,12 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "weezl" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a85b86a771b1c87058196170769dd264f66c0782acf1ae6cc51bfd64b39082" + [[package]] name = "which" version = "4.4.2" @@ -5636,3 +6201,12 @@ dependencies = [ "quote", "syn 2.0.99", ] + +[[package]] +name = "zune-inflate" +version = "0.2.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73ab332fe2f6680068f3582b16a24f90ad7096d5d39b974d1c0aff0125116f02" +dependencies = [ + "simd-adler32", +] diff --git a/Cargo.toml b/Cargo.toml index 34a8a11..7e6e755 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,2 @@ [workspace] -members = ["asklyphe-common", "asklyphe-frontend", "searchservice", "asklyphe-auth-frontend", "unit_converter", "authservice", "authservice/migration", "authservice/entity", "bingservice", "googleservice"] +members = ["asklyphe-common", "asklyphe-frontend", "searchservice", "asklyphe-auth-frontend", "unit_converter", "authservice", "authservice/migration", "authservice/entity", "bingservice", "googleservice", "vorebot", "lyphedb", "lyphedb/ldbtesttool"] diff --git a/asklyphe-common/Cargo.toml b/asklyphe-common/Cargo.toml index b85d0b1..4ad48ed 100644 --- a/asklyphe-common/Cargo.toml +++ b/asklyphe-common/Cargo.toml @@ -11,10 +11,12 @@ license-file = "LICENSE" tokio = { version = "1.0", features = ["full"] } chrono = "0.4.31" serde = { version = "1.0", features = ["derive"] } +lyphedb = { path = "../lyphedb" } foundationdb = { version = "0.8.0", features = ["embedded-fdb-include"] } log = "0.4.20" rmp-serde = "1.1.2" futures = "0.3.30" async-nats = "0.38.0" ulid = "1.1.0" -rand = "0.8.5" \ No newline at end of file +rand = "0.8.5" +percent-encoding = "2.3.1" diff --git a/asklyphe-common/src/ldb/linkstore.rs b/asklyphe-common/src/ldb/linkstore.rs new file mode 100644 index 0000000..e5063b7 --- /dev/null +++ b/asklyphe-common/src/ldb/linkstore.rs @@ -0,0 +1,63 @@ +use log::{error, warn}; +use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy}; +use crate::ldb::{construct_path, hash, DBConn}; + +pub const LINKSTORE: &str = "linkstore"; + +pub async fn add_url_to_linkwords(db: &DBConn, linkwords: &[&str], url: &str) -> Result<(), ()> { + let mut key_sets = Vec::new(); + for linkword in linkwords { + let path = construct_path(&[LINKSTORE, linkword, &hash(url)]).as_bytes().to_vec(); + key_sets.push((path, url.as_bytes().to_vec())); + } + let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(KVList { + kvs: key_sets, + }, PropagationStrategy::OnRead)); + + match db.query(cmd).await { + LDBNatsMessage::Success => { + Ok(()) + } + LDBNatsMessage::BadRequest => { + error!("bad request for add_url_to_linkwords"); + Err(()) + } + LDBNatsMessage::NotFound => { + error!("not found for add_url_to_linkwords"); + Err(()) + } + + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} + +pub async fn get_urls_from_linkword(db: &DBConn, keyword: &str) -> Result, ()> { + let path = construct_path(&[LINKSTORE, keyword]).as_bytes().to_vec(); + let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { + key: path, + })); + + match db.query(cmd).await { + LDBNatsMessage::Entries(kvs) => { + Ok(kvs.kvs.into_iter().map(|v| String::from_utf8_lossy(&v.1).to_string()).collect()) + } + LDBNatsMessage::Success => { + warn!("lyphedb responded with \"success\" to get_urls_from_linkwords, treating as error"); + Err(()) + } + LDBNatsMessage::BadRequest => { + warn!("bad request for get_urls_from_linkwords"); + Err(()) + } + LDBNatsMessage::NotFound => { + Ok(vec![]) + } + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} \ No newline at end of file diff --git a/asklyphe-common/src/ldb/metastore.rs b/asklyphe-common/src/ldb/metastore.rs new file mode 100644 index 0000000..3307a52 --- /dev/null +++ b/asklyphe-common/src/ldb/metastore.rs @@ -0,0 +1,63 @@ +use log::{error, warn}; +use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy}; +use crate::ldb::{construct_path, hash, DBConn}; + +pub const METASTORE: &str = "metastore"; + +pub async fn add_url_to_metawords(db: &DBConn, metawords: &[&str], url: &str) -> Result<(), ()> { + let mut key_sets = Vec::new(); + for metaword in metawords { + let path = construct_path(&[METASTORE, metaword, &hash(url)]).as_bytes().to_vec(); + key_sets.push((path, url.as_bytes().to_vec())); + } + let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(KVList { + kvs: key_sets, + }, PropagationStrategy::OnRead)); + + match db.query(cmd).await { + LDBNatsMessage::Success => { + Ok(()) + } + LDBNatsMessage::BadRequest => { + error!("bad request for add_url_to_metawords"); + Err(()) + } + LDBNatsMessage::NotFound => { + error!("not found for add_url_to_metawords"); + Err(()) + } + + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} + +pub async fn get_urls_from_metaword(db: &DBConn, metaword: &str) -> Result, ()> { + let path = construct_path(&[METASTORE, metaword]).as_bytes().to_vec(); + let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { + key: path, + })); + + match db.query(cmd).await { + LDBNatsMessage::Entries(kvs) => { + Ok(kvs.kvs.into_iter().map(|v| String::from_utf8_lossy(&v.1).to_string()).collect()) + } + LDBNatsMessage::Success => { + warn!("lyphedb responded with \"success\" to get_urls_from_metawords, treating as error"); + Err(()) + } + LDBNatsMessage::BadRequest => { + warn!("bad request for get_urls_from_metawords"); + Err(()) + } + LDBNatsMessage::NotFound => { + Ok(vec![]) + } + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} \ No newline at end of file diff --git a/asklyphe-common/src/ldb/mod.rs b/asklyphe-common/src/ldb/mod.rs new file mode 100644 index 0000000..098e00d --- /dev/null +++ b/asklyphe-common/src/ldb/mod.rs @@ -0,0 +1,62 @@ +pub mod wordstore; +pub mod sitestore; +pub mod linkstore; +pub mod metastore; +pub mod titlestore; + +use std::hash::{DefaultHasher, Hasher}; +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use futures::StreamExt; +use log::warn; +use lyphedb::LDBNatsMessage; +use percent_encoding::{percent_encode, AsciiSet, CONTROLS, NON_ALPHANUMERIC}; + +static NEXT_REPLY_ID: AtomicU64 = AtomicU64::new(0); + +pub const NOT_ALLOWED_ASCII: AsciiSet = CONTROLS.add(b' ').add(b'/').add(b'.').add(b'\\'); + +pub fn hash(str: &str) -> String { + let mut hasher = DefaultHasher::new(); + hasher.write(str.as_bytes()); + hasher.finish().to_string() +} + +pub fn construct_path(path_elements: &[&str]) -> String { + let mut buf = String::new(); + buf.push_str("ASKLYPHE/"); + for el in path_elements { + buf.push_str(&percent_encode(el.as_bytes(), &NOT_ALLOWED_ASCII).to_string()); + buf.push('/'); + } + buf.pop(); + buf +} + +#[derive(Clone)] +pub struct DBConn { + nats: async_nats::Client, + name: String, +} + +impl DBConn { + pub fn new(nats: async_nats::Client, name: impl ToString) -> DBConn { + DBConn { nats, name: name.to_string() } + } + + pub async fn query(&self, message: LDBNatsMessage) -> LDBNatsMessage { + let data = rmp_serde::to_vec(&message).unwrap(); + let replyto = format!("ldb-reply-{}", NEXT_REPLY_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)); + let mut subscriber = self.nats.subscribe(replyto.clone()).await.expect("NATS ERROR"); + self.nats.publish_with_reply(self.name.clone(), replyto, data.into()).await.expect("NATS ERROR"); + if let Some(reply) = subscriber.next().await { + let reply = rmp_serde::from_slice::(&reply.payload); + if reply.is_err() { + warn!("DECODED BAD MESSAGE FROM LYPHEDB: {}", reply.err().unwrap()); + return LDBNatsMessage::NotFound; + } + return reply.unwrap(); + } + LDBNatsMessage::NotFound + } +} \ No newline at end of file diff --git a/asklyphe-common/src/ldb/sitestore.rs b/asklyphe-common/src/ldb/sitestore.rs new file mode 100644 index 0000000..a99d5f3 --- /dev/null +++ b/asklyphe-common/src/ldb/sitestore.rs @@ -0,0 +1,178 @@ +use crate::ldb::{construct_path, hash, DBConn}; +use log::{error, warn}; +use lyphedb::{KVList, KeyDirectory, KeyList, LDBNatsMessage, LypheDBCommand, PropagationStrategy}; + +pub const SITESTORE: &str = "sitestore"; + +pub const TITLE: &str = "title"; +pub const DESCRIPTION: &str = "desc"; +pub const KEYWORDS: &str = "keywords"; +pub const PAGE_TEXT_RANKING: &str = "ptranks"; +pub const PAGE_TEXT_RAW: &str = "textraw"; +pub const DAMPING: &str = "damping"; + +pub async fn add_website( + db: &DBConn, + url: &str, + title: Option, + description: Option, + keywords: Option>, + page_text_ranking: &[(String, f64)], + page_text_raw: String, + damping: f64, +) -> Result<(), ()> { + let keyurl = hash(url); + let mut kvs = vec![ + ( + construct_path(&[SITESTORE, &keyurl]).as_bytes().to_vec(), + url.as_bytes().to_vec(), + ), + ( + construct_path(&[SITESTORE, &keyurl, PAGE_TEXT_RANKING]) + .as_bytes() + .to_vec(), + rmp_serde::to_vec(page_text_ranking).unwrap(), + ), + ( + construct_path(&[SITESTORE, &keyurl, PAGE_TEXT_RAW]) + .as_bytes() + .to_vec(), + page_text_raw.as_bytes().to_vec(), + ), + ( + construct_path(&[SITESTORE, &keyurl, DAMPING]) + .as_bytes() + .to_vec(), + damping.to_be_bytes().to_vec(), + ), + ]; + + if let Some(title) = title { + kvs.push(( + construct_path(&[SITESTORE, &keyurl, TITLE]).as_bytes().to_vec(), + title.as_bytes().to_vec(), + )) + } + if let Some(description) = description { + kvs.push(( + construct_path(&[SITESTORE, &keyurl, DESCRIPTION]) + .as_bytes() + .to_vec(), + description.as_bytes().to_vec(), + )) + } + if let Some(keywords) = keywords { + kvs.push(( + construct_path(&[SITESTORE, &keyurl, KEYWORDS]) + .as_bytes() + .to_vec(), + rmp_serde::to_vec(&keywords).unwrap(), + )) + } + + let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys( + KVList { kvs }, + PropagationStrategy::OnRead, + )); + + match db.query(cmd).await { + LDBNatsMessage::Success => Ok(()), + LDBNatsMessage::BadRequest => { + error!("bad request for add_website"); + Err(()) + } + LDBNatsMessage::NotFound => { + error!("not found for add_website"); + Err(()) + } + + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} + +#[derive(Default, Debug, Clone)] +pub struct WebsiteData { + pub title: Option, + pub description: Option, + pub keywords: Option>, + pub page_text_ranking: Vec<(String, f64)>, + pub page_text_raw: String, + pub damping: f64, +} + +pub async fn get_website(db: &DBConn, url: &str) -> Result { + let keyurl = hash(url); + let keys = [ + construct_path(&[SITESTORE, &keyurl, TITLE]).as_bytes().to_vec(), + construct_path(&[SITESTORE, &keyurl, DESCRIPTION]).as_bytes().to_vec(), + construct_path(&[SITESTORE, &keyurl, KEYWORDS]).as_bytes().to_vec(), + construct_path(&[SITESTORE, &keyurl, PAGE_TEXT_RANKING]).as_bytes().to_vec(), + construct_path(&[SITESTORE, &keyurl, PAGE_TEXT_RAW]).as_bytes().to_vec(), + construct_path(&[SITESTORE, &keyurl, DAMPING]).as_bytes().to_vec(), + ].to_vec(); + + let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeys(KeyList { keys })); + + match db.query(cmd).await { + LDBNatsMessage::Entries(kvlist) => { + let mut data = WebsiteData::default(); + for (key, value) in kvlist.kvs { + let key = String::from_utf8_lossy(&key).to_string(); + match key.as_str() { + _ if key.ends_with(TITLE) => { + data.title = Some(String::from_utf8_lossy(&value).to_string()); + } + _ if key.ends_with(DESCRIPTION) => { + data.description = Some(String::from_utf8_lossy(&value).to_string()); + } + _ if key.ends_with(KEYWORDS) => { + let deser = rmp_serde::from_slice::>(&value); + if let Err(e) = deser { + error!("bad keywords entry for {}, deser error: {:?}", key, e); + } else { + data.keywords = Some(deser.unwrap()); + } + } + _ if key.ends_with(PAGE_TEXT_RANKING) => { + let deser = rmp_serde::from_slice::>(&value); + if let Err(e) = deser { + error!("bad page_text_ranking entry for {}, deser error: {:?}", key, e); + } else { + data.page_text_ranking = deser.unwrap(); + } + } + _ if key.ends_with(PAGE_TEXT_RAW) => { + data.page_text_raw = String::from_utf8_lossy(&value).to_string(); + } + _ if key.ends_with(DAMPING) => { + data.damping = f64::from_be_bytes(value.try_into().unwrap_or(0.85f64.to_be_bytes())); + } + + _ => { + warn!("encountered weird returned key for get_website"); + } + } + } + Ok(data) + } + LDBNatsMessage::Success => { + warn!("lyphedb responded with \"success\" to get_website, treating as error"); + Err(()) + } + LDBNatsMessage::BadRequest => { + error!("bad request for get_website"); + Err(()) + } + LDBNatsMessage::NotFound => { + warn!("not found for get_website"); + Err(()) + } + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} diff --git a/asklyphe-common/src/ldb/titlestore.rs b/asklyphe-common/src/ldb/titlestore.rs new file mode 100644 index 0000000..8c6c455 --- /dev/null +++ b/asklyphe-common/src/ldb/titlestore.rs @@ -0,0 +1,63 @@ +use log::{error, warn}; +use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy}; +use crate::ldb::{construct_path, hash, DBConn}; + +pub const TITLESTORE: &str = "titlestore"; + +pub async fn add_url_to_titlewords(db: &DBConn, titlewords: &[&str], url: &str) -> Result<(), ()> { + let mut key_sets = Vec::new(); + for titleword in titlewords { + let path = construct_path(&[TITLESTORE, titleword, &hash(url)]).as_bytes().to_vec(); + key_sets.push((path, url.as_bytes().to_vec())); + } + let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(KVList { + kvs: key_sets, + }, PropagationStrategy::OnRead)); + + match db.query(cmd).await { + LDBNatsMessage::Success => { + Ok(()) + } + LDBNatsMessage::BadRequest => { + error!("bad request for add_url_to_titlewords"); + Err(()) + } + LDBNatsMessage::NotFound => { + error!("not found for add_url_to_titlewords"); + Err(()) + } + + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} + +pub async fn get_urls_from_titleword(db: &DBConn, titleword: &str) -> Result, ()> { + let path = construct_path(&[TITLESTORE, titleword]).as_bytes().to_vec(); + let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { + key: path, + })); + + match db.query(cmd).await { + LDBNatsMessage::Entries(kvs) => { + Ok(kvs.kvs.into_iter().map(|v| String::from_utf8_lossy(&v.1).to_string()).collect()) + } + LDBNatsMessage::Success => { + warn!("lyphedb responded with \"success\" to get_urls_from_titlewords, treating as error"); + Err(()) + } + LDBNatsMessage::BadRequest => { + warn!("bad request for get_urls_from_titlewords"); + Err(()) + } + LDBNatsMessage::NotFound => { + Ok(vec![]) + } + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} \ No newline at end of file diff --git a/asklyphe-common/src/ldb/wordstore.rs b/asklyphe-common/src/ldb/wordstore.rs new file mode 100644 index 0000000..22ea584 --- /dev/null +++ b/asklyphe-common/src/ldb/wordstore.rs @@ -0,0 +1,64 @@ +use log::{error, warn}; +use lyphedb::{KVList, KeyDirectory, LDBNatsMessage, LypheDBCommand, PropagationStrategy}; +use crate::ldb::{construct_path, hash, DBConn}; + +pub const WORDSTORE: &str = "wordstore"; + +pub async fn add_url_to_keywords(db: &DBConn, keywords: &[&str], url: &str) -> Result<(), ()> { + let mut key_sets = Vec::new(); + for keyword in keywords { + let path = construct_path(&[WORDSTORE, keyword, &hash(url)]).as_bytes().to_vec(); + let data = url.as_bytes().to_vec(); + key_sets.push((path, data)); + } + let cmd = LDBNatsMessage::Command(LypheDBCommand::SetKeys(KVList { + kvs: key_sets, + }, PropagationStrategy::OnRead)); + + match db.query(cmd).await { + LDBNatsMessage::Success => { + Ok(()) + } + LDBNatsMessage::BadRequest => { + error!("bad request for add_url_to_keywords"); + Err(()) + } + LDBNatsMessage::NotFound => { + error!("not found for add_url_to_keywords"); + Err(()) + } + + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} + +pub async fn get_urls_from_keyword(db: &DBConn, keyword: &str) -> Result, ()> { + let path = construct_path(&[WORDSTORE, keyword]).as_bytes().to_vec(); + let cmd = LDBNatsMessage::Command(LypheDBCommand::GetKeyDirectory(KeyDirectory { + key: path, + })); + + match db.query(cmd).await { + LDBNatsMessage::Entries(kvs) => { + Ok(kvs.kvs.into_iter().map(|v| String::from_utf8_lossy(&v.1).to_string()).collect()) + } + LDBNatsMessage::Success => { + warn!("lyphedb responded with \"success\" to get_urls_from_keywords, treating as error"); + Err(()) + } + LDBNatsMessage::BadRequest => { + warn!("bad request for get_urls_from_keywords"); + Err(()) + } + LDBNatsMessage::NotFound => { + Ok(vec![]) + } + _ => { + warn!("lyphedb sent weird message as response, treating as error"); + Err(()) + } + } +} \ No newline at end of file diff --git a/asklyphe-common/src/lib.rs b/asklyphe-common/src/lib.rs index 1684fad..d87c55c 100644 --- a/asklyphe-common/src/lib.rs +++ b/asklyphe-common/src/lib.rs @@ -13,7 +13,9 @@ pub mod nats; pub mod db; +pub mod ldb; +pub use lyphedb; pub use foundationdb; pub fn add(left: usize, right: usize) -> usize { diff --git a/asklyphe-common/src/nats/vorebot.rs b/asklyphe-common/src/nats/vorebot.rs index 4c0e0f6..085a7da 100644 --- a/asklyphe-common/src/nats/vorebot.rs +++ b/asklyphe-common/src/nats/vorebot.rs @@ -18,7 +18,7 @@ pub const VOREBOT_NEWHOSTNAME_SERVICE: &str = "websiteparse_highpriority"; pub const VOREBOT_SUGGESTED_SERVICE: &str = "websiteparse_highestpriority"; #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WebParseRequest { +pub struct CrawlRequest { pub url: String, - pub damping_factor: f32, -} \ No newline at end of file + pub damping: f64, +} diff --git a/lyphedb/.gitignore b/lyphedb/.gitignore new file mode 100644 index 0000000..faa7d35 --- /dev/null +++ b/lyphedb/.gitignore @@ -0,0 +1,30 @@ +# ---> Rust +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# RustRover +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + + +# Added by cargo + +/target + +# don't commit employee test configs (: +husky_config.toml \ No newline at end of file diff --git a/lyphedb/Cargo.toml b/lyphedb/Cargo.toml new file mode 100644 index 0000000..c659b6d --- /dev/null +++ b/lyphedb/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "lyphedb" +version = "0.1.0" +edition = "2024" + +[dependencies] +log = "0.4.20" +rmp-serde = "1.1.2" +futures = "0.3.30" +async-nats = "0.38.0" +tokio = { version = "1.0", features = ["full"] } +chrono = "0.4.31" +serde = { version = "1.0", features = ["derive"] } +rand = "0.9.0" +toml = "0.8.20" +env_logger = "0.11.6" +once_cell = "1.20.3" +percent-encoding = "2.3.1" \ No newline at end of file diff --git a/lyphedb/config.toml b/lyphedb/config.toml new file mode 100644 index 0000000..a070e5e --- /dev/null +++ b/lyphedb/config.toml @@ -0,0 +1,10 @@ +name = "lyphedb-test" +write = true +master = "/lyphedb_master" +ram_limit = "1mb" # supported suffixes: b,kb,mb,gb +gc_limit = "512kb" +avg_entry_size = "1kb" +log = "debug" +nats_cert = "nats/nats.cert" +nats_key = "nats/nats.pem" +nats_url = "127.0.0.1:4222" \ No newline at end of file diff --git a/lyphedb/ldbtesttool/Cargo.toml b/lyphedb/ldbtesttool/Cargo.toml new file mode 100644 index 0000000..54f7fc9 --- /dev/null +++ b/lyphedb/ldbtesttool/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "ldbtesttool" +version = "0.1.0" +edition = "2024" + +[dependencies] +rmp-serde = "1.1.2" +futures = "0.3.30" +async-nats = "0.38.0" +tokio = { version = "1.0", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +lyphedb = { path = "../" } +toml = "0.8.20" +rand = "0.9.0" \ No newline at end of file diff --git a/lyphedb/ldbtesttool/src/config.rs b/lyphedb/ldbtesttool/src/config.rs new file mode 100644 index 0000000..29a5487 --- /dev/null +++ b/lyphedb/ldbtesttool/src/config.rs @@ -0,0 +1,40 @@ +use serde::Deserialize; + +#[derive(Deserialize)] +struct ConfigFile { + pub name: String, + pub nats_cert: String, + pub nats_key: String, + pub nats_url: String, +} + +#[derive(Clone)] +pub struct LypheDBConfig { + pub name: String, + pub nats_cert: String, + pub nats_key: String, + pub nats_url: String, +} + +pub fn load_config(path: &str) -> LypheDBConfig { + let s = std::fs::read_to_string(path); + if let Err(e) = s { + panic!("failed to read config file: {}", e); + } + let s = s.unwrap(); + let cnf = toml::from_str::(&s); + if let Err(e) = cnf { + panic!("failed to parse config file: {}", e); + } + let cnf = cnf.unwrap(); + + // quick checks and conversions + let mut config = LypheDBConfig { + name: cnf.name, + nats_cert: cnf.nats_cert, + nats_key: cnf.nats_key, + nats_url: cnf.nats_url, + }; + + config +} \ No newline at end of file diff --git a/lyphedb/ldbtesttool/src/main.rs b/lyphedb/ldbtesttool/src/main.rs new file mode 100644 index 0000000..cea33e4 --- /dev/null +++ b/lyphedb/ldbtesttool/src/main.rs @@ -0,0 +1,114 @@ +use crate::config::load_config; +use lyphedb::{KVList, KeyList, LDBNatsMessage, LypheDBCommand, PropagationStrategy}; +use std::collections::BTreeMap; +use async_nats::Message; +use futures::StreamExt; +use tokio::sync::mpsc; + +mod config; + +#[tokio::main] +async fn main() { + let config = load_config(&std::env::args().nth(1).expect("please specify config file")); + let nats = async_nats::ConnectOptions::new() + .add_client_certificate( + config.nats_cert.as_str().into(), + config.nats_key.as_str().into(), + ) + .connect(config.nats_url.as_str()) + .await; + if let Err(e) = nats { + eprintln!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e); + return; + } + let nats = nats.unwrap(); + + // test 1: create 10000 keys, send in chunks of 10 + println!("test 1: create 10_000 keys, send in chunks of 100"); + let (key_tx, mut key_rx) = mpsc::unbounded_channel(); + let start = std::time::Instant::now(); + let mut tasks = vec![]; + for _ in 0..(10_000 / 100) { + let name = config.name.clone(); + let nats = nats.clone(); + let key_tx = key_tx.clone(); + tasks.push(tokio::spawn(async move { + let mut keys = BTreeMap::new(); + for _ in 0..100 { + let key = rand::random::<[u8; 16]>(); + let data = rand::random::<[u8; 16]>(); + key_tx.send((key, data)).unwrap(); + keys.insert(key, data); + } + let data = rmp_serde::to_vec(&LDBNatsMessage::Command(LypheDBCommand::SetKeys( + KVList { + kvs: keys + .into_iter() + .map(|(k, v)| (k.to_vec(), v.to_vec())) + .collect(), + }, + PropagationStrategy::Immediate, + ))) + .unwrap(); + let replyto_sub = "lyphedb_test_1".to_string(); + let mut subscriber = nats.subscribe(replyto_sub.clone()).await.unwrap(); + nats.publish_with_reply(name.clone(), replyto_sub, data.into()).await.unwrap(); + if let Some(reply) = subscriber.next().await { + let reply = rmp_serde::from_slice::(&reply.payload).unwrap(); + if let LDBNatsMessage::Success = reply {} else { + eprintln!("failure"); + } + } + })); + } + for task in tasks { + task.await.unwrap(); + } + let end = std::time::Instant::now(); + println!("test 1: {}ms", end.duration_since(start).as_millis()); + let mut our_copy = BTreeMap::new(); + let mut buffer = vec![]; + key_rx.recv_many(&mut buffer, 10_000).await; + for (k, v) in buffer { + our_copy.insert(k, v); + } + + // test 2: read back all keys and check for accuracy + println!("test 2: read back all keys and check for accuracy"); + let kv: Vec<_> = our_copy.into_iter().map(|(k, v)| (k.to_vec(), v.to_vec())).collect(); + + let start = std::time::Instant::now(); + let mut tasks = vec![]; + for i in 0..100 { + let batch = kv[i * 100..((i + 1) * 100).min(kv.len())].to_vec(); + let name = config.name.clone(); + let nats = nats.clone(); + tasks.push(tokio::spawn(async move { + let data = rmp_serde::to_vec(&LDBNatsMessage::Command(LypheDBCommand::GetKeys( + KeyList { + keys: batch.iter().map(|(k, _)| k.to_vec()).collect() + } + ))).unwrap(); + let replyto_sub = format!("lyphedb_test_2_{}", i); + let mut subscriber = nats.subscribe(replyto_sub.clone()).await.unwrap(); + nats.publish_with_reply(name.clone(), replyto_sub, data.into()).await.unwrap(); + if let Some(reply) = subscriber.next().await { + let reply = rmp_serde::from_slice::(&reply.payload).unwrap(); + if let LDBNatsMessage::Entries(kvlist) = reply { + // check all keys + for (k1, v1) in batch { + let v2 = kvlist.kvs.iter().find(|(k, v)| k == &k1).expect("key not found"); + assert_eq!(v1, v2.1); + } + } else { + eprintln!("failure"); + } + } + })); + } + for task in tasks { + task.await.unwrap(); + } + let end = std::time::Instant::now(); + println!("test 2: {}ms", end.duration_since(start).as_millis()); +} diff --git a/lyphedb/src/config.rs b/lyphedb/src/config.rs new file mode 100644 index 0000000..5e9568f --- /dev/null +++ b/lyphedb/src/config.rs @@ -0,0 +1,129 @@ +use serde::Deserialize; + +#[derive(Deserialize)] +struct ConfigFile { + pub name: String, + pub write: bool, + pub master: String, + pub ram_limit: String, + pub gc_limit: String, + pub avg_entry_size: String, + pub log: String, + pub nats_cert: String, + pub nats_key: String, + pub nats_url: String, +} + +#[derive(Clone)] +pub struct LypheDBConfig { + pub name: String, + pub write: bool, + pub master: String, + pub ram_limit: usize, + pub gc_limit: usize, + pub avg_entry_size: usize, + pub log: String, + pub nats_cert: String, + pub nats_key: String, + pub nats_url: String, +} + +pub fn load_config(path: &str) -> LypheDBConfig { + let s = std::fs::read_to_string(path); + if let Err(e) = s { + panic!("failed to read config file: {}", e); + } + let s = s.unwrap(); + let cnf = toml::from_str::(&s); + if let Err(e) = cnf { + panic!("failed to parse config file: {}", e); + } + let cnf = cnf.unwrap(); + + // quick checks and conversions + let mut config = LypheDBConfig { + name: cnf.name, + write: cnf.write, + master: cnf.master, + ram_limit: 0, + gc_limit: 0, + avg_entry_size: 0, + log: cnf.log, + nats_cert: cnf.nats_cert, + nats_key: cnf.nats_key, + nats_url: cnf.nats_url, + }; + + if !(cnf.ram_limit.ends_with("b") && + ( + cnf.ram_limit.trim_end_matches("b").ends_with("k") || + cnf.ram_limit.trim_end_matches("b").ends_with("m") || + cnf.ram_limit.trim_end_matches("b").ends_with("g") + ) + ) { + panic!("invalid ram limit"); + } + config.ram_limit = if cnf.ram_limit.ends_with("gb") { + cnf.ram_limit.trim_end_matches("gb").parse::().unwrap() * 1024 * 1024 * 1024 + } else if cnf.ram_limit.ends_with("mb") { + cnf.ram_limit.trim_end_matches("mb").parse::().unwrap() * 1024 * 1024 + } else if cnf.ram_limit.ends_with("kb") { + cnf.ram_limit.trim_end_matches("kb").parse::().unwrap() * 1024 + } else { + cnf.ram_limit.trim_end_matches("b").parse::().unwrap() + }; + + if !(cnf.gc_limit.ends_with("b") && + ( + cnf.gc_limit.trim_end_matches("b").ends_with("k") || + cnf.gc_limit.trim_end_matches("b").ends_with("m") || + cnf.gc_limit.trim_end_matches("b").ends_with("g") + ) + ) { + panic!("invalid ram limit"); + } + config.gc_limit = if cnf.gc_limit.ends_with("gb") { + cnf.gc_limit.trim_end_matches("gb").parse::().unwrap() * 1024 * 1024 * 1024 + } else if cnf.gc_limit.ends_with("mb") { + cnf.gc_limit.trim_end_matches("mb").parse::().unwrap() * 1024 * 1024 + } else if cnf.gc_limit.ends_with("kb") { + cnf.gc_limit.trim_end_matches("kb").parse::().unwrap() * 1024 + } else { + cnf.gc_limit.trim_end_matches("b").parse::().unwrap() + }; + + if !(cnf.avg_entry_size.ends_with("b") && + ( + cnf.avg_entry_size.trim_end_matches("b").ends_with("k") || + cnf.avg_entry_size.trim_end_matches("b").ends_with("m") || + cnf.avg_entry_size.trim_end_matches("b").ends_with("g") + ) + ) { + panic!("invalid ram limit"); + } + config.avg_entry_size = if cnf.avg_entry_size.ends_with("gb") { + cnf.avg_entry_size.trim_end_matches("gb").parse::().unwrap() * 1024 * 1024 * 1024 + } else if cnf.avg_entry_size.ends_with("mb") { + cnf.avg_entry_size.trim_end_matches("mb").parse::().unwrap() * 1024 * 1024 + } else if cnf.avg_entry_size.ends_with("kb") { + cnf.avg_entry_size.trim_end_matches("kb").parse::().unwrap() * 1024 + } else { + cnf.avg_entry_size.trim_end_matches("b").parse::().unwrap() + }; + + if config.avg_entry_size > config.ram_limit { + panic!("avg entry size is larger than ram limit"); + } + if config.gc_limit > config.ram_limit { + panic!("gc limit is larger than ram limit"); + } + + if config.log.is_empty() { + config.log = "info".to_string(); + } + if config.log != "debug" && config.log != "info" && config.log != "warn" && config.log != "error" { + panic!("invalid log level"); + } + + config +} \ No newline at end of file diff --git a/lyphedb/src/dbimpl/mod.rs b/lyphedb/src/dbimpl/mod.rs new file mode 100644 index 0000000..73952fd --- /dev/null +++ b/lyphedb/src/dbimpl/mod.rs @@ -0,0 +1,321 @@ +use crate::config::LypheDBConfig; +use log::*; +use lyphedb::PropagationStrategy; +use once_cell::sync::Lazy; +use std::borrow::Cow; +use std::collections::BTreeMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use percent_encoding::{percent_decode_str, percent_encode, AsciiSet, CONTROLS}; +use tokio::sync::RwLock; +pub const NOT_ALLOWED_ASCII: AsciiSet = CONTROLS.add(b' ').add(b'/').add(b'.').add(b'\\'); + +pub type LDBCache = BTreeMap>, Arc>>; + +/// Reads will read from this cache +pub static PRIMARY_CACHE: Lazy> = + Lazy::new(|| RwLock::new(BTreeMap::new())); +/// Writes will be written to this cache, and then depending on the propagation method, the +/// Primary cache will be set to a copy +pub static SECONDARY_CACHE: Lazy> = + Lazy::new(|| RwLock::new(BTreeMap::new())); +/// how often are keys accessed? this will influence the garbage collector +pub static KEY_ACCESS_COUNTER: Lazy>, AtomicU64>>> = + Lazy::new(|| RwLock::new(BTreeMap::new())); + +pub fn key_to_path(config: &LypheDBConfig, key: &[u8]) -> PathBuf { + let mut path = PathBuf::new(); + path.push(&config.master); + + let mut fnbuf = Vec::new(); + for (i, byte) in key.iter().enumerate() { + if *byte == b'/' { + let encoded = percent_encode(&fnbuf, &NOT_ALLOWED_ASCII).to_string(); + path.push(encoded); + fnbuf.clear(); + } else { + fnbuf.push(*byte); + } + } + if !fnbuf.is_empty() { + let encoded = percent_encode(&fnbuf, &NOT_ALLOWED_ASCII).to_string(); + path.push(encoded); + fnbuf.clear(); + } + + path.push(".self"); + + path +} + +#[derive(Debug)] +pub enum OperationError { + KeyCannotBeEmpty, + FilesystemPermissionError, + BadFilesystemEntry, +} + +pub async fn set_key_disk( + config: &LypheDBConfig, + key: &[u8], + value: &[u8], +) -> Result<(), OperationError> { + if key.is_empty() { + return Err(OperationError::KeyCannotBeEmpty); + } + let path = key_to_path(config, key); + let directory = path.parent().ok_or(OperationError::KeyCannotBeEmpty)?; + if let Ok(directory_exists) = directory.try_exists() { + if !directory_exists { + std::fs::create_dir_all(directory).map_err(|e| { + warn!("couldn't create directory: {:?}", e); + OperationError::FilesystemPermissionError + })?; + } + } else { + return Err(OperationError::FilesystemPermissionError); + } + + std::fs::write(path, value).map_err(|e| { + warn!("couldn't write file: {:?}", e); + OperationError::FilesystemPermissionError + })?; + + Ok(()) +} + +pub async fn delete_key_disk( + config: &LypheDBConfig, + key: &[u8], +) -> Result<(), OperationError> { + if key.is_empty() { + return Err(OperationError::KeyCannotBeEmpty); + } + let path = key_to_path(config, key); + if let Ok(exists) = path.try_exists() { + if !exists { + return Ok(()); + } + } + std::fs::remove_file(path).map_err(|e| { + warn!("couldn't remove file: {:?}", e); + OperationError::FilesystemPermissionError + }) +} + +pub async fn get_key_disk( + config: &LypheDBConfig, + key: &[u8], +) -> Result>, OperationError> { + if key.is_empty() { + return Err(OperationError::KeyCannotBeEmpty); + } + let path = key_to_path(config, key); + if let Ok(exists) = path.try_exists() { + if !exists { + return Ok(None); + } + } + let data = std::fs::read(path).map_err(|e| { + warn!("couldn't read file: {:?}", e); + OperationError::FilesystemPermissionError + })?; + Ok(Some(data)) +} + +/// this function allows empty keys, so you can get all keys under root by doing +/// get_keys_under_keydir_disk(..., b"") +pub async fn get_keys_under_keydir_disk( + config: &LypheDBConfig, + keydir: &[u8], +) -> Result>, OperationError> { + let path = key_to_path(config, keydir); + let path = path.parent().expect("bad master"); + let mut keys = Vec::new(); + for entry in std::fs::read_dir(path).map_err(|e| { + warn!("couldn't read directory: {:?}", e); + OperationError::FilesystemPermissionError + })? { + let entry = entry.map_err(|e| { + warn!("couldn't read directory entry: {:?}", e); + OperationError::FilesystemPermissionError + })?; + let path = entry.path(); + let filename = path + .to_str() + .ok_or(OperationError::FilesystemPermissionError)?; + if filename.ends_with(".self") { + // this is a value file, ignore + continue; + } + let filename = filename.trim_start_matches(&config.master); + let key = percent_decode_str(filename).collect(); + keys.push(key); + } + + Ok(keys) +} + +pub async fn set_key( + config: LypheDBConfig, + key: &[u8], + value: &[u8], + strat: &PropagationStrategy, +) -> Result<(), OperationError> { + let k1 = Arc::new(key.to_vec()); + let v1 = Arc::new(value.to_vec()); + let disk_task = { + let k1 = k1.clone(); + let v1 = v1.clone(); + tokio::spawn(async move { set_key_disk(&config, &k1, &v1).await }) + }; + + let prop_task = match strat { + PropagationStrategy::Immediate => { + let k1 = k1.clone(); + let v1 = v1.clone(); + tokio::spawn(async move { + let mut secondary_cache = SECONDARY_CACHE.write().await; + secondary_cache.insert(k1, v1); + let mut primary_cache = PRIMARY_CACHE.write().await; + *primary_cache = secondary_cache.clone(); + }) + } + PropagationStrategy::Timeout => { + let k1 = k1.clone(); + let v1 = v1.clone(); + tokio::spawn(async move { + tokio::spawn(async move { + { + let mut secondary_cache = SECONDARY_CACHE.write().await; + secondary_cache.insert(k1, v1); + } + tokio::spawn(async move { + let start = std::time::Instant::now(); + loop { + if start.elapsed().as_secs() > 60 { + break; + } + let pc = PRIMARY_CACHE.try_write(); + if pc.is_err() { + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + continue; + } else { + break; + } + } + let mut primary_cache = PRIMARY_CACHE.write().await; + let secondary_cache = SECONDARY_CACHE.read().await; + *primary_cache = secondary_cache.clone(); + }); + }); + }) + } + PropagationStrategy::OnRead => { + let k1 = k1.clone(); + tokio::spawn(async move { + { + let mut secondary_cache = SECONDARY_CACHE.write().await; + secondary_cache.remove(&k1); + } + { + let mut primary_cache = PRIMARY_CACHE.write().await; + primary_cache.remove(&k1); + } + }) + } + }; + + if let Ok(Err(e)) = disk_task.await { + error!("couldn't set key on disk: {:?} ({:?})", e, key); + // undo propagation + prop_task.abort(); + { + let mut primary_cache = PRIMARY_CACHE.write().await; + primary_cache.remove(&k1); + } + { + let mut secondary_cache = SECONDARY_CACHE.write().await; + secondary_cache.remove(&k1); + } + return Err(e); + } + let _ = prop_task.await; + + Ok(()) +} + +pub async fn get_key(config: LypheDBConfig, key: &[u8]) -> Result>, OperationError> { + let k1 = Arc::new(key.to_vec()); + { + let k1 = k1.clone(); + tokio::spawn(async move { + let mut access_counter = KEY_ACCESS_COUNTER.write().await; + let counter = access_counter.entry(k1.clone()).or_insert(AtomicU64::new(0)); + if counter.load(Ordering::Relaxed) > 10000000 { + return; + } + counter.fetch_add(1, Ordering::SeqCst); + }); + } + let disk_task = { + let k1 = k1.clone(); + tokio::spawn(async move { get_key_disk(&config, &k1).await }) + }; + { + // read from cache + let cache = PRIMARY_CACHE.read().await; + if let Some(val) = cache.get(&k1) { + disk_task.abort(); + return Ok(Some(val.to_vec())); + } + } + //debug!("cache miss"); + if let Ok(result) = disk_task.await { + if let Ok(Some(val)) = &result { + let val = Arc::new(val.to_vec()); + tokio::spawn(async move { + { + let mut cache = SECONDARY_CACHE.write().await; + cache.insert(k1.clone(), val.clone()); + } + { + let secondary_cache = SECONDARY_CACHE.read().await; + let mut primary_cache = PRIMARY_CACHE.write().await; + *primary_cache = secondary_cache.clone(); + } + //debug!("cache insert"); + }); + } + result + } else { + Err(OperationError::FilesystemPermissionError) + } +} + +pub async fn delete_key(config: LypheDBConfig, key: &[u8]) -> Result<(), OperationError> { + let k1 = Arc::new(key.to_vec()); + let disk_task = { + let k1 = k1.clone(); + tokio::spawn(async move { delete_key_disk(&config, &k1).await }) + }; + let prop_task = { + let k1 = k1.clone(); + tokio::spawn(async move { + { + let mut key_access_counter = KEY_ACCESS_COUNTER.write().await; + key_access_counter.remove(&k1); + } + { + let mut secondary_cache = SECONDARY_CACHE.write().await; + secondary_cache.remove(&k1); + } + let mut primary_cache = PRIMARY_CACHE.write().await; + let secondary_cache = SECONDARY_CACHE.read().await; + *primary_cache = secondary_cache.clone(); + }) + }; + prop_task.await.expect("couldn't delete key"); + disk_task.await.expect("couldn't delete key") +} \ No newline at end of file diff --git a/lyphedb/src/lib.rs b/lyphedb/src/lib.rs new file mode 100644 index 0000000..6a76aae --- /dev/null +++ b/lyphedb/src/lib.rs @@ -0,0 +1,49 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum LDBNatsMessage { + Command(LypheDBCommand), + Entries(KVList), + Count(u64), + Success, + BadRequest, + NotFound, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum PropagationStrategy { + /// Reads will immediately be able to read this key's value as soon as it has been set + Immediate, + /// The value change will be queued along with others, + /// then in a period of either inactivity or a maximum time, all changes will be immediately + /// seen at once + Timeout, + /// The key will be removed from the cache, and thus the next read will cause a cache miss, + /// and the value will be loaded from disk + OnRead, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum LypheDBCommand { + SetKeys(KVList, PropagationStrategy), + GetKeys(KeyList), + CountKeys(KeyDirectory), + /// NOT RECURSIVE + GetKeyDirectory(KeyDirectory), + DeleteKeys(KeyList), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct KVList { + pub kvs: Vec<(Vec, Vec)>, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct KeyList { + pub keys: Vec>, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct KeyDirectory { + pub key: Vec, +} \ No newline at end of file diff --git a/lyphedb/src/main.rs b/lyphedb/src/main.rs new file mode 100644 index 0000000..8c55848 --- /dev/null +++ b/lyphedb/src/main.rs @@ -0,0 +1,248 @@ +/* +keys are stored on disk like this +//.self + +so if i stored the key "/this/is/a/key" with the data "hello world", it'd look like this + +/this/is/a/key/.self -> "hello world" + */ + +use crate::config::{LypheDBConfig, load_config}; +use crate::dbimpl::{KEY_ACCESS_COUNTER, PRIMARY_CACHE, SECONDARY_CACHE}; +use futures::StreamExt; +use log::{debug, error, info, warn}; +use lyphedb::{KVList, LDBNatsMessage, LypheDBCommand}; + +mod config; +mod dbimpl; + +pub async fn gc_thread(config: LypheDBConfig) { + loop { + tokio::time::sleep(std::time::Duration::from_secs(61)).await; + { + let cache = PRIMARY_CACHE.read().await; + let cache_size = cache.len() * config.avg_entry_size; + if cache_size > config.gc_limit { + debug!("gc triggered, cache size: {} bytes", cache_size); + let keycount_to_remove = cache.len() - config.gc_limit / config.avg_entry_size; + drop(cache); + let mut least_freq_keys = vec![]; + for (key, count) in KEY_ACCESS_COUNTER.read().await.iter() { + let count = count.load(std::sync::atomic::Ordering::Relaxed); + if least_freq_keys.len() < keycount_to_remove { + least_freq_keys.push((key.clone(), count)); + } else { + for (other, oc) in least_freq_keys.iter_mut() { + if count < *oc { + *other = key.clone(); + *oc = count; + break; + } + } + } + } + let mut cache = SECONDARY_CACHE.write().await; + for (key, _) in least_freq_keys.iter() { + cache.remove(key); + } + let mut primary_cache = PRIMARY_CACHE.write().await; + *primary_cache = cache.clone(); + debug!( + "gc finished, cache size: {} bytes", + primary_cache.len() * config.avg_entry_size + ); + } + } + } +} + +pub async fn precache_keys_until_limit(config: LypheDBConfig) { + info!("precache started"); + let mut precache_count = 0; + let mut precache_stack = dbimpl::get_keys_under_keydir_disk(&config, b"") + .await + .expect("couldn't get root keys"); + while let Some(precache_key) = precache_stack.pop() { + { + let cache = PRIMARY_CACHE.read().await; + if cache.len() * config.avg_entry_size > config.gc_limit { + break; + } + if cache.len() % 10000 == 0 { + info!("precache size: {} mb", (cache.len() * config.avg_entry_size) / (1024*1024)); + } + } + let children = dbimpl::get_keys_under_keydir_disk(&config, &precache_key) + .await + .expect("couldn't get children of key"); + if !children.is_empty() { + precache_stack.extend(children); + } else { + let _value = dbimpl::get_key(config.clone(), &precache_key) + .await + .expect("couldn't get value of key"); + precache_count += 1; + } + } + info!("precache finished, {} values precached", precache_count); +} + +#[tokio::main] +async fn main() { + let config = load_config(&std::env::args().nth(1).expect("please specify config file")); + let logger = env_logger::builder() + .filter_level(match config.log.as_str() { + "debug" => log::LevelFilter::Debug, + "info" => log::LevelFilter::Info, + "warn" => log::LevelFilter::Warn, + "error" => log::LevelFilter::Error, + _ => unreachable!(), + }) + .build(); + log::set_boxed_logger(Box::new(logger)).unwrap(); + log::set_max_level(match config.log.as_str() { + "debug" => log::LevelFilter::Debug, + "info" => log::LevelFilter::Info, + "warn" => log::LevelFilter::Warn, + "error" => log::LevelFilter::Error, + _ => unreachable!(), + }); + info!("lyphedb started"); + + let nats = async_nats::ConnectOptions::new() + .add_client_certificate( + config.nats_cert.as_str().into(), + config.nats_key.as_str().into(), + ) + .connect(config.nats_url.as_str()) + .await; + if let Err(e) = nats { + error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e); + return; + } + let nats = nats.unwrap(); + + let mut subscriber = nats + .queue_subscribe(config.name.clone(), "lyphedb".to_string()) + .await + .expect("couldn't subscribe to subject"); + + info!("nats connected"); + + tokio::spawn(precache_keys_until_limit(config.clone())); + tokio::spawn(gc_thread(config.clone())); + + while let Some(msg) = subscriber.next().await { + let config = config.clone(); + let nats = nats.clone(); + tokio::spawn(async move { + async fn bad_request(nats: &async_nats::Client, replyto: async_nats::Subject) { + let reply = rmp_serde::to_vec(&LDBNatsMessage::BadRequest).unwrap(); + if let Err(e) = nats.publish(replyto, reply.into()).await { + warn!("couldn't send reply: {:?}", e); + } + } + + let data = msg.payload.to_vec(); + let data = rmp_serde::from_slice::(&data); + if let Err(e) = data { + warn!("couldn't deserialize message: {:?}", e); + if let Some(replyto) = msg.reply { + bad_request(&nats, replyto).await; + } + return; + } + let data = data.unwrap(); + match data { + LDBNatsMessage::Command(cmd) => match cmd { + LypheDBCommand::SetKeys(kvlist, propstrat) => { + for (key, value) in kvlist.kvs { + if let Err(e) = + dbimpl::set_key(config.clone(), &key, &value, &propstrat).await + { + warn!("couldn't set key: {:?}", e); + } + } + let reply = rmp_serde::to_vec(&LDBNatsMessage::Success).unwrap(); + if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await { + warn!("couldn't send reply: {:?}", e); + } + } + LypheDBCommand::GetKeys(klist) => { + let mut reply = Vec::new(); + for key in klist.keys { + if let Ok(Some(value)) = dbimpl::get_key(config.clone(), &key).await { + reply.push((key, value)); + } else { + warn!("couldn't get key: {:?}", key); + } + } + let reply = + rmp_serde::to_vec(&LDBNatsMessage::Entries(KVList { kvs: reply })) + .unwrap(); + if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await { + warn!("couldn't send reply: {:?}", e); + } + } + LypheDBCommand::CountKeys(keydir) => { + let keys = dbimpl::get_keys_under_keydir_disk(&config, &keydir.key) + .await + .expect("couldn't get keys under keydir"); + let mut count = 0; + for key in keys { + let value = dbimpl::get_key(config.clone(), &key) + .await + .expect("couldn't get value of key"); + if value.is_some() { + count += 1; + } + } + let reply = rmp_serde::to_vec(&LDBNatsMessage::Count(count)).unwrap(); + if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await { + warn!("couldn't send reply: {:?}", e); + } + } + LypheDBCommand::GetKeyDirectory(keydir) => { + let mut reply = Vec::new(); + let keys = dbimpl::get_keys_under_keydir_disk(&config, &keydir.key) + .await + .expect("couldn't get keys under keydir"); + for key in keys { + let value = dbimpl::get_key(config.clone(), &key) + .await + .expect("couldn't get value of key"); + if let Some(value) = value { + reply.push((key, value)); + } + } + let reply = + rmp_serde::to_vec(&LDBNatsMessage::Entries(KVList { kvs: reply })) + .unwrap(); + if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await { + warn!("couldn't send reply: {:?}", e); + } + } + LypheDBCommand::DeleteKeys(klist) => { + for key in klist.keys { + if let Err(e) = dbimpl::delete_key(config.clone(), &key).await { + warn!("couldn't delete key: {:?}", e); + } + } + let reply = rmp_serde::to_vec(&LDBNatsMessage::Success).unwrap(); + if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await { + warn!("couldn't send reply: {:?}", e); + } + } + }, + _ => { + warn!("bad request, not command"); + if let Some(replyto) = msg.reply { + bad_request(&nats, replyto).await; + } + } + } + }); + } + + info!("lyphedb shutting down"); +} diff --git a/vorebot/.gitignore b/vorebot/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/vorebot/.gitignore @@ -0,0 +1 @@ +/target diff --git a/vorebot/Cargo.toml b/vorebot/Cargo.toml new file mode 100644 index 0000000..00ae58e --- /dev/null +++ b/vorebot/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "vorebot" +version = "0.2.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +asklyphe-common = { path = "../asklyphe-common" } +tokio = { version = "1.0", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +rand = "0.8.5" +rmp-serde = "1.1.2" +base64 = "0.21.7" +image = "0.24.8" +isahc = "1.7.2" +ulid = "1.0.0" +async-nats = "0.38.0" +futures = "0.3.28" +chrono = "0.4.26" +once_cell = "1.18.0" +env_logger = "0.10.0" +log = "0.4.19" +mutex-timeouts = { version = "0.3.0", features = ["tokio"] } +prometheus_exporter = "0.8.5" +thirtyfour = "0.35.0" +stopwords = "0.1.1" +texting_robots = "0.2.2" \ No newline at end of file diff --git a/vorebot/src/main.rs b/vorebot/src/main.rs new file mode 100644 index 0000000..2c1db87 --- /dev/null +++ b/vorebot/src/main.rs @@ -0,0 +1,352 @@ +mod webparse; + +use asklyphe_common::db; +use asklyphe_common::nats::vorebot::{ + VOREBOT_NEWHOSTNAME_SERVICE, VOREBOT_SERVICE, VOREBOT_SUGGESTED_SERVICE, +}; +use async_nats::jetstream; +use async_nats::jetstream::consumer::PullConsumer; +use async_nats::jetstream::stream::RetentionPolicy; +use chrono::TimeZone; +use futures::StreamExt; +use log::{debug, error, info, warn}; +use mutex_timeouts::tokio::MutexWithTimeoutAuto as Mutex; +use once_cell::sync::Lazy; +use prometheus_exporter::prometheus::core::{AtomicF64, GenericGauge}; +use prometheus_exporter::prometheus::{register_counter, register_gauge, Counter}; +use std::cmp::max; +use std::collections::{BTreeMap, BTreeSet}; +use std::hash::{DefaultHasher, Hasher}; +use std::io::Read; +use std::iter::Iterator; +use std::str::FromStr; +use std::string::ToString; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use async_nats::jetstream::kv; +use stopwords::{Language, Spark, Stopwords}; +use thirtyfour::{CapabilitiesHelper, DesiredCapabilities, Proxy, WebDriver}; +use thirtyfour::common::capabilities::firefox::FirefoxPreferences; +use tokio::task::JoinHandle; +use asklyphe_common::ldb::DBConn; +use asklyphe_common::nats::vorebot::CrawlRequest; +use crate::webparse::web_parse; + +pub static NATS_URL: Lazy = + Lazy::new(|| std::env::var("NATS_URL").expect("NO NATS URL DEFINED")); +pub static NATS_CERT: Lazy = Lazy::new(|| std::env::var("NATS_CERT").expect("NO NATS_CERT DEFINED")); +pub static NATS_KEY: Lazy = Lazy::new(|| std::env::var("NATS_KEY").expect("NO NATS_KEY DEFINED")); +pub static BROWSER_THREADS: Lazy> = + Lazy::new(|| std::env::var("BROWSER_THREADS").expect("PLEASE LIST BROWSER_THREADS").split(',').map(|v| v.to_string()).collect()); +pub static DB_NAME: Lazy = + Lazy::new(|| std::env::var("DB_NAME").expect("PLEASE ADD DB_NAME")); + +pub static DOCUMENTS_CRAWLED: AtomicU64 = AtomicU64::new(0); +pub static LAST_MESSAGE: AtomicI64 = AtomicI64::new(0); + +pub static LAST_TASK_COMPLETE: Lazy>> = Lazy::new(|| { + let max_threads: usize = BROWSER_THREADS.len(); + let mut vals = vec![]; + for i in 0..max_threads { + // let db = Database::default().expect("couldn't connect to foundation db!"); + // DBS.lock().await.push(Arc::new(db)); + vals.push(Arc::new(AtomicI64::new(chrono::Utc::now().timestamp()))); + } + vals +}); + +pub static USER_AGENT: Lazy = Lazy::new(|| { + format!( + "Vorebot/{} (compatible; Googlebot/2.1; +https://voremicrocomputers.com/crawler.html)", + env!("CARGO_PKG_VERSION") + ) +}); + +#[tokio::main] +async fn main() { + mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed); + + env_logger::init(); + info!("began at {}", chrono::Utc::now().to_string()); + + let nats = async_nats::ConnectOptions::new() + .add_client_certificate(NATS_CERT.as_str().into(), NATS_KEY.as_str().into()) + .connect(NATS_URL.as_str()) + .await; + if let Err(e) = nats { + error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e); + return; + } + let nats = nats.unwrap(); + let dbconn = DBConn::new(nats.clone(), DB_NAME.to_string()); + let nats = jetstream::new(nats); + + let mut tasks: Vec<(JoinHandle<()>, String)> = vec![]; + let mut available_browsers: Vec = BROWSER_THREADS.clone(); + + { + loop { + while tasks.len() < BROWSER_THREADS.len() { + let nats = nats.clone(); + let browser = available_browsers.pop().expect("NO BROWSERS LEFT, THIS IS A FATAL BUG!"); + let db = dbconn.clone(); + let b = browser.clone(); + tasks.push((tokio::spawn(async move { + let browser = b; + info!("using {}", browser); + info!("crawler spawned"); + + /* normal priority */ + let consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config { + name: VOREBOT_SERVICE.to_string(), + subjects: vec![VOREBOT_SERVICE.to_string()], + retention: RetentionPolicy::WorkQueue, + ..Default::default() + }).await + .expect("FATAL! FAILED TO SUBSCRIBE TO NATS!") + .get_or_create_consumer("parser", jetstream::consumer::pull::Config { + durable_name: Some("parser".to_string()), + filter_subject: VOREBOT_SERVICE.to_string(), + ..Default::default() + }).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); + let mut messages = consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); + + /* higher priority (new hostnames) */ + let higher_consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config { + name: VOREBOT_NEWHOSTNAME_SERVICE.to_string(), + subjects: vec![VOREBOT_NEWHOSTNAME_SERVICE.to_string()], + retention: RetentionPolicy::WorkQueue, + ..Default::default() + }).await + .expect("FATAL! FAILED TO SUBSCRIBE TO NATS!") + .get_or_create_consumer("highparser", jetstream::consumer::pull::Config { + durable_name: Some("highparser".to_string()), + filter_subject: VOREBOT_NEWHOSTNAME_SERVICE.to_string(), + ..Default::default() + }).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); + let mut high_messages = higher_consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); + + /* highest priority (user-suggested) */ + let highest_consumer: PullConsumer = nats.get_or_create_stream(jetstream::stream::Config { + name: VOREBOT_SUGGESTED_SERVICE.to_string(), + subjects: vec![VOREBOT_SUGGESTED_SERVICE.to_string()], + retention: RetentionPolicy::WorkQueue, + ..Default::default() + }).await + .expect("FATAL! FAILED TO SUBSCRIBE TO NATS!") + .get_or_create_consumer("highestparser", jetstream::consumer::pull::Config { + durable_name: Some("highestparser".to_string()), + filter_subject: VOREBOT_SUGGESTED_SERVICE.to_string(), + ..Default::default() + }).await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); + let mut highest_messages = highest_consumer.messages().await.expect("FATAL! FAILED TO SUBSCRIBE TO NATS!"); + + let mut prefs = FirefoxPreferences::new(); + prefs.set_user_agent(USER_AGENT.to_string()).unwrap(); + let mut caps = DesiredCapabilities::firefox(); + caps.set_preferences(prefs).unwrap(); + let driver = WebDriver::new(&browser, caps).await.unwrap(); + info!("crawler ready"); + + loop { + tokio::select! { + Some(highest) = StreamExt::next(&mut highest_messages) => { + if let Err(e) = highest { + warn!("error when recv js message! {e}"); + } else { + let message = highest.unwrap(); + if let Err(e) = message.ack().await { + warn!("failed acking message {e}"); + } + let req = rmp_serde::from_slice::(message.payload.as_ref()); + if let Err(e) = req { + error!("BAD NATS REQUEST: {e}"); + continue; + } + let req = req.unwrap(); + info!("RECV USER SUGGESTION!"); + let now = chrono::Utc::now().timestamp(); + LAST_MESSAGE.store(now, Ordering::Relaxed); + let nats = nats.clone(); + + let mut bad = false; + + driver.in_new_tab(|| async { + if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() { + warn!("temporary failure detected in parsing, requeuing"); + nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE"); + bad = true; + } + Ok(()) + }).await.unwrap(); + + if bad { + continue; + } + + if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 { + DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed); + info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed)); + } + } + } + Some(high) = StreamExt::next(&mut high_messages) => { + if let Err(e) = high { + warn!("error when recv js message! {e}"); + } else { + let message = high.unwrap(); + if let Err(e) = message.ack().await { + warn!("failed acking message {e}"); + } + let req = rmp_serde::from_slice::(message.payload.as_ref()); + if let Err(e) = req { + error!("BAD NATS REQUEST: {e}"); + continue; + } + let req = req.unwrap(); + info!("RECV HIGH PRIORITY!"); + let now = chrono::Utc::now().timestamp(); + LAST_MESSAGE.store(now, Ordering::Relaxed); + let nats = nats.clone(); + let mut bad = false; + + driver.in_new_tab(|| async { + if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() { + warn!("temporary failure detected in parsing, requeuing"); + nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE"); + bad = true; + } + Ok(()) + }).await.unwrap(); + + if bad { + continue; + } + + if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 { + DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed); + info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed)); + } + } + } + Some(normal) = StreamExt::next(&mut messages) => { + if let Err(e) = normal { + warn!("error when recv js message! {e}"); + } else { + let message = normal.unwrap(); + if let Err(e) = message.ack().await { + warn!("failed acking message {e}"); + } + let req = rmp_serde::from_slice::(message.payload.as_ref()); + if let Err(e) = req { + error!("BAD NATS REQUEST: {e}"); + continue; + } + let req = req.unwrap(); + let now = chrono::Utc::now().timestamp(); + LAST_MESSAGE.store(now, Ordering::Relaxed); + let nats = nats.clone(); + + let mut hash = DefaultHasher::new(); + hash.write(req.url.as_bytes()); + let hash = hash.finish(); + + let dehomo_bucket = nats.get_key_value("dehomo").await; + let dehomo_bucket = if dehomo_bucket.is_err() { + let dehomo_bucket = nats.create_key_value(kv::Config { + bucket: "dehomo".to_string(), + description: "prevent the same url from being scraped again too quickly".to_string(), + max_age: Duration::from_secs(60*60), + ..Default::default() + }).await; + if let Err(e) = dehomo_bucket { + panic!("FAILED TO CREATE DEHOMO BUCKET!!! {e}"); + } else { + dehomo_bucket.unwrap() + } + } else { + dehomo_bucket.unwrap() + }; + if dehomo_bucket.get(hash.to_string()).await.ok().flatten().map(|v| *v.first().unwrap_or(&0) == 1).unwrap_or(false) { + info!("too soon to scrape {}", req.url); + continue; + } + + let mut bad = false; + + driver.in_new_tab(|| async { + if web_parse(nats.clone(), db.clone(), &driver, &req.url, req.damping).await.is_err() { + warn!("temporary failure detected in parsing, requeuing"); + nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&req).unwrap().into()).await.expect("FAILED TO REQUEUE"); + bad = true; + } + Ok(()) + }).await.unwrap(); + + if bad { + continue; + } + + dehomo_bucket.put(hash.to_string(), vec![1u8].into()).await.expect("failed to store dehomo"); + + if DOCUMENTS_CRAWLED.load(Ordering::Relaxed) % 100 == 0 { + DOCUMENTS_CRAWLED.fetch_add(1, Ordering::Relaxed); + info!("crawled {} pages!", DOCUMENTS_CRAWLED.load(Ordering::Relaxed)); + } + } + } + } + } + }), + browser + )); + warn!("spawning new injest thread"); + } + let mut tasks_to_remove = vec![]; + for task in tasks.iter() { + if task.0.is_finished() { + tasks_to_remove.push(task.1.clone()); + available_browsers.push(task.1.clone()); + } + } + tasks.retain(|v| !tasks_to_remove.contains(&v.1)); + tokio::time::sleep(Duration::from_secs(3)).await; + + //while let Some(p) = initial.pop() { + // nats.publish(VOREBOT_SUGGESTED_SERVICE.to_string(), rmp_serde::to_vec(&p).unwrap().into()).await.unwrap(); + //} + } + } +} + +//#[tokio::main] +//async fn main() { +// mutex_timeouts::tokio::GLOBAL_TOKIO_TIMEOUT.store(72, Ordering::Relaxed); +// +// env_logger::init(); +// info!("began at {}", chrono::Utc::now().to_string()); +// +// let nats = async_nats::connect(NATS_URL.as_str()).await; +// if let Err(e) = nats { +// error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e); +// return; +// } +// let nats = nats.unwrap(); +// +// let dbconn = DBConn::new(nats.clone(), "lyphedb-test"); +// +// let nats = jetstream::new(nats); +// +// let mut prefs = FirefoxPreferences::new(); +// prefs.set_user_agent(USER_AGENT.to_string()).unwrap(); +// let mut caps = DesiredCapabilities::firefox(); +// caps.set_preferences(prefs).unwrap(); +// let driver = WebDriver::new(&BROWSER_THREADS[0], caps).await.unwrap(); +// +// driver.in_new_tab(|| async { +// web_parse(nats.clone(), dbconn.clone(), &driver, "https://asklyphe.com/", 0.85).await.expect("Failed to run web parse"); +// +// Ok(()) +// }).await.unwrap(); +//} diff --git a/vorebot/src/webparse/mod.rs b/vorebot/src/webparse/mod.rs new file mode 100644 index 0000000..d6d6580 --- /dev/null +++ b/vorebot/src/webparse/mod.rs @@ -0,0 +1,479 @@ +use crate::USER_AGENT; +use asklyphe_common::ldb::{linkstore, metastore, sitestore, titlestore, wordstore, DBConn}; +use async_nats::jetstream; +use async_nats::jetstream::kv; +use futures::AsyncReadExt; +use image::EncodableLayout; +use isahc::config::RedirectPolicy; +use isahc::prelude::Configurable; +use isahc::HttpClient; +use log::{debug, error, warn}; +use std::collections::{BTreeMap, BTreeSet}; +use std::hash::{DefaultHasher, Hasher}; +use std::sync::atomic::AtomicBool; +use std::sync::{mpsc, Arc}; +use std::time::Duration; +use stopwords::{Language, Spark, Stopwords}; +use texting_robots::{get_robots_url, Robot}; +use thirtyfour::{By, WebDriver}; +use asklyphe_common::nats::vorebot::CrawlRequest; +use asklyphe_common::nats::vorebot::VOREBOT_SERVICE; + +pub fn allowed_to_crawl(robotstxt: &[u8], url: &str) -> Result { + let robot1 = Robot::new("Vorebot", robotstxt); + if let Err(e) = robot1 { + warn!( + "potentially malformed robots.txt ({}), not crawling {}", + e, url + ); + return Err(()); + } + let robot1 = robot1.unwrap(); + Ok(robot1.allowed(url)) +} + +// returns Err if we cannot access a page, but the error associated with it seems temporary (i.e. it's worth trying again later) +// otherwise, returns Ok +pub async fn web_parse( + nats: jetstream::Context, + db: DBConn, + driver: &WebDriver, + url: &str, + damping: f64, +) -> Result<(), ()> { + + driver.delete_all_cookies().await.map_err(|_| ())?; + let robots_bucket = nats.get_key_value("robots").await; + let robots_bucket = if robots_bucket.is_err() { + let robots_bucket = nats + .create_key_value(kv::Config { + bucket: "robots".to_string(), + description: "storage of robots.txt data for given hosts".to_string(), + ..Default::default() + }) + .await; + if let Err(e) = robots_bucket { + error!("could not create robots.txt bucket: {}", e); + None + } else { + Some(robots_bucket.unwrap()) + } + } else { + robots_bucket.ok() + }; + let hosts_bucket = nats.get_key_value("hosts").await; + let hosts_bucket = if hosts_bucket.is_err() { + let hosts_bucket = nats + .create_key_value(kv::Config { + bucket: "hosts".to_string(), + description: "prevent the same host from being scraped too quickly".to_string(), + max_age: Duration::from_secs(60 * 5), + ..Default::default() + }) + .await; + if let Err(e) = hosts_bucket { + error!("could not create hosts bucket: {}", e); + return Err(()); + } else { + hosts_bucket.unwrap() + } + } else { + hosts_bucket.unwrap() + }; + + let robots_url = get_robots_url(url); + if robots_url.is_err() { + error!("could not get a robots.txt url from {}, not crawling", url); + return Ok(()); + } + let robots_url = robots_url.unwrap(); + let mut hash = DefaultHasher::new(); + hash.write(robots_url.as_bytes()); + let hash = hash.finish(); + + if let Ok(Some(host)) = hosts_bucket.get(hash.to_string()).await { + let count = *host.first().unwrap_or(&0); + if count > 100 { + warn!("scraping {} too quickly, avoiding for one minute", robots_url); + return Err(()); + } + hosts_bucket.put(hash.to_string(), vec![count + 1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!"); + } else { + hosts_bucket.put(hash.to_string(), vec![1].into()).await.expect("COULDN'T INSERT INTO HOSTS BUCKET!"); + } + + let mut skip_robots_check = false; + if let Some(robots_bucket) = &robots_bucket { + if let Ok(Some(entry)) = robots_bucket.get(hash.to_string()).await { + if let Ok(res) = allowed_to_crawl(entry.as_bytes(), url) { + if !res { + debug!("robots.txt does not allow us to crawl {}", url); + return Ok(()); + } else { + skip_robots_check = true; + } + } + } + } + + if !skip_robots_check { + // check manually + debug!("checking new robots.txt \"{}\"", robots_url); + let client = HttpClient::builder() + .redirect_policy(RedirectPolicy::Limit(10)) + .timeout(Duration::from_secs(60)) + .build(); + if let Err(e) = client { + error!("could not create new robots.txt httpclient: {}", e); + return Err(()); + } + let client = client.unwrap(); + let request = isahc::Request::get(&robots_url) + .header("user-agent", USER_AGENT.as_str()) + .body(()); + if let Err(e) = request { + error!("could not create robots.txt get request: {}", e); + return Ok(()); + } + let request = request.unwrap(); + let response = client.send_async(request).await; + if let Err(e) = response { + warn!("could not get robots.txt page: {}", e); + return Err(()); + } + let mut response = response.unwrap(); + if response.status() == 429 { + // too many requests + warn!("too many requests for {}", robots_url); + return Err(()); + } + if response.status().is_server_error() { + // don't crawl at the moment + debug!("not crawling {} due to server error", robots_url); + return Err(()); + } + + let mut body = "".to_string(); + if let Err(e) = response.body_mut().read_to_string(&mut body).await { + warn!("could not read from robots.txt response: {}", e); + return Err(()); + } + + if let Ok(res) = allowed_to_crawl(body.as_bytes(), url) { + if let Some(robots_bucket) = &robots_bucket { + if let Err(e) = robots_bucket + .put(hash.to_string(), body.as_bytes().to_vec().into()) + .await + { + warn!("could not put robots.txt data: {}", e); + } + } + + if !res { + debug!("robots.txt does not allow us to crawl {}", url); + return Ok(()); + } else { + // we're allowed to crawl! + } + } + } + + let start = std::time::Instant::now(); + debug!("handling request for {}", url); + + // check for bad status codes + // fixme: i hate this solution, can we get something that actually checks the browser's request? + let client = HttpClient::builder() + .redirect_policy(RedirectPolicy::Limit(10)) + .timeout(Duration::from_secs(60)) + .build(); + if let Err(e) = client { + error!("could not create new badstatuscode httpclient: {}", e); + return Err(()); + } + let client = client.unwrap(); + let request = isahc::Request::get(url) + .header("user-agent", USER_AGENT.as_str()) + .body(()); + if let Err(e) = request { + error!("could not create badstatuscode get request: {}", e); + return Ok(()); + } + let request = request.unwrap(); + let response = client.send_async(request).await; + if let Err(e) = response { + warn!("could not get badstatuscode page: {}", e); + return Err(()); + } + let mut response = response.unwrap(); + if response.status() == 429 { + // too many requests + warn!("too many requests for {}", url); + return Err(()); + } + if response.status().is_server_error() || response.status().is_client_error() { + // don't crawl at the moment + debug!("not crawling {} due to bad status code {}", url, response.status()); + return Err(()); + } + + // i guess we're good + driver.goto(url).await.map_err(|_| ())?; + + let meta_elements = driver.find_all(By::Tag("meta")).await.map_err(|_| ())?; + + let title = driver.title().await.map_err(|_| ())?; + let mut description = None; + let mut keywords = vec![]; + for elem in meta_elements { + if let Ok(Some(name)) = elem.attr("name").await { + match name.as_str() { + "description" => { + if let Ok(Some(content)) = elem.attr("content").await { + description = Some(content); + } + } + "keywords" => { + if let Ok(Some(content)) = elem.attr("content").await { + keywords = content + .split(',') + .map(|v| v.to_lowercase()) + .filter(|v| !v.is_empty()) + .collect(); + } + } + _ => {} + } + } + } + + let body = driver.find(By::Tag("body")).await.map_err(|_| ())?; + let raw_page_content = body.text().await.map_err(|_| ())?; + + async fn gather_elements_with_multiplier( + driver: &WebDriver, + wordmap: &mut BTreeMap, + stops: &BTreeSet<&&str>, + elements: &[&str], + multiplier: f64, + ) { + let mut elms = vec![]; + for tag in elements { + elms.push(driver.find_all(By::Tag(*tag)).await); + } + let elms = elms.iter().flatten().flatten().collect::>(); + let mut sentences = vec![]; + let mut sentence_set = BTreeSet::new(); + + debug!("processing elements..."); + for node in elms { + let _ = node.scroll_into_view().await; + let boxmodel = node.rect().await; + if boxmodel.is_err() { + // not visible + continue; + } + let boxmodel = boxmodel.unwrap(); + let current_text = node.text().await; + if current_text.is_err() { + // no text on this node + continue; + } + let current_text = current_text.unwrap().trim().to_string(); + if current_text.is_empty() { + continue; + } + let sqs = (boxmodel.width * boxmodel.height).max(1.0); // no 0 divides pls (: + let ccount = current_text.chars().count() as f64; + let cssq = if ccount > 0.0 { sqs / ccount } else { 0.0 }; + if sentence_set.contains(¤t_text) { + continue; + } + sentence_set.insert(current_text.clone()); + sentences.push((current_text, cssq)); + } + + for (sentence, cssq) in sentences { + let mut cssq = (cssq / 500.0).powi(2) * multiplier; + for word in sentence.split_whitespace() { + let word = word + .to_lowercase() + .trim_end_matches(|v: char| v.is_ascii_punctuation()) + .to_string(); + if stops.contains(&word.as_str()) { + // less valuable + cssq /= 100.0; + } + if let Some(wentry) = wordmap.get_mut(&word) { + *wentry += cssq; + } else { + if word.is_empty() { + continue; + } + wordmap.insert(word.to_string(), cssq); + } + } + } + } + + let mut wordmap: BTreeMap = BTreeMap::new(); + let stops: BTreeSet<_> = Spark::stopwords(Language::English) + .unwrap() + .iter() + .collect(); + + debug!("headers..."); + gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["h1","h2","h3","h4","h5","h6"], 3.0) + .await; + + debug!("paragraphs..."); + gather_elements_with_multiplier(driver, &mut wordmap, &stops, &["p","div"], 1.0).await; + + let mut wordmap = wordmap.into_iter().collect::>(); + wordmap.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap()); + + let mut db_error_so_requeue_anyways = false; + + let words = wordmap + .iter() + .map(|(word, _)| word.as_str()) + .collect::>(); + #[allow(clippy::collapsible_if)] + if !words.is_empty() { + if wordstore::add_url_to_keywords(&db, &words, url) + .await + .is_err() + { + warn!("couldn't add {} to keywords!", url); + db_error_so_requeue_anyways = true; + } + } + + let mut metawords = keywords.iter().map(|v| v.as_str()).collect::>(); + let desc2 = description.clone(); + let desc2 = desc2.map(|v| { + v.to_lowercase() + .split_whitespace() + .map(String::from) + .collect::>() + }); + if let Some(description) = &desc2 { + for word in description { + let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation()); + if word.is_empty() { + continue; + } + metawords.push(word); + } + } + #[allow(clippy::collapsible_if)] + if !metawords.is_empty() { + if metastore::add_url_to_metawords(&db, &metawords, url) + .await + .is_err() + { + warn!("couldn't add {} to metawords!", url); + db_error_so_requeue_anyways = true; + } + } + + let mut titlewords = vec![]; + let title2 = title.clone(); + let title2 = title2.to_lowercase(); + for word in title2.split_whitespace() { + let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation()); + if word.is_empty() { + continue; + } + titlewords.push(word); + } + #[allow(clippy::collapsible_if)] + if !titlewords.is_empty() { + if titlestore::add_url_to_titlewords(&db, &titlewords, url) + .await + .is_err() + { + warn!("couldn't add {} to titlewords!", url); + db_error_so_requeue_anyways = true; + } + } + + if sitestore::add_website( + &db, + url, + Some(title), + description, + if keywords.is_empty() { + None + } else { + Some(keywords) + }, + &wordmap, + raw_page_content, + damping + ) + .await + .is_err() + { + warn!("couldn't add {} to sitestore!", url); + db_error_so_requeue_anyways = true; + } + + debug!("finished with main site stuff for {}", url); + + let linkelms = driver.find_all(By::Tag("a")).await.map_err(|_| ())?; + + for linkelm in linkelms { + if linkelm.scroll_into_view().await.is_err() { + debug!("couldn't scroll into view!"); + } + let href = linkelm.prop("href").await.map_err(|_| ())?; + if href.is_none() { + debug!("no href!"); + continue; + } + let href = href.unwrap(); + if href.contains('#') { + continue; + } + let linktext = linkelm.text().await.map_err(|_| ())?.to_lowercase(); + let linkimgs = linkelm.find_all(By::Tag("img")).await.map_err(|_| ())?; + let mut alts = "".to_string(); + for img in linkimgs { + if let Ok(Some(alt)) = img.attr("alt").await { + alts.push_str(&alt); + alts.push(' '); + } + } + let alts = alts.trim().to_lowercase(); + let mut linkwords = vec![]; + for word in linktext.split_whitespace() { + let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation()); + linkwords.push(word); + } + for word in alts.split_whitespace() { + let word = word.trim_end_matches(|v: char| v.is_ascii_punctuation()); + linkwords.push(word); + } + + #[allow(clippy::collapsible_if)] + if !linkwords.is_empty() { + if linkstore::add_url_to_linkwords(&db, &linkwords, &href).await.is_err() { + warn!("couldn't add {} to linkwords!", url); + } + } + + nats.publish(VOREBOT_SERVICE.to_string(), rmp_serde::to_vec(&CrawlRequest { + url: href, + damping: 0.85, + }).unwrap().into()).await.unwrap(); + } + + let elapsed = start.elapsed().as_secs_f64(); + + debug!("crawled {} in {} seconds", url, elapsed); + + // todo: queue links to be crawled + + Ok(()) +}