diff --git a/README.md b/README.md index fb7b4b4..9bf38c3 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we | 1.4 | Merge Iterators | ✅ | ✅ | ✅ | | 1.5 | Storage Engine - Read Path | ✅ | ✅ | ✅ | | 1.6 | Storage Engine - Write Path | ✅ | ✅ | ✅ | -| 2.1 | Compaction Framework | ✅ | 🚧 | 🚧 | +| 2.1 | Simple Compaction Strategy | ✅ | 🚧 | 🚧 | | 2.2 | Compaction Strategy - Tiered | ✅ | | | | 2.3 | Compaction Strategy - Leveled | 🚧 | | | | 2.4 | Manifest | | | | diff --git a/mini-lsm/src/bin/compaction_simulator.rs b/mini-lsm/src/bin/compaction_simulator.rs index 9079e28..34c8a42 100644 --- a/mini-lsm/src/bin/compaction_simulator.rs +++ b/mini-lsm/src/bin/compaction_simulator.rs @@ -2,13 +2,28 @@ use std::collections::HashMap; use std::sync::Arc; use clap::Parser; -use mini_lsm::compact::{TieredCompactionController, TieredCompactionOptions}; +use mini_lsm::compact::{ + SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController, + TieredCompactionOptions, +}; use mini_lsm::lsm_storage::LsmStorageInner; use mini_lsm::mem_table::MemTable; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] enum Args { + Simple { + #[clap(long)] + dump_real_id: bool, + #[clap(long, default_value = "2")] + level0_file_num_compaction_trigger: usize, + #[clap(long, default_value = "3")] + max_levels: usize, + #[clap(long, default_value = "200")] + size_ratio_percent: usize, + #[clap(long, default_value = "50")] + iterations: usize, + }, Tiered { #[clap(long)] dump_real_id: bool, @@ -82,9 +97,13 @@ impl MockStorage { } } - pub fn dump_original_id(&self) { - if !self.snapshot.l0_sstables.is_empty() { - println!("L0: {:?}", self.snapshot.l0_sstables); + pub fn dump_original_id(&self, always_show_l0: bool) { + if !self.snapshot.l0_sstables.is_empty() || always_show_l0 { + println!( + "L0 ({}): {:?}", + self.snapshot.l0_sstables.len(), + self.snapshot.l0_sstables, + ); } for (level, files) in &self.snapshot.levels { println!( @@ -95,16 +114,16 @@ impl MockStorage { } } - pub fn dump_real_id(&self) { - if !self.snapshot.l0_sstables.is_empty() { - println!("L0: {:?}", self.snapshot.l0_sstables); + pub fn dump_real_id(&self, always_show_l0: bool) { + if !self.snapshot.l0_sstables.is_empty() || always_show_l0 { + println!( + "L0 ({}): {:?}", + self.snapshot.l0_sstables.len(), + self.snapshot.l0_sstables, + ); } for (level, files) in &self.snapshot.levels { - println!( - "L{level} ({}): {:?}", - files.len(), - files.iter().map(|x| self.file_list[x]).collect::>() - ); + println!("L{level} ({}): {:?}", files.len(), files); } } } @@ -112,6 +131,102 @@ impl MockStorage { fn main() { let args = Args::parse(); match args { + Args::Simple { + dump_real_id, + size_ratio_percent, + iterations, + level0_file_num_compaction_trigger, + max_levels, + } => { + let mut controller = + SimpleLeveledCompactionController::new(SimpleLeveledCompactionOptions { + size_ratio_percent, + level0_file_num_compaction_trigger, + max_levels, + }); + let mut storage = MockStorage::new(); + for i in 0..max_levels { + storage.snapshot.levels.push((i + 1, Vec::new())); + } + let mut max_space = 0; + for i in 0..iterations { + println!("=== Iteration {i} ==="); + storage.flush_sst_to_l0(); + println!("--- After Flush ---"); + if dump_real_id { + storage.dump_real_id(true); + } else { + storage.dump_original_id(true); + } + let mut num_compactions = 0; + while let Some(task) = controller.generate_compaction_task(&storage.snapshot) { + num_compactions += 1; + println!("--- Compaction Task ---"); + let mut sst_ids = Vec::new(); + for file in task + .upper_level_sst_ids + .iter() + .chain(task.lower_level_sst_ids.iter()) + { + let new_sst_id = storage.generate_sst_id(); + sst_ids.push(new_sst_id); + storage.file_list.insert(new_sst_id, *file); + storage.total_writes += 1; + } + print!( + "Upper L{} {:?} ", + task.upper_level.unwrap_or_default(), + task.upper_level_sst_ids + ); + print!( + "Lower L{} {:?} ", + task.lower_level, task.lower_level_sst_ids + ); + println!("-> {:?}", sst_ids); + max_space = max_space.max(storage.file_list.len()); + let (snapshot, del) = + controller.apply_compaction_result(&storage.snapshot, &task, &sst_ids); + storage.snapshot = snapshot; + storage.remove(&del); + println!("--- After Compaction ---"); + if dump_real_id { + storage.dump_real_id(true); + } else { + storage.dump_original_id(true); + } + } + if num_compactions == 0 { + println!("no compaction triggered"); + } else { + println!("{num_compactions} compaction triggered in this iteration"); + } + max_space = max_space.max(storage.file_list.len()); + println!("--- Statistics ---"); + println!( + "Write Amplification: {}/{}={:.3}x", + storage.total_writes, + storage.total_flushes, + storage.total_writes as f64 / storage.total_flushes as f64 + ); + println!( + "Space Amplification: {}/{}={:.3}x", + max_space, + storage.total_flushes, + max_space as f64 / storage.total_flushes as f64 + ); + println!( + "Read Amplification: {}x", + storage.snapshot.l0_sstables.len() + + storage + .snapshot + .levels + .iter() + .filter(|(_, f)| !f.is_empty()) + .count() + ); + println!(); + } + } Args::Tiered { dump_real_id, level0_file_num_compaction_trigger, @@ -133,9 +248,9 @@ fn main() { storage.flush_sst_to_new_tier(); println!("--- After Flush ---"); if dump_real_id { - storage.dump_real_id(); + storage.dump_real_id(false); } else { - storage.dump_original_id(); + storage.dump_original_id(false); } let task = controller.generate_compaction_task(&storage.snapshot); println!("--- Compaction Task ---"); @@ -158,9 +273,9 @@ fn main() { storage.remove(&del); println!("--- After Compaction ---"); if dump_real_id { - storage.dump_real_id(); + storage.dump_real_id(false); } else { - storage.dump_original_id(); + storage.dump_original_id(false); } } else { println!("no compaction triggered"); diff --git a/mini-lsm/src/compact.rs b/mini-lsm/src/compact.rs index f198b75..e32723f 100644 --- a/mini-lsm/src/compact.rs +++ b/mini-lsm/src/compact.rs @@ -1,10 +1,14 @@ mod leveled; +mod simple_leveled; mod tiered; use std::sync::Arc; use anyhow::Result; pub use leveled::{LeveledCompactionController, LeveledCompactionTask}; +pub use simple_leveled::{ + SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask, +}; pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask}; use crate::iterators::merge_iterator::MergeIterator; diff --git a/mini-lsm/src/compact/leveled.rs b/mini-lsm/src/compact/leveled.rs index 539657e..8fff439 100644 --- a/mini-lsm/src/compact/leveled.rs +++ b/mini-lsm/src/compact/leveled.rs @@ -1,7 +1,8 @@ use crate::lsm_storage::LsmStorageInner; pub struct LeveledCompactionTask { - upper_level: usize, + // if upper_level is `None`, then it is L0 compaction + upper_level: Option, upper_level_sst_ids: Vec, lower_level: usize, lower_level_sst_ids: Vec, diff --git a/mini-lsm/src/compact/simple_leveled.rs b/mini-lsm/src/compact/simple_leveled.rs new file mode 100644 index 0000000..e04d68f --- /dev/null +++ b/mini-lsm/src/compact/simple_leveled.rs @@ -0,0 +1,100 @@ +use std::collections::HashMap; + +use crate::lsm_storage::LsmStorageInner; + +pub struct SimpleLeveledCompactionOptions { + pub size_ratio_percent: usize, + pub level0_file_num_compaction_trigger: usize, + pub max_levels: usize, +} + +pub struct SimpleLeveledCompactionTask { + // if upper_level is `None`, then it is L0 compaction + pub upper_level: Option, + pub upper_level_sst_ids: Vec, + pub lower_level: usize, + pub lower_level_sst_ids: Vec, +} + +pub struct SimpleLeveledCompactionController { + options: SimpleLeveledCompactionOptions, +} + +impl SimpleLeveledCompactionController { + pub fn new(options: SimpleLeveledCompactionOptions) -> Self { + Self { options } + } + + pub fn generate_compaction_task( + &mut self, + snapshot: &LsmStorageInner, + ) -> Option { + let mut level_sizes = Vec::new(); + level_sizes.push(snapshot.l0_sstables.len()); + for (_, files) in &snapshot.levels { + level_sizes.push(files.len()); + } + + for i in 0..self.options.max_levels { + if i == 0 + && snapshot.l0_sstables.len() < self.options.level0_file_num_compaction_trigger + { + continue; + } + + let lower_level = i + 1; + let size_ratio = level_sizes[lower_level] as f64 / level_sizes[i] as f64; + if size_ratio < self.options.size_ratio_percent as f64 / 100.0 { + println!( + "compaction triggered at level {} and {} with size ratio {}", + i, lower_level, size_ratio + ); + return Some(SimpleLeveledCompactionTask { + upper_level: if i == 0 { None } else { Some(i) }, + upper_level_sst_ids: if i == 0 { + snapshot.l0_sstables.clone() + } else { + snapshot.levels[i - 1].1.clone() + }, + lower_level, + lower_level_sst_ids: snapshot.levels[lower_level - 1].1.clone(), + }); + } + } + None + } + + pub fn apply_compaction_result( + &self, + snapshot: &LsmStorageInner, + task: &SimpleLeveledCompactionTask, + output: &[usize], + ) -> (LsmStorageInner, Vec) { + let mut snapshot = snapshot.clone(); + let mut files_to_remove = Vec::new(); + if let Some(upper_level) = task.upper_level { + assert_eq!( + task.upper_level_sst_ids, + snapshot.levels[upper_level - 1].1, + "sst mismatched" + ); + files_to_remove.extend(&snapshot.levels[upper_level - 1].1); + snapshot.levels[upper_level - 1].1.clear(); + } else { + assert_eq!( + task.upper_level_sst_ids, snapshot.l0_sstables, + "sst mismatched" + ); + files_to_remove.extend(&snapshot.l0_sstables); + snapshot.l0_sstables.clear(); + } + assert_eq!( + task.lower_level_sst_ids, + snapshot.levels[task.lower_level - 1].1, + "sst mismatched" + ); + files_to_remove.extend(&snapshot.levels[task.lower_level - 1].1); + snapshot.levels[task.lower_level - 1].1 = output.to_vec(); + (snapshot, files_to_remove) + } +}