From 9473c893304fabb046eb0837a858087e1f39e27b Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Wed, 24 Jan 2024 14:32:13 +0800 Subject: [PATCH] checkin part 2 solution Signed-off-by: Alex Chi --- Cargo.lock | 60 +-- README.md | 4 +- .../src/bin/compaction-simulator.rs | 1 + mini-lsm-starter/src/bin/mini-lsm-cli.rs | 15 +- mini-lsm-starter/src/lsm_storage.rs | 14 +- mini-lsm-starter/src/mem_table.rs | 5 + mini-lsm/Cargo.toml | 3 +- mini-lsm/src/block.rs | 7 +- mini-lsm/src/block/builder.rs | 29 +- mini-lsm/src/block/iterator.rs | 33 +- mini-lsm/src/compact.rs | 230 +++++++++--- mini-lsm/src/compact/simple_leveled.rs | 10 + mini-lsm/src/iterators.rs | 9 +- mini-lsm/src/iterators/concat_iterator.rs | 122 ++++++ mini-lsm/src/iterators/merge_iterator.rs | 20 +- mini-lsm/src/iterators/two_merge_iterator.rs | 4 + mini-lsm/src/lsm_iterator.rs | 63 +++- mini-lsm/src/lsm_storage.rs | 353 ++++++++++++++---- mini-lsm/src/mem_table.rs | 44 ++- mini-lsm/src/table.rs | 28 +- mini-lsm/src/table/bloom.rs | 113 ++++++ mini-lsm/src/table/builder.rs | 17 +- mini-lsm/src/tests.rs | 10 +- mini-lsm/src/tests/harness.rs | 3 +- mini-lsm/src/tests/week1_day6.rs | 1 - 25 files changed, 945 insertions(+), 253 deletions(-) create mode 100644 mini-lsm/src/iterators/concat_iterator.rs create mode 100644 mini-lsm/src/table/bloom.rs diff --git a/Cargo.lock b/Cargo.lock index 9615396..7d51a01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,12 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "Inflector" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" - [[package]] name = "aliasable" version = "0.1.3" @@ -420,8 +414,9 @@ dependencies = [ "crossbeam-channel", "crossbeam-epoch", "crossbeam-skiplist", + "farmhash", "moka", - "ouroboros 0.15.5", + "ouroboros", "parking_lot", "rand", "serde", @@ -442,7 +437,7 @@ dependencies = [ "crossbeam-skiplist", "farmhash", "moka", - "ouroboros 0.18.2", + "ouroboros", "parking_lot", "rand", "serde", @@ -509,16 +504,6 @@ dependencies = [ "windows-sys 0.42.0", ] -[[package]] -name = "ouroboros" -version = "0.15.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfbb50b356159620db6ac971c6d5c9ab788c9cc38a6f49619fca2a27acb062ca" -dependencies = [ - "aliasable", - "ouroboros_macro 0.15.5", -] - [[package]] name = "ouroboros" version = "0.18.2" @@ -526,23 +511,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a50b637ffd883b2733a8483599fb6136b9dcedaa1850f7ac08b9b6f9f2061208" dependencies = [ "aliasable", - "ouroboros_macro 0.18.2", + "ouroboros_macro", "static_assertions", ] -[[package]] -name = "ouroboros_macro" -version = "0.15.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a0d9d1a6191c4f391f87219d1ea42b23f09ee84d64763cd05ee6ea88d9f384d" -dependencies = [ - "Inflector", - "proc-macro-error", - "proc-macro2", - "quote", - "syn 1.0.107", -] - [[package]] name = "ouroboros_macro" version = "0.18.2" @@ -586,30 +558,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.107", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", -] - [[package]] name = "proc-macro2" version = "1.0.76" diff --git a/README.md b/README.md index 200cf16..1ee1293 100644 --- a/README.md +++ b/README.md @@ -59,8 +59,8 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we | 2.2 | Compaction Strategy - Simple | ✅ | ✅ | ✅ | | 2.3 | Compaction Strategy - Tiered | ✅ | ✅ | ✅ | | 2.4 | Compaction Strategy - Leveled | ✅ | ✅ | ✅ | -| 2.5 | Manifest | ✅ | 🚧 | 🚧 | -| 2.6 | Write-Ahead Log | ✅ | 🚧 | 🚧 | +| 2.5 | Manifest | ✅ | ✅ | 🚧 | +| 2.6 | Write-Ahead Log | ✅ | ✅ | 🚧 | | 2.7 | Batch Write + Checksum | | | | | 3.1 | Timestamp Key Encoding + New Block Format | | | | | 3.2 | Prefix Bloom Filter | | | | diff --git a/mini-lsm-starter/src/bin/compaction-simulator.rs b/mini-lsm-starter/src/bin/compaction-simulator.rs index ced74a7..f91058f 100644 --- a/mini-lsm-starter/src/bin/compaction-simulator.rs +++ b/mini-lsm-starter/src/bin/compaction-simulator.rs @@ -343,6 +343,7 @@ fn main() { } else { storage.dump_original_id(false, false); } + println!("--- Compaction Task ---"); let mut num_compactions = 0; while let Some(task) = { println!("--- Compaction Task ---"); diff --git a/mini-lsm-starter/src/bin/mini-lsm-cli.rs b/mini-lsm-starter/src/bin/mini-lsm-cli.rs index 5a07863..ba4e0c5 100644 --- a/mini-lsm-starter/src/bin/mini-lsm-cli.rs +++ b/mini-lsm-starter/src/bin/mini-lsm-cli.rs @@ -107,6 +107,19 @@ fn main() -> Result<()> { } else { println!("{} not exist", key); } + } else if line == "scan" { + let mut iter = lsm.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded)?; + let mut cnt = 0; + while iter.is_valid() { + println!( + "{:?}={:?}", + Bytes::copy_from_slice(iter.key()), + Bytes::copy_from_slice(iter.value()), + ); + iter.next()?; + cnt += 1; + } + println!("{} keys scanned", cnt); } else if line.starts_with("scan ") { let Some((_, rest)) = line.split_once(' ') else { println!("invalid command"); @@ -137,7 +150,7 @@ fn main() -> Result<()> { lsm.force_flush()?; } else if line == "full_compaction" { lsm.force_full_compaction()?; - } else if line == "quit" { + } else if line == "quit" || line == "close" { lsm.close()?; break; } else { diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index 9e50d37..21f7025 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -166,10 +166,16 @@ impl MiniLsm { self.inner.scan(lower, upper) } + /// Only call this in test cases due to race conditions pub fn force_flush(&self) -> Result<()> { - self.inner - .force_freeze_memtable(&self.inner.state_lock.lock())?; - self.inner.force_flush_next_imm_memtable() + if !self.inner.state.read().memtable.is_empty() { + self.inner + .force_freeze_memtable(&self.inner.state_lock.lock())?; + } + if !self.inner.state.read().imm_memtables.is_empty() { + self.inner.force_flush_next_imm_memtable()?; + } + Ok(()) } pub fn force_full_compaction(&self) -> Result<()> { @@ -247,7 +253,7 @@ impl LsmStorageInner { Self::path_of_wal_static(&self.path, id) } - fn sync_dir(&self) -> Result<()> { + pub(super) fn sync_dir(&self) -> Result<()> { unimplemented!() } diff --git a/mini-lsm-starter/src/mem_table.rs b/mini-lsm-starter/src/mem_table.rs index 12b8619..2070707 100644 --- a/mini-lsm-starter/src/mem_table.rs +++ b/mini-lsm-starter/src/mem_table.rs @@ -88,6 +88,11 @@ impl MemTable { self.approximate_size .load(std::sync::atomic::Ordering::Relaxed) } + + /// Only use this function when closing the database + pub fn is_empty(&self) -> bool { + self.map.is_empty() + } } type SkipMapRangeIter<'a> = diff --git a/mini-lsm/Cargo.toml b/mini-lsm/Cargo.toml index 7bbedde..441baa3 100644 --- a/mini-lsm/Cargo.toml +++ b/mini-lsm/Cargo.toml @@ -16,13 +16,14 @@ bytes = "1" crossbeam-epoch = "0.9" crossbeam-skiplist = "0.1" parking_lot = "0.12" -ouroboros = "0.15" +ouroboros = "0.18" moka = "0.9" clap = { version = "4.4.17", features = ["derive"] } rand = "0.8.5" crossbeam-channel = "0.5.11" serde_json = { version = "1.0" } serde = { version = "1.0", features = ["derive"] } +farmhash = "1" [dev-dependencies] tempfile = "3" diff --git a/mini-lsm/src/block.rs b/mini-lsm/src/block.rs index fe4e035..932c4c6 100644 --- a/mini-lsm/src/block.rs +++ b/mini-lsm/src/block.rs @@ -10,8 +10,8 @@ pub(crate) const SIZEOF_U16: usize = std::mem::size_of::(); /// A block is the smallest unit of read and caching in LSM tree. It is a collection of sorted /// key-value pairs. pub struct Block { - data: Vec, - offsets: Vec, + pub(crate) data: Vec, + pub(crate) offsets: Vec, } impl Block { @@ -41,6 +41,3 @@ impl Block { Self { data, offsets } } } - -#[cfg(test)] -mod tests; diff --git a/mini-lsm/src/block/builder.rs b/mini-lsm/src/block/builder.rs index f69ec43..5b13c25 100644 --- a/mini-lsm/src/block/builder.rs +++ b/mini-lsm/src/block/builder.rs @@ -10,6 +10,22 @@ pub struct BlockBuilder { data: Vec, /// The expected block size. block_size: usize, + /// The first key in the block + first_key: Vec, +} + +fn compute_overlap(first_key: &[u8], key: &[u8]) -> usize { + let mut i = 0; + loop { + if i >= first_key.len() || i >= key.len() { + break; + } + if first_key[i] != key[i] { + break; + } + i += 1; + } + i } impl BlockBuilder { @@ -19,6 +35,7 @@ impl BlockBuilder { offsets: Vec::new(), data: Vec::new(), block_size, + first_key: Vec::new(), } } @@ -38,14 +55,22 @@ impl BlockBuilder { } // Add the offset of the data into the offset array. self.offsets.push(self.data.len() as u16); + let overlap = compute_overlap(&self.first_key, key); + // Encode key overlap. + self.data.put_u16(overlap as u16); // Encode key length. - self.data.put_u16(key.len() as u16); + self.data.put_u16((key.len() - overlap) as u16); // Encode key content. - self.data.put(key); + self.data.put(&key[overlap..]); // Encode value length. self.data.put_u16(value.len() as u16); // Encode value content. self.data.put(value); + + if self.first_key.is_empty() { + self.first_key = key.to_vec(); + } + true } diff --git a/mini-lsm/src/block/iterator.rs b/mini-lsm/src/block/iterator.rs index 79d1393..72de838 100644 --- a/mini-lsm/src/block/iterator.rs +++ b/mini-lsm/src/block/iterator.rs @@ -2,6 +2,8 @@ use std::sync::Arc; use bytes::Buf; +use crate::block::SIZEOF_U16; + use super::Block; /// Iterates on a block. @@ -10,18 +12,31 @@ pub struct BlockIterator { block: Arc, /// the current key at the iterator position key: Vec, - /// the current value at the iterator position - value: Vec, + /// the value range from the block + value_range: (usize, usize), /// the current index at the iterator position idx: usize, + /// the first key in the block + first_key: Vec, +} + +impl Block { + fn get_first_key(&self) -> Vec { + let mut buf = &self.data[..]; + buf.get_u16(); + let key_len = buf.get_u16(); + let key = &buf[..key_len as usize]; + key.to_vec() + } } impl BlockIterator { fn new(block: Arc) -> Self { Self { + first_key: block.get_first_key(), block, key: Vec::new(), - value: Vec::new(), + value_range: (0, 0), idx: 0, } } @@ -49,7 +64,7 @@ impl BlockIterator { /// Returns the value of the current entry. pub fn value(&self) -> &[u8] { debug_assert!(!self.key.is_empty(), "invalid iterator"); - &self.value + &self.block.data[self.value_range.0..self.value_range.1] } /// Returns true if the iterator is valid. @@ -66,7 +81,7 @@ impl BlockIterator { fn seek_to(&mut self, idx: usize) { if idx >= self.block.offsets.len() { self.key.clear(); - self.value.clear(); + self.value_range = (0, 0); return; } let offset = self.block.offsets[idx] as usize; @@ -86,16 +101,18 @@ impl BlockIterator { let mut entry = &self.block.data[offset..]; // Since `get_u16()` will automatically move the ptr 2 bytes ahead here, // we don't need to manually advance it + let overlap_len = entry.get_u16() as usize; let key_len = entry.get_u16() as usize; let key = entry[..key_len].to_vec(); entry.advance(key_len); self.key.clear(); + self.key.extend(&self.first_key[..overlap_len]); self.key.extend(key); let value_len = entry.get_u16() as usize; - let value = entry[..value_len].to_vec(); + let value_offset_begin = offset + SIZEOF_U16 + SIZEOF_U16 + key_len + SIZEOF_U16; + let value_offset_end = value_offset_begin + value_len; + self.value_range = (value_offset_begin, value_offset_end); entry.advance(value_len); - self.value.clear(); - self.value.extend(value); } /// Seek to the first key that is >= `key`. diff --git a/mini-lsm/src/compact.rs b/mini-lsm/src/compact.rs index 3907db8..fca211c 100644 --- a/mini-lsm/src/compact.rs +++ b/mini-lsm/src/compact.rs @@ -1,7 +1,10 @@ +#![allow(dead_code)] // REMOVE THIS LINE after fully implementing this functionality + mod leveled; mod simple_leveled; mod tiered; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -13,7 +16,9 @@ pub use simple_leveled::{ }; pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask}; +use crate::iterators::concat_iterator::SstConcatIterator; use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::two_merge_iterator::TwoMergeIterator; use crate::iterators::StorageIterator; use crate::lsm_storage::{LsmStorageInner, LsmStorageState}; use crate::manifest::ManifestRecord; @@ -24,13 +29,16 @@ pub enum CompactionTask { Leveled(LeveledCompactionTask), Tiered(TieredCompactionTask), Simple(SimpleLeveledCompactionTask), - ForceFullCompaction(Vec), + ForceFullCompaction { + l0_sstables: Vec, + l1_sstables: Vec, + }, } impl CompactionTask { fn compact_to_bottom_level(&self) -> bool { match self { - CompactionTask::ForceFullCompaction(_) => true, + CompactionTask::ForceFullCompaction { .. } => true, CompactionTask::Leveled(task) => task.is_lower_level_bottom_level, CompactionTask::Simple(task) => task.is_lower_level_bottom_level, CompactionTask::Tiered(task) => task.bottom_tier_included, @@ -105,50 +113,13 @@ pub enum CompactionOptions { } impl LsmStorageInner { - fn compact(&self, task: &CompactionTask) -> Result>> { - let table_ids = match task { - CompactionTask::Leveled(task) => task - .lower_level_sst_ids - .iter() - .copied() - .chain(task.upper_level_sst_ids.iter().copied()) - .collect::>(), - CompactionTask::Simple(task) => task - .lower_level_sst_ids - .iter() - .copied() - .chain(task.upper_level_sst_ids.iter().copied()) - .collect::>(), - CompactionTask::Tiered(task) => task - .tiers - .iter() - .map(|(_, files)| files) - .flatten() - .copied() - .collect::>(), - CompactionTask::ForceFullCompaction(l0_ssts) => l0_ssts.clone(), - }; - let tables: Vec> = { - let state = self.state.read(); - table_ids - .iter() - .map(|id| state.sstables.get(id).unwrap().clone()) - .collect::>() - }; - - let mut iters = Vec::new(); - iters.reserve(tables.len()); - for table in tables.iter() { - iters.push(Box::new(SsTableIterator::create_and_seek_to_first( - table.clone(), - )?)); - } - let mut iter = MergeIterator::create(iters); - + fn compact_generate_sst_from_iter( + &self, + mut iter: impl StorageIterator, + compact_to_bottom_level: bool, + ) -> Result>> { let mut builder = None; - let mut new_sst = vec![]; - - let compact_to_bottom_level = task.compact_to_bottom_level(); + let mut new_sst = Vec::new(); while iter.is_valid() { if builder.is_none() { @@ -165,7 +136,7 @@ impl LsmStorageInner { iter.next()?; if builder_inner.estimated_size() >= self.options.target_sst_size { - let sst_id = self.next_sst_id(); // lock dropped here + let sst_id = self.next_sst_id(); let builder = builder.take().unwrap(); let sst = Arc::new(builder.build( sst_id, @@ -187,6 +158,98 @@ impl LsmStorageInner { Ok(new_sst) } + fn compact(&self, task: &CompactionTask) -> Result>> { + let snapshot = { + let state = self.state.read(); + state.clone() + }; + match task { + CompactionTask::ForceFullCompaction { + l0_sstables, + l1_sstables, + } => { + let mut l0_iters = Vec::with_capacity(l0_sstables.len()); + for id in l0_sstables.iter() { + l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( + snapshot.sstables.get(id).unwrap().clone(), + )?)); + } + let mut l1_iters = Vec::with_capacity(l1_sstables.len()); + for id in l1_sstables.iter() { + l1_iters.push(snapshot.sstables.get(id).unwrap().clone()); + } + let iter = TwoMergeIterator::create( + MergeIterator::create(l0_iters), + SstConcatIterator::create_and_seek_to_first(l1_iters)?, + )?; + self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level()) + } + CompactionTask::Simple(SimpleLeveledCompactionTask { + upper_level, + upper_level_sst_ids, + lower_level: _, + lower_level_sst_ids, + .. + }) + | CompactionTask::Leveled(LeveledCompactionTask { + upper_level, + upper_level_sst_ids, + lower_level: _, + lower_level_sst_ids, + .. + }) => match upper_level { + Some(_) => { + let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len()); + for id in upper_level_sst_ids.iter() { + upper_ssts.push(snapshot.sstables.get(id).unwrap().clone()); + } + let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?; + let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len()); + for id in lower_level_sst_ids.iter() { + lower_ssts.push(snapshot.sstables.get(id).unwrap().clone()); + } + let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?; + self.compact_generate_sst_from_iter( + TwoMergeIterator::create(upper_iter, lower_iter)?, + task.compact_to_bottom_level(), + ) + } + None => { + let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len()); + for id in upper_level_sst_ids.iter() { + upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( + snapshot.sstables.get(id).unwrap().clone(), + )?)); + } + let upper_iter = MergeIterator::create(upper_iters); + let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len()); + for id in lower_level_sst_ids.iter() { + lower_ssts.push(snapshot.sstables.get(id).unwrap().clone()); + } + let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?; + self.compact_generate_sst_from_iter( + TwoMergeIterator::create(upper_iter, lower_iter)?, + task.compact_to_bottom_level(), + ) + } + }, + CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => { + let mut iters = Vec::with_capacity(tiers.len()); + for (_, tier_sst_ids) in tiers { + let mut ssts = Vec::with_capacity(tier_sst_ids.len()); + for id in tier_sst_ids.iter() { + ssts.push(snapshot.sstables.get(id).unwrap().clone()); + } + iters.push(Box::new(SstConcatIterator::create_and_seek_to_first(ssts)?)); + } + self.compact_generate_sst_from_iter( + MergeIterator::create(iters), + task.compact_to_bottom_level(), + ) + } + } + } + pub fn force_full_compaction(&self) -> Result<()> { let CompactionOptions::NoCompaction = self.options.compaction_options else { panic!("full compaction can only be called with compaction is not enabled") @@ -195,15 +258,19 @@ impl LsmStorageInner { let state = self.state.read(); state.clone() }; - let mut original_sstables = snapshot.l0_sstables.clone(); - original_sstables.reverse(); // is this correct? - let sstables = self.compact(&CompactionTask::ForceFullCompaction( - original_sstables.clone(), - ))?; + + let l0_sstables = snapshot.l0_sstables.clone(); + let l1_sstables = snapshot.levels[0].1.clone(); + let compaction_task = CompactionTask::ForceFullCompaction { + l0_sstables: l0_sstables.clone(), + l1_sstables: l1_sstables.clone(), + }; + let sstables = self.compact(&compaction_task)?; + { let _state_lock = self.state_lock.lock(); let mut state = self.state.read().as_ref().clone(); - for sst in original_sstables.iter() { + for sst in l0_sstables.iter().chain(l1_sstables.iter()) { let result = state.sstables.remove(sst); assert!(result.is_some()); } @@ -213,11 +280,20 @@ impl LsmStorageInner { let result = state.sstables.insert(new_sst.sst_id(), new_sst); assert!(result.is_none()); } - state.l0_sstables = ids; + assert_eq!(l1_sstables, state.levels[0].1); + state.levels[0].1 = ids; + let mut l0_sstables_map = l0_sstables.iter().copied().collect::>(); + state.l0_sstables = state + .l0_sstables + .iter() + .filter(|x| !l0_sstables_map.remove(x)) + .copied() + .collect::>(); + assert!(l0_sstables_map.is_empty()); *self.state.write() = Arc::new(state); } - for sst in original_sstables { - std::fs::remove_file(self.path_of_sst(sst))?; + for sst in l0_sstables.iter().chain(l1_sstables.iter()) { + std::fs::remove_file(self.path_of_sst(*sst))?; } Ok(()) } @@ -235,6 +311,7 @@ impl LsmStorageInner { }; println!("running compaction task: {:?}", task); let sstables = self.compact(&task)?; + let files_added = sstables.len(); let output = sstables.iter().map(|x| x.sst_id()).collect::>(); let ssts_to_remove = { let state_lock = self.state_lock.lock(); @@ -244,7 +321,7 @@ impl LsmStorageInner { let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len()); for file_to_remove in &files_to_remove { let result = snapshot.sstables.remove(file_to_remove); - assert!(result.is_some()); + assert!(result.is_some(), "cannot remove {}.sst", file_to_remove); ssts_to_remove.push(result.unwrap()); } let mut new_sst_ids = Vec::new(); @@ -255,13 +332,24 @@ impl LsmStorageInner { } let mut state = self.state.write(); *state = Arc::new(snapshot); + drop(state); + self.sync_dir()?; self.manifest + .as_ref() + .unwrap() .add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?; ssts_to_remove }; + println!( + "compaction finished: {} files removed, {} files added", + ssts_to_remove.len(), + files_added + ); for sst in ssts_to_remove { std::fs::remove_file(self.path_of_sst(sst.sst_id()))?; } + self.sync_dir()?; + Ok(()) } @@ -289,4 +377,34 @@ impl LsmStorageInner { } Ok(None) } + + fn trigger_flush(&self) -> Result<()> { + if { + let state = self.state.read(); + state.imm_memtables.len() >= self.options.num_memtable_limit + } { + self.force_flush_next_imm_memtable()?; + } + + Ok(()) + } + + pub(crate) fn spawn_flush_thread( + self: &Arc, + rx: crossbeam_channel::Receiver<()>, + ) -> Result>> { + let this = self.clone(); + let handle = std::thread::spawn(move || { + let ticker = crossbeam_channel::tick(Duration::from_millis(50)); + loop { + crossbeam_channel::select! { + recv(ticker) -> _ => if let Err(e) = this.trigger_flush() { + eprintln!("flush failed: {}", e); + }, + recv(rx) -> _ => return + } + } + }); + return Ok(Some(handle)); + } } diff --git a/mini-lsm/src/compact/simple_leveled.rs b/mini-lsm/src/compact/simple_leveled.rs index affc70e..5f16a4e 100644 --- a/mini-lsm/src/compact/simple_leveled.rs +++ b/mini-lsm/src/compact/simple_leveled.rs @@ -28,6 +28,9 @@ impl SimpleLeveledCompactionController { Self { options } } + /// Generates a compaction task. + /// + /// Returns `None` if no compaction needs to be scheduled. The order of SSTs in the compaction task id vector matters. pub fn generate_compaction_task( &self, snapshot: &LsmStorageState, @@ -68,6 +71,13 @@ impl SimpleLeveledCompactionController { None } + /// Apply the compaction result. + /// + /// The compactor will call this function with the compaction task and the list of SST ids generated. This function applies the + /// result and generates a new LSM state. The functions should only change `l0_sstables` and `levels` without changing memtables + /// and `sstables` hash map. Though there should only be one thread running compaction jobs, you should think about the case + /// where an L0 SST gets flushed while the compactor generates new SSTs, and with that in mind, you should do some sanity checks + /// in your implementation. pub fn apply_compaction_result( &self, snapshot: &LsmStorageState, diff --git a/mini-lsm/src/iterators.rs b/mini-lsm/src/iterators.rs index bffb820..56a36a4 100644 --- a/mini-lsm/src/iterators.rs +++ b/mini-lsm/src/iterators.rs @@ -1,3 +1,4 @@ +pub mod concat_iterator; pub mod merge_iterator; pub mod two_merge_iterator; @@ -13,7 +14,9 @@ pub trait StorageIterator { /// Move to the next position. fn next(&mut self) -> anyhow::Result<()>; -} -#[cfg(test)] -mod tests; + /// Number of underlying active iterators for this iterator. + fn num_active_iterators(&self) -> usize { + 1 + } +} diff --git a/mini-lsm/src/iterators/concat_iterator.rs b/mini-lsm/src/iterators/concat_iterator.rs new file mode 100644 index 0000000..298d10b --- /dev/null +++ b/mini-lsm/src/iterators/concat_iterator.rs @@ -0,0 +1,122 @@ +use std::sync::Arc; + +use anyhow::Result; + +use crate::table::{SsTable, SsTableIterator}; + +use super::StorageIterator; + +/// Concat multiple iterators ordered in key order and their key ranges do not overlap. We do not want to create the +/// iterators when initializing this iterator to reduce the overhead of seeking. +pub struct SstConcatIterator { + current: Option, + next_sst_idx: usize, + sstables: Vec>, +} + +impl SstConcatIterator { + fn check_sst_valid(sstables: &[Arc]) { + for sst in sstables { + assert!(sst.first_key() <= sst.last_key()); + } + if !sstables.is_empty() { + for i in 0..(sstables.len() - 1) { + assert!(sstables[i].last_key() < sstables[i + 1].first_key()); + } + } + } + + pub fn create_and_seek_to_first(sstables: Vec>) -> Result { + Self::check_sst_valid(&sstables); + if sstables.is_empty() { + return Ok(Self { + current: None, + next_sst_idx: 0, + sstables, + }); + } + let mut iter = Self { + current: Some(SsTableIterator::create_and_seek_to_first( + sstables[0].clone(), + )?), + next_sst_idx: 1, + sstables, + }; + iter.move_until_valid()?; + Ok(iter) + } + + pub fn create_and_seek_to_key(sstables: Vec>, key: &[u8]) -> Result { + Self::check_sst_valid(&sstables); + let idx: usize = sstables + .partition_point(|table| table.first_key() <= key) + .saturating_sub(1); + if idx >= sstables.len() { + return Ok(Self { + current: None, + next_sst_idx: sstables.len(), + sstables, + }); + } + let mut iter = Self { + current: Some(SsTableIterator::create_and_seek_to_key( + sstables[idx].clone(), + key, + )?), + next_sst_idx: idx + 1, + sstables, + }; + iter.move_until_valid()?; + Ok(iter) + } + + fn move_until_valid(&mut self) -> Result<()> { + loop { + if let Some(iter) = self.current.as_mut() { + if iter.is_valid() { + break; + } + if self.next_sst_idx >= self.sstables.len() { + self.current = None; + } else { + self.current = Some(SsTableIterator::create_and_seek_to_first( + self.sstables[self.next_sst_idx].clone(), + )?); + self.next_sst_idx += 1; + } + } else { + break; + } + } + Ok(()) + } +} + +impl StorageIterator for SstConcatIterator { + fn key(&self) -> &[u8] { + self.current.as_ref().unwrap().key() + } + + fn value(&self) -> &[u8] { + self.current.as_ref().unwrap().value() + } + + fn is_valid(&self) -> bool { + if let Some(current) = &self.current { + assert!(current.is_valid()); + true + } else { + false + } + } + + fn next(&mut self) -> Result<()> { + self.current.as_mut().unwrap().next()?; + self.move_until_valid()?; + Ok(()) + } + + fn num_active_iterators(&self) -> usize { + 1 + } +} diff --git a/mini-lsm/src/iterators/merge_iterator.rs b/mini-lsm/src/iterators/merge_iterator.rs index e7ee35b..0e5b10b 100644 --- a/mini-lsm/src/iterators/merge_iterator.rs +++ b/mini-lsm/src/iterators/merge_iterator.rs @@ -77,13 +77,11 @@ impl MergeIterator { impl StorageIterator for MergeIterator { fn key(&self) -> &[u8] { - unsafe { self.current.as_ref().unwrap_unchecked() }.1.key() + self.current.as_ref().unwrap().1.key() } fn value(&self) -> &[u8] { - unsafe { self.current.as_ref().unwrap_unchecked() } - .1 - .value() + self.current.as_ref().unwrap().1.value() } fn is_valid(&self) -> bool { @@ -94,7 +92,7 @@ impl StorageIterator for MergeIterator { } fn next(&mut self) -> Result<()> { - let current = unsafe { self.current.as_mut().unwrap_unchecked() }; + let current = self.current.as_mut().unwrap(); // Pop the item out of the heap if they have the same value. while let Some(mut inner_iter) = self.iters.peek_mut() { debug_assert!( @@ -136,4 +134,16 @@ impl StorageIterator for MergeIterator { Ok(()) } + + fn num_active_iterators(&self) -> usize { + self.iters + .iter() + .map(|x| x.1.num_active_iterators()) + .sum::() + + self + .current + .as_ref() + .map(|x| x.1.num_active_iterators()) + .unwrap_or(0) + } } diff --git a/mini-lsm/src/iterators/two_merge_iterator.rs b/mini-lsm/src/iterators/two_merge_iterator.rs index 7115a4a..b7b33df 100644 --- a/mini-lsm/src/iterators/two_merge_iterator.rs +++ b/mini-lsm/src/iterators/two_merge_iterator.rs @@ -77,4 +77,8 @@ impl StorageIterator for TwoMergeIterato self.choose_a = Self::choose_a(&self.a, &self.b); Ok(()) } + + fn num_active_iterators(&self) -> usize { + self.a.num_active_iterators() + self.b.num_active_iterators() + } } diff --git a/mini-lsm/src/lsm_iterator.rs b/mini-lsm/src/lsm_iterator.rs index df49609..1a1817b 100644 --- a/mini-lsm/src/lsm_iterator.rs +++ b/mini-lsm/src/lsm_iterator.rs @@ -1,19 +1,23 @@ use std::ops::Bound; -use anyhow::Result; +use anyhow::{bail, Result}; use bytes::Bytes; +use crate::iterators::concat_iterator::SstConcatIterator; use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::two_merge_iterator::TwoMergeIterator; use crate::iterators::StorageIterator; use crate::mem_table::MemTableIterator; use crate::table::SsTableIterator; -type LsmIteratorInner = - TwoMergeIterator, MergeIterator>; +/// Represents the internal type for an LSM iterator. This type will be changed across the tutorial for multiple times. +type LsmIteratorInner = TwoMergeIterator< + TwoMergeIterator, MergeIterator>, + MergeIterator, +>; pub struct LsmIterator { - iter: LsmIteratorInner, + inner: LsmIteratorInner, end_bound: Bound, is_valid: bool, } @@ -22,7 +26,7 @@ impl LsmIterator { pub(crate) fn new(iter: LsmIteratorInner, end_bound: Bound) -> Result { let mut iter = Self { is_valid: iter.is_valid(), - iter, + inner: iter, end_bound, }; iter.move_to_non_delete()?; @@ -30,21 +34,21 @@ impl LsmIterator { } fn next_inner(&mut self) -> Result<()> { - self.iter.next()?; - if !self.iter.is_valid() { + self.inner.next()?; + if !self.inner.is_valid() { self.is_valid = false; return Ok(()); } match self.end_bound.as_ref() { Bound::Unbounded => {} - Bound::Included(key) => self.is_valid = self.iter.key() <= key.as_ref(), - Bound::Excluded(key) => self.is_valid = self.iter.key() < key.as_ref(), + Bound::Included(key) => self.is_valid = self.inner.key() <= key.as_ref(), + Bound::Excluded(key) => self.is_valid = self.inner.key() < key.as_ref(), } Ok(()) } fn move_to_non_delete(&mut self) -> Result<()> { - while self.is_valid() && self.iter.value().is_empty() { + while self.is_valid() && self.inner.value().is_empty() { self.next_inner()?; } Ok(()) @@ -57,11 +61,11 @@ impl StorageIterator for LsmIterator { } fn key(&self) -> &[u8] { - self.iter.key() + self.inner.key() } fn value(&self) -> &[u8] { - self.iter.value() + self.inner.value() } fn next(&mut self) -> Result<()> { @@ -69,38 +73,63 @@ impl StorageIterator for LsmIterator { self.move_to_non_delete()?; Ok(()) } + + fn num_active_iterators(&self) -> usize { + self.inner.num_active_iterators() + } } /// A wrapper around existing iterator, will prevent users from calling `next` when the iterator is -/// invalid. +/// invalid. If an iterator is already invalid, `next` does not do anything. If `next` returns an error, +/// `is_valid` should return false, and `next` should always return an error. pub struct FusedIterator { iter: I, + has_errored: bool, } impl FusedIterator { pub fn new(iter: I) -> Self { - Self { iter } + Self { + iter, + has_errored: false, + } } } impl StorageIterator for FusedIterator { fn is_valid(&self) -> bool { - self.iter.is_valid() + !self.has_errored && self.iter.is_valid() } fn key(&self) -> &[u8] { + if self.has_errored || !self.iter.is_valid() { + panic!("invalid access to the underlying iterator"); + } self.iter.key() } fn value(&self) -> &[u8] { + if self.has_errored || !self.iter.is_valid() { + panic!("invalid access to the underlying iterator"); + } self.iter.value() } fn next(&mut self) -> Result<()> { - // only move when the iterator is valid + // only move when the iterator is valid and not errored + if self.has_errored { + bail!("the iterator is tainted"); + } if self.iter.is_valid() { - self.iter.next()?; + if let Err(e) = self.iter.next() { + self.has_errored = true; + return Err(e); + } } Ok(()) } + + fn num_active_iterators(&self) -> usize { + self.iter.num_active_iterators() + } } diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index a453bba..4be7319 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -7,13 +7,14 @@ use std::sync::Arc; use anyhow::{Context, Result}; use bytes::Bytes; -use parking_lot::{Mutex, RwLock}; +use parking_lot::{Mutex, MutexGuard, RwLock}; use crate::block::Block; use crate::compact::{ CompactionController, CompactionOptions, LeveledCompactionController, LeveledCompactionOptions, SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController, }; +use crate::iterators::concat_iterator::SstConcatIterator; use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::two_merge_iterator::TwoMergeIterator; use crate::iterators::StorageIterator; @@ -24,13 +25,14 @@ use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator}; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; +/// Represents the state of the storage engine. #[derive(Clone)] pub struct LsmStorageState { /// The current memtable. pub memtable: Arc, - /// Immutable memtables, from earliest to latest. + /// Immutable memtables, from latest to earliest. pub imm_memtables: Vec>, - /// L0 SSTs, from earliest to latest. + /// L0 SSTs, from latest to earliest. pub l0_sstables: Vec, /// SsTables sorted by key range; L1 - L_max for leveled compaction, or tiers for tiered /// compaction. @@ -47,7 +49,8 @@ impl LsmStorageState { ..=*max_levels) .map(|level| (level, Vec::new())) .collect::>(), - CompactionOptions::Tiered(_) | CompactionOptions::NoCompaction => Vec::new(), + CompactionOptions::Tiered(_) => Vec::new(), + CompactionOptions::NoCompaction => vec![(1, Vec::new())], }; Self { memtable: Arc::new(MemTable::create(0)), @@ -60,8 +63,11 @@ impl LsmStorageState { } pub struct LsmStorageOptions { + // Block size in bytes pub block_size: usize, + // SST size in bytes, also the approximate memtable capacity limit pub target_sst_size: usize, + // Maximum number of memtables in memory, flush to L0 when exceeding this limit pub num_memtable_limit: usize, pub compaction_options: CompactionOptions, pub enable_wal: bool, @@ -74,9 +80,50 @@ impl LsmStorageOptions { target_sst_size: 2 << 20, compaction_options: CompactionOptions::NoCompaction, enable_wal: false, - num_memtable_limit: 3, + num_memtable_limit: 50, } } + + pub fn default_for_week1_day6_test() -> Self { + Self { + block_size: 4096, + target_sst_size: 2 << 20, + compaction_options: CompactionOptions::NoCompaction, + enable_wal: false, + num_memtable_limit: 2, + } + } +} + +fn range_overlap( + user_begin: Bound<&[u8]>, + user_end: Bound<&[u8]>, + table_begin: &[u8], + table_end: &[u8], +) -> bool { + match user_end { + Bound::Excluded(key) if key <= table_begin => { + return false; + } + Bound::Included(key) if key < table_begin => { + return false; + } + _ => {} + } + match user_begin { + Bound::Excluded(key) if key >= table_end => { + return false; + } + Bound::Included(key) if key > table_end => { + return false; + } + _ => {} + } + true +} + +fn key_within(user_key: &[u8], table_begin: &[u8], table_end: &[u8]) -> bool { + table_begin <= user_key && user_key <= table_end } /// The storage interface of the LSM tree. @@ -88,18 +135,26 @@ pub(crate) struct LsmStorageInner { next_sst_id: AtomicUsize, pub(crate) options: Arc, pub(crate) compaction_controller: CompactionController, - pub(crate) manifest: Manifest, + pub(crate) manifest: Option, } +/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM. pub struct MiniLsm { pub(crate) inner: Arc, + /// Notifies the L0 flush thread to stop working. (In week 1 day 6) + flush_notifier: crossbeam_channel::Sender<()>, + /// The handle for the compaction thread. (In week 1 day 6) + flush_thread: Mutex>>, + /// Notifies the compaction thread to stop working. (In week 2) compaction_notifier: crossbeam_channel::Sender<()>, + /// The handle for the compaction thread. (In week 2) compaction_thread: Mutex>>, } impl Drop for MiniLsm { fn drop(&mut self) { self.compaction_notifier.send(()).ok(); + self.flush_notifier.send(()).ok(); } } @@ -107,22 +162,59 @@ impl MiniLsm { pub fn close(&self) -> Result<()> { self.inner.sync_dir()?; self.compaction_notifier.send(()).ok(); + self.flush_notifier.send(()).ok(); + + if self.inner.options.enable_wal { + self.inner.sync()?; + self.inner.sync_dir()?; + return Ok(()); + } + let mut compaction_thread = self.compaction_thread.lock(); if let Some(compaction_thread) = compaction_thread.take() { compaction_thread .join() .map_err(|e| anyhow::anyhow!("{:?}", e))?; } + let mut flush_thread = self.flush_thread.lock(); + if let Some(flush_thread) = flush_thread.take() { + flush_thread + .join() + .map_err(|e| anyhow::anyhow!("{:?}", e))?; + } + + // create memtable and skip updating manifest + if !self.inner.state.read().memtable.is_empty() { + self.inner + .freeze_memtable_with_memtable(Arc::new(MemTable::create( + self.inner.next_sst_id(), + )))?; + } + + while { + let snapshot = self.inner.state.read(); + !snapshot.imm_memtables.is_empty() + } { + self.inner.force_flush_next_imm_memtable()?; + } + self.inner.sync_dir()?; + Ok(()) } + /// Start the storage engine by either loading an existing directory or creating a new one if the directory does + /// not exist. pub fn open(path: impl AsRef, options: LsmStorageOptions) -> Result> { let inner = Arc::new(LsmStorageInner::open(path, options)?); - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx1, rx) = crossbeam_channel::unbounded(); let compaction_thread = inner.spawn_compaction_thread(rx)?; + let (tx2, rx) = crossbeam_channel::unbounded(); + let flush_thread = inner.spawn_flush_thread(rx)?; Ok(Arc::new(Self { inner, - compaction_notifier: tx, + flush_notifier: tx2, + flush_thread: Mutex::new(flush_thread), + compaction_notifier: tx1, compaction_thread: Mutex::new(compaction_thread), })) } @@ -139,6 +231,10 @@ impl MiniLsm { self.inner.delete(key) } + pub fn sync(&self) -> Result<()> { + self.inner.sync() + } + pub fn scan( &self, lower: Bound<&[u8]>, @@ -147,9 +243,16 @@ impl MiniLsm { self.inner.scan(lower, upper) } + /// Only call this in test cases due to race conditions pub fn force_flush(&self) -> Result<()> { - self.inner.force_freeze_memtable()?; - self.inner.force_flush_next_imm_memtable() + if !self.inner.state.read().memtable.is_empty() { + self.inner + .force_freeze_memtable(&self.inner.state_lock.lock())?; + } + if !self.inner.state.read().imm_memtables.is_empty() { + self.inner.force_flush_next_imm_memtable()?; + } + Ok(()) } pub fn force_full_compaction(&self) -> Result<()> { @@ -163,6 +266,8 @@ impl LsmStorageInner { .fetch_add(1, std::sync::atomic::Ordering::SeqCst) } + /// Start the storage engine by either loading an existing directory or creating a new one if the directory does + /// not exist. pub(crate) fn open(path: impl AsRef, options: LsmStorageOptions) -> Result { let mut state = LsmStorageState::create(&options); let path = path.as_ref(); @@ -204,10 +309,15 @@ impl LsmStorageInner { ManifestRecord::Flush(sst_id) => { let res = memtables.remove(&sst_id); assert!(res, "memtable not exist?"); - state.l0_sstables.insert(0, sst_id); + if compaction_controller.flush_to_l0() { + state.l0_sstables.insert(0, sst_id); + } else { + state.levels.insert(0, (sst_id, vec![sst_id])); + } + next_sst_id = next_sst_id.max(sst_id); } ManifestRecord::NewMemtable(x) => { - next_sst_id = x + 1; + next_sst_id = next_sst_id.max(x); memtables.insert(x); } ManifestRecord::Compaction(task, output) => { @@ -215,9 +325,13 @@ impl LsmStorageInner { compaction_controller.apply_compaction_result(&state, &task, &output); // TODO: apply remove again state = new_state; + next_sst_id = + next_sst_id.max(output.iter().max().copied().unwrap_or_default()); } } } + + let mut sst_cnt = 0; // recover SSTs for table_id in state .l0_sstables @@ -232,15 +346,24 @@ impl LsmStorageInner { .context("failed to open SST")?, )?; state.sstables.insert(table_id, Arc::new(sst)); + sst_cnt += 1; } + println!("{} SSTs opened", sst_cnt); + + next_sst_id += 1; + // recover memtables if options.enable_wal { + let mut wal_cnt = 0; for id in memtables.iter() { let memtable = MemTable::recover_from_wal(*id, Self::path_of_wal_static(path, *id))?; - state.imm_memtables.insert(0, Arc::new(memtable)); - next_sst_id = *id + 1; + if !memtable.is_empty() { + state.imm_memtables.insert(0, Arc::new(memtable)); + wal_cnt += 1; + } } + println!("{} WALs recovered", wal_cnt); state.memtable = Arc::new(MemTable::create_with_wal( next_sst_id, Self::path_of_wal_static(path, next_sst_id), @@ -260,7 +383,7 @@ impl LsmStorageInner { block_cache, next_sst_id: AtomicUsize::new(next_sst_id), compaction_controller, - manifest, + manifest: Some(manifest), options: options.into(), }; storage.sync_dir()?; @@ -268,6 +391,10 @@ impl LsmStorageInner { Ok(storage) } + pub fn sync(&self) -> Result<()> { + self.state.read().memtable.sync_wal() + } + /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. pub fn get(&self, key: &[u8]) -> Result> { let snapshot = { @@ -294,19 +421,47 @@ impl LsmStorageInner { return Ok(Some(value)); } } - let mut iters = Vec::with_capacity(snapshot.l0_sstables.len()); - for table in snapshot - .l0_sstables - .iter() - .chain(snapshot.levels.iter().map(|(_, files)| files).flatten()) - { - iters.push(Box::new(SsTableIterator::create_and_seek_to_key( - snapshot.sstables[table].clone(), - key, - )?)); + + let mut l0_iters = Vec::with_capacity(snapshot.l0_sstables.len()); + + let keep_table = |key: &[u8], table: &SsTable| { + if key_within(key, table.first_key(), table.last_key()) { + if let Some(bloom) = &table.bloom { + if bloom.may_contain(farmhash::fingerprint32(key)) { + return true; + } + } else { + return true; + } + } + false + }; + + for table in snapshot.l0_sstables.iter() { + let table = snapshot.sstables[table].clone(); + if keep_table(key, &table) { + l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_key( + table, key, + )?)); + } } - let iter = MergeIterator::create(iters); - if iter.is_valid() && iter.key() == key { + let l0_iter = MergeIterator::create(l0_iters); + let mut level_iters = Vec::with_capacity(snapshot.levels.len()); + for (_, level_sst_ids) in &snapshot.levels { + let mut level_ssts = Vec::with_capacity(snapshot.levels[0].1.len()); + for table in level_sst_ids { + let table = snapshot.sstables[table].clone(); + if keep_table(key, &table) { + level_ssts.push(table); + } + } + let level_iter = SstConcatIterator::create_and_seek_to_key(level_ssts, key)?; + level_iters.push(Box::new(level_iter)); + } + + let iter = TwoMergeIterator::create(l0_iter, MergeIterator::create(level_iters))?; + + if iter.is_valid() && iter.key() == key && !iter.value().is_empty() { return Ok(Some(Bytes::copy_from_slice(iter.value()))); } Ok(None) @@ -317,8 +472,14 @@ impl LsmStorageInner { assert!(!value.is_empty(), "value cannot be empty"); assert!(!key.is_empty(), "key cannot be empty"); - let guard = self.state.read(); - guard.memtable.put(key, value)?; + let size; + { + let guard = self.state.read(); + guard.memtable.put(key, value)?; + size = guard.memtable.approximate_size(); + } + + self.try_freeze(size)?; Ok(()) } @@ -327,9 +488,28 @@ impl LsmStorageInner { pub fn delete(&self, key: &[u8]) -> Result<()> { assert!(!key.is_empty(), "key cannot be empty"); - let guard = self.state.read(); - guard.memtable.put(key, b"")?; + let size; + { + let guard = self.state.read(); + guard.memtable.put(key, b"")?; + size = guard.memtable.approximate_size(); + } + self.try_freeze(size)?; + + Ok(()) + } + + fn try_freeze(&self, estimated_size: usize) -> Result<()> { + if estimated_size >= self.options.target_sst_size { + let state_lock = self.state_lock.lock(); + let guard = self.state.read(); + // the memtable could have already been frozen, check again to ensure we really need to freeze + if guard.memtable.approximate_size() >= self.options.target_sst_size { + drop(guard); + self.force_freeze_memtable(&state_lock)?; + } + } Ok(()) } @@ -349,39 +529,46 @@ impl LsmStorageInner { Self::path_of_wal_static(&self.path, id) } - fn sync_dir(&self) -> Result<()> { + pub(super) fn sync_dir(&self) -> Result<()> { File::open(&self.path)?.sync_all()?; Ok(()) } - /// Force freeze the current memetable to an immutable memtable - pub fn force_freeze_memtable(&self) -> Result<()> { - let state_lock = self.state_lock.lock(); + fn freeze_memtable_with_memtable(&self, memtable: Arc) -> Result<()> { + let mut guard = self.state.write(); + // Swap the current memtable with a new one. + let mut snapshot = guard.as_ref().clone(); + let old_memtable = std::mem::replace(&mut snapshot.memtable, memtable); + // Add the memtable to the immutable memtables. + snapshot.imm_memtables.insert(0, old_memtable.clone()); + // Update the snapshot. + *guard = Arc::new(snapshot); - let memtable_id = self.next_sst_id(); - let memtable = Arc::new(if self.options.enable_wal { - let mt = MemTable::create_with_wal(memtable_id, self.path_of_wal(memtable_id))?; - self.sync_dir()?; - mt - } else { - MemTable::create(memtable_id) - }); - - let old_memtable; - { - let mut guard = self.state.write(); - // Swap the current memtable with a new one. - let mut snapshot = guard.as_ref().clone(); - old_memtable = std::mem::replace(&mut snapshot.memtable, memtable); - // Add the memtable to the immutable memtables. - snapshot.imm_memtables.insert(0, old_memtable.clone()); - // Update the snapshot. - *guard = Arc::new(snapshot); - } + drop(guard); old_memtable.sync_wal()?; - self.manifest - .add_record(&state_lock, ManifestRecord::NewMemtable(memtable_id))?; + Ok(()) + } + + /// Force freeze the current memtable to an immutable memtable + pub fn force_freeze_memtable(&self, state_lock_observer: &MutexGuard<'_, ()>) -> Result<()> { + let memtable_id = self.next_sst_id(); + let memtable = if self.options.enable_wal { + Arc::new(MemTable::create_with_wal( + memtable_id, + self.path_of_wal(memtable_id), + )?) + } else { + Arc::new(MemTable::create(memtable_id)) + }; + + self.freeze_memtable_with_memtable(memtable)?; + + self.manifest.as_ref().unwrap().add_record( + state_lock_observer, + ManifestRecord::NewMemtable(memtable_id), + )?; + self.sync_dir()?; Ok(()) } @@ -436,6 +623,8 @@ impl LsmStorageInner { } self.manifest + .as_ref() + .unwrap() .add_record(&state_lock, ManifestRecord::Flush(sst_id))?; self.sync_dir()?; @@ -462,30 +651,52 @@ impl LsmStorageInner { let memtable_iter = MergeIterator::create(memtable_iters); let mut table_iters = Vec::with_capacity(snapshot.l0_sstables.len()); - for table_id in snapshot - .l0_sstables - .iter() - .chain(snapshot.levels.iter().map(|(_, files)| files).flatten()) - { + for table_id in snapshot.l0_sstables.iter() { let table = snapshot.sstables[table_id].clone(); - let iter = match lower { - Bound::Included(key) => SsTableIterator::create_and_seek_to_key(table, key)?, + if range_overlap(lower, upper, table.first_key(), table.last_key()) { + let iter = match lower { + Bound::Included(key) => SsTableIterator::create_and_seek_to_key(table, key)?, + Bound::Excluded(key) => { + let mut iter = SsTableIterator::create_and_seek_to_key(table, key)?; + if iter.is_valid() && iter.key() == key { + iter.next()?; + } + iter + } + Bound::Unbounded => SsTableIterator::create_and_seek_to_first(table)?, + }; + + table_iters.push(Box::new(iter)); + } + } + + let l0_iter = MergeIterator::create(table_iters); + let mut level_iters = Vec::with_capacity(snapshot.levels.len()); + for (_, level_sst_ids) in &snapshot.levels { + let mut level_ssts = Vec::with_capacity(level_sst_ids.len()); + for table in level_sst_ids { + let table = snapshot.sstables[table].clone(); + if range_overlap(lower, upper, table.first_key(), table.last_key()) { + level_ssts.push(table); + } + } + + let level_iter = match lower { + Bound::Included(key) => SstConcatIterator::create_and_seek_to_key(level_ssts, key)?, Bound::Excluded(key) => { - let mut iter = SsTableIterator::create_and_seek_to_key(table, key)?; + let mut iter = SstConcatIterator::create_and_seek_to_key(level_ssts, key)?; if iter.is_valid() && iter.key() == key { iter.next()?; } iter } - Bound::Unbounded => SsTableIterator::create_and_seek_to_first(table)?, + Bound::Unbounded => SstConcatIterator::create_and_seek_to_first(level_ssts)?, }; - - table_iters.push(Box::new(iter)); + level_iters.push(Box::new(level_iter)); } - let table_iter = MergeIterator::create(table_iters); - - let iter = TwoMergeIterator::create(memtable_iter, table_iter)?; + let iter = TwoMergeIterator::create(memtable_iter, l0_iter)?; + let iter = TwoMergeIterator::create(iter, MergeIterator::create(level_iters))?; Ok(FusedIterator::new(LsmIterator::new( iter, diff --git a/mini-lsm/src/mem_table.rs b/mini-lsm/src/mem_table.rs index 08a0490..361ccb5 100644 --- a/mini-lsm/src/mem_table.rs +++ b/mini-lsm/src/mem_table.rs @@ -1,5 +1,8 @@ +#![allow(dead_code)] // REMOVE THIS LINE after fully implementing this functionality + use std::ops::Bound; use std::path::Path; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use anyhow::Result; @@ -12,13 +15,18 @@ use crate::iterators::StorageIterator; use crate::table::SsTableBuilder; use crate::wal::Wal; -/// A basic mem-table based on crossbeam-skiplist +/// A basic mem-table based on crossbeam-skiplist. +/// +/// An initial implementation of memtable is part of week 1, day 1. It will be incrementally implemented in other +/// chapters of week 1 and week 2. pub struct MemTable { map: Arc>, wal: Option, id: usize, + approximate_size: Arc, } +/// Create a bound of `Bytes` from a bound of `&[u8]`. pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound { match bound { Bound::Included(x) => Bound::Included(Bytes::copy_from_slice(x)), @@ -34,6 +42,7 @@ impl MemTable { id, map: Arc::new(SkipMap::new()), wal: None, + approximate_size: Arc::new(AtomicUsize::new(0)), } } @@ -43,6 +52,7 @@ impl MemTable { id, map: Arc::new(SkipMap::new()), wal: Some(Wal::create(path.as_ref())?), + approximate_size: Arc::new(AtomicUsize::new(0)), }) } @@ -53,6 +63,7 @@ impl MemTable { id, wal: Some(Wal::recover(path.as_ref(), &map)?), map, + approximate_size: Arc::new(AtomicUsize::new(0)), }) } @@ -62,9 +73,15 @@ impl MemTable { } /// Put a key-value pair into the mem-table. + /// + /// In week 1, day 1, simply put the key-value pair into the skipmap. + /// In week 2, day 6, also flush the data to WAL. pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + let estimated_size = key.len() + value.len(); self.map .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); + self.approximate_size + .fetch_add(estimated_size, std::sync::atomic::Ordering::Relaxed); if let Some(ref wal) = self.wal { wal.put(key, value)?; } @@ -84,7 +101,7 @@ impl MemTable { let mut iter = MemTableIteratorBuilder { map: self.map.clone(), iter_builder: |map| map.range((lower, upper)), - item: (Bytes::from_static(&[]), Bytes::from_static(&[])), + item: (Bytes::new(), Bytes::new()), } .build(); let entry = iter.with_iter_mut(|iter| MemTableIterator::entry_to_item(iter.next())); @@ -92,7 +109,7 @@ impl MemTable { iter } - /// Flush the mem-table to SSTable. + /// Flush the mem-table to SSTable. Implement in week 1 day 6. pub fn flush(&self, builder: &mut SsTableBuilder) -> Result<()> { for entry in self.map.iter() { builder.add(&entry.key()[..], &entry.value()[..]); @@ -103,18 +120,34 @@ impl MemTable { pub fn id(&self) -> usize { self.id } + + pub fn approximate_size(&self) -> usize { + self.approximate_size + .load(std::sync::atomic::Ordering::Relaxed) + } + + /// Only use this function when closing the database + pub fn is_empty(&self) -> bool { + self.map.is_empty() + } } type SkipMapRangeIter<'a> = crossbeam_skiplist::map::Range<'a, Bytes, (Bound, Bound), Bytes, Bytes>; -/// An iterator over a range of `SkipMap`. +/// An iterator over a range of `SkipMap`. This is a self-referential structure and please refer to week 1, day 2 +/// chapter for more information. +/// +/// This is part of week 1, day 2. #[self_referencing] pub struct MemTableIterator { + /// Stores a reference to the skipmap. map: Arc>, + /// Stores a skipmap iterator that refers to the lifetime of `MemTableIterator` itself. #[borrows(map)] #[not_covariant] iter: SkipMapRangeIter<'this>, + /// Stores the current key-value pair. item: (Bytes, Bytes), } @@ -145,6 +178,3 @@ impl StorageIterator for MemTableIterator { Ok(()) } } - -#[cfg(test)] -mod tests; diff --git a/mini-lsm/src/table.rs b/mini-lsm/src/table.rs index fc749cd..fecb101 100644 --- a/mini-lsm/src/table.rs +++ b/mini-lsm/src/table.rs @@ -1,3 +1,4 @@ +pub(crate) mod bloom; mod builder; mod iterator; @@ -13,6 +14,8 @@ pub use iterator::SsTableIterator; use crate::block::Block; use crate::lsm_storage::BlockCache; +use self::bloom::Bloom; + #[derive(Clone, Debug, PartialEq, Eq)] pub struct BlockMeta { /// Offset of this data block. @@ -107,16 +110,20 @@ impl FileObject { } } +/// An SSTable. pub struct SsTable { - file: FileObject, - block_meta: Vec, - block_meta_offset: usize, + /// The actual storage unit of SsTable, the format is as above. + pub(crate) file: FileObject, + /// The meta blocks that hold info for data blocks. + pub(crate) block_meta: Vec, + /// The offset that indicates the start point of meta blocks in `file`. + pub(crate) block_meta_offset: usize, id: usize, block_cache: Option>, first_key: Bytes, last_key: Bytes, + pub(crate) bloom: Option, } - impl SsTable { #[cfg(test)] pub(crate) fn open_for_test(file: FileObject) -> Result { @@ -126,9 +133,13 @@ impl SsTable { /// Open SSTable from a file. pub fn open(id: usize, block_cache: Option>, file: FileObject) -> Result { let len = file.size(); - let raw_meta_offset = file.read(len - 4, 4)?; + let raw_bloom_offset = file.read(len - 4, 4)?; + let bloom_offset = (&raw_bloom_offset[..]).get_u32() as u64; + let raw_bloom = file.read(bloom_offset, len - 4 - bloom_offset)?; + let bloom_filter = Bloom::decode(&raw_bloom); + let raw_meta_offset = file.read(bloom_offset - 4, 4)?; let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64; - let raw_meta = file.read(block_meta_offset, len - 4 - block_meta_offset)?; + let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?; let block_meta = BlockMeta::decode_block_meta(&raw_meta[..]); Ok(Self { file, @@ -138,6 +149,7 @@ impl SsTable { block_meta_offset: block_meta_offset as usize, id, block_cache, + bloom: Some(bloom_filter), }) } @@ -151,6 +163,7 @@ impl SsTable { block_cache: None, first_key, last_key, + bloom: None, } } @@ -207,6 +220,3 @@ impl SsTable { self.id } } - -#[cfg(test)] -mod tests; diff --git a/mini-lsm/src/table/bloom.rs b/mini-lsm/src/table/bloom.rs new file mode 100644 index 0000000..5b41d8e --- /dev/null +++ b/mini-lsm/src/table/bloom.rs @@ -0,0 +1,113 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use bytes::{BufMut, Bytes, BytesMut}; + +/// Implements a bloom filter +pub struct Bloom { + /// data of filter in bits + pub(crate) filter: Bytes, + /// number of hash functions + pub(crate) k: u8, +} + +pub trait BitSlice { + fn get_bit(&self, idx: usize) -> bool; + fn bit_len(&self) -> usize; +} + +pub trait BitSliceMut { + fn set_bit(&mut self, idx: usize, val: bool); +} + +impl> BitSlice for T { + fn get_bit(&self, idx: usize) -> bool { + let pos = idx / 8; + let offset = idx % 8; + (self.as_ref()[pos] & (1 << offset)) != 0 + } + + fn bit_len(&self) -> usize { + self.as_ref().len() * 8 + } +} + +impl> BitSliceMut for T { + fn set_bit(&mut self, idx: usize, val: bool) { + let pos = idx / 8; + let offset = idx % 8; + if val { + self.as_mut()[pos] |= 1 << offset; + } else { + self.as_mut()[pos] &= !(1 << offset); + } + } +} + +impl Bloom { + /// Decode a bloom filter + pub fn decode(buf: &[u8]) -> Self { + let filter = &buf[..buf.len() - 1]; + let k = buf[buf.len() - 1]; + Self { + filter: filter.to_vec().into(), + k, + } + } + + /// Encode a bloom filter + pub fn encode(&self, buf: &mut Vec) { + buf.extend(&self.filter); + buf.put_u8(self.k); + } + + /// Get bloom filter bits per key from entries count and FPR + pub fn bloom_bits_per_key(entries: usize, false_positive_rate: f64) -> usize { + let size = + -1.0 * (entries as f64) * false_positive_rate.ln() / std::f64::consts::LN_2.powi(2); + let locs = (size / (entries as f64)).ceil(); + locs as usize + } + + /// Build bloom filter from key hashes + pub fn build_from_key_hashes(keys: &[u32], bits_per_key: usize) -> Self { + let k = (bits_per_key as f64 * 0.69) as u32; + let k = k.min(30).max(1); + let nbits = (keys.len() * bits_per_key).max(64); + let nbytes = (nbits + 7) / 8; + let nbits = nbytes * 8; + let mut filter = BytesMut::with_capacity(nbytes); + filter.resize(nbytes, 0); + for h in keys { + let mut h = *h; + let delta = (h >> 17) | (h << 15); + for _ in 0..k { + let bit_pos = (h as usize) % nbits; + filter.set_bit(bit_pos, true); + h = h.wrapping_add(delta); + } + } + Self { + filter: filter.freeze(), + k: k as u8, + } + } + + /// Check if a bloom filter may contain some data + pub fn may_contain(&self, mut h: u32) -> bool { + if self.k > 30 { + // potential new encoding for short bloom filters + true + } else { + let nbits = self.filter.bit_len(); + let delta = (h >> 17) | (h << 15); + for _ in 0..self.k { + let bit_pos = h % (nbits as u32); + if !self.filter.get_bit(bit_pos as usize) { + return false; + } + h = h.wrapping_add(delta); + } + true + } + } +} diff --git a/mini-lsm/src/table/builder.rs b/mini-lsm/src/table/builder.rs index f95deb1..718c3c8 100644 --- a/mini-lsm/src/table/builder.rs +++ b/mini-lsm/src/table/builder.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use anyhow::Result; use bytes::BufMut; +use super::bloom::Bloom; use super::{BlockMeta, FileObject, SsTable}; use crate::block::BlockBuilder; use crate::lsm_storage::BlockCache; @@ -14,8 +15,9 @@ pub struct SsTableBuilder { first_key: Vec, last_key: Vec, data: Vec, - pub(super) meta: Vec, + pub(crate) meta: Vec, block_size: usize, + key_hashes: Vec, } impl SsTableBuilder { @@ -28,6 +30,7 @@ impl SsTableBuilder { last_key: Vec::new(), block_size, builder: BlockBuilder::new(block_size), + key_hashes: Vec::new(), } } @@ -38,6 +41,8 @@ impl SsTableBuilder { self.first_key.extend(key); } + self.key_hashes.push(farmhash::fingerprint32(key)); + if self.builder.add(key, value) { self.last_key.clear(); self.last_key.extend(key); @@ -71,7 +76,7 @@ impl SsTableBuilder { self.data.extend(encoded_block); } - /// Builds the SSTable and writes it to the given path. + /// Builds the SSTable and writes it to the given path. Use the `FileObject` structure to manipulate the disk objects. pub fn build( mut self, id: usize, @@ -83,6 +88,13 @@ impl SsTableBuilder { let meta_offset = buf.len(); BlockMeta::encode_block_meta(&self.meta, &mut buf); buf.put_u32(meta_offset as u32); + let bloom = Bloom::build_from_key_hashes( + &self.key_hashes, + Bloom::bloom_bits_per_key(self.key_hashes.len(), 0.01), + ); + let bloom_offset = buf.len(); + bloom.encode(&mut buf); + buf.put_u32(bloom_offset as u32); let file = FileObject::create(path.as_ref(), buf)?; Ok(SsTable { id, @@ -92,6 +104,7 @@ impl SsTableBuilder { block_meta: self.meta, block_meta_offset: meta_offset, block_cache, + bloom: Some(bloom), }) } diff --git a/mini-lsm/src/tests.rs b/mini-lsm/src/tests.rs index 8b13789..e6ec3ed 100644 --- a/mini-lsm/src/tests.rs +++ b/mini-lsm/src/tests.rs @@ -1 +1,9 @@ - +mod harness; +mod week1_day1; +mod week1_day2; +mod week1_day3; +mod week1_day4; +mod week1_day5; +mod week1_day6; +mod week1_day7; +mod week2_day1; diff --git a/mini-lsm/src/tests/harness.rs b/mini-lsm/src/tests/harness.rs index 6b4f4a9..8ee0d44 100644 --- a/mini-lsm/src/tests/harness.rs +++ b/mini-lsm/src/tests/harness.rs @@ -124,10 +124,9 @@ pub fn generate_sst( builder.build(id, block_cache, path.as_ref()).unwrap() } - pub fn sync(storage: &LsmStorageInner) { storage .force_freeze_memtable(&storage.state_lock.lock()) .unwrap(); storage.force_flush_next_imm_memtable().unwrap(); -} \ No newline at end of file +} diff --git a/mini-lsm/src/tests/week1_day6.rs b/mini-lsm/src/tests/week1_day6.rs index 6b63faa..0e8758a 100644 --- a/mini-lsm/src/tests/week1_day6.rs +++ b/mini-lsm/src/tests/week1_day6.rs @@ -11,7 +11,6 @@ use crate::{ lsm_storage::{LsmStorageInner, LsmStorageOptions, MiniLsm}, }; - #[test] fn test_task1_storage_scan() { let dir = tempdir().unwrap();