/* keys are stored on disk like this //.self so if i stored the key "/this/is/a/key" with the data "hello world", it'd look like this /this/is/a/key/.self -> "hello world" */ use crate::config::{LypheDBConfig, load_config}; use crate::dbimpl::{KEY_ACCESS_COUNTER, PRIMARY_CACHE, SECONDARY_CACHE}; use futures::StreamExt; use log::{debug, error, info, warn}; use lyphedb::{KVList, LDBNatsMessage, LypheDBCommand}; mod config; mod dbimpl; pub async fn gc_thread(config: LypheDBConfig) { loop { tokio::time::sleep(std::time::Duration::from_secs(61)).await; { let cache = PRIMARY_CACHE.read().await; let cache_size = cache.len() * config.avg_entry_size; if cache_size > config.gc_limit { debug!("gc triggered, cache size: {} bytes", cache_size); let keycount_to_remove = cache.len() - config.gc_limit / config.avg_entry_size; drop(cache); let mut least_freq_keys = vec![]; for (key, count) in KEY_ACCESS_COUNTER.read().await.iter() { let count = count.load(std::sync::atomic::Ordering::Relaxed); if least_freq_keys.len() < keycount_to_remove { least_freq_keys.push((key.clone(), count)); } else { for (other, oc) in least_freq_keys.iter_mut() { if count < *oc { *other = key.clone(); *oc = count; break; } } } } let mut cache = SECONDARY_CACHE.write().await; for (key, _) in least_freq_keys.iter() { cache.remove(key); } let mut primary_cache = PRIMARY_CACHE.write().await; *primary_cache = cache.clone(); debug!( "gc finished, cache size: {} bytes", primary_cache.len() * config.avg_entry_size ); } } } } pub async fn precache_keys_until_limit(config: LypheDBConfig) { info!("precache started"); let mut precache_count = 0; let mut precache_stack = dbimpl::get_keys_under_keydir_disk(&config, b"") .await .expect("couldn't get root keys"); while let Some(precache_key) = precache_stack.pop() { { let cache = PRIMARY_CACHE.read().await; if cache.len() * config.avg_entry_size > config.gc_limit { break; } if cache.len() % 10000 == 0 { info!("precache size: {} mb", (cache.len() * config.avg_entry_size) / (1024*1024)); } } let children = dbimpl::get_keys_under_keydir_disk(&config, &precache_key) .await .expect("couldn't get children of key"); if !children.is_empty() { precache_stack.extend(children); } else { let _value = dbimpl::get_key(config.clone(), &precache_key) .await .expect("couldn't get value of key"); precache_count += 1; } } info!("precache finished, {} values precached", precache_count); } #[tokio::main] async fn main() { let config = load_config(&std::env::args().nth(1).expect("please specify config file")); let logger = env_logger::builder() .filter_level(match config.log.as_str() { "debug" => log::LevelFilter::Debug, "info" => log::LevelFilter::Info, "warn" => log::LevelFilter::Warn, "error" => log::LevelFilter::Error, _ => unreachable!(), }) .build(); log::set_boxed_logger(Box::new(logger)).unwrap(); log::set_max_level(match config.log.as_str() { "debug" => log::LevelFilter::Debug, "info" => log::LevelFilter::Info, "warn" => log::LevelFilter::Warn, "error" => log::LevelFilter::Error, _ => unreachable!(), }); info!("lyphedb started"); let nats = async_nats::ConnectOptions::new() .add_client_certificate( config.nats_cert.as_str().into(), config.nats_key.as_str().into(), ) .connect(config.nats_url.as_str()) .await; if let Err(e) = nats { error!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e); return; } let nats = nats.unwrap(); let mut subscriber = nats .queue_subscribe(config.name.clone(), "lyphedb".to_string()) .await .expect("couldn't subscribe to subject"); info!("nats connected"); tokio::spawn(precache_keys_until_limit(config.clone())); tokio::spawn(gc_thread(config.clone())); while let Some(msg) = subscriber.next().await { let config = config.clone(); let nats = nats.clone(); tokio::spawn(async move { async fn bad_request(nats: &async_nats::Client, replyto: async_nats::Subject) { let reply = rmp_serde::to_vec(&LDBNatsMessage::BadRequest).unwrap(); if let Err(e) = nats.publish(replyto, reply.into()).await { warn!("couldn't send reply: {:?}", e); } } let data = msg.payload.to_vec(); let data = rmp_serde::from_slice::(&data); if let Err(e) = data { warn!("couldn't deserialize message: {:?}", e); if let Some(replyto) = msg.reply { bad_request(&nats, replyto).await; } return; } let data = data.unwrap(); match data { LDBNatsMessage::Command(cmd) => match cmd { LypheDBCommand::SetKeys(kvlist, propstrat) => { for (key, value) in kvlist.kvs { if let Err(e) = dbimpl::set_key(config.clone(), &key, &value, &propstrat).await { warn!("couldn't set key: {:?}", e); } } let reply = rmp_serde::to_vec(&LDBNatsMessage::Success).unwrap(); if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await { warn!("couldn't send reply: {:?}", e); } } LypheDBCommand::GetKeys(klist) => { let mut reply = Vec::new(); for key in klist.keys { if let Ok(Some(value)) = dbimpl::get_key(config.clone(), &key).await { reply.push((key, value)); } else { warn!("couldn't get key: {:?}", key); } } let reply = rmp_serde::to_vec(&LDBNatsMessage::Entries(KVList { kvs: reply })) .unwrap(); if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await { warn!("couldn't send reply: {:?}", e); } } LypheDBCommand::CountKeys(keydir) => { let keys = dbimpl::get_keys_under_keydir_disk(&config, &keydir.key) .await .expect("couldn't get keys under keydir"); let mut count = 0; for key in keys { let value = dbimpl::get_key(config.clone(), &key) .await .expect("couldn't get value of key"); if value.is_some() { count += 1; } } let reply = rmp_serde::to_vec(&LDBNatsMessage::Count(count)).unwrap(); if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await { warn!("couldn't send reply: {:?}", e); } } LypheDBCommand::GetKeyDirectory(keydir) => { let mut reply = Vec::new(); let keys = dbimpl::get_keys_under_keydir_disk(&config, &keydir.key) .await .expect("couldn't get keys under keydir"); for key in keys { let value = dbimpl::get_key(config.clone(), &key) .await .expect("couldn't get value of key"); if let Some(value) = value { reply.push((key, value)); } } let reply = rmp_serde::to_vec(&LDBNatsMessage::Entries(KVList { kvs: reply })) .unwrap(); if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await { warn!("couldn't send reply: {:?}", e); } } LypheDBCommand::DeleteKeys(klist) => { for key in klist.keys { if let Err(e) = dbimpl::delete_key(config.clone(), &key).await { warn!("couldn't delete key: {:?}", e); } } let reply = rmp_serde::to_vec(&LDBNatsMessage::Success).unwrap(); if let Err(e) = nats.publish(msg.reply.unwrap(), reply.into()).await { warn!("couldn't send reply: {:?}", e); } } }, _ => { warn!("bad request, not command"); if let Some(replyto) = msg.reply { bad_request(&nats, replyto).await; } } } }); } info!("lyphedb shutting down"); }