From f93a8a1bd8e493f006e68ee1cc5ec68e3f3bb4f9 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 17 Jan 2024 14:51:15 +0800 Subject: [PATCH] add tiered compaction + compaction simulator Signed-off-by: Alex Chi Z --- README.md | 2 +- mini-lsm/src/bin/compaction_simulator.rs | 145 +++++++++++++++++++---- mini-lsm/src/compact.rs | 2 +- mini-lsm/src/compact/tiered.rs | 114 +++++++++++++++++- 4 files changed, 234 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index fb9e31d..fb7b4b4 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we | 1.5 | Storage Engine - Read Path | ✅ | ✅ | ✅ | | 1.6 | Storage Engine - Write Path | ✅ | ✅ | ✅ | | 2.1 | Compaction Framework | ✅ | 🚧 | 🚧 | -| 2.2 | Compaction Strategy - Tiered | 🚧 | | | +| 2.2 | Compaction Strategy - Tiered | ✅ | | | | 2.3 | Compaction Strategy - Leveled | 🚧 | | | | 2.4 | Manifest | | | | | 2.5 | Write-Ahead Log | | | | diff --git a/mini-lsm/src/bin/compaction_simulator.rs b/mini-lsm/src/bin/compaction_simulator.rs index 54a3e1c..9079e28 100644 --- a/mini-lsm/src/bin/compaction_simulator.rs +++ b/mini-lsm/src/bin/compaction_simulator.rs @@ -1,22 +1,38 @@ -use std::collections::HashSet; +use std::collections::HashMap; use std::sync::Arc; use clap::Parser; -use mini_lsm::compact::TieredCompactionController; +use mini_lsm::compact::{TieredCompactionController, TieredCompactionOptions}; use mini_lsm::lsm_storage::LsmStorageInner; use mini_lsm::mem_table::MemTable; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] enum Args { - Tiered {}, + Tiered { + #[clap(long)] + dump_real_id: bool, + #[clap(long, default_value = "3")] + level0_file_num_compaction_trigger: usize, + #[clap(long, default_value = "200")] + max_size_amplification_percent: usize, + #[clap(long, default_value = "1")] + size_ratio: usize, + #[clap(long, default_value = "2")] + min_merge_width: usize, + #[clap(long, default_value = "50")] + iterations: usize, + }, Leveled {}, } pub struct MockStorage { snapshot: LsmStorageInner, next_sst_id: usize, - file_list: HashSet, + /// Maps SST ID to the original flushed SST ID + file_list: HashMap, + total_flushes: usize, + total_writes: usize, } impl MockStorage { @@ -32,6 +48,8 @@ impl MockStorage { snapshot, next_sst_id: 0, file_list: Default::default(), + total_flushes: 0, + total_writes: 0, } } @@ -41,22 +59,52 @@ impl MockStorage { id } - pub fn flush_sst(&mut self) { + pub fn flush_sst_to_l0(&mut self) { let id = self.generate_sst_id(); self.snapshot.l0_sstables.push(id); - self.file_list.insert(id); + self.file_list.insert(id, id); + self.total_flushes += 1; + self.total_writes += 1; + } + + pub fn flush_sst_to_new_tier(&mut self) { + let id = self.generate_sst_id(); + self.snapshot.levels.insert(0, (id, vec![id])); + self.file_list.insert(id, id); + self.total_flushes += 1; + self.total_writes += 1; } pub fn remove(&mut self, files_to_remove: &[usize]) { for file_id in files_to_remove { - self.file_list.remove(file_id); + let ret = self.file_list.remove(file_id); + assert!(ret.is_some(), "failed to remove file {}", file_id); } } - pub fn dump(&self) { - print!("L0: {:?}", self.snapshot.l0_sstables); + pub fn dump_original_id(&self) { + if !self.snapshot.l0_sstables.is_empty() { + println!("L0: {:?}", self.snapshot.l0_sstables); + } for (level, files) in &self.snapshot.levels { - print!("L{level}: {:?}", files); + println!( + "L{level} ({}): {:?}", + files.len(), + files.iter().map(|x| self.file_list[x]).collect::>() + ); + } + } + + pub fn dump_real_id(&self) { + if !self.snapshot.l0_sstables.is_empty() { + println!("L0: {:?}", self.snapshot.l0_sstables); + } + for (level, files) in &self.snapshot.levels { + println!( + "L{level} ({}): {:?}", + files.len(), + files.iter().map(|x| self.file_list[x]).collect::>() + ); } } } @@ -64,19 +112,74 @@ impl MockStorage { fn main() { let args = Args::parse(); match args { - Args::Tiered {} => { - let controller = TieredCompactionController {}; + Args::Tiered { + dump_real_id, + level0_file_num_compaction_trigger, + max_size_amplification_percent, + size_ratio, + min_merge_width, + iterations, + } => { + let controller = TieredCompactionController::new(TieredCompactionOptions { + level0_file_num_compaction_trigger, + max_size_amplification_percent, + size_ratio, + min_merge_width, + }); let mut storage = MockStorage::new(); - for i in 0..500 { - println!("Iteration {i}"); - storage.flush_sst(); + let mut max_space = 0; + for i in 0..iterations { + println!("=== Iteration {i} ==="); + storage.flush_sst_to_new_tier(); + println!("--- After Flush ---"); + if dump_real_id { + storage.dump_real_id(); + } else { + storage.dump_original_id(); + } let task = controller.generate_compaction_task(&storage.snapshot); - let sst_id = storage.generate_sst_id(); - let (snapshot, del) = - controller.apply_compaction_result(&storage.snapshot, &task, &[sst_id]); - storage.snapshot = snapshot; - storage.remove(&del); - storage.dump(); + println!("--- Compaction Task ---"); + if let Some(task) = task { + let mut sst_ids = Vec::new(); + for (tier_id, files) in &task.tiers { + for file in files { + let new_sst_id = storage.generate_sst_id(); + sst_ids.push(new_sst_id); + storage.file_list.insert(new_sst_id, *file); + storage.total_writes += 1; + } + print!("L{} {:?} ", tier_id, files); + } + println!("-> {:?}", sst_ids); + max_space = max_space.max(storage.file_list.len()); + let (snapshot, del) = + controller.apply_compaction_result(&storage.snapshot, &task, &sst_ids); + storage.snapshot = snapshot; + storage.remove(&del); + println!("--- After Compaction ---"); + if dump_real_id { + storage.dump_real_id(); + } else { + storage.dump_original_id(); + } + } else { + println!("no compaction triggered"); + } + max_space = max_space.max(storage.file_list.len()); + println!("--- Statistics ---"); + println!( + "Write Amplification: {}/{}={:.3}x", + storage.total_writes, + storage.total_flushes, + storage.total_writes as f64 / storage.total_flushes as f64 + ); + println!( + "Space Amplification: {}/{}={:.3}x", + max_space, + storage.total_flushes, + max_space as f64 / storage.total_flushes as f64 + ); + println!(); } } Args::Leveled {} => {} diff --git a/mini-lsm/src/compact.rs b/mini-lsm/src/compact.rs index dc560cc..f198b75 100644 --- a/mini-lsm/src/compact.rs +++ b/mini-lsm/src/compact.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use anyhow::Result; pub use leveled::{LeveledCompactionController, LeveledCompactionTask}; -pub use tiered::{TieredCompactionController, TieredCompactionTask}; +pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask}; use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::StorageIterator; diff --git a/mini-lsm/src/compact/tiered.rs b/mini-lsm/src/compact/tiered.rs index 4b83aa3..dfde993 100644 --- a/mini-lsm/src/compact/tiered.rs +++ b/mini-lsm/src/compact/tiered.rs @@ -1,15 +1,88 @@ +use std::collections::HashMap; + use crate::lsm_storage::LsmStorageInner; -use crate::table::SsTable; pub struct TieredCompactionTask { - tiers: Vec, + pub tiers: Vec<(usize, Vec)>, } -pub struct TieredCompactionController {} +pub struct TieredCompactionOptions { + pub level0_file_num_compaction_trigger: usize, + pub max_size_amplification_percent: usize, + pub size_ratio: usize, + pub min_merge_width: usize, +} + +pub struct TieredCompactionController { + options: TieredCompactionOptions, +} impl TieredCompactionController { - pub fn generate_compaction_task(&self, snapshot: &LsmStorageInner) -> TieredCompactionTask { - return TieredCompactionTask { tiers: Vec::new() }; + pub fn new(options: TieredCompactionOptions) -> Self { + Self { options } + } + + pub fn generate_compaction_task( + &self, + snapshot: &LsmStorageInner, + ) -> Option { + assert!( + snapshot.l0_sstables.is_empty(), + "should not add l0 ssts in tiered compaction" + ); + if snapshot.levels.len() < self.options.level0_file_num_compaction_trigger { + return None; + } + // compaction triggered by space amplification ratio + let mut size = 0; + for id in 0..(snapshot.levels.len() - 1) { + size += snapshot.levels[id].1.len(); + } + let space_amp_ratio = + (size as f64) / (snapshot.levels.last().unwrap().1.len() as f64) * 100.0; + if space_amp_ratio >= self.options.max_size_amplification_percent as f64 { + println!( + "compaction triggered by space amplification ratio: {}", + space_amp_ratio + ); + return Some(TieredCompactionTask { + tiers: snapshot.levels.clone(), + }); + } + let size_ratio_trigger = (100.0 + self.options.size_ratio as f64) / 100.0; + // compaction triggered by size ratio + let mut size = 0; + for id in 0..(snapshot.levels.len() - 1) { + size += snapshot.levels[id].1.len(); + let next_level_size = snapshot.levels[id + 1].1.len(); + let current_size_ratio = size as f64 / next_level_size as f64; + if current_size_ratio >= size_ratio_trigger && id + 2 >= self.options.min_merge_width { + println!( + "compaction triggered by size ratio: {}", + current_size_ratio * 100.0 + ); + return Some(TieredCompactionTask { + tiers: snapshot + .levels + .iter() + .take(id + 2) + .cloned() + .collect::>(), + }); + } + } + // trying to reduce sorted runs without respecting size ratio + let num_tiers_to_take = + snapshot.levels.len() - self.options.level0_file_num_compaction_trigger + 1; + println!("compaction triggered by reducing sorted runs"); + return Some(TieredCompactionTask { + tiers: snapshot + .levels + .iter() + .take(num_tiers_to_take) + .cloned() + .collect::>(), + }); } pub fn apply_compaction_result( @@ -18,6 +91,35 @@ impl TieredCompactionController { task: &TieredCompactionTask, output: &[usize], ) -> (LsmStorageInner, Vec) { - (snapshot.clone(), Vec::new()) + assert!( + snapshot.l0_sstables.is_empty(), + "should not add l0 ssts in tiered compaction" + ); + let mut snapshot = snapshot.clone(); + let mut tier_to_remove = task + .tiers + .iter() + .map(|(x, y)| (*x, y)) + .collect::>(); + let mut levels = Vec::new(); + let mut new_tier_added = false; + let mut files_to_remove = Vec::new(); + for (tier_id, files) in &snapshot.levels { + if let Some(ffiles) = tier_to_remove.remove(tier_id) { + // the tier should be removed + assert_eq!(ffiles, files, "file changed after issuing compaction task"); + files_to_remove.extend(ffiles.iter().copied()); + } else { + // retain the tier + levels.push((*tier_id, files.clone())); + } + if tier_to_remove.is_empty() && !new_tier_added { + // add the compacted tier to the LSM tree + new_tier_added = true; + levels.push((output[0], output.to_vec())); + } + } + snapshot.levels = levels; + (snapshot, files_to_remove) } }