diff --git a/.gitignore b/.gitignore index 87bc59f..cc2f724 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target .vscode/ sync-tmp/ +mini-lsm.db/ diff --git a/Cargo.lock b/Cargo.lock index 564f69a..7d3645d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,11 +203,10 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.6" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +checksum = "176dc175b78f56c0f321911d9c8eb2b77a78a4860b9c19db83835fea1a46649b" dependencies = [ - "cfg-if", "crossbeam-utils", ] @@ -238,12 +237,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.14" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" -dependencies = [ - "cfg-if", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "duct" @@ -400,6 +396,7 @@ dependencies = [ "arc-swap", "bytes", "clap", + "crossbeam-channel", "crossbeam-epoch", "crossbeam-skiplist", "moka", diff --git a/README.md b/README.md index 9d9b998..675b202 100644 --- a/README.md +++ b/README.md @@ -43,12 +43,13 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we | 1.4 | Merge Iterators | ✅ | ✅ | ✅ | | 1.5 | Storage Engine - Read Path | ✅ | ✅ | ✅ | | 1.6 | Storage Engine - Write Path | ✅ | ✅ | ✅ | -| 2.1 | Compaction - Get Started | ✅ | 🚧 | 🚧 | -| 2.2 | Compaction Strategy - Tiered | ✅ | | | -| 2.3 | Compaction Strategy - Leveled | ✅ | | | -| 2.4 | Manifest | | | | -| 2.5 | Write-Ahead Log | | | | -| 2.6 | Bloom Filter and Key Compression | | | | +| 1.7 | Bloom Filter and Key Compression | | | | +| 2.1 | Compaction Introduction | ✅ | 🚧 | 🚧 | +| 2.2 | Compaction Strategy - Simple | ✅ | 🚧 | 🚧 | +| 2.3 | Compaction Strategy - Tiered | ✅ | | | +| 2.4 | Compaction Strategy - Leveled | ✅ | | | +| 2.5 | Manifest | | | | +| 2.6 | Write-Ahead Log | | | | | 3.1 | Timestamp Encoding + Prefix Bloom Filter | | | | | 3.2 | Snapshot Read | | | | | 3.3 | Watermark and Garbage Collection | | | | diff --git a/mini-lsm/Cargo.toml b/mini-lsm/Cargo.toml index 7c5f3e9..2194ab7 100644 --- a/mini-lsm/Cargo.toml +++ b/mini-lsm/Cargo.toml @@ -19,6 +19,7 @@ ouroboros = "0.15" moka = "0.9" clap = { version = "4.4.17", features = ["derive"] } rand = "0.8.5" +crossbeam-channel = "0.5.11" [dev-dependencies] tempfile = "3" diff --git a/mini-lsm/src/bin/compaction_simulator.rs b/mini-lsm/src/bin/compaction_simulator.rs index 2e43851..f9e36f5 100644 --- a/mini-lsm/src/bin/compaction_simulator.rs +++ b/mini-lsm/src/bin/compaction_simulator.rs @@ -323,7 +323,7 @@ fn main() { iterations, } => { let controller = TieredCompactionController::new(TieredCompactionOptions { - level0_file_num_compaction_trigger, + num_tiers: level0_file_num_compaction_trigger, max_size_amplification_percent, size_ratio, min_merge_width, diff --git a/mini-lsm/src/bin/minilsm_cli.rs b/mini-lsm/src/bin/minilsm_cli.rs new file mode 100644 index 0000000..0258828 --- /dev/null +++ b/mini-lsm/src/bin/minilsm_cli.rs @@ -0,0 +1,62 @@ +use std::time::Duration; + +use anyhow::Result; +use mini_lsm::compact::{CompactionOptions, SimpleLeveledCompactionOptions}; +use mini_lsm::lsm_storage::{LsmStorageOptions, MiniLsm}; + +fn main() -> Result<()> { + let lsm = MiniLsm::open( + "mini-lsm.db", + LsmStorageOptions { + block_size: 4096, + target_sst_size: 2 << 20, + compaction_options: CompactionOptions::Simple(SimpleLeveledCompactionOptions { + size_ratio_percent: 200, + level0_file_num_compaction_trigger: 2, + max_levels: 4, + }), + }, + )?; + let mut epoch = 0; + loop { + let mut line = String::new(); + std::io::stdin().read_line(&mut line)?; + let line = line.trim().to_string(); + if line.starts_with("fill ") { + let Some((_, options)) = line.split_once(' ') else { + println!("invalid command"); + continue; + }; + let Some((begin, end)) = options.split_once(' ') else { + println!("invalid command"); + continue; + }; + let begin = begin.parse::()?; + let end = end.parse::()?; + + for i in begin..=end { + lsm.put( + format!("{}", i).as_bytes(), + format!("value{}@{}", i, epoch).as_bytes(), + )?; + } + + println!("{} values filled with epoch {}", end - begin + 1, epoch); + } else if line.starts_with("get ") { + let Some((_, key)) = line.split_once(' ') else { + println!("invalid command"); + continue; + }; + if let Some(value) = lsm.get(key.as_bytes())? { + println!("{}={:?}", key, value); + } else { + println!("{} not exist", key); + } + } else if line == "flush" { + lsm.force_flush_imm_memtables()?; + } else { + println!("invalid command: {}", line); + } + epoch += 1; + } +} diff --git a/mini-lsm/src/compact.rs b/mini-lsm/src/compact.rs index 6b31546..05b5e53 100644 --- a/mini-lsm/src/compact.rs +++ b/mini-lsm/src/compact.rs @@ -3,6 +3,7 @@ mod simple_leveled; mod tiered; use std::sync::Arc; +use std::time::Duration; use anyhow::Result; pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask}; @@ -13,18 +14,25 @@ pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredComp use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::StorageIterator; -use crate::lsm_storage::LsmStorageInner; +use crate::lsm_storage::{LsmStorageInner, LsmStorageState}; use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; pub(crate) enum CompactionTask { Leveled(LeveledCompactionTask), Tiered(TieredCompactionTask), Simple(SimpleLeveledCompactionTask), + ForceFullCompaction(Vec), } -struct CompactOptions { - block_size: usize, - target_sst_size: usize, +impl CompactionTask { + fn compact_to_bottom_level(&self) -> bool { + match self { + CompactionTask::ForceFullCompaction(_) => true, + CompactionTask::Leveled(task) => task.is_lower_level_bottom_level, + CompactionTask::Simple(task) => task.is_lower_level_bottom_level, + CompactionTask::Tiered(task) => task.bottom_tier_included, + } + } } pub(crate) enum CompactionController { @@ -34,6 +42,43 @@ pub(crate) enum CompactionController { NoCompaction, } +impl CompactionController { + fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option { + match self { + CompactionController::Leveled(ctrl) => ctrl + .generate_compaction_task(&snapshot) + .map(CompactionTask::Leveled), + CompactionController::Simple(ctrl) => ctrl + .generate_compaction_task(&snapshot) + .map(CompactionTask::Simple), + CompactionController::Tiered(ctrl) => ctrl + .generate_compaction_task(&snapshot) + .map(CompactionTask::Tiered), + CompactionController::NoCompaction => unreachable!(), + } + } + + fn apply_compaction_result( + &self, + snapshot: &LsmStorageState, + task: &CompactionTask, + output: &[usize], + ) -> (LsmStorageState, Vec) { + match (self, task) { + (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => { + ctrl.apply_compaction_result(&snapshot, task, output) + } + (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => { + ctrl.apply_compaction_result(&snapshot, task, output) + } + (CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => { + ctrl.apply_compaction_result(&snapshot, task, output) + } + _ => unreachable!(), + } + } +} + impl CompactionController { pub fn flush_to_l0(&self) -> bool { if let Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction = self { @@ -57,12 +102,37 @@ pub enum CompactionOptions { } impl LsmStorageInner { - #[allow(dead_code)] - fn compact( - &self, - tables: Vec>, - options: CompactOptions, - ) -> Result>> { + fn compact(&self, task: &CompactionTask) -> Result>> { + let table_ids = match task { + CompactionTask::Leveled(task) => task + .lower_level_sst_ids + .iter() + .copied() + .chain(task.upper_level_sst_ids.iter().copied()) + .collect::>(), + CompactionTask::Simple(task) => task + .lower_level_sst_ids + .iter() + .copied() + .chain(task.upper_level_sst_ids.iter().copied()) + .collect::>(), + CompactionTask::Tiered(task) => task + .tiers + .iter() + .map(|(_, files)| files) + .flatten() + .copied() + .collect::>(), + CompactionTask::ForceFullCompaction(l0_ssts) => l0_ssts.clone(), + }; + let tables: Vec> = { + let state = self.state.read(); + table_ids + .iter() + .map(|id| state.sstables.get(id).unwrap().clone()) + .collect::>() + }; + let mut iters = Vec::new(); iters.reserve(tables.len()); for table in tables.iter() { @@ -75,11 +145,11 @@ impl LsmStorageInner { let mut builder = None; let mut new_sst = vec![]; - let compact_to_bottom_level = false; + let compact_to_bottom_level = task.compact_to_bottom_level(); while iter.is_valid() { if builder.is_none() { - builder = Some(SsTableBuilder::new(options.block_size)); + builder = Some(SsTableBuilder::new(self.options.block_size)); } let builder_inner = builder.as_mut().unwrap(); if compact_to_bottom_level { @@ -91,7 +161,7 @@ impl LsmStorageInner { } iter.next()?; - if builder_inner.estimated_size() >= options.target_sst_size { + if builder_inner.estimated_size() >= self.options.target_sst_size { let sst_id = self.next_sst_id(); // lock dropped here let builder = builder.take().unwrap(); let sst = Arc::new(builder.build( @@ -114,10 +184,67 @@ impl LsmStorageInner { Ok(new_sst) } + fn trigger_compaction(&self) -> Result<()> { + let snapshot = { + let state = self.state.read(); + state.clone() + }; + let task = self + .compaction_controller + .generate_compaction_task(&snapshot); + let Some(task) = task else { + return Ok(()); + }; + println!("running compaction task"); + let sstables = self.compact(&task)?; + let output = sstables.iter().map(|x| x.sst_id()).collect::>(); + let ssts_to_remove = { + let _state_lock = self.state_lock.lock(); + let (mut snapshot, files_to_remove) = self + .compaction_controller + .apply_compaction_result(&self.state.read(), &task, &output); + let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len()); + for file_to_remove in &files_to_remove { + let result = snapshot.sstables.remove(file_to_remove); + assert!(result.is_some()); + ssts_to_remove.push(result.unwrap()); + } + for file_to_add in sstables { + let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add); + assert!(result.is_none()); + } + let mut state = self.state.write(); + *state = Arc::new(snapshot); + ssts_to_remove + }; + for sst in ssts_to_remove { + std::fs::remove_file(self.path_of_sst(sst.sst_id()))?; + } + Ok(()) + } + pub(crate) fn spawn_compaction_thread( self: &Arc, - rx: std::sync::mpsc::Receiver<()>, + rx: crossbeam_channel::Receiver<()>, ) -> Result>> { + if let CompactionOptions::Leveled(_) + | CompactionOptions::Simple(_) + | CompactionOptions::Tiered(_) = self.options.compaction_options + { + let this = self.clone(); + let handle = std::thread::spawn(move || { + let ticker = crossbeam_channel::tick(Duration::from_millis(50)); + loop { + crossbeam_channel::select! { + recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() { + eprintln!("compaction failed: {}", e); + }, + recv(rx) -> _ => return + } + } + }); + return Ok(Some(handle)); + } Ok(None) } } diff --git a/mini-lsm/src/compact/leveled.rs b/mini-lsm/src/compact/leveled.rs index a25b94a..df0f031 100644 --- a/mini-lsm/src/compact/leveled.rs +++ b/mini-lsm/src/compact/leveled.rs @@ -8,6 +8,7 @@ pub struct LeveledCompactionTask { pub upper_level_sst_ids: Vec, pub lower_level: usize, pub lower_level_sst_ids: Vec, + pub is_lower_level_bottom_level: bool, } #[derive(Debug, Clone)] @@ -115,6 +116,7 @@ impl LeveledCompactionController { &snapshot.l0_sstables, base_level, ), + is_lower_level_bottom_level: base_level == self.options.max_levels, }); } @@ -143,6 +145,7 @@ impl LeveledCompactionController { &[selected_sst], level + 1, ), + is_lower_level_bottom_level: level + 1 == self.options.max_levels, }); } None diff --git a/mini-lsm/src/compact/simple_leveled.rs b/mini-lsm/src/compact/simple_leveled.rs index 295a907..f414339 100644 --- a/mini-lsm/src/compact/simple_leveled.rs +++ b/mini-lsm/src/compact/simple_leveled.rs @@ -13,6 +13,7 @@ pub struct SimpleLeveledCompactionTask { pub upper_level_sst_ids: Vec, pub lower_level: usize, pub lower_level_sst_ids: Vec, + pub is_lower_level_bottom_level: bool, } pub struct SimpleLeveledCompactionController { @@ -57,6 +58,7 @@ impl SimpleLeveledCompactionController { }, lower_level, lower_level_sst_ids: snapshot.levels[lower_level - 1].1.clone(), + is_lower_level_bottom_level: lower_level == self.options.max_levels, }); } } diff --git a/mini-lsm/src/compact/tiered.rs b/mini-lsm/src/compact/tiered.rs index 492431e..d7ed52a 100644 --- a/mini-lsm/src/compact/tiered.rs +++ b/mini-lsm/src/compact/tiered.rs @@ -4,11 +4,12 @@ use crate::lsm_storage::LsmStorageState; pub struct TieredCompactionTask { pub tiers: Vec<(usize, Vec)>, + pub bottom_tier_included: bool, } #[derive(Debug, Clone)] pub struct TieredCompactionOptions { - pub level0_file_num_compaction_trigger: usize, + pub num_tiers: usize, pub max_size_amplification_percent: usize, pub size_ratio: usize, pub min_merge_width: usize, @@ -31,7 +32,7 @@ impl TieredCompactionController { snapshot.l0_sstables.is_empty(), "should not add l0 ssts in tiered compaction" ); - if snapshot.levels.len() < self.options.level0_file_num_compaction_trigger { + if snapshot.levels.len() < self.options.num_tiers { return None; } // compaction triggered by space amplification ratio @@ -48,6 +49,7 @@ impl TieredCompactionController { ); return Some(TieredCompactionTask { tiers: snapshot.levels.clone(), + bottom_tier_included: true, }); } let size_ratio_trigger = (100.0 + self.options.size_ratio as f64) / 100.0; @@ -69,12 +71,12 @@ impl TieredCompactionController { .take(id + 2) .cloned() .collect::>(), + bottom_tier_included: id + 2 >= snapshot.levels.len(), }); } } // trying to reduce sorted runs without respecting size ratio - let num_tiers_to_take = - snapshot.levels.len() - self.options.level0_file_num_compaction_trigger + 2; + let num_tiers_to_take = snapshot.levels.len() - self.options.num_tiers + 2; println!("compaction triggered by reducing sorted runs"); return Some(TieredCompactionTask { tiers: snapshot @@ -83,6 +85,7 @@ impl TieredCompactionController { .take(num_tiers_to_take) .cloned() .collect::>(), + bottom_tier_included: snapshot.levels.len() >= num_tiers_to_take, }); } diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index b1c9736..337a8af 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -64,9 +64,9 @@ impl LsmStorageState { } pub struct LsmStorageOptions { - block_size: usize, - target_sst_size: usize, - compaction_options: CompactionOptions, + pub block_size: usize, + pub target_sst_size: usize, + pub compaction_options: CompactionOptions, } impl LsmStorageOptions { @@ -82,17 +82,17 @@ impl LsmStorageOptions { /// The storage interface of the LSM tree. pub(crate) struct LsmStorageInner { pub(crate) state: Arc>>, - state_lock: Mutex<()>, + pub(crate) state_lock: Mutex<()>, path: PathBuf, pub(crate) block_cache: Arc, next_sst_id: AtomicUsize, - options: Arc, - compaction_controller: CompactionController, + pub(crate) options: Arc, + pub(crate) compaction_controller: CompactionController, } pub struct MiniLsm { inner: Arc, - compaction_notifier: std::sync::mpsc::Sender<()>, + compaction_notifier: crossbeam_channel::Sender<()>, compaction_thread: Mutex>>, } @@ -116,7 +116,7 @@ impl MiniLsm { pub fn open(path: impl AsRef, options: LsmStorageOptions) -> Result> { let inner = Arc::new(LsmStorageInner::open(path, options)?); - let (tx, rx) = std::sync::mpsc::channel(); + let (tx, rx) = crossbeam_channel::unbounded(); let compaction_thread = inner.spawn_compaction_thread(rx)?; Ok(Arc::new(Self { inner, @@ -144,6 +144,10 @@ impl MiniLsm { ) -> Result> { self.inner.scan(lower, upper) } + + pub fn force_flush_imm_memtables(&self) -> Result<()> { + self.inner.force_flush_imm_memtables() + } } impl LsmStorageInner { @@ -153,10 +157,14 @@ impl LsmStorageInner { } pub(crate) fn open(path: impl AsRef, options: LsmStorageOptions) -> Result { + let path = path.as_ref(); + if !path.exists() { + std::fs::create_dir_all(path)?; + } Ok(Self { state: Arc::new(RwLock::new(Arc::new(LsmStorageState::create(&options)))), state_lock: Mutex::new(()), - path: path.as_ref().to_path_buf(), + path: path.to_path_buf(), block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache, next_sst_id: AtomicUsize::new(1), compaction_controller: match &options.compaction_options { @@ -291,6 +299,7 @@ impl LsmStorageInner { // In tiered compaction, create a new tier snapshot.levels.insert(0, (sst_id, vec![sst_id])); } + println!("flushed {}.sst with size={}", sst_id, sst.table_size()); snapshot.sstables.insert(sst_id, sst); // Update the snapshot. *guard = Arc::new(snapshot); diff --git a/mini-lsm/src/table.rs b/mini-lsm/src/table.rs index 8a56bf9..eb8301b 100644 --- a/mini-lsm/src/table.rs +++ b/mini-lsm/src/table.rs @@ -37,7 +37,7 @@ impl BlockMeta { // The size of key length estimated_size += std::mem::size_of::(); // The size of actual key - estimated_size += meta.first_key.len(); + estimated_size += meta.last_key.len(); } // Reserve the space to improve performance, especially when the size of incoming data is // large