use crate::config::LypheDBConfig; use log::*; use lyphedb::PropagationStrategy; use once_cell::sync::Lazy; use std::borrow::Cow; use std::collections::BTreeMap; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use percent_encoding::{percent_decode_str, percent_encode, AsciiSet, CONTROLS}; use tokio::sync::RwLock; pub const NOT_ALLOWED_ASCII: AsciiSet = CONTROLS.add(b' ').add(b'/').add(b'.').add(b'\\'); pub type LDBCache = BTreeMap>, Arc>>; /// Reads will read from this cache pub static PRIMARY_CACHE: Lazy> = Lazy::new(|| RwLock::new(BTreeMap::new())); /// Writes will be written to this cache, and then depending on the propagation method, the /// Primary cache will be set to a copy pub static SECONDARY_CACHE: Lazy> = Lazy::new(|| RwLock::new(BTreeMap::new())); /// how often are keys accessed? this will influence the garbage collector pub static KEY_ACCESS_COUNTER: Lazy>, AtomicU64>>> = Lazy::new(|| RwLock::new(BTreeMap::new())); pub fn key_to_path(config: &LypheDBConfig, key: &[u8]) -> PathBuf { let mut path = PathBuf::new(); path.push(&config.master); let mut fnbuf = Vec::new(); for (i, byte) in key.iter().enumerate() { if *byte == b'/' { let encoded = percent_encode(&fnbuf, &NOT_ALLOWED_ASCII).to_string(); path.push(encoded); fnbuf.clear(); } else { fnbuf.push(*byte); } } if !fnbuf.is_empty() { let encoded = percent_encode(&fnbuf, &NOT_ALLOWED_ASCII).to_string(); path.push(encoded); fnbuf.clear(); } path.push(".self"); path } #[derive(Debug)] pub enum OperationError { KeyCannotBeEmpty, FilesystemPermissionError, BadFilesystemEntry, } pub async fn set_key_disk( config: &LypheDBConfig, key: &[u8], value: &[u8], ) -> Result<(), OperationError> { if key.is_empty() { return Err(OperationError::KeyCannotBeEmpty); } let path = key_to_path(config, key); let directory = path.parent().ok_or(OperationError::KeyCannotBeEmpty)?; if let Ok(directory_exists) = directory.try_exists() { if !directory_exists { std::fs::create_dir_all(directory).map_err(|e| { warn!("couldn't create directory: {:?}", e); OperationError::FilesystemPermissionError })?; } } else { return Err(OperationError::FilesystemPermissionError); } std::fs::write(path, value).map_err(|e| { warn!("couldn't write file: {:?}", e); OperationError::FilesystemPermissionError })?; Ok(()) } pub async fn delete_key_disk( config: &LypheDBConfig, key: &[u8], ) -> Result<(), OperationError> { if key.is_empty() { return Err(OperationError::KeyCannotBeEmpty); } let path = key_to_path(config, key); if let Ok(exists) = path.try_exists() { if !exists { return Ok(()); } } std::fs::remove_file(path).map_err(|e| { warn!("couldn't remove file: {:?}", e); OperationError::FilesystemPermissionError }) } pub async fn get_key_disk( config: &LypheDBConfig, key: &[u8], ) -> Result>, OperationError> { if key.is_empty() { return Err(OperationError::KeyCannotBeEmpty); } let path = key_to_path(config, key); if let Ok(exists) = path.try_exists() { if !exists { return Ok(None); } } let data = std::fs::read(path).map_err(|e| { warn!("couldn't read file: {:?}", e); OperationError::FilesystemPermissionError })?; Ok(Some(data)) } /// this function allows empty keys, so you can get all keys under root by doing /// get_keys_under_keydir_disk(..., b"") pub async fn get_keys_under_keydir_disk( config: &LypheDBConfig, keydir: &[u8], ) -> Result>, OperationError> { let path = key_to_path(config, keydir); let path = path.parent().expect("bad master"); let mut keys = Vec::new(); for entry in std::fs::read_dir(path).map_err(|e| { warn!("couldn't read directory: {:?}", e); OperationError::FilesystemPermissionError })? { let entry = entry.map_err(|e| { warn!("couldn't read directory entry: {:?}", e); OperationError::FilesystemPermissionError })?; let path = entry.path(); let filename = path .to_str() .ok_or(OperationError::FilesystemPermissionError)?; if filename.ends_with(".self") { // this is a value file, ignore continue; } let filename = filename.trim_start_matches(&config.master); let key = percent_decode_str(filename).collect(); keys.push(key); } Ok(keys) } pub async fn set_key( config: LypheDBConfig, key: &[u8], value: &[u8], strat: &PropagationStrategy, ) -> Result<(), OperationError> { let k1 = Arc::new(key.to_vec()); let v1 = Arc::new(value.to_vec()); let disk_task = { let k1 = k1.clone(); let v1 = v1.clone(); tokio::spawn(async move { set_key_disk(&config, &k1, &v1).await }) }; let prop_task = match strat { PropagationStrategy::Immediate => { let k1 = k1.clone(); let v1 = v1.clone(); tokio::spawn(async move { let mut secondary_cache = SECONDARY_CACHE.write().await; secondary_cache.insert(k1, v1); let mut primary_cache = PRIMARY_CACHE.write().await; *primary_cache = secondary_cache.clone(); }) } PropagationStrategy::Timeout => { let k1 = k1.clone(); let v1 = v1.clone(); tokio::spawn(async move { tokio::spawn(async move { { let mut secondary_cache = SECONDARY_CACHE.write().await; secondary_cache.insert(k1, v1); } tokio::spawn(async move { let start = std::time::Instant::now(); loop { if start.elapsed().as_secs() > 60 { break; } let pc = PRIMARY_CACHE.try_write(); if pc.is_err() { tokio::time::sleep(std::time::Duration::from_secs(10)).await; continue; } else { break; } } let mut primary_cache = PRIMARY_CACHE.write().await; let secondary_cache = SECONDARY_CACHE.read().await; *primary_cache = secondary_cache.clone(); }); }); }) } PropagationStrategy::OnRead => { let k1 = k1.clone(); tokio::spawn(async move { { let mut secondary_cache = SECONDARY_CACHE.write().await; secondary_cache.remove(&k1); } { let mut primary_cache = PRIMARY_CACHE.write().await; primary_cache.remove(&k1); } }) } }; if let Ok(Err(e)) = disk_task.await { error!("couldn't set key on disk: {:?} ({:?})", e, key); // undo propagation prop_task.abort(); { let mut primary_cache = PRIMARY_CACHE.write().await; primary_cache.remove(&k1); } { let mut secondary_cache = SECONDARY_CACHE.write().await; secondary_cache.remove(&k1); } return Err(e); } let _ = prop_task.await; Ok(()) } pub async fn get_key(config: LypheDBConfig, key: &[u8]) -> Result>, OperationError> { let k1 = Arc::new(key.to_vec()); { let k1 = k1.clone(); tokio::spawn(async move { let mut access_counter = KEY_ACCESS_COUNTER.write().await; let counter = access_counter.entry(k1.clone()).or_insert(AtomicU64::new(0)); if counter.load(Ordering::Relaxed) > 10000000 { return; } counter.fetch_add(1, Ordering::SeqCst); }); } let disk_task = { let k1 = k1.clone(); tokio::spawn(async move { get_key_disk(&config, &k1).await }) }; { // read from cache let cache = PRIMARY_CACHE.read().await; if let Some(val) = cache.get(&k1) { disk_task.abort(); return Ok(Some(val.to_vec())); } } //debug!("cache miss"); if let Ok(result) = disk_task.await { if let Ok(Some(val)) = &result { let val = Arc::new(val.to_vec()); tokio::spawn(async move { { let mut cache = SECONDARY_CACHE.write().await; cache.insert(k1.clone(), val.clone()); } { let secondary_cache = SECONDARY_CACHE.read().await; let mut primary_cache = PRIMARY_CACHE.write().await; *primary_cache = secondary_cache.clone(); } //debug!("cache insert"); }); } result } else { Err(OperationError::FilesystemPermissionError) } } pub async fn delete_key(config: LypheDBConfig, key: &[u8]) -> Result<(), OperationError> { let k1 = Arc::new(key.to_vec()); let disk_task = { let k1 = k1.clone(); tokio::spawn(async move { delete_key_disk(&config, &k1).await }) }; let prop_task = { let k1 = k1.clone(); tokio::spawn(async move { { let mut key_access_counter = KEY_ACCESS_COUNTER.write().await; key_access_counter.remove(&k1); } { let mut secondary_cache = SECONDARY_CACHE.write().await; secondary_cache.remove(&k1); } let mut primary_cache = PRIMARY_CACHE.write().await; let secondary_cache = SECONDARY_CACHE.read().await; *primary_cache = secondary_cache.clone(); }) }; prop_task.await.expect("couldn't delete key"); disk_task.await.expect("couldn't delete key") }