feat: refine the CLI tool (#33)

* implement a repl

* remove debug log
This commit is contained in:
Yu Lei
2024-01-30 17:04:25 +08:00
committed by GitHub
parent 83545ab5dc
commit 71342d4384
5 changed files with 449 additions and 95 deletions

View File

@@ -19,6 +19,8 @@ crossbeam-channel = "0.5.11"
serde_json = { version = "1.0" }
serde = { version = "1.0", features = ["derive"] }
farmhash = "1"
nom = "7.1.3"
rustyline = "13.0.0"
[dev-dependencies]
tempfile = "3"

View File

@@ -1,4 +1,6 @@
mod wrapper;
use rustyline::DefaultEditor;
use wrapper::mini_lsm_wrapper;
use anyhow::Result;
@@ -11,6 +13,7 @@ use mini_lsm_wrapper::compact::{
use mini_lsm_wrapper::iterators::StorageIterator;
use mini_lsm_wrapper::lsm_storage::{LsmStorageOptions, MiniLsm};
use std::path::PathBuf;
use std::sync::Arc;
#[derive(Debug, Clone, ValueEnum)]
enum CompactionStrategy {
@@ -33,6 +36,279 @@ struct Args {
serializable: bool,
}
struct ReplHandler {
epoch: u64,
lsm: Arc<MiniLsm>,
}
impl ReplHandler {
fn handle(&mut self, command: &Command) -> Result<()> {
match command {
Command::Fill { begin, end } => {
for i in *begin..=*end {
self.lsm.put(
format!("{}", i).as_bytes(),
format!("value{}@{}", i, self.epoch).as_bytes(),
)?;
}
println!(
"{} values filled with epoch {}",
end - begin + 1,
self.epoch
);
}
Command::Del { key } => {
self.lsm.delete(key.as_bytes())?;
println!("{} deleted", key);
}
Command::Get { key } => {
if let Some(value) = self.lsm.get(key.as_bytes())? {
println!("{}={:?}", key, value);
} else {
println!("{} not exist", key);
}
}
Command::Scan { begin, end } => match (begin, end) {
(None, None) => {
let mut iter = self
.lsm
.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded)?;
let mut cnt = 0;
while iter.is_valid() {
println!(
"{:?}={:?}",
Bytes::copy_from_slice(iter.key()),
Bytes::copy_from_slice(iter.value()),
);
iter.next()?;
cnt += 1;
}
println!();
println!("{} keys scanned", cnt);
}
(Some(begin), Some(end)) => {
let mut iter = self.lsm.scan(
std::ops::Bound::Included(begin.as_bytes()),
std::ops::Bound::Included(end.as_bytes()),
)?;
let mut cnt = 0;
while iter.is_valid() {
println!(
"{:?}={:?}",
Bytes::copy_from_slice(iter.key()),
Bytes::copy_from_slice(iter.value()),
);
iter.next()?;
cnt += 1;
}
println!();
println!("{} keys scanned", cnt);
}
_ => {
println!("invalid command");
}
},
Command::Dump => {
self.lsm.dump_structure();
println!("dump success");
}
Command::Flush => {
self.lsm.force_flush()?;
println!("flush success");
}
Command::FullCompaction => {
self.lsm.force_full_compaction()?;
println!("full compaction success");
}
Command::Quit | Command::Close => std::process::exit(0),
};
self.epoch += 1;
Ok(())
}
}
#[derive(Debug)]
enum Command {
Fill {
begin: u64,
end: u64,
},
Del {
key: String,
},
Get {
key: String,
},
Scan {
begin: Option<String>,
end: Option<String>,
},
Dump,
Flush,
FullCompaction,
Quit,
Close,
}
impl Command {
pub fn parse(input: &str) -> Result<Self> {
use nom::bytes::complete::*;
use nom::character::complete::*;
use nom::branch::*;
use nom::combinator::*;
use nom::sequence::*;
let uint = |i| {
map_res(digit1::<&str, nom::error::Error<_>>, |s: &str| {
s.parse()
.map_err(|_| nom::error::Error::new(s, nom::error::ErrorKind::Digit))
})(i)
};
let string = |i| {
map(take_till1(|c: char| c.is_whitespace()), |s: &str| {
s.to_string()
})(i)
};
let fill = |i| {
map(
tuple((tag_no_case("fill"), space1, uint, space1, uint)),
|(_, _, key, _, value)| Command::Fill {
begin: key,
end: value,
},
)(i)
};
let del = |i| {
map(
tuple((tag_no_case("del"), space1, string)),
|(_, _, key)| Command::Del { key },
)(i)
};
let get = |i| {
map(
tuple((tag_no_case("get"), space1, string)),
|(_, _, key)| Command::Get { key },
)(i)
};
let scan = |i| {
map(
tuple((
tag_no_case("scan"),
opt(tuple((space1, string, space1, string))),
)),
|(_, opt_args)| {
let (begin, end) = opt_args
.map_or((None, None), |(_, begin, _, end)| (Some(begin), Some(end)));
Command::Scan { begin, end }
},
)(i)
};
let command = |i| {
alt((
fill,
del,
get,
scan,
map(tag_no_case("dump"), |_| Command::Dump),
map(tag_no_case("flush"), |_| Command::Flush),
map(tag_no_case("full_compaction"), |_| Command::FullCompaction),
map(tag_no_case("quit"), |_| Command::Quit),
map(tag_no_case("close"), |_| Command::Close),
))(i)
};
command(input)
.map(|(_, c)| c)
.map_err(|e| anyhow::anyhow!("{}", e))
}
}
struct Repl {
app_name: String,
description: String,
prompt: String,
handler: ReplHandler,
editor: DefaultEditor,
}
impl Repl {
pub fn run(mut self) -> Result<()> {
self.bootstrap()?;
loop {
let readline = self.editor.readline(&self.prompt)?;
if readline.trim().is_empty() {
// Skip noop
continue;
}
let command = Command::parse(&readline)?;
self.handler.handle(&command)?;
self.editor.add_history_entry(readline)?;
}
}
fn bootstrap(&mut self) -> Result<()> {
println!("Welcome to {}!", self.app_name);
println!("{}", self.description);
println!();
Ok(())
}
}
struct ReplBuilder {
app_name: String,
description: String,
prompt: String,
}
impl ReplBuilder {
pub fn new() -> Self {
Self {
app_name: "mini-lsm-cli".to_string(),
description: "A CLI for mini-lsm".to_string(),
prompt: "mini-lsm-cli> ".to_string(),
}
}
pub fn app_name(mut self, app_name: &str) -> Self {
self.app_name = app_name.to_string();
self
}
pub fn description(mut self, description: &str) -> Self {
self.description = description.to_string();
self
}
pub fn prompt(mut self, prompt: &str) -> Self {
self.prompt = prompt.to_string();
self
}
pub fn build(self, handler: ReplHandler) -> Result<Repl> {
Ok(Repl {
app_name: self.app_name,
description: self.description,
prompt: self.prompt,
editor: DefaultEditor::new()?,
handler,
})
}
}
fn main() -> Result<()> {
let args = Args::parse();
let lsm = MiniLsm::open(
@@ -69,97 +345,13 @@ fn main() -> Result<()> {
serializable: args.serializable,
},
)?;
let mut epoch = 0;
loop {
let mut line = String::new();
std::io::stdin().read_line(&mut line)?;
let line = line.trim().to_string();
if line.starts_with("fill ") {
let Some((_, options)) = line.split_once(' ') else {
println!("invalid command");
continue;
};
let Some((begin, end)) = options.split_once(' ') else {
println!("invalid command");
continue;
};
let begin = begin.parse::<u64>()?;
let end = end.parse::<u64>()?;
for i in begin..=end {
lsm.put(
format!("{}", i).as_bytes(),
format!("value{}@{}", i, epoch).as_bytes(),
)?;
}
let repl = ReplBuilder::new()
.app_name("mini-lsm-cli")
.description("A CLI for mini-lsm")
.prompt("mini-lsm-cli> ")
.build(ReplHandler { epoch: 0, lsm })?;
println!("{} values filled with epoch {}", end - begin + 1, epoch);
} else if line.starts_with("del ") {
let Some((_, key)) = line.split_once(' ') else {
println!("invalid command");
continue;
};
lsm.delete(key.as_bytes())?;
} else if line.starts_with("get ") {
let Some((_, key)) = line.split_once(' ') else {
println!("invalid command");
continue;
};
if let Some(value) = lsm.get(key.as_bytes())? {
println!("{}={:?}", key, value);
} else {
println!("{} not exist", key);
}
} else if line == "scan" {
let mut iter = lsm.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded)?;
let mut cnt = 0;
while iter.is_valid() {
println!(
"{:?}={:?}",
Bytes::copy_from_slice(iter.key()),
Bytes::copy_from_slice(iter.value()),
);
iter.next()?;
cnt += 1;
}
println!("{} keys scanned", cnt);
} else if line.starts_with("scan ") {
let Some((_, rest)) = line.split_once(' ') else {
println!("invalid command");
continue;
};
let Some((begin_key, end_key)) = rest.split_once(' ') else {
println!("invalid command");
continue;
};
let mut iter = lsm.scan(
std::ops::Bound::Included(begin_key.as_bytes()),
std::ops::Bound::Included(end_key.as_bytes()),
)?;
let mut cnt = 0;
while iter.is_valid() {
println!(
"{:?}={:?}",
Bytes::copy_from_slice(iter.key()),
Bytes::copy_from_slice(iter.value()),
);
iter.next()?;
cnt += 1;
}
println!("{} keys scanned", cnt);
} else if line == "dump" {
lsm.dump_structure();
} else if line == "flush" {
lsm.force_flush()?;
} else if line == "full_compaction" {
lsm.force_full_compaction()?;
} else if line == "quit" || line == "close" {
lsm.close()?;
break;
} else {
println!("invalid command: {}", line);
}
epoch += 1;
}
repl.run()?;
Ok(())
}