use crate::config::load_config; use lyphedb::{KVList, KeyList, LDBNatsMessage, LypheDBCommand, PropagationStrategy}; use std::collections::BTreeMap; use async_nats::Message; use futures::StreamExt; use tokio::sync::mpsc; mod config; #[tokio::main] async fn main() { let config = load_config(&std::env::args().nth(1).expect("please specify config file")); let nats = async_nats::ConnectOptions::new() .add_client_certificate( config.nats_cert.as_str().into(), config.nats_key.as_str().into(), ) .connect(config.nats_url.as_str()) .await; if let Err(e) = nats { eprintln!("FATAL ERROR, COULDN'T CONNECT TO NATS: {}", e); return; } let nats = nats.unwrap(); // test 1: create 10000 keys, send in chunks of 10 println!("test 1: create 10_000 keys, send in chunks of 100"); let (key_tx, mut key_rx) = mpsc::unbounded_channel(); let start = std::time::Instant::now(); let mut tasks = vec![]; for _ in 0..(10_000 / 100) { let name = config.name.clone(); let nats = nats.clone(); let key_tx = key_tx.clone(); tasks.push(tokio::spawn(async move { let mut keys = BTreeMap::new(); for _ in 0..100 { let key = rand::random::<[u8; 16]>(); let data = rand::random::<[u8; 16]>(); key_tx.send((key, data)).unwrap(); keys.insert(key, data); } let data = rmp_serde::to_vec(&LDBNatsMessage::Command(LypheDBCommand::SetKeys( KVList { kvs: keys .into_iter() .map(|(k, v)| (k.to_vec(), v.to_vec())) .collect(), }, PropagationStrategy::Immediate, ))) .unwrap(); let replyto_sub = "lyphedb_test_1".to_string(); let mut subscriber = nats.subscribe(replyto_sub.clone()).await.unwrap(); nats.publish_with_reply(name.clone(), replyto_sub, data.into()).await.unwrap(); if let Some(reply) = subscriber.next().await { let reply = rmp_serde::from_slice::(&reply.payload).unwrap(); if let LDBNatsMessage::Success = reply {} else { eprintln!("failure"); } } })); } for task in tasks { task.await.unwrap(); } let end = std::time::Instant::now(); println!("test 1: {}ms", end.duration_since(start).as_millis()); let mut our_copy = BTreeMap::new(); let mut buffer = vec![]; key_rx.recv_many(&mut buffer, 10_000).await; for (k, v) in buffer { our_copy.insert(k, v); } // test 2: read back all keys and check for accuracy println!("test 2: read back all keys and check for accuracy"); let kv: Vec<_> = our_copy.into_iter().map(|(k, v)| (k.to_vec(), v.to_vec())).collect(); let start = std::time::Instant::now(); let mut tasks = vec![]; for i in 0..100 { let batch = kv[i * 100..((i + 1) * 100).min(kv.len())].to_vec(); let name = config.name.clone(); let nats = nats.clone(); tasks.push(tokio::spawn(async move { let data = rmp_serde::to_vec(&LDBNatsMessage::Command(LypheDBCommand::GetKeys( KeyList { keys: batch.iter().map(|(k, _)| k.to_vec()).collect() } ))).unwrap(); let replyto_sub = format!("lyphedb_test_2_{}", i); let mut subscriber = nats.subscribe(replyto_sub.clone()).await.unwrap(); nats.publish_with_reply(name.clone(), replyto_sub, data.into()).await.unwrap(); if let Some(reply) = subscriber.next().await { let reply = rmp_serde::from_slice::(&reply.payload).unwrap(); if let LDBNatsMessage::Entries(kvlist) = reply { // check all keys for (k1, v1) in batch { let v2 = kvlist.kvs.iter().find(|(k, v)| k == &k1).expect("key not found"); assert_eq!(v1, v2.1); } } else { eprintln!("failure"); } } })); } for task in tasks { task.await.unwrap(); } let end = std::time::Instant::now(); println!("test 2: {}ms", end.duration_since(start).as_millis()); }