From 693e7f2e6abe74ed23d8c27106e3099adef2bd03 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Thu, 18 Jan 2024 14:50:12 +0800 Subject: [PATCH] finish leveled compaction Signed-off-by: Alex Chi --- Cargo.lock | 37 +++ README.md | 2 +- mini-lsm-starter/src/lsm_storage.rs | 3 - mini-lsm-week-1/src/lsm_storage.rs | 8 +- mini-lsm-week-1/src/table.rs | 21 -- mini-lsm/Cargo.toml | 1 + mini-lsm/src/bin/compaction_simulator.rs | 294 +++++++++++++++++++++-- mini-lsm/src/compact.rs | 6 +- mini-lsm/src/compact/leveled.rs | 213 +++++++++++++++- mini-lsm/src/compact/simple_leveled.rs | 2 - mini-lsm/src/compact/tiered.rs | 2 +- mini-lsm/src/lsm_storage.rs | 8 +- mini-lsm/src/table.rs | 50 ++-- 13 files changed, 543 insertions(+), 104 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 28cdde4..564f69a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -405,6 +405,7 @@ dependencies = [ "moka", "ouroboros", "parking_lot", + "rand", "tempfile", ] @@ -543,6 +544,12 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -612,6 +619,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "raw-cpuid" version = "10.6.0" diff --git a/README.md b/README.md index 8ccfb90..640c4c4 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we | 1.6 | Storage Engine - Write Path | ✅ | ✅ | ✅ | | 2.1 | Compaction - Get Started | ✅ | 🚧 | 🚧 | | 2.2 | Compaction Strategy - Tiered | ✅ | | | -| 2.3 | Compaction Strategy - Leveled | 🚧 | | | +| 2.3 | Compaction Strategy - Leveled | ✅ | | | | 2.4 | Manifest | | | | | 2.5 | Write-Ahead Log | | | | | 2.6 | Bloom Filter and Key Compression | | | | diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index 076b900..7a4de8d 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -73,9 +73,6 @@ impl LsmStorage { } /// Persist data to disk. - /// - /// In day 3: flush the current memtable to disk as L0 SST. - /// In day 6: call `fsync` on WAL. pub fn sync(&self) -> Result<()> { unimplemented!() } diff --git a/mini-lsm-week-1/src/lsm_storage.rs b/mini-lsm-week-1/src/lsm_storage.rs index 538234d..a55bba0 100644 --- a/mini-lsm-week-1/src/lsm_storage.rs +++ b/mini-lsm-week-1/src/lsm_storage.rs @@ -131,9 +131,6 @@ impl LsmStorage { } /// Persist data to disk. - /// - /// In day 3: flush the current memtable to disk as L0 SST. - /// In day 6: call `fsync` on WAL. pub fn sync(&self) -> Result<()> { let _flush_lock = self.flush_lock.lock(); @@ -221,9 +218,6 @@ impl LsmStorage { let iter = TwoMergeIterator::create(memtable_iter, table_iter)?; - Ok(FusedIterator::new(LsmIterator::new( - iter, - map_bound(upper), - )?)) + Ok(FusedIterator::new(LsmIterator::new(iter, map_bound(upper))?)) } } diff --git a/mini-lsm-week-1/src/table.rs b/mini-lsm-week-1/src/table.rs index 831c65b..d388540 100644 --- a/mini-lsm-week-1/src/table.rs +++ b/mini-lsm-week-1/src/table.rs @@ -61,27 +61,6 @@ impl BlockMeta { /// A file object. /// /// Before day 4, it should look like: -/// -/// ```ignore -/// pub struct FileObject(Bytes); -/// -/// impl FileObject { -/// pub fn read(&self, offset: u64, len: u64) -> Result> { -/// Ok(self.0[offset as usize..(offset + len) as usize].to_vec()) -/// } -/// pub fn size(&self) -> u64 { -/// self.0.len() as u64 -/// } -/// -/// pub fn create(_path: &Path, data: Vec) -> Result { -/// Ok(FileObject(data.into())) -/// } -/// -/// pub fn open(_path: &Path) -> Result { -/// unimplemented!() -/// } -/// } -/// ``` pub struct FileObject(File, u64); impl FileObject { diff --git a/mini-lsm/Cargo.toml b/mini-lsm/Cargo.toml index a008bde..7c5f3e9 100644 --- a/mini-lsm/Cargo.toml +++ b/mini-lsm/Cargo.toml @@ -18,6 +18,7 @@ parking_lot = "0.12" ouroboros = "0.15" moka = "0.9" clap = { version = "4.4.17", features = ["derive"] } +rand = "0.8.5" [dev-dependencies] tempfile = "3" diff --git a/mini-lsm/src/bin/compaction_simulator.rs b/mini-lsm/src/bin/compaction_simulator.rs index 0eda300..328bf1e 100644 --- a/mini-lsm/src/bin/compaction_simulator.rs +++ b/mini-lsm/src/bin/compaction_simulator.rs @@ -1,13 +1,15 @@ use std::collections::HashMap; use std::sync::Arc; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use clap::Parser; use mini_lsm::compact::{ - SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController, - TieredCompactionOptions, + LeveledCompactionController, LeveledCompactionOptions, SimpleLeveledCompactionController, + SimpleLeveledCompactionOptions, TieredCompactionController, TieredCompactionOptions, }; use mini_lsm::lsm_storage::LsmStorageInner; use mini_lsm::mem_table::MemTable; +use mini_lsm::table::SsTable; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -38,7 +40,22 @@ enum Args { #[clap(long, default_value = "50")] iterations: usize, }, - Leveled {}, + Leveled { + #[clap(long)] + dump_real_id: bool, + #[clap(long, default_value = "2")] + level0_file_num_compaction_trigger: usize, + #[clap(long, default_value = "2")] + level_size_multiplier: usize, + #[clap(long, default_value = "4")] + max_levels: usize, + #[clap(long, default_value = "128")] + base_level_size_mb: usize, + #[clap(long, default_value = "50")] + iterations: usize, + #[clap(long, default_value = "32")] + sst_size_mb: usize, + }, } pub struct MockStorage { @@ -74,12 +91,13 @@ impl MockStorage { id } - pub fn flush_sst_to_l0(&mut self) { + pub fn flush_sst_to_l0(&mut self) -> usize { let id = self.generate_sst_id(); self.snapshot.l0_sstables.push(id); self.file_list.insert(id, id); self.total_flushes += 1; self.total_writes += 1; + id } pub fn flush_sst_to_new_tier(&mut self) { @@ -97,7 +115,30 @@ impl MockStorage { } } - pub fn dump_original_id(&self, always_show_l0: bool) { + fn check_keys(&self) { + for (level, files) in &self.snapshot.levels { + if files.len() >= 2 { + for id in 0..(files.len() - 1) { + let this_file = self.snapshot.sstables[&files[id]].clone(); + let next_file = self.snapshot.sstables[&files[id + 1]].clone(); + if this_file.last_key() >= next_file.first_key() { + panic!( + "invalid file arrangement in L{}: id={}, range={:x}..={:x}; id={}, range={:x}..={:x}", + level, + this_file.sst_id(), + this_file.first_key().clone().get_u64(), + this_file.last_key().clone().get_u64(), + next_file.sst_id(), + next_file.first_key().clone().get_u64(), + next_file.last_key().clone().get_u64() + ); + } + } + } + } + } + + pub fn dump_original_id(&self, always_show_l0: bool, with_key: bool) { if !self.snapshot.l0_sstables.is_empty() || always_show_l0 { println!( "L0 ({}): {:?}", @@ -112,9 +153,12 @@ impl MockStorage { files.iter().map(|x| self.file_list[x]).collect::>() ); } + if with_key { + self.check_keys(); + } } - pub fn dump_real_id(&self, always_show_l0: bool) { + pub fn dump_real_id(&self, always_show_l0: bool, with_key: bool) { if !self.snapshot.l0_sstables.is_empty() || always_show_l0 { println!( "L0 ({}): {:?}", @@ -125,9 +169,47 @@ impl MockStorage { for (level, files) in &self.snapshot.levels { println!("L{level} ({}): {:?}", files.len(), files); } + if with_key { + self.check_keys(); + } } } +fn generate_random_key_range() -> (Bytes, Bytes) { + use rand::Rng; + let mut rng = rand::thread_rng(); + let begin: usize = rng.gen_range(0..(1 << 31)); + let end: usize = begin + rng.gen_range((1 << 10)..(1 << 31)); + let mut begin_bytes = BytesMut::new(); + let mut end_bytes = BytesMut::new(); + begin_bytes.put_u64(begin as u64); + end_bytes.put_u64(end as u64); + (begin_bytes.into(), end_bytes.into()) +} + +fn generate_random_split( + mut begin_bytes: Bytes, + mut end_bytes: Bytes, + split: usize, +) -> Vec<(Bytes, Bytes)> { + let begin = begin_bytes.get_u64(); + let end = end_bytes.get_u64(); + let len = end - begin + 1; + let mut result = Vec::new(); + let split = split as u64; + assert!(len >= split, "well, this is unfortunate... run again!"); + for i in 0..split { + let nb = begin + len * i / split; + let ne = begin + len * (i + 1) / split - 1; + let mut begin_bytes = BytesMut::new(); + let mut end_bytes = BytesMut::new(); + begin_bytes.put_u64(nb as u64); + end_bytes.put_u64(ne as u64); + result.push((begin_bytes.into(), end_bytes.into())); + } + result +} + fn main() { let args = Args::parse(); match args { @@ -154,9 +236,9 @@ fn main() { storage.flush_sst_to_l0(); println!("--- After Flush ---"); if dump_real_id { - storage.dump_real_id(true); + storage.dump_real_id(true, false); } else { - storage.dump_original_id(true); + storage.dump_original_id(true, false); } let mut num_compactions = 0; while let Some(task) = controller.generate_compaction_task(&storage.snapshot) { @@ -189,9 +271,9 @@ fn main() { storage.remove(&del); println!("--- After Compaction ---"); if dump_real_id { - storage.dump_real_id(true); + storage.dump_real_id(true, false); } else { - storage.dump_original_id(true); + storage.dump_original_id(true, false); } num_compactions += 1; if num_compactions >= max_levels * 2 { @@ -212,7 +294,7 @@ fn main() { storage.total_writes as f64 / storage.total_flushes as f64 ); println!( - "Space Amplification: {}/{}={:.3}x", + "Maximum Space Usage: {}/{}={:.3}x", max_space, storage.total_flushes, max_space as f64 / storage.total_flushes as f64 @@ -251,13 +333,13 @@ fn main() { storage.flush_sst_to_new_tier(); println!("--- After Flush ---"); if dump_real_id { - storage.dump_real_id(false); + storage.dump_real_id(false, false); } else { - storage.dump_original_id(false); + storage.dump_original_id(false, false); } - let task = controller.generate_compaction_task(&storage.snapshot); println!("--- Compaction Task ---"); - if let Some(task) = task { + let mut num_compactions = 0; + while let Some(task) = controller.generate_compaction_task(&storage.snapshot) { let mut sst_ids = Vec::new(); for (tier_id, files) in &task.tiers { for file in files { @@ -276,12 +358,19 @@ fn main() { storage.remove(&del); println!("--- After Compaction ---"); if dump_real_id { - storage.dump_real_id(false); + storage.dump_real_id(false, false); } else { - storage.dump_original_id(false); + storage.dump_original_id(false, false); } - } else { + num_compactions += 1; + if num_compactions >= level0_file_num_compaction_trigger * 3 { + panic!("compaction does not converge?"); + } + } + if num_compactions == 0 { println!("no compaction triggered"); + } else { + println!("{num_compactions} compaction triggered in this iteration"); } max_space = max_space.max(storage.file_list.len()); println!("--- Statistics ---"); @@ -292,7 +381,173 @@ fn main() { storage.total_writes as f64 / storage.total_flushes as f64 ); println!( - "Space Amplification: {}/{}={:.3}x", + "Maximum Space Usage: {}/{}={:.3}x", + max_space, + storage.total_flushes, + max_space as f64 / storage.total_flushes as f64 + ); + println!( + "Read Amplification: {}x", + storage.snapshot.l0_sstables.len() + + storage + .snapshot + .levels + .iter() + .filter(|(_, f)| !f.is_empty()) + .count() + ); + println!(); + } + } + Args::Leveled { + dump_real_id, + level0_file_num_compaction_trigger, + level_size_multiplier, + max_levels, + base_level_size_mb, + iterations, + sst_size_mb, + } => { + let controller = LeveledCompactionController::new(LeveledCompactionOptions { + level0_file_num_compaction_trigger, + level_size_multiplier, + max_levels, + base_level_size_mb, + }); + + let mut storage = MockStorage::new(); + for i in 0..max_levels { + storage.snapshot.levels.push((i + 1, Vec::new())); + } + let mut max_space = 0; + for i in 0..iterations { + println!("=== Iteration {i} ==="); + let id = storage.flush_sst_to_l0(); + let (first_key, last_key) = generate_random_key_range(); + storage.snapshot.sstables.insert( + id, + Arc::new(SsTable::create_meta_only( + id, + sst_size_mb as u64 * 1024 * 1024, + first_key, + last_key, + )), + ); + println!("--- After Flush ---"); + if dump_real_id { + storage.dump_real_id(false, true); + } else { + storage.dump_original_id(false, true); + } + let mut num_compactions = 0; + while let Some(task) = controller.generate_compaction_task(&storage.snapshot) { + let mut sst_ids = Vec::new(); + let split_num = task.upper_level_sst_ids.len() + task.lower_level_sst_ids.len(); + let mut first_keys = Vec::new(); + let mut last_keys = Vec::new(); + for file in task + .upper_level_sst_ids + .iter() + .chain(task.lower_level_sst_ids.iter()) + { + first_keys.push(storage.snapshot.sstables[file].first_key().clone()); + last_keys.push(storage.snapshot.sstables[file].last_key().clone()); + } + let begin = first_keys.into_iter().min().unwrap(); + let end = last_keys.into_iter().max().unwrap(); + let splits = generate_random_split(begin, end, split_num); + for (id, file) in task + .upper_level_sst_ids + .iter() + .chain(task.lower_level_sst_ids.iter()) + .enumerate() + { + let new_sst_id = storage.generate_sst_id(); + sst_ids.push(new_sst_id); + storage.file_list.insert(new_sst_id, *file); + storage.total_writes += 1; + storage.snapshot.sstables.insert( + new_sst_id, + Arc::new(SsTable::create_meta_only( + new_sst_id, + sst_size_mb as u64 * 1024 * 1024, + splits[id].0.clone(), + splits[id].1.clone(), + )), + ); + } + print!( + "Upper L{} [{}] ", + task.upper_level.unwrap_or_default(), + task.upper_level_sst_ids + .iter() + .map(|id| format!( + "{}.sst {:x}..={:x}", + id, + storage.snapshot.sstables[id].first_key().clone().get_u64(), + storage.snapshot.sstables[id].last_key().clone().get_u64() + )) + .collect::>() + .join(", ") + ); + print!( + "Lower L{} [{}] ", + task.lower_level, + task.lower_level_sst_ids + .iter() + .map(|id| format!( + "{}.sst {:x}..={:x}", + id, + storage.snapshot.sstables[id].first_key().clone().get_u64(), + storage.snapshot.sstables[id].last_key().clone().get_u64() + )) + .collect::>() + .join(", ") + ); + println!( + "-> [{}]", + sst_ids + .iter() + .map(|id| format!( + "{}.sst {:x}..={:x}", + id, + storage.snapshot.sstables[id].first_key().clone().get_u64(), + storage.snapshot.sstables[id].last_key().clone().get_u64() + )) + .collect::>() + .join(", ") + ); + max_space = max_space.max(storage.file_list.len()); + let (snapshot, del) = + controller.apply_compaction_result(&storage.snapshot, &task, &sst_ids); + storage.snapshot = snapshot; + storage.remove(&del); + println!("--- After Compaction ---"); + if dump_real_id { + storage.dump_real_id(true, true); + } else { + storage.dump_original_id(true, true); + } + num_compactions += 1; + if num_compactions >= storage.file_list.len() * max_levels { + panic!("compaction does not converge?"); + } + } + if num_compactions == 0 { + println!("no compaction triggered"); + } else { + println!("{num_compactions} compaction triggered in this iteration"); + } + max_space = max_space.max(storage.file_list.len()); + println!("--- Statistics ---"); + println!( + "Write Amplification: {}/{}={:.3}x", + storage.total_writes, + storage.total_flushes, + storage.total_writes as f64 / storage.total_flushes as f64 + ); + println!( + "Maximum Space Usage: {}/{}={:.3}x", max_space, storage.total_flushes, max_space as f64 / storage.total_flushes as f64 @@ -310,6 +565,5 @@ fn main() { println!(); } } - Args::Leveled {} => {} } } diff --git a/mini-lsm/src/compact.rs b/mini-lsm/src/compact.rs index e32723f..22e9573 100644 --- a/mini-lsm/src/compact.rs +++ b/mini-lsm/src/compact.rs @@ -5,7 +5,7 @@ mod tiered; use std::sync::Arc; use anyhow::Result; -pub use leveled::{LeveledCompactionController, LeveledCompactionTask}; +pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask}; pub use simple_leveled::{ SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask, }; @@ -37,9 +37,7 @@ impl LsmStorage { let mut iters = Vec::new(); iters.reserve(tables.len()); for table in tables.iter() { - iters.push(Box::new(SsTableIterator::create_and_seek_to_first( - table.clone(), - )?)); + iters.push(Box::new(SsTableIterator::create_and_seek_to_first(table.clone())?)); } let mut iter = MergeIterator::create(iters); diff --git a/mini-lsm/src/compact/leveled.rs b/mini-lsm/src/compact/leveled.rs index 8fff439..ab64678 100644 --- a/mini-lsm/src/compact/leveled.rs +++ b/mini-lsm/src/compact/leveled.rs @@ -1,18 +1,150 @@ +use std::collections::HashSet; + use crate::lsm_storage::LsmStorageInner; pub struct LeveledCompactionTask { // if upper_level is `None`, then it is L0 compaction - upper_level: Option, - upper_level_sst_ids: Vec, - lower_level: usize, - lower_level_sst_ids: Vec, + pub upper_level: Option, + pub upper_level_sst_ids: Vec, + pub lower_level: usize, + pub lower_level_sst_ids: Vec, } -pub struct LeveledCompactionController {} +pub struct LeveledCompactionOptions { + pub level_size_multiplier: usize, + pub level0_file_num_compaction_trigger: usize, + pub max_levels: usize, + pub base_level_size_mb: usize, +} + +pub struct LeveledCompactionController { + options: LeveledCompactionOptions, +} impl LeveledCompactionController { - pub fn generate_compaction_task(&self, snapshot: &LsmStorageInner) -> LeveledCompactionTask { - unimplemented!() + pub fn new(options: LeveledCompactionOptions) -> Self { + Self { options } + } + + fn find_overlapping_ssts( + &self, + snapshot: &LsmStorageInner, + sst_ids: &[usize], + in_level: usize, + ) -> Vec { + let begin_key = sst_ids + .iter() + .map(|id| snapshot.sstables[id].first_key()) + .min() + .cloned() + .unwrap(); + let end_key = sst_ids + .iter() + .map(|id| snapshot.sstables[id].last_key()) + .max() + .cloned() + .unwrap(); + let mut overlap_ssts = Vec::new(); + for sst_id in &snapshot.levels[in_level - 1].1 { + let sst = &snapshot.sstables[sst_id]; + let first_key = sst.first_key(); + let last_key = sst.last_key(); + if !(last_key < &begin_key || first_key > &end_key) { + overlap_ssts.push(*sst_id); + } + } + overlap_ssts + } + + pub fn generate_compaction_task( + &self, + snapshot: &LsmStorageInner, + ) -> Option { + // step 1: compute target level size + let mut target_level_size = (0..self.options.max_levels).map(|_| 0).collect::>(); // exclude level 0 + let mut real_level_size = Vec::with_capacity(self.options.max_levels); + let mut base_level = self.options.max_levels; + for i in 0..self.options.max_levels { + real_level_size.push( + snapshot.levels[i] + .1 + .iter() + .map(|x| snapshot.sstables.get(x).unwrap().table_size()) + .sum::() as usize, + ); + } + let base_level_size_bytes = self.options.base_level_size_mb as usize * 1024 * 1024; + + // select base level and compute target level size + target_level_size[self.options.max_levels - 1] = + real_level_size[self.options.max_levels - 1].max(base_level_size_bytes); + for i in (0..(self.options.max_levels - 1)).rev() { + let next_level_size = target_level_size[i + 1]; + let this_level_size = next_level_size / self.options.level_size_multiplier; + if next_level_size > base_level_size_bytes { + target_level_size[i] = this_level_size; + } + if target_level_size[i] > 0 { + base_level = i + 1; + } + } + + println!( + "target level sizes: {:?}, real level sizes: {:?}, base_level: {}", + target_level_size + .iter() + .map(|x| format!("{}MB", x / 1024 / 1024)) + .collect::>(), + real_level_size + .iter() + .map(|x| format!("{}MB", x / 1024 / 1024)) + .collect::>(), + base_level, + ); + + // Flush L0 SST is the top priority + if snapshot.l0_sstables.len() >= self.options.level0_file_num_compaction_trigger { + println!("flush L0 SST to base level {}", base_level); + return Some(LeveledCompactionTask { + upper_level: None, + upper_level_sst_ids: snapshot.l0_sstables.clone(), + lower_level: base_level, + lower_level_sst_ids: self.find_overlapping_ssts( + snapshot, + &snapshot.l0_sstables, + base_level, + ), + }); + } + + let mut priorities = Vec::with_capacity(self.options.max_levels); + for level in 0..self.options.max_levels { + let prio = real_level_size[level] as f64 / target_level_size[level] as f64; + if prio > 1.0 { + priorities.push((prio, level + 1)); + } + } + priorities.sort_by(|a, b| a.partial_cmp(b).unwrap().reverse()); + let priority = priorities.first(); + if let Some((_, level)) = priority { + let level = *level; + let selected_sst = snapshot.levels[level - 1].1.iter().min().copied().unwrap(); // select the oldest sst to compact + println!( + "compaction triggered by priority: {level} out of {:?}, select {selected_sst} for compaction", + priorities + ); + return Some(LeveledCompactionTask { + upper_level: Some(level), + upper_level_sst_ids: vec![selected_sst], + lower_level: level + 1, + lower_level_sst_ids: self.find_overlapping_ssts( + snapshot, + &[selected_sst], + level + 1, + ), + }); + } + None } pub fn apply_compaction_result( @@ -21,6 +153,71 @@ impl LeveledCompactionController { task: &LeveledCompactionTask, output: &[usize], ) -> (LsmStorageInner, Vec) { - unimplemented!() + let mut snapshot = snapshot.clone(); + let mut files_to_remove = Vec::new(); + let mut upper_level_sst_ids_set = task + .upper_level_sst_ids + .iter() + .copied() + .collect::>(); + let mut lower_level_sst_ids_set = task + .lower_level_sst_ids + .iter() + .copied() + .collect::>(); + if let Some(upper_level) = task.upper_level { + let new_upper_level_ssts = + snapshot.levels[upper_level - 1] + .1 + .iter() + .filter_map(|x| { + if upper_level_sst_ids_set.remove(x) { + return None; + } + Some(*x) + }) + .collect::>(); + assert!(upper_level_sst_ids_set.is_empty()); + snapshot.levels[upper_level - 1].1 = new_upper_level_ssts; + } else { + let new_l0_ssts = snapshot + .l0_sstables + .iter() + .filter_map(|x| { + if upper_level_sst_ids_set.remove(x) { + return None; + } + Some(*x) + }) + .collect::>(); + assert!(upper_level_sst_ids_set.is_empty()); + snapshot.l0_sstables = new_l0_ssts; + } + + files_to_remove.extend(&task.upper_level_sst_ids); + files_to_remove.extend(&task.lower_level_sst_ids); + + let mut new_lower_level_ssts = snapshot.levels[task.lower_level - 1] + .1 + .iter() + .filter_map(|x| { + if lower_level_sst_ids_set.remove(x) { + return None; + } + Some(*x) + }) + .collect::>(); + assert!(lower_level_sst_ids_set.is_empty()); + new_lower_level_ssts.extend(output); + new_lower_level_ssts.sort_by(|x, y| { + snapshot + .sstables + .get(x) + .unwrap() + .first_key() + .cmp(snapshot.sstables.get(y).unwrap().first_key()) + }); + snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts; + (snapshot, files_to_remove) } } diff --git a/mini-lsm/src/compact/simple_leveled.rs b/mini-lsm/src/compact/simple_leveled.rs index a0caa31..66f16dd 100644 --- a/mini-lsm/src/compact/simple_leveled.rs +++ b/mini-lsm/src/compact/simple_leveled.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use crate::lsm_storage::LsmStorageInner; pub struct SimpleLeveledCompactionOptions { diff --git a/mini-lsm/src/compact/tiered.rs b/mini-lsm/src/compact/tiered.rs index 6f1b5b3..a6750c2 100644 --- a/mini-lsm/src/compact/tiered.rs +++ b/mini-lsm/src/compact/tiered.rs @@ -73,7 +73,7 @@ impl TieredCompactionController { } // trying to reduce sorted runs without respecting size ratio let num_tiers_to_take = - snapshot.levels.len() - self.options.level0_file_num_compaction_trigger + 1; + snapshot.levels.len() - self.options.level0_file_num_compaction_trigger + 2; println!("compaction triggered by reducing sorted runs"); return Some(TieredCompactionTask { tiers: snapshot diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index ebd2ca9..2bcf69c 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -136,9 +136,6 @@ impl LsmStorage { } /// Persist data to disk. - /// - /// In day 3: flush the current memtable to disk as L0 SST. - /// In day 6: call `fsync` on WAL. pub fn sync(&self) -> Result<()> { let _flush_lock = self.flush_lock.lock(); @@ -226,9 +223,6 @@ impl LsmStorage { let iter = TwoMergeIterator::create(memtable_iter, table_iter)?; - Ok(FusedIterator::new(LsmIterator::new( - iter, - map_bound(upper), - )?)) + Ok(FusedIterator::new(LsmIterator::new(iter, map_bound(upper))?)) } } diff --git a/mini-lsm/src/table.rs b/mini-lsm/src/table.rs index bd50002..8a56bf9 100644 --- a/mini-lsm/src/table.rs +++ b/mini-lsm/src/table.rs @@ -10,7 +10,7 @@ pub use builder::SsTableBuilder; use bytes::{Buf, BufMut, Bytes}; pub use iterator::SsTableIterator; -use crate::block::{self, Block}; +use crate::block::Block; use crate::lsm_storage::BlockCache; #[derive(Clone, Debug, PartialEq, Eq)] @@ -75,27 +75,6 @@ impl BlockMeta { /// A file object. /// /// Before day 4, it should look like: -/// -/// ```ignore -/// pub struct FileObject(Bytes); -/// -/// impl FileObject { -/// pub fn read(&self, offset: u64, len: u64) -> Result> { -/// Ok(self.0[offset as usize..(offset + len) as usize].to_vec()) -/// } -/// pub fn size(&self) -> u64 { -/// self.0.len() as u64 -/// } -/// -/// pub fn create(_path: &Path, data: Vec) -> Result { -/// Ok(FileObject(data.into())) -/// } -/// -/// pub fn open(_path: &Path) -> Result { -/// unimplemented!() -/// } -/// } -/// ``` pub struct FileObject(Option, u64); impl FileObject { @@ -162,13 +141,8 @@ impl SsTable { } /// Create a mock SST with only first key + last key metadata - pub fn create_meta_only( - id: usize, - file_size: u64, - first_key: Bytes, - last_key: Bytes, - ) -> Result { - Ok(Self { + pub fn create_meta_only(id: usize, file_size: u64, first_key: Bytes, last_key: Bytes) -> Self { + Self { file: FileObject(None, file_size), block_metas: vec![], block_meta_offset: 0, @@ -176,7 +150,7 @@ impl SsTable { block_cache: None, first_key, last_key, - }) + } } /// Read a block from the disk. @@ -215,6 +189,22 @@ impl SsTable { pub fn num_of_blocks(&self) -> usize { self.block_metas.len() } + + pub fn first_key(&self) -> &Bytes { + &self.first_key + } + + pub fn last_key(&self) -> &Bytes { + &self.last_key + } + + pub fn table_size(&self) -> u64 { + self.file.1 + } + + pub fn sst_id(&self) -> usize { + self.id + } } #[cfg(test)]