From 753e6d4f9ee68f42acba86952d5964b857c92139 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Thu, 25 Jan 2024 12:07:53 +0800 Subject: [PATCH] checkin initial MVCC codebase Signed-off-by: Alex Chi --- Cargo.lock | 21 + Cargo.toml | 2 +- mini-lsm-mvcc/Cargo.toml | 41 + mini-lsm-mvcc/README.md | 3 + mini-lsm-mvcc/src/bin/compaction-simulator.rs | 1 + mini-lsm-mvcc/src/bin/mini-lsm-cli.rs | 1 + mini-lsm-mvcc/src/bin/wrapper.rs | 6 + mini-lsm-mvcc/src/block.rs | 43 + mini-lsm-mvcc/src/block/builder.rs | 94 +++ mini-lsm-mvcc/src/block/iterator.rs | 137 ++++ mini-lsm-mvcc/src/compact.rs | 409 ++++++++++ mini-lsm-mvcc/src/compact/leveled.rs | 229 ++++++ mini-lsm-mvcc/src/compact/simple_leveled.rs | 114 +++ mini-lsm-mvcc/src/compact/tiered.rs | 135 ++++ mini-lsm-mvcc/src/debug.rs | 17 + mini-lsm-mvcc/src/iterators.rs | 26 + .../src/iterators/concat_iterator.rs | 123 +++ mini-lsm-mvcc/src/iterators/merge_iterator.rs | 155 ++++ .../iterators/tests/merge_iterator_test.rs | 137 ++++ .../tests/two_merge_iterator_test.rs | 129 +++ .../src/iterators/two_merge_iterator.rs | 94 +++ mini-lsm-mvcc/src/key.rs | 176 +++++ mini-lsm-mvcc/src/lib.rs | 14 + mini-lsm-mvcc/src/lsm_iterator.rs | 139 ++++ mini-lsm-mvcc/src/lsm_storage.rs | 737 ++++++++++++++++++ mini-lsm-mvcc/src/manifest.rs | 74 ++ mini-lsm-mvcc/src/mem_table.rs | 184 +++++ mini-lsm-mvcc/src/table.rs | 228 ++++++ mini-lsm-mvcc/src/table/bloom.rs | 113 +++ mini-lsm-mvcc/src/table/builder.rs | 112 +++ mini-lsm-mvcc/src/table/iterator.rs | 105 +++ mini-lsm-mvcc/src/tests.rs | 9 + mini-lsm-mvcc/src/tests/harness.rs | 1 + mini-lsm-mvcc/src/tests/week1_day1.rs | 1 + mini-lsm-mvcc/src/tests/week1_day2.rs | 1 + mini-lsm-mvcc/src/tests/week1_day3.rs | 1 + mini-lsm-mvcc/src/tests/week1_day4.rs | 1 + mini-lsm-mvcc/src/tests/week1_day5.rs | 1 + mini-lsm-mvcc/src/tests/week1_day6.rs | 1 + mini-lsm-mvcc/src/tests/week1_day7.rs | 1 + mini-lsm-mvcc/src/tests/week2_day1.rs | 1 + mini-lsm-mvcc/src/wal.rs | 70 ++ mini-lsm/README.md | 4 +- 43 files changed, 3889 insertions(+), 2 deletions(-) create mode 100644 mini-lsm-mvcc/Cargo.toml create mode 100644 mini-lsm-mvcc/README.md create mode 120000 mini-lsm-mvcc/src/bin/compaction-simulator.rs create mode 120000 mini-lsm-mvcc/src/bin/mini-lsm-cli.rs create mode 100644 mini-lsm-mvcc/src/bin/wrapper.rs create mode 100644 mini-lsm-mvcc/src/block.rs create mode 100644 mini-lsm-mvcc/src/block/builder.rs create mode 100644 mini-lsm-mvcc/src/block/iterator.rs create mode 100644 mini-lsm-mvcc/src/compact.rs create mode 100644 mini-lsm-mvcc/src/compact/leveled.rs create mode 100644 mini-lsm-mvcc/src/compact/simple_leveled.rs create mode 100644 mini-lsm-mvcc/src/compact/tiered.rs create mode 100644 mini-lsm-mvcc/src/debug.rs create mode 100644 mini-lsm-mvcc/src/iterators.rs create mode 100644 mini-lsm-mvcc/src/iterators/concat_iterator.rs create mode 100644 mini-lsm-mvcc/src/iterators/merge_iterator.rs create mode 100644 mini-lsm-mvcc/src/iterators/tests/merge_iterator_test.rs create mode 100644 mini-lsm-mvcc/src/iterators/tests/two_merge_iterator_test.rs create mode 100644 mini-lsm-mvcc/src/iterators/two_merge_iterator.rs create mode 100644 mini-lsm-mvcc/src/key.rs create mode 100644 mini-lsm-mvcc/src/lib.rs create mode 100644 mini-lsm-mvcc/src/lsm_iterator.rs create mode 100644 mini-lsm-mvcc/src/lsm_storage.rs create mode 100644 mini-lsm-mvcc/src/manifest.rs create mode 100644 mini-lsm-mvcc/src/mem_table.rs create mode 100644 mini-lsm-mvcc/src/table.rs create mode 100644 mini-lsm-mvcc/src/table/bloom.rs create mode 100644 mini-lsm-mvcc/src/table/builder.rs create mode 100644 mini-lsm-mvcc/src/table/iterator.rs create mode 100644 mini-lsm-mvcc/src/tests.rs create mode 120000 mini-lsm-mvcc/src/tests/harness.rs create mode 120000 mini-lsm-mvcc/src/tests/week1_day1.rs create mode 120000 mini-lsm-mvcc/src/tests/week1_day2.rs create mode 120000 mini-lsm-mvcc/src/tests/week1_day3.rs create mode 120000 mini-lsm-mvcc/src/tests/week1_day4.rs create mode 120000 mini-lsm-mvcc/src/tests/week1_day5.rs create mode 120000 mini-lsm-mvcc/src/tests/week1_day6.rs create mode 120000 mini-lsm-mvcc/src/tests/week1_day7.rs create mode 120000 mini-lsm-mvcc/src/tests/week2_day1.rs create mode 100644 mini-lsm-mvcc/src/wal.rs diff --git a/Cargo.lock b/Cargo.lock index 20bb872..ca0ca2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,27 @@ dependencies = [ "tempfile", ] +[[package]] +name = "mini-lsm-mvcc" +version = "0.2.0" +dependencies = [ + "anyhow", + "arc-swap", + "bytes", + "clap", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-skiplist", + "farmhash", + "moka", + "ouroboros", + "parking_lot", + "rand", + "serde", + "serde_json", + "tempfile", +] + [[package]] name = "mini-lsm-starter" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 36db65f..2c31f07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["mini-lsm", "xtask", "mini-lsm-starter"] +members = ["mini-lsm", "xtask", "mini-lsm-starter", "mini-lsm-mvcc"] resolver = "2" [workspace.package] diff --git a/mini-lsm-mvcc/Cargo.toml b/mini-lsm-mvcc/Cargo.toml new file mode 100644 index 0000000..92fc798 --- /dev/null +++ b/mini-lsm-mvcc/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "mini-lsm-mvcc" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +description = "A tutorial for building an LSM tree storage engine in a week." + + +[dependencies] +anyhow = "1" +arc-swap = "1" +bytes = "1" +crossbeam-epoch = "0.9" +crossbeam-skiplist = "0.1" +parking_lot = "0.12" +ouroboros = "0.18" +moka = "0.9" +clap = { version = "4.4.17", features = ["derive"] } +rand = "0.8.5" +crossbeam-channel = "0.5.11" +serde_json = { version = "1.0" } +serde = { version = "1.0", features = ["derive"] } +farmhash = "1" + +[dev-dependencies] +tempfile = "3" + +[[bin]] +name = "mini-lsm-cli-mvcc-ref" +path = "src/bin/mini-lsm-cli.rs" + +[[bin]] +name = "mini-lsm-wrapper-mvcc-ref" +path = "src/bin/wrapper.rs" + +[[bin]] +name = "compaction-simulator-mvcc-ref" +path = "src/bin/compaction-simulator.rs" diff --git a/mini-lsm-mvcc/README.md b/mini-lsm-mvcc/README.md new file mode 100644 index 0000000..632cfa2 --- /dev/null +++ b/mini-lsm-mvcc/README.md @@ -0,0 +1,3 @@ +# Week 3 Solution + +This is the solution of Mini-LSM week 3. diff --git a/mini-lsm-mvcc/src/bin/compaction-simulator.rs b/mini-lsm-mvcc/src/bin/compaction-simulator.rs new file mode 120000 index 0000000..c76876c --- /dev/null +++ b/mini-lsm-mvcc/src/bin/compaction-simulator.rs @@ -0,0 +1 @@ +../../../mini-lsm-starter/src/bin/compaction-simulator.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/bin/mini-lsm-cli.rs b/mini-lsm-mvcc/src/bin/mini-lsm-cli.rs new file mode 120000 index 0000000..ade5bc5 --- /dev/null +++ b/mini-lsm-mvcc/src/bin/mini-lsm-cli.rs @@ -0,0 +1 @@ +../../../mini-lsm-starter/src/bin/mini-lsm-cli.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/bin/wrapper.rs b/mini-lsm-mvcc/src/bin/wrapper.rs new file mode 100644 index 0000000..0872c5e --- /dev/null +++ b/mini-lsm-mvcc/src/bin/wrapper.rs @@ -0,0 +1,6 @@ +pub mod mini_lsm_wrapper { + pub use mini_lsm_mvcc::*; +} + +#[allow(dead_code)] +fn main() {} diff --git a/mini-lsm-mvcc/src/block.rs b/mini-lsm-mvcc/src/block.rs new file mode 100644 index 0000000..932c4c6 --- /dev/null +++ b/mini-lsm-mvcc/src/block.rs @@ -0,0 +1,43 @@ +mod builder; +mod iterator; + +pub use builder::BlockBuilder; +use bytes::{Buf, BufMut, Bytes}; +pub use iterator::BlockIterator; + +pub(crate) const SIZEOF_U16: usize = std::mem::size_of::(); + +/// A block is the smallest unit of read and caching in LSM tree. It is a collection of sorted +/// key-value pairs. +pub struct Block { + pub(crate) data: Vec, + pub(crate) offsets: Vec, +} + +impl Block { + pub fn encode(&self) -> Bytes { + let mut buf = self.data.clone(); + let offsets_len = self.offsets.len(); + for offset in &self.offsets { + buf.put_u16(*offset); + } + // Adds number of elements at the end of the block + buf.put_u16(offsets_len as u16); + buf.into() + } + + pub fn decode(data: &[u8]) -> Self { + // get number of elements in the block + let entry_offsets_len = (&data[data.len() - SIZEOF_U16..]).get_u16() as usize; + let data_end = data.len() - SIZEOF_U16 - entry_offsets_len * SIZEOF_U16; + let offsets_raw = &data[data_end..data.len() - SIZEOF_U16]; + // get offset array + let offsets = offsets_raw + .chunks(SIZEOF_U16) + .map(|mut x| x.get_u16()) + .collect(); + // retrieve data + let data = data[0..data_end].to_vec(); + Self { data, offsets } + } +} diff --git a/mini-lsm-mvcc/src/block/builder.rs b/mini-lsm-mvcc/src/block/builder.rs new file mode 100644 index 0000000..4d28a75 --- /dev/null +++ b/mini-lsm-mvcc/src/block/builder.rs @@ -0,0 +1,94 @@ +use bytes::BufMut; + +use crate::key::{KeySlice, KeyVec}; + +use super::{Block, SIZEOF_U16}; + +/// Builds a block. +pub struct BlockBuilder { + /// Offsets of each key-value entries. + offsets: Vec, + /// All serialized key-value pairs in the block. + data: Vec, + /// The expected block size. + block_size: usize, + /// The first key in the block + first_key: KeyVec, +} + +fn compute_overlap(first_key: KeySlice, key: KeySlice) -> usize { + let mut i = 0; + loop { + if i >= first_key.len() || i >= key.len() { + break; + } + if first_key.raw_ref()[i] != key.raw_ref()[i] { + break; + } + i += 1; + } + i +} + +impl BlockBuilder { + /// Creates a new block builder. + pub fn new(block_size: usize) -> Self { + Self { + offsets: Vec::new(), + data: Vec::new(), + block_size, + first_key: KeyVec::new(), + } + } + + fn estimated_size(&self) -> usize { + SIZEOF_U16 /* number of key-value pairs in the block */ + self.offsets.len() * SIZEOF_U16 /* offsets */ + self.data.len() + // key-value pairs + } + + /// Adds a key-value pair to the block. Returns false when the block is full. + #[must_use] + pub fn add(&mut self, key: KeySlice, value: &[u8]) -> bool { + assert!(!key.is_empty(), "key must not be empty"); + if self.estimated_size() + key.len() + value.len() + SIZEOF_U16 * 3 /* key_len, value_len and offset */ > self.block_size + && !self.is_empty() + { + return false; + } + // Add the offset of the data into the offset array. + self.offsets.push(self.data.len() as u16); + let overlap = compute_overlap(self.first_key.as_key_slice(), key); + // Encode key overlap. + self.data.put_u16(overlap as u16); + // Encode key length. + self.data.put_u16((key.len() - overlap) as u16); + // Encode key content. + self.data.put(&key.raw_ref()[overlap..]); + // Encode value length. + self.data.put_u16(value.len() as u16); + // Encode value content. + self.data.put(value); + + if self.first_key.is_empty() { + self.first_key = key.to_key_vec(); + } + + true + } + + /// Check if there are no key-value pairs in the block. + pub fn is_empty(&self) -> bool { + self.offsets.is_empty() + } + + /// Finalize the block. + pub fn build(self) -> Block { + if self.is_empty() { + panic!("block should not be empty"); + } + Block { + data: self.data, + offsets: self.offsets, + } + } +} diff --git a/mini-lsm-mvcc/src/block/iterator.rs b/mini-lsm-mvcc/src/block/iterator.rs new file mode 100644 index 0000000..65b06af --- /dev/null +++ b/mini-lsm-mvcc/src/block/iterator.rs @@ -0,0 +1,137 @@ +use std::sync::Arc; + +use bytes::Buf; + +use crate::{ + block::SIZEOF_U16, + key::{KeySlice, KeyVec}, +}; + +use super::Block; + +/// Iterates on a block. +pub struct BlockIterator { + /// reference to the block + block: Arc, + /// the current key at the iterator position + key: KeyVec, + /// the value range from the block + value_range: (usize, usize), + /// the current index at the iterator position + idx: usize, + /// the first key in the block + first_key: KeyVec, +} + +impl Block { + fn get_first_key(&self) -> KeyVec { + let mut buf = &self.data[..]; + buf.get_u16(); + let key_len = buf.get_u16(); + let key = &buf[..key_len as usize]; + KeyVec::from_vec(key.to_vec()) + } +} + +impl BlockIterator { + fn new(block: Arc) -> Self { + Self { + first_key: block.get_first_key(), + block, + key: KeyVec::new(), + value_range: (0, 0), + idx: 0, + } + } + + /// Creates a block iterator and seek to the first entry. + pub fn create_and_seek_to_first(block: Arc) -> Self { + let mut iter = Self::new(block); + iter.seek_to_first(); + iter + } + + /// Creates a block iterator and seek to the first key that >= `key`. + pub fn create_and_seek_to_key(block: Arc, key: KeySlice) -> Self { + let mut iter = Self::new(block); + iter.seek_to_key(key); + iter + } + + /// Returns the key of the current entry. + pub fn key(&self) -> KeySlice { + debug_assert!(!self.key.is_empty(), "invalid iterator"); + self.key.as_key_slice() + } + + /// Returns the value of the current entry. + pub fn value(&self) -> &[u8] { + debug_assert!(!self.key.is_empty(), "invalid iterator"); + &self.block.data[self.value_range.0..self.value_range.1] + } + + /// Returns true if the iterator is valid. + pub fn is_valid(&self) -> bool { + !self.key.is_empty() + } + + /// Seeks to the first key in the block. + pub fn seek_to_first(&mut self) { + self.seek_to(0); + } + + /// Seeks to the idx-th key in the block. + fn seek_to(&mut self, idx: usize) { + if idx >= self.block.offsets.len() { + self.key.clear(); + self.value_range = (0, 0); + return; + } + let offset = self.block.offsets[idx] as usize; + self.seek_to_offset(offset); + self.idx = idx; + } + + /// Move to the next key in the block. + pub fn next(&mut self) { + self.idx += 1; + self.seek_to(self.idx); + } + + /// Seek to the specified position and update the current `key` and `value` + /// Index update will be handled by caller + fn seek_to_offset(&mut self, offset: usize) { + let mut entry = &self.block.data[offset..]; + // Since `get_u16()` will automatically move the ptr 2 bytes ahead here, + // we don't need to manually advance it + let overlap_len = entry.get_u16() as usize; + let key_len = entry.get_u16() as usize; + let key = &entry[..key_len]; + self.key.clear(); + self.key.append(&self.first_key.raw_ref()[..overlap_len]); + self.key.append(key); + entry.advance(key_len); + let value_len = entry.get_u16() as usize; + let value_offset_begin = offset + SIZEOF_U16 + SIZEOF_U16 + key_len + SIZEOF_U16; + let value_offset_end = value_offset_begin + value_len; + self.value_range = (value_offset_begin, value_offset_end); + entry.advance(value_len); + } + + /// Seek to the first key that is >= `key`. + pub fn seek_to_key(&mut self, key: KeySlice) { + let mut low = 0; + let mut high = self.block.offsets.len(); + while low < high { + let mid = low + (high - low) / 2; + self.seek_to(mid); + assert!(self.is_valid()); + match self.key().cmp(&key) { + std::cmp::Ordering::Less => low = mid + 1, + std::cmp::Ordering::Greater => high = mid, + std::cmp::Ordering::Equal => return, + } + } + self.seek_to(low); + } +} diff --git a/mini-lsm-mvcc/src/compact.rs b/mini-lsm-mvcc/src/compact.rs new file mode 100644 index 0000000..2378bb2 --- /dev/null +++ b/mini-lsm-mvcc/src/compact.rs @@ -0,0 +1,409 @@ +mod leveled; +mod simple_leveled; +mod tiered; + +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask}; +use serde::{Deserialize, Serialize}; +pub use simple_leveled::{ + SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask, +}; +pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask}; + +use crate::iterators::concat_iterator::SstConcatIterator; +use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::two_merge_iterator::TwoMergeIterator; +use crate::iterators::StorageIterator; +use crate::key::KeySlice; +use crate::lsm_storage::{LsmStorageInner, LsmStorageState}; +use crate::manifest::ManifestRecord; +use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; + +#[derive(Debug, Serialize, Deserialize)] +pub enum CompactionTask { + Leveled(LeveledCompactionTask), + Tiered(TieredCompactionTask), + Simple(SimpleLeveledCompactionTask), + ForceFullCompaction { + l0_sstables: Vec, + l1_sstables: Vec, + }, +} + +impl CompactionTask { + fn compact_to_bottom_level(&self) -> bool { + match self { + CompactionTask::ForceFullCompaction { .. } => true, + CompactionTask::Leveled(task) => task.is_lower_level_bottom_level, + CompactionTask::Simple(task) => task.is_lower_level_bottom_level, + CompactionTask::Tiered(task) => task.bottom_tier_included, + } + } +} + +pub(crate) enum CompactionController { + Leveled(LeveledCompactionController), + Tiered(TieredCompactionController), + Simple(SimpleLeveledCompactionController), + NoCompaction, +} + +impl CompactionController { + pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option { + match self { + CompactionController::Leveled(ctrl) => ctrl + .generate_compaction_task(snapshot) + .map(CompactionTask::Leveled), + CompactionController::Simple(ctrl) => ctrl + .generate_compaction_task(snapshot) + .map(CompactionTask::Simple), + CompactionController::Tiered(ctrl) => ctrl + .generate_compaction_task(snapshot) + .map(CompactionTask::Tiered), + CompactionController::NoCompaction => unreachable!(), + } + } + + pub fn apply_compaction_result( + &self, + snapshot: &LsmStorageState, + task: &CompactionTask, + output: &[usize], + ) -> (LsmStorageState, Vec) { + match (self, task) { + (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => { + ctrl.apply_compaction_result(snapshot, task, output) + } + (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => { + ctrl.apply_compaction_result(snapshot, task, output) + } + (CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => { + ctrl.apply_compaction_result(snapshot, task, output) + } + _ => unreachable!(), + } + } +} + +impl CompactionController { + pub fn flush_to_l0(&self) -> bool { + matches!( + self, + Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction + ) + } +} + +pub enum CompactionOptions { + /// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled + /// Compaction) + Leveled(LeveledCompactionOptions), + /// Tiered compaction (= RocksDB's universal compaction) + Tiered(TieredCompactionOptions), + /// Simple leveled compaction + Simple(SimpleLeveledCompactionOptions), + /// In no compaction mode (week 1), always flush to L0 + NoCompaction, +} + +impl LsmStorageInner { + fn compact_generate_sst_from_iter( + &self, + mut iter: impl for<'a> StorageIterator = KeySlice<'a>>, + compact_to_bottom_level: bool, + ) -> Result>> { + let mut builder = None; + let mut new_sst = Vec::new(); + + while iter.is_valid() { + if builder.is_none() { + builder = Some(SsTableBuilder::new(self.options.block_size)); + } + let builder_inner = builder.as_mut().unwrap(); + if compact_to_bottom_level { + if !iter.value().is_empty() { + builder_inner.add(iter.key(), iter.value()); + } + } else { + builder_inner.add(iter.key(), iter.value()); + } + iter.next()?; + + if builder_inner.estimated_size() >= self.options.target_sst_size { + let sst_id = self.next_sst_id(); + let builder = builder.take().unwrap(); + let sst = Arc::new(builder.build( + sst_id, + Some(self.block_cache.clone()), + self.path_of_sst(sst_id), + )?); + new_sst.push(sst); + } + } + if let Some(builder) = builder { + let sst_id = self.next_sst_id(); // lock dropped here + let sst = Arc::new(builder.build( + sst_id, + Some(self.block_cache.clone()), + self.path_of_sst(sst_id), + )?); + new_sst.push(sst); + } + Ok(new_sst) + } + + fn compact(&self, task: &CompactionTask) -> Result>> { + let snapshot = { + let state = self.state.read(); + state.clone() + }; + match task { + CompactionTask::ForceFullCompaction { + l0_sstables, + l1_sstables, + } => { + let mut l0_iters = Vec::with_capacity(l0_sstables.len()); + for id in l0_sstables.iter() { + l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( + snapshot.sstables.get(id).unwrap().clone(), + )?)); + } + let mut l1_iters = Vec::with_capacity(l1_sstables.len()); + for id in l1_sstables.iter() { + l1_iters.push(snapshot.sstables.get(id).unwrap().clone()); + } + let iter = TwoMergeIterator::create( + MergeIterator::create(l0_iters), + SstConcatIterator::create_and_seek_to_first(l1_iters)?, + )?; + self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level()) + } + CompactionTask::Simple(SimpleLeveledCompactionTask { + upper_level, + upper_level_sst_ids, + lower_level: _, + lower_level_sst_ids, + .. + }) + | CompactionTask::Leveled(LeveledCompactionTask { + upper_level, + upper_level_sst_ids, + lower_level: _, + lower_level_sst_ids, + .. + }) => match upper_level { + Some(_) => { + let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len()); + for id in upper_level_sst_ids.iter() { + upper_ssts.push(snapshot.sstables.get(id).unwrap().clone()); + } + let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?; + let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len()); + for id in lower_level_sst_ids.iter() { + lower_ssts.push(snapshot.sstables.get(id).unwrap().clone()); + } + let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?; + self.compact_generate_sst_from_iter( + TwoMergeIterator::create(upper_iter, lower_iter)?, + task.compact_to_bottom_level(), + ) + } + None => { + let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len()); + for id in upper_level_sst_ids.iter() { + upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( + snapshot.sstables.get(id).unwrap().clone(), + )?)); + } + let upper_iter = MergeIterator::create(upper_iters); + let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len()); + for id in lower_level_sst_ids.iter() { + lower_ssts.push(snapshot.sstables.get(id).unwrap().clone()); + } + let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?; + self.compact_generate_sst_from_iter( + TwoMergeIterator::create(upper_iter, lower_iter)?, + task.compact_to_bottom_level(), + ) + } + }, + CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => { + let mut iters = Vec::with_capacity(tiers.len()); + for (_, tier_sst_ids) in tiers { + let mut ssts = Vec::with_capacity(tier_sst_ids.len()); + for id in tier_sst_ids.iter() { + ssts.push(snapshot.sstables.get(id).unwrap().clone()); + } + iters.push(Box::new(SstConcatIterator::create_and_seek_to_first(ssts)?)); + } + self.compact_generate_sst_from_iter( + MergeIterator::create(iters), + task.compact_to_bottom_level(), + ) + } + } + } + + pub fn force_full_compaction(&self) -> Result<()> { + let CompactionOptions::NoCompaction = self.options.compaction_options else { + panic!("full compaction can only be called with compaction is not enabled") + }; + let snapshot = { + let state = self.state.read(); + state.clone() + }; + + let l0_sstables = snapshot.l0_sstables.clone(); + let l1_sstables = snapshot.levels[0].1.clone(); + let compaction_task = CompactionTask::ForceFullCompaction { + l0_sstables: l0_sstables.clone(), + l1_sstables: l1_sstables.clone(), + }; + let sstables = self.compact(&compaction_task)?; + + { + let _state_lock = self.state_lock.lock(); + let mut state = self.state.read().as_ref().clone(); + for sst in l0_sstables.iter().chain(l1_sstables.iter()) { + let result = state.sstables.remove(sst); + assert!(result.is_some()); + } + let mut ids = Vec::with_capacity(sstables.len()); + for new_sst in sstables { + ids.push(new_sst.sst_id()); + let result = state.sstables.insert(new_sst.sst_id(), new_sst); + assert!(result.is_none()); + } + assert_eq!(l1_sstables, state.levels[0].1); + state.levels[0].1 = ids; + let mut l0_sstables_map = l0_sstables.iter().copied().collect::>(); + state.l0_sstables = state + .l0_sstables + .iter() + .filter(|x| !l0_sstables_map.remove(x)) + .copied() + .collect::>(); + assert!(l0_sstables_map.is_empty()); + *self.state.write() = Arc::new(state); + } + for sst in l0_sstables.iter().chain(l1_sstables.iter()) { + std::fs::remove_file(self.path_of_sst(*sst))?; + } + Ok(()) + } + + fn trigger_compaction(&self) -> Result<()> { + let snapshot = { + let state = self.state.read(); + state.clone() + }; + let task = self + .compaction_controller + .generate_compaction_task(&snapshot); + let Some(task) = task else { + return Ok(()); + }; + println!("running compaction task: {:?}", task); + let sstables = self.compact(&task)?; + let files_added = sstables.len(); + let output = sstables.iter().map(|x| x.sst_id()).collect::>(); + let ssts_to_remove = { + let state_lock = self.state_lock.lock(); + let (mut snapshot, files_to_remove) = self + .compaction_controller + .apply_compaction_result(&self.state.read(), &task, &output); + let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len()); + for file_to_remove in &files_to_remove { + let result = snapshot.sstables.remove(file_to_remove); + assert!(result.is_some(), "cannot remove {}.sst", file_to_remove); + ssts_to_remove.push(result.unwrap()); + } + let mut new_sst_ids = Vec::new(); + for file_to_add in sstables { + new_sst_ids.push(file_to_add.sst_id()); + let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add); + assert!(result.is_none()); + } + let mut state = self.state.write(); + *state = Arc::new(snapshot); + drop(state); + self.sync_dir()?; + self.manifest + .as_ref() + .unwrap() + .add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?; + ssts_to_remove + }; + println!( + "compaction finished: {} files removed, {} files added", + ssts_to_remove.len(), + files_added + ); + for sst in ssts_to_remove { + std::fs::remove_file(self.path_of_sst(sst.sst_id()))?; + } + self.sync_dir()?; + + Ok(()) + } + + pub(crate) fn spawn_compaction_thread( + self: &Arc, + rx: crossbeam_channel::Receiver<()>, + ) -> Result>> { + if let CompactionOptions::Leveled(_) + | CompactionOptions::Simple(_) + | CompactionOptions::Tiered(_) = self.options.compaction_options + { + let this = self.clone(); + let handle = std::thread::spawn(move || { + let ticker = crossbeam_channel::tick(Duration::from_millis(50)); + loop { + crossbeam_channel::select! { + recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() { + eprintln!("compaction failed: {}", e); + }, + recv(rx) -> _ => return + } + } + }); + return Ok(Some(handle)); + } + Ok(None) + } + + fn trigger_flush(&self) -> Result<()> { + let res = { + let state = self.state.read(); + state.imm_memtables.len() >= self.options.num_memtable_limit + }; + if res { + self.force_flush_next_imm_memtable()?; + } + + Ok(()) + } + + pub(crate) fn spawn_flush_thread( + self: &Arc, + rx: crossbeam_channel::Receiver<()>, + ) -> Result>> { + let this = self.clone(); + let handle = std::thread::spawn(move || { + let ticker = crossbeam_channel::tick(Duration::from_millis(50)); + loop { + crossbeam_channel::select! { + recv(ticker) -> _ => if let Err(e) = this.trigger_flush() { + eprintln!("flush failed: {}", e); + }, + recv(rx) -> _ => return + } + } + }); + Ok(Some(handle)) + } +} diff --git a/mini-lsm-mvcc/src/compact/leveled.rs b/mini-lsm-mvcc/src/compact/leveled.rs new file mode 100644 index 0000000..a44b8b1 --- /dev/null +++ b/mini-lsm-mvcc/src/compact/leveled.rs @@ -0,0 +1,229 @@ +use std::collections::HashSet; + +use serde::{Deserialize, Serialize}; + +use crate::lsm_storage::LsmStorageState; + +#[derive(Debug, Serialize, Deserialize)] +pub struct LeveledCompactionTask { + // if upper_level is `None`, then it is L0 compaction + pub upper_level: Option, + pub upper_level_sst_ids: Vec, + pub lower_level: usize, + pub lower_level_sst_ids: Vec, + pub is_lower_level_bottom_level: bool, +} + +#[derive(Debug, Clone)] +pub struct LeveledCompactionOptions { + pub level_size_multiplier: usize, + pub level0_file_num_compaction_trigger: usize, + pub max_levels: usize, + pub base_level_size_mb: usize, +} + +pub struct LeveledCompactionController { + options: LeveledCompactionOptions, +} + +impl LeveledCompactionController { + pub fn new(options: LeveledCompactionOptions) -> Self { + Self { options } + } + + fn find_overlapping_ssts( + &self, + snapshot: &LsmStorageState, + sst_ids: &[usize], + in_level: usize, + ) -> Vec { + let begin_key = sst_ids + .iter() + .map(|id| snapshot.sstables[id].first_key()) + .min() + .cloned() + .unwrap(); + let end_key = sst_ids + .iter() + .map(|id| snapshot.sstables[id].last_key()) + .max() + .cloned() + .unwrap(); + let mut overlap_ssts = Vec::new(); + for sst_id in &snapshot.levels[in_level - 1].1 { + let sst = &snapshot.sstables[sst_id]; + let first_key = sst.first_key(); + let last_key = sst.last_key(); + if !(last_key < &begin_key || first_key > &end_key) { + overlap_ssts.push(*sst_id); + } + } + overlap_ssts + } + + pub fn generate_compaction_task( + &self, + snapshot: &LsmStorageState, + ) -> Option { + // step 1: compute target level size + let mut target_level_size = (0..self.options.max_levels).map(|_| 0).collect::>(); // exclude level 0 + let mut real_level_size = Vec::with_capacity(self.options.max_levels); + let mut base_level = self.options.max_levels; + for i in 0..self.options.max_levels { + real_level_size.push( + snapshot.levels[i] + .1 + .iter() + .map(|x| snapshot.sstables.get(x).unwrap().table_size()) + .sum::() as usize, + ); + } + let base_level_size_bytes = self.options.base_level_size_mb * 1024 * 1024; + + // select base level and compute target level size + target_level_size[self.options.max_levels - 1] = + real_level_size[self.options.max_levels - 1].max(base_level_size_bytes); + for i in (0..(self.options.max_levels - 1)).rev() { + let next_level_size = target_level_size[i + 1]; + let this_level_size = next_level_size / self.options.level_size_multiplier; + if next_level_size > base_level_size_bytes { + target_level_size[i] = this_level_size; + } + if target_level_size[i] > 0 { + base_level = i + 1; + } + } + + // Flush L0 SST is the top priority + if snapshot.l0_sstables.len() >= self.options.level0_file_num_compaction_trigger { + println!("flush L0 SST to base level {}", base_level); + return Some(LeveledCompactionTask { + upper_level: None, + upper_level_sst_ids: snapshot.l0_sstables.clone(), + lower_level: base_level, + lower_level_sst_ids: self.find_overlapping_ssts( + snapshot, + &snapshot.l0_sstables, + base_level, + ), + is_lower_level_bottom_level: base_level == self.options.max_levels, + }); + } + + let mut priorities = Vec::with_capacity(self.options.max_levels); + for level in 0..self.options.max_levels { + let prio = real_level_size[level] as f64 / target_level_size[level] as f64; + if prio > 1.0 { + priorities.push((prio, level + 1)); + } + } + priorities.sort_by(|a, b| a.partial_cmp(b).unwrap().reverse()); + let priority = priorities.first(); + if let Some((_, level)) = priority { + println!( + "target level sizes: {:?}, real level sizes: {:?}, base_level: {}", + target_level_size + .iter() + .map(|x| format!("{}MB", x / 1024 / 1024)) + .collect::>(), + real_level_size + .iter() + .map(|x| format!("{}MB", x / 1024 / 1024)) + .collect::>(), + base_level, + ); + + let level = *level; + let selected_sst = snapshot.levels[level - 1].1.iter().min().copied().unwrap(); // select the oldest sst to compact + println!( + "compaction triggered by priority: {level} out of {:?}, select {selected_sst} for compaction", + priorities + ); + return Some(LeveledCompactionTask { + upper_level: Some(level), + upper_level_sst_ids: vec![selected_sst], + lower_level: level + 1, + lower_level_sst_ids: self.find_overlapping_ssts( + snapshot, + &[selected_sst], + level + 1, + ), + is_lower_level_bottom_level: level + 1 == self.options.max_levels, + }); + } + None + } + + pub fn apply_compaction_result( + &self, + snapshot: &LsmStorageState, + task: &LeveledCompactionTask, + output: &[usize], + ) -> (LsmStorageState, Vec) { + let mut snapshot = snapshot.clone(); + let mut files_to_remove = Vec::new(); + let mut upper_level_sst_ids_set = task + .upper_level_sst_ids + .iter() + .copied() + .collect::>(); + let mut lower_level_sst_ids_set = task + .lower_level_sst_ids + .iter() + .copied() + .collect::>(); + if let Some(upper_level) = task.upper_level { + let new_upper_level_ssts = snapshot.levels[upper_level - 1] + .1 + .iter() + .filter_map(|x| { + if upper_level_sst_ids_set.remove(x) { + return None; + } + Some(*x) + }) + .collect::>(); + assert!(upper_level_sst_ids_set.is_empty()); + snapshot.levels[upper_level - 1].1 = new_upper_level_ssts; + } else { + let new_l0_ssts = snapshot + .l0_sstables + .iter() + .filter_map(|x| { + if upper_level_sst_ids_set.remove(x) { + return None; + } + Some(*x) + }) + .collect::>(); + assert!(upper_level_sst_ids_set.is_empty()); + snapshot.l0_sstables = new_l0_ssts; + } + + files_to_remove.extend(&task.upper_level_sst_ids); + files_to_remove.extend(&task.lower_level_sst_ids); + + let mut new_lower_level_ssts = snapshot.levels[task.lower_level - 1] + .1 + .iter() + .filter_map(|x| { + if lower_level_sst_ids_set.remove(x) { + return None; + } + Some(*x) + }) + .collect::>(); + assert!(lower_level_sst_ids_set.is_empty()); + new_lower_level_ssts.extend(output); + new_lower_level_ssts.sort_by(|x, y| { + snapshot + .sstables + .get(x) + .unwrap() + .first_key() + .cmp(snapshot.sstables.get(y).unwrap().first_key()) + }); + snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts; + (snapshot, files_to_remove) + } +} diff --git a/mini-lsm-mvcc/src/compact/simple_leveled.rs b/mini-lsm-mvcc/src/compact/simple_leveled.rs new file mode 100644 index 0000000..5f16a4e --- /dev/null +++ b/mini-lsm-mvcc/src/compact/simple_leveled.rs @@ -0,0 +1,114 @@ +use serde::{Deserialize, Serialize}; + +use crate::lsm_storage::LsmStorageState; + +#[derive(Debug, Clone)] +pub struct SimpleLeveledCompactionOptions { + pub size_ratio_percent: usize, + pub level0_file_num_compaction_trigger: usize, + pub max_levels: usize, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SimpleLeveledCompactionTask { + // if upper_level is `None`, then it is L0 compaction + pub upper_level: Option, + pub upper_level_sst_ids: Vec, + pub lower_level: usize, + pub lower_level_sst_ids: Vec, + pub is_lower_level_bottom_level: bool, +} + +pub struct SimpleLeveledCompactionController { + options: SimpleLeveledCompactionOptions, +} + +impl SimpleLeveledCompactionController { + pub fn new(options: SimpleLeveledCompactionOptions) -> Self { + Self { options } + } + + /// Generates a compaction task. + /// + /// Returns `None` if no compaction needs to be scheduled. The order of SSTs in the compaction task id vector matters. + pub fn generate_compaction_task( + &self, + snapshot: &LsmStorageState, + ) -> Option { + let mut level_sizes = Vec::new(); + level_sizes.push(snapshot.l0_sstables.len()); + for (_, files) in &snapshot.levels { + level_sizes.push(files.len()); + } + + for i in 0..self.options.max_levels { + if i == 0 + && snapshot.l0_sstables.len() < self.options.level0_file_num_compaction_trigger + { + continue; + } + + let lower_level = i + 1; + let size_ratio = level_sizes[lower_level] as f64 / level_sizes[i] as f64; + if size_ratio < self.options.size_ratio_percent as f64 / 100.0 { + println!( + "compaction triggered at level {} and {} with size ratio {}", + i, lower_level, size_ratio + ); + return Some(SimpleLeveledCompactionTask { + upper_level: if i == 0 { None } else { Some(i) }, + upper_level_sst_ids: if i == 0 { + snapshot.l0_sstables.clone() + } else { + snapshot.levels[i - 1].1.clone() + }, + lower_level, + lower_level_sst_ids: snapshot.levels[lower_level - 1].1.clone(), + is_lower_level_bottom_level: lower_level == self.options.max_levels, + }); + } + } + None + } + + /// Apply the compaction result. + /// + /// The compactor will call this function with the compaction task and the list of SST ids generated. This function applies the + /// result and generates a new LSM state. The functions should only change `l0_sstables` and `levels` without changing memtables + /// and `sstables` hash map. Though there should only be one thread running compaction jobs, you should think about the case + /// where an L0 SST gets flushed while the compactor generates new SSTs, and with that in mind, you should do some sanity checks + /// in your implementation. + pub fn apply_compaction_result( + &self, + snapshot: &LsmStorageState, + task: &SimpleLeveledCompactionTask, + output: &[usize], + ) -> (LsmStorageState, Vec) { + let mut snapshot = snapshot.clone(); + let mut files_to_remove = Vec::new(); + if let Some(upper_level) = task.upper_level { + assert_eq!( + task.upper_level_sst_ids, + snapshot.levels[upper_level - 1].1, + "sst mismatched" + ); + files_to_remove.extend(&snapshot.levels[upper_level - 1].1); + snapshot.levels[upper_level - 1].1.clear(); + } else { + assert_eq!( + task.upper_level_sst_ids, snapshot.l0_sstables, + "sst mismatched" + ); + files_to_remove.extend(&snapshot.l0_sstables); + snapshot.l0_sstables.clear(); + } + assert_eq!( + task.lower_level_sst_ids, + snapshot.levels[task.lower_level - 1].1, + "sst mismatched" + ); + files_to_remove.extend(&snapshot.levels[task.lower_level - 1].1); + snapshot.levels[task.lower_level - 1].1 = output.to_vec(); + (snapshot, files_to_remove) + } +} diff --git a/mini-lsm-mvcc/src/compact/tiered.rs b/mini-lsm-mvcc/src/compact/tiered.rs new file mode 100644 index 0000000..57491ce --- /dev/null +++ b/mini-lsm-mvcc/src/compact/tiered.rs @@ -0,0 +1,135 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use crate::lsm_storage::LsmStorageState; + +#[derive(Debug, Serialize, Deserialize)] +pub struct TieredCompactionTask { + pub tiers: Vec<(usize, Vec)>, + pub bottom_tier_included: bool, +} + +#[derive(Debug, Clone)] +pub struct TieredCompactionOptions { + pub num_tiers: usize, + pub max_size_amplification_percent: usize, + pub size_ratio: usize, + pub min_merge_width: usize, +} + +pub struct TieredCompactionController { + options: TieredCompactionOptions, +} + +impl TieredCompactionController { + pub fn new(options: TieredCompactionOptions) -> Self { + Self { options } + } + + pub fn generate_compaction_task( + &self, + snapshot: &LsmStorageState, + ) -> Option { + assert!( + snapshot.l0_sstables.is_empty(), + "should not add l0 ssts in tiered compaction" + ); + if snapshot.levels.len() < self.options.num_tiers { + return None; + } + // compaction triggered by space amplification ratio + let mut size = 0; + for id in 0..(snapshot.levels.len() - 1) { + size += snapshot.levels[id].1.len(); + } + let space_amp_ratio = + (size as f64) / (snapshot.levels.last().unwrap().1.len() as f64) * 100.0; + if space_amp_ratio >= self.options.max_size_amplification_percent as f64 { + println!( + "compaction triggered by space amplification ratio: {}", + space_amp_ratio + ); + return Some(TieredCompactionTask { + tiers: snapshot.levels.clone(), + bottom_tier_included: true, + }); + } + let size_ratio_trigger = (100.0 + self.options.size_ratio as f64) / 100.0; + // compaction triggered by size ratio + let mut size = 0; + for id in 0..(snapshot.levels.len() - 1) { + size += snapshot.levels[id].1.len(); + let next_level_size = snapshot.levels[id + 1].1.len(); + let current_size_ratio = size as f64 / next_level_size as f64; + if current_size_ratio >= size_ratio_trigger && id + 2 >= self.options.min_merge_width { + println!( + "compaction triggered by size ratio: {}", + current_size_ratio * 100.0 + ); + return Some(TieredCompactionTask { + tiers: snapshot + .levels + .iter() + .take(id + 2) + .cloned() + .collect::>(), + bottom_tier_included: id + 2 >= snapshot.levels.len(), + }); + } + } + // trying to reduce sorted runs without respecting size ratio + let num_tiers_to_take = snapshot.levels.len() - self.options.num_tiers + 2; + println!("compaction triggered by reducing sorted runs"); + return Some(TieredCompactionTask { + tiers: snapshot + .levels + .iter() + .take(num_tiers_to_take) + .cloned() + .collect::>(), + bottom_tier_included: snapshot.levels.len() >= num_tiers_to_take, + }); + } + + pub fn apply_compaction_result( + &self, + snapshot: &LsmStorageState, + task: &TieredCompactionTask, + output: &[usize], + ) -> (LsmStorageState, Vec) { + assert!( + snapshot.l0_sstables.is_empty(), + "should not add l0 ssts in tiered compaction" + ); + let mut snapshot = snapshot.clone(); + let mut tier_to_remove = task + .tiers + .iter() + .map(|(x, y)| (*x, y)) + .collect::>(); + let mut levels = Vec::new(); + let mut new_tier_added = false; + let mut files_to_remove = Vec::new(); + for (tier_id, files) in &snapshot.levels { + if let Some(ffiles) = tier_to_remove.remove(tier_id) { + // the tier should be removed + assert_eq!(ffiles, files, "file changed after issuing compaction task"); + files_to_remove.extend(ffiles.iter().copied()); + } else { + // retain the tier + levels.push((*tier_id, files.clone())); + } + if tier_to_remove.is_empty() && !new_tier_added { + // add the compacted tier to the LSM tree + new_tier_added = true; + levels.push((output[0], output.to_vec())); + } + } + if !tier_to_remove.is_empty() { + unreachable!("some tiers not found??"); + } + snapshot.levels = levels; + (snapshot, files_to_remove) + } +} diff --git a/mini-lsm-mvcc/src/debug.rs b/mini-lsm-mvcc/src/debug.rs new file mode 100644 index 0000000..76702de --- /dev/null +++ b/mini-lsm-mvcc/src/debug.rs @@ -0,0 +1,17 @@ +use crate::lsm_storage::MiniLsm; + +impl MiniLsm { + pub fn dump_structure(&self) { + let snapshot = self.inner.state.read(); + if !snapshot.l0_sstables.is_empty() { + println!( + "L0 ({}): {:?}", + snapshot.l0_sstables.len(), + snapshot.l0_sstables, + ); + } + for (level, files) in &snapshot.levels { + println!("L{level} ({}): {:?}", files.len(), files); + } + } +} diff --git a/mini-lsm-mvcc/src/iterators.rs b/mini-lsm-mvcc/src/iterators.rs new file mode 100644 index 0000000..a0da803 --- /dev/null +++ b/mini-lsm-mvcc/src/iterators.rs @@ -0,0 +1,26 @@ +pub mod concat_iterator; +pub mod merge_iterator; +pub mod two_merge_iterator; + +pub trait StorageIterator { + type KeyType<'a>: PartialEq + Eq + PartialOrd + Ord + where + Self: 'a; + + /// Get the current value. + fn value(&self) -> &[u8]; + + /// Get the current key. + fn key(&self) -> Self::KeyType<'_>; + + /// Check if the current iterator is valid. + fn is_valid(&self) -> bool; + + /// Move to the next position. + fn next(&mut self) -> anyhow::Result<()>; + + /// Number of underlying active iterators for this iterator. + fn num_active_iterators(&self) -> usize { + 1 + } +} diff --git a/mini-lsm-mvcc/src/iterators/concat_iterator.rs b/mini-lsm-mvcc/src/iterators/concat_iterator.rs new file mode 100644 index 0000000..e6203f2 --- /dev/null +++ b/mini-lsm-mvcc/src/iterators/concat_iterator.rs @@ -0,0 +1,123 @@ +use std::sync::Arc; + +use anyhow::Result; + +use crate::{ + key::KeySlice, + table::{SsTable, SsTableIterator}, +}; + +use super::StorageIterator; + +/// Concat multiple iterators ordered in key order and their key ranges do not overlap. We do not want to create the +/// iterators when initializing this iterator to reduce the overhead of seeking. +pub struct SstConcatIterator { + current: Option, + next_sst_idx: usize, + sstables: Vec>, +} + +impl SstConcatIterator { + fn check_sst_valid(sstables: &[Arc]) { + for sst in sstables { + assert!(sst.first_key() <= sst.last_key()); + } + if !sstables.is_empty() { + for i in 0..(sstables.len() - 1) { + assert!(sstables[i].last_key() < sstables[i + 1].first_key()); + } + } + } + + pub fn create_and_seek_to_first(sstables: Vec>) -> Result { + Self::check_sst_valid(&sstables); + if sstables.is_empty() { + return Ok(Self { + current: None, + next_sst_idx: 0, + sstables, + }); + } + let mut iter = Self { + current: Some(SsTableIterator::create_and_seek_to_first( + sstables[0].clone(), + )?), + next_sst_idx: 1, + sstables, + }; + iter.move_until_valid()?; + Ok(iter) + } + + pub fn create_and_seek_to_key(sstables: Vec>, key: KeySlice) -> Result { + Self::check_sst_valid(&sstables); + let idx: usize = sstables + .partition_point(|table| table.first_key().as_key_slice() <= key) + .saturating_sub(1); + if idx >= sstables.len() { + return Ok(Self { + current: None, + next_sst_idx: sstables.len(), + sstables, + }); + } + let mut iter = Self { + current: Some(SsTableIterator::create_and_seek_to_key( + sstables[idx].clone(), + key, + )?), + next_sst_idx: idx + 1, + sstables, + }; + iter.move_until_valid()?; + Ok(iter) + } + + fn move_until_valid(&mut self) -> Result<()> { + while let Some(iter) = self.current.as_mut() { + if iter.is_valid() { + break; + } + if self.next_sst_idx >= self.sstables.len() { + self.current = None; + } else { + self.current = Some(SsTableIterator::create_and_seek_to_first( + self.sstables[self.next_sst_idx].clone(), + )?); + self.next_sst_idx += 1; + } + } + Ok(()) + } +} + +impl StorageIterator for SstConcatIterator { + type KeyType<'a> = KeySlice<'a>; + + fn key(&self) -> KeySlice { + self.current.as_ref().unwrap().key() + } + + fn value(&self) -> &[u8] { + self.current.as_ref().unwrap().value() + } + + fn is_valid(&self) -> bool { + if let Some(current) = &self.current { + assert!(current.is_valid()); + true + } else { + false + } + } + + fn next(&mut self) -> Result<()> { + self.current.as_mut().unwrap().next()?; + self.move_until_valid()?; + Ok(()) + } + + fn num_active_iterators(&self) -> usize { + 1 + } +} diff --git a/mini-lsm-mvcc/src/iterators/merge_iterator.rs b/mini-lsm-mvcc/src/iterators/merge_iterator.rs new file mode 100644 index 0000000..c4abc8d --- /dev/null +++ b/mini-lsm-mvcc/src/iterators/merge_iterator.rs @@ -0,0 +1,155 @@ +use std::cmp::{self}; +use std::collections::binary_heap::PeekMut; +use std::collections::BinaryHeap; + +use anyhow::Result; + +use crate::key::KeySlice; + +use super::StorageIterator; + +struct HeapWrapper(pub usize, pub Box); + +impl PartialEq for HeapWrapper { + fn eq(&self, other: &Self) -> bool { + self.partial_cmp(other).unwrap() == cmp::Ordering::Equal + } +} + +impl Eq for HeapWrapper {} + +impl PartialOrd for HeapWrapper { + #[allow(clippy::non_canonical_partial_ord_impl)] + fn partial_cmp(&self, other: &Self) -> Option { + match self.1.key().cmp(&other.1.key()) { + cmp::Ordering::Greater => Some(cmp::Ordering::Greater), + cmp::Ordering::Less => Some(cmp::Ordering::Less), + cmp::Ordering::Equal => self.0.partial_cmp(&other.0), + } + .map(|x| x.reverse()) + } +} + +impl Ord for HeapWrapper { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.partial_cmp(other).unwrap() + } +} + +/// Merge multiple iterators of the same type. If the same key occurs multiple times in some +/// iterators, perfer the one with smaller index. +pub struct MergeIterator { + iters: BinaryHeap>, + current: Option>, +} + +impl MergeIterator { + pub fn create(iters: Vec>) -> Self { + if iters.is_empty() { + return Self { + iters: BinaryHeap::new(), + current: None, + }; + } + + let mut heap = BinaryHeap::new(); + + if iters.iter().all(|x| !x.is_valid()) { + // All invalid, select the last one as the current. + let mut iters = iters; + return Self { + iters: heap, + current: Some(HeapWrapper(0, iters.pop().unwrap())), + }; + } + + for (idx, iter) in iters.into_iter().enumerate() { + if iter.is_valid() { + heap.push(HeapWrapper(idx, iter)); + } + } + + let current = heap.pop().unwrap(); + Self { + iters: heap, + current: Some(current), + } + } +} + +impl StorageIterator = KeySlice<'a>>> StorageIterator + for MergeIterator +{ + type KeyType<'a> = KeySlice<'a>; + + fn key(&self) -> KeySlice { + self.current.as_ref().unwrap().1.key() + } + + fn value(&self) -> &[u8] { + self.current.as_ref().unwrap().1.value() + } + + fn is_valid(&self) -> bool { + self.current + .as_ref() + .map(|x| x.1.is_valid()) + .unwrap_or(false) + } + + fn next(&mut self) -> Result<()> { + let current = self.current.as_mut().unwrap(); + // Pop the item out of the heap if they have the same value. + while let Some(mut inner_iter) = self.iters.peek_mut() { + debug_assert!( + inner_iter.1.key() >= current.1.key(), + "heap invariant violated" + ); + if inner_iter.1.key() == current.1.key() { + // Case 1: an error occurred when calling `next`. + if let e @ Err(_) = inner_iter.1.next() { + PeekMut::pop(inner_iter); + return e; + } + + // Case 2: iter is no longer valid. + if !inner_iter.1.is_valid() { + PeekMut::pop(inner_iter); + } + } else { + break; + } + } + + current.1.next()?; + + // If the current iterator is invalid, pop it out of the heap and select the next one. + if !current.1.is_valid() { + if let Some(iter) = self.iters.pop() { + *current = iter; + } + return Ok(()); + } + + // Otherwise, compare with heap top and swap if necessary. + if let Some(mut inner_iter) = self.iters.peek_mut() { + if *current < *inner_iter { + std::mem::swap(&mut *inner_iter, current); + } + } + + Ok(()) + } + + fn num_active_iterators(&self) -> usize { + self.iters + .iter() + .map(|x| x.1.num_active_iterators()) + .sum::() + + self + .current + .as_ref() + .map(|x| x.1.num_active_iterators()) + .unwrap_or(0) + } +} diff --git a/mini-lsm-mvcc/src/iterators/tests/merge_iterator_test.rs b/mini-lsm-mvcc/src/iterators/tests/merge_iterator_test.rs new file mode 100644 index 0000000..e063911 --- /dev/null +++ b/mini-lsm-mvcc/src/iterators/tests/merge_iterator_test.rs @@ -0,0 +1,137 @@ +use super::*; +use crate::iterators::merge_iterator::MergeIterator; + +fn as_bytes(x: &[u8]) -> Bytes { + Bytes::copy_from_slice(x) +} + +fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { + let mut iter = iter; + for (k, v) in expected { + assert!(iter.is_valid()); + assert_eq!( + k, + iter.key(), + "expected key: {:?}, actual key: {:?}", + k, + as_bytes(iter.key()), + ); + assert_eq!( + v, + iter.value(), + "expected value: {:?}, actual value: {:?}", + v, + as_bytes(iter.value()), + ); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + +#[test] +fn test_merge_1() { + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let i3 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.3")), + (Bytes::from("c"), Bytes::from("3.3")), + (Bytes::from("d"), Bytes::from("4.3")), + ]); + + let iter = MergeIterator::create(vec![ + Box::new(i1.clone()), + Box::new(i2.clone()), + Box::new(i3.clone()), + ]); + + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ); + + let iter = MergeIterator::create(vec![Box::new(i3), Box::new(i1), Box::new(i2)]); + + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.3")), + (Bytes::from("c"), Bytes::from("3.3")), + (Bytes::from("d"), Bytes::from("4.3")), + ], + ); +} + +#[test] +fn test_merge_2() { + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i2 = MockIterator::new(vec![ + (Bytes::from("d"), Bytes::from("1.2")), + (Bytes::from("e"), Bytes::from("2.2")), + (Bytes::from("f"), Bytes::from("3.2")), + (Bytes::from("g"), Bytes::from("4.2")), + ]); + let i3 = MockIterator::new(vec![ + (Bytes::from("h"), Bytes::from("1.3")), + (Bytes::from("i"), Bytes::from("2.3")), + (Bytes::from("j"), Bytes::from("3.3")), + (Bytes::from("k"), Bytes::from("4.3")), + ]); + let i4 = MockIterator::new(vec![]); + let result = vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + (Bytes::from("d"), Bytes::from("1.2")), + (Bytes::from("e"), Bytes::from("2.2")), + (Bytes::from("f"), Bytes::from("3.2")), + (Bytes::from("g"), Bytes::from("4.2")), + (Bytes::from("h"), Bytes::from("1.3")), + (Bytes::from("i"), Bytes::from("2.3")), + (Bytes::from("j"), Bytes::from("3.3")), + (Bytes::from("k"), Bytes::from("4.3")), + ]; + + let iter = MergeIterator::create(vec![ + Box::new(i1.clone()), + Box::new(i2.clone()), + Box::new(i3.clone()), + Box::new(i4.clone()), + ]); + check_iter_result(iter, result.clone()); + + let iter = MergeIterator::create(vec![ + Box::new(i2.clone()), + Box::new(i4.clone()), + Box::new(i3.clone()), + Box::new(i1.clone()), + ]); + check_iter_result(iter, result.clone()); + + let iter = MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]); + check_iter_result(iter, result); +} + +#[test] +fn test_merge_empty() { + let iter = MergeIterator::::create(vec![]); + check_iter_result(iter, vec![]); +} diff --git a/mini-lsm-mvcc/src/iterators/tests/two_merge_iterator_test.rs b/mini-lsm-mvcc/src/iterators/tests/two_merge_iterator_test.rs new file mode 100644 index 0000000..1719bf3 --- /dev/null +++ b/mini-lsm-mvcc/src/iterators/tests/two_merge_iterator_test.rs @@ -0,0 +1,129 @@ +use super::*; +use crate::iterators::two_merge_iterator::TwoMergeIterator; + +fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { + let mut iter = iter; + for (k, v) in expected { + assert!(iter.is_valid()); + assert_eq!(iter.key(), k.as_ref()); + assert_eq!(iter.value(), v.as_ref()); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + +#[test] +fn test_merge_1() { + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ) +} + +#[test] +fn test_merge_2() { + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ) +} + +#[test] +fn test_merge_3() { + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i1 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ) +} + +#[test] +fn test_merge_4() { + let i2 = MockIterator::new(vec![]); + let i1 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ); + let i1 = MockIterator::new(vec![]); + let i2 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ); +} + +#[test] +fn test_merge_5() { + let i2 = MockIterator::new(vec![]); + let i1 = MockIterator::new(vec![]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result(iter, vec![]) +} diff --git a/mini-lsm-mvcc/src/iterators/two_merge_iterator.rs b/mini-lsm-mvcc/src/iterators/two_merge_iterator.rs new file mode 100644 index 0000000..781055a --- /dev/null +++ b/mini-lsm-mvcc/src/iterators/two_merge_iterator.rs @@ -0,0 +1,94 @@ +use anyhow::Result; + +use crate::key::KeySlice; + +use super::StorageIterator; + +/// Merges two iterators of different types into one. If the two iterators have the same key, only +/// produce the key once and prefer the entry from A. +pub struct TwoMergeIterator { + a: A, + b: B, + choose_a: bool, +} + +impl< + A: 'static + for<'a> StorageIterator = KeySlice<'a>>, + B: 'static + for<'a> StorageIterator = KeySlice<'a>>, + > TwoMergeIterator +{ + fn choose_a(a: &A, b: &B) -> bool { + if !a.is_valid() { + return false; + } + if !b.is_valid() { + return true; + } + a.key() < b.key() + } + + fn skip_b(&mut self) -> Result<()> { + if self.a.is_valid() && self.b.is_valid() && self.b.key() == self.a.key() { + self.b.next()?; + } + Ok(()) + } + + pub fn create(a: A, b: B) -> Result { + let mut iter = Self { + choose_a: false, + a, + b, + }; + iter.skip_b()?; + iter.choose_a = Self::choose_a(&iter.a, &iter.b); + Ok(iter) + } +} + +impl< + A: 'static + for<'a> StorageIterator = KeySlice<'a>>, + B: 'static + for<'a> StorageIterator = KeySlice<'a>>, + > StorageIterator for TwoMergeIterator +{ + type KeyType<'a> = KeySlice<'a>; + + fn key(&self) -> KeySlice { + if self.choose_a { + self.a.key() + } else { + self.b.key() + } + } + + fn value(&self) -> &[u8] { + if self.choose_a { + self.a.value() + } else { + self.b.value() + } + } + + fn is_valid(&self) -> bool { + if self.choose_a { + self.a.is_valid() + } else { + self.b.is_valid() + } + } + + fn next(&mut self) -> Result<()> { + if self.choose_a { + self.a.next()?; + } else { + self.b.next()?; + } + self.skip_b()?; + self.choose_a = Self::choose_a(&self.a, &self.b); + Ok(()) + } + + fn num_active_iterators(&self) -> usize { + self.a.num_active_iterators() + self.b.num_active_iterators() + } +} diff --git a/mini-lsm-mvcc/src/key.rs b/mini-lsm-mvcc/src/key.rs new file mode 100644 index 0000000..34922fa --- /dev/null +++ b/mini-lsm-mvcc/src/key.rs @@ -0,0 +1,176 @@ +use std::{cmp::Reverse, fmt::Debug}; + +use bytes::Bytes; + +pub struct Key>(T, u64); + +pub type KeySlice<'a> = Key<&'a [u8]>; +pub type KeyVec = Key>; +pub type KeyBytes = Key; + +/// Temporary, should remove after implementing full week 3 day 1 + 2. +pub const TS_DEFAULT: u64 = std::u64::MAX; + +pub const TS_MAX: u64 = std::u64::MAX; +pub const TS_MIN: u64 = std::u64::MIN; +pub const TS_RANGE_BEGIN: u64 = std::u64::MAX; +pub const TS_RANGE_END: u64 = std::u64::MIN; + +impl> Key { + pub fn into_inner(self) -> T { + self.0 + } + + pub fn len(&self) -> usize { + self.0.as_ref().len() + } + + pub fn is_empty(&self) -> bool { + self.0.as_ref().is_empty() + } +} + +impl Key> { + pub fn new() -> Self { + Self(Vec::new(), 0) + } + + pub fn from_vec(key: Vec) -> Self { + unimplemented!() + } + + /// Create a `KeyVec` from a `Vec`. Will be removed in week 3. + pub fn from_vec_with_ts(key: Vec, ts: u64) -> Self { + Self(key, ts) + } + + /// Clears the key and set ts to 0. + pub fn clear(&mut self) { + self.0.clear() + } + + /// Append a slice to the end of the key + pub fn append(&mut self, data: &[u8]) { + self.0.extend(data) + } + + /// Set the key from a slice without re-allocating. The signature will change in week 3. + pub fn set_from_slice(&mut self, key_slice: KeySlice) { + self.0.clear(); + self.0.extend(key_slice.0); + } + + pub fn as_key_slice(&self) -> KeySlice { + Key(self.0.as_slice(), self.1) + } + + pub fn into_key_bytes(self) -> KeyBytes { + Key(self.0.into(), self.1) + } + + /// Always use `raw_ref` to access the key in week 1 + 2. This function will be removed in week 3. + pub fn raw_ref(&self) -> &[u8] { + self.0.as_ref() + } + + pub fn for_testing_key_ref(&self) -> &[u8] { + self.0.as_ref() + } + + pub fn for_testing_from_vec_no_ts(key: Vec) -> Self { + Self(key, 0) + } +} + +impl Key { + pub fn as_key_slice(&self) -> KeySlice { + Key(&self.0, self.1) + } + + /// Create a `KeyBytes` from a `Bytes`. Will be removed in week 3. + pub fn from_bytes(bytes: Bytes) -> KeyBytes { + unimplemented!() + } + + /// Create a `KeyBytes` from a `Bytes`. Will be removed in week 3. + pub fn from_bytes_with_ts(bytes: Bytes, ts: u64) -> KeyBytes { + Key(bytes, ts) + } + + /// Always use `raw_ref` to access the key in week 1 + 2. This function will be removed in week 3. + pub fn raw_ref(&self) -> &[u8] { + self.0.as_ref() + } + + pub fn for_testing_from_bytes_no_ts(bytes: Bytes) -> KeyBytes { + Key(bytes, 0) + } + + pub fn for_testing_key_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +impl<'a> Key<&'a [u8]> { + pub fn to_key_vec(self) -> KeyVec { + Key(self.0.to_vec(), self.1) + } + + /// Create a key slice from a slice. Will be removed in week 3. + pub fn from_slice(slice: &'a [u8], ts: u64) -> Self { + Self(slice, ts) + } + + /// Always use `raw_ref` to access the key in week 1 + 2. This function will be removed in week 3. + pub fn raw_ref(self) -> &'a [u8] { + self.0 + } + + pub fn for_testing_key_ref(self) -> &'a [u8] { + self.0 + } + + pub fn for_testing_from_slice_no_ts(slice: &'a [u8]) -> Self { + Self(slice, 0) + } +} + +impl + Debug> Debug for Key { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl + Default> Default for Key { + fn default() -> Self { + Self(T::default(), 0) + } +} + +impl + PartialEq> PartialEq for Key { + fn eq(&self, other: &Self) -> bool { + (self.0.as_ref(), self.1).eq(&(other.0.as_ref(), other.1)) + } +} + +impl + Eq> Eq for Key {} + +impl + Clone> Clone for Key { + fn clone(&self) -> Self { + Self(self.0.clone(), self.1) + } +} + +impl + Copy> Copy for Key {} + +impl + PartialOrd> PartialOrd for Key { + fn partial_cmp(&self, other: &Self) -> Option { + (self.0.as_ref(), Reverse(self.1)).partial_cmp(&(other.0.as_ref(), Reverse(other.1))) + } +} + +impl + Ord> Ord for Key { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (self.0.as_ref(), Reverse(self.1)).cmp(&(other.0.as_ref(), Reverse(other.1))) + } +} diff --git a/mini-lsm-mvcc/src/lib.rs b/mini-lsm-mvcc/src/lib.rs new file mode 100644 index 0000000..afdfb65 --- /dev/null +++ b/mini-lsm-mvcc/src/lib.rs @@ -0,0 +1,14 @@ +pub mod block; +pub mod compact; +pub mod debug; +pub mod iterators; +pub mod key; +pub mod lsm_iterator; +pub mod lsm_storage; +pub mod manifest; +pub mod mem_table; +pub mod table; +pub mod wal; + +#[cfg(test)] +mod tests; diff --git a/mini-lsm-mvcc/src/lsm_iterator.rs b/mini-lsm-mvcc/src/lsm_iterator.rs new file mode 100644 index 0000000..044769c --- /dev/null +++ b/mini-lsm-mvcc/src/lsm_iterator.rs @@ -0,0 +1,139 @@ +use std::ops::Bound; + +use anyhow::{bail, Result}; +use bytes::Bytes; + +use crate::iterators::concat_iterator::SstConcatIterator; +use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::two_merge_iterator::TwoMergeIterator; +use crate::iterators::StorageIterator; +use crate::mem_table::MemTableIterator; +use crate::table::SsTableIterator; + +/// Represents the internal type for an LSM iterator. This type will be changed across the tutorial for multiple times. +type LsmIteratorInner = TwoMergeIterator< + TwoMergeIterator, MergeIterator>, + MergeIterator, +>; + +pub struct LsmIterator { + inner: LsmIteratorInner, + end_bound: Bound, + is_valid: bool, +} + +impl LsmIterator { + pub(crate) fn new(iter: LsmIteratorInner, end_bound: Bound) -> Result { + let mut iter = Self { + is_valid: iter.is_valid(), + inner: iter, + end_bound, + }; + iter.move_to_non_delete()?; + Ok(iter) + } + + fn next_inner(&mut self) -> Result<()> { + self.inner.next()?; + if !self.inner.is_valid() { + self.is_valid = false; + return Ok(()); + } + match self.end_bound.as_ref() { + Bound::Unbounded => {} + Bound::Included(key) => self.is_valid = self.inner.key().raw_ref() <= key.as_ref(), + Bound::Excluded(key) => self.is_valid = self.inner.key().raw_ref() < key.as_ref(), + } + Ok(()) + } + + fn move_to_non_delete(&mut self) -> Result<()> { + while self.is_valid() && self.inner.value().is_empty() { + self.next_inner()?; + } + Ok(()) + } +} + +impl StorageIterator for LsmIterator { + type KeyType<'a> = &'a [u8]; + + fn is_valid(&self) -> bool { + self.is_valid + } + + fn key(&self) -> &[u8] { + self.inner.key().raw_ref() + } + + fn value(&self) -> &[u8] { + self.inner.value() + } + + fn next(&mut self) -> Result<()> { + self.next_inner()?; + self.move_to_non_delete()?; + Ok(()) + } + + fn num_active_iterators(&self) -> usize { + self.inner.num_active_iterators() + } +} + +/// A wrapper around existing iterator, will prevent users from calling `next` when the iterator is +/// invalid. If an iterator is already invalid, `next` does not do anything. If `next` returns an error, +/// `is_valid` should return false, and `next` should always return an error. +pub struct FusedIterator { + iter: I, + has_errored: bool, +} + +impl FusedIterator { + pub fn new(iter: I) -> Self { + Self { + iter, + has_errored: false, + } + } +} + +impl StorageIterator for FusedIterator { + type KeyType<'a> = I::KeyType<'a> where Self: 'a; + + fn is_valid(&self) -> bool { + !self.has_errored && self.iter.is_valid() + } + + fn key(&self) -> Self::KeyType<'_> { + if self.has_errored || !self.iter.is_valid() { + panic!("invalid access to the underlying iterator"); + } + self.iter.key() + } + + fn value(&self) -> &[u8] { + if self.has_errored || !self.iter.is_valid() { + panic!("invalid access to the underlying iterator"); + } + self.iter.value() + } + + fn next(&mut self) -> Result<()> { + // only move when the iterator is valid and not errored + if self.has_errored { + bail!("the iterator is tainted"); + } + if self.iter.is_valid() { + if let Err(e) = self.iter.next() { + self.has_errored = true; + return Err(e); + } + } + Ok(()) + } + + fn num_active_iterators(&self) -> usize { + self.iter.num_active_iterators() + } +} diff --git a/mini-lsm-mvcc/src/lsm_storage.rs b/mini-lsm-mvcc/src/lsm_storage.rs new file mode 100644 index 0000000..10218f2 --- /dev/null +++ b/mini-lsm-mvcc/src/lsm_storage.rs @@ -0,0 +1,737 @@ +use std::collections::{BTreeSet, HashMap}; +use std::fs::File; +use std::ops::Bound; +use std::path::{Path, PathBuf}; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use bytes::Bytes; +use parking_lot::{Mutex, MutexGuard, RwLock}; + +use crate::block::Block; +use crate::compact::{ + CompactionController, CompactionOptions, LeveledCompactionController, LeveledCompactionOptions, + SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController, +}; +use crate::iterators::concat_iterator::SstConcatIterator; +use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::two_merge_iterator::TwoMergeIterator; +use crate::iterators::StorageIterator; +use crate::key::{self, KeySlice}; +use crate::lsm_iterator::{FusedIterator, LsmIterator}; +use crate::manifest::{Manifest, ManifestRecord}; +use crate::mem_table::{map_bound, MemTable}; +use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator}; + +pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; + +/// Represents the state of the storage engine. +#[derive(Clone)] +pub struct LsmStorageState { + /// The current memtable. + pub memtable: Arc, + /// Immutable memtables, from latest to earliest. + pub imm_memtables: Vec>, + /// L0 SSTs, from latest to earliest. + pub l0_sstables: Vec, + /// SsTables sorted by key range; L1 - L_max for leveled compaction, or tiers for tiered + /// compaction. + pub levels: Vec<(usize, Vec)>, + /// SST objects. + pub sstables: HashMap>, +} + +impl LsmStorageState { + fn create(options: &LsmStorageOptions) -> Self { + let levels = match &options.compaction_options { + CompactionOptions::Leveled(LeveledCompactionOptions { max_levels, .. }) + | CompactionOptions::Simple(SimpleLeveledCompactionOptions { max_levels, .. }) => (1 + ..=*max_levels) + .map(|level| (level, Vec::new())) + .collect::>(), + CompactionOptions::Tiered(_) => Vec::new(), + CompactionOptions::NoCompaction => vec![(1, Vec::new())], + }; + Self { + memtable: Arc::new(MemTable::create(0)), + imm_memtables: Vec::new(), + l0_sstables: Vec::new(), + levels, + sstables: Default::default(), + } + } +} + +pub struct LsmStorageOptions { + // Block size in bytes + pub block_size: usize, + // SST size in bytes, also the approximate memtable capacity limit + pub target_sst_size: usize, + // Maximum number of memtables in memory, flush to L0 when exceeding this limit + pub num_memtable_limit: usize, + pub compaction_options: CompactionOptions, + pub enable_wal: bool, +} + +impl LsmStorageOptions { + pub fn default_for_week1_test() -> Self { + Self { + block_size: 4096, + target_sst_size: 2 << 20, + compaction_options: CompactionOptions::NoCompaction, + enable_wal: false, + num_memtable_limit: 50, + } + } + + pub fn default_for_week1_day6_test() -> Self { + Self { + block_size: 4096, + target_sst_size: 2 << 20, + compaction_options: CompactionOptions::NoCompaction, + enable_wal: false, + num_memtable_limit: 2, + } + } +} + +fn range_overlap( + user_begin: Bound<&[u8]>, + user_end: Bound<&[u8]>, + table_begin: KeySlice, + table_end: KeySlice, +) -> bool { + match user_end { + Bound::Excluded(key) if key <= table_begin.raw_ref() => { + return false; + } + Bound::Included(key) if key < table_begin.raw_ref() => { + return false; + } + _ => {} + } + match user_begin { + Bound::Excluded(key) if key >= table_end.raw_ref() => { + return false; + } + Bound::Included(key) if key > table_end.raw_ref() => { + return false; + } + _ => {} + } + true +} + +fn key_within(user_key: &[u8], table_begin: KeySlice, table_end: KeySlice) -> bool { + table_begin.raw_ref() <= user_key && user_key <= table_end.raw_ref() +} + +/// The storage interface of the LSM tree. +pub(crate) struct LsmStorageInner { + pub(crate) state: Arc>>, + pub(crate) state_lock: Mutex<()>, + path: PathBuf, + pub(crate) block_cache: Arc, + next_sst_id: AtomicUsize, + pub(crate) options: Arc, + pub(crate) compaction_controller: CompactionController, + pub(crate) manifest: Option, +} + +/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM. +pub struct MiniLsm { + pub(crate) inner: Arc, + /// Notifies the L0 flush thread to stop working. (In week 1 day 6) + flush_notifier: crossbeam_channel::Sender<()>, + /// The handle for the compaction thread. (In week 1 day 6) + flush_thread: Mutex>>, + /// Notifies the compaction thread to stop working. (In week 2) + compaction_notifier: crossbeam_channel::Sender<()>, + /// The handle for the compaction thread. (In week 2) + compaction_thread: Mutex>>, +} + +impl Drop for MiniLsm { + fn drop(&mut self) { + self.compaction_notifier.send(()).ok(); + self.flush_notifier.send(()).ok(); + } +} + +impl MiniLsm { + pub fn close(&self) -> Result<()> { + self.inner.sync_dir()?; + self.compaction_notifier.send(()).ok(); + self.flush_notifier.send(()).ok(); + + if self.inner.options.enable_wal { + self.inner.sync()?; + self.inner.sync_dir()?; + return Ok(()); + } + + let mut compaction_thread = self.compaction_thread.lock(); + if let Some(compaction_thread) = compaction_thread.take() { + compaction_thread + .join() + .map_err(|e| anyhow::anyhow!("{:?}", e))?; + } + let mut flush_thread = self.flush_thread.lock(); + if let Some(flush_thread) = flush_thread.take() { + flush_thread + .join() + .map_err(|e| anyhow::anyhow!("{:?}", e))?; + } + + // create memtable and skip updating manifest + if !self.inner.state.read().memtable.is_empty() { + self.inner + .freeze_memtable_with_memtable(Arc::new(MemTable::create( + self.inner.next_sst_id(), + )))?; + } + + while { + let snapshot = self.inner.state.read(); + !snapshot.imm_memtables.is_empty() + } { + self.inner.force_flush_next_imm_memtable()?; + } + self.inner.sync_dir()?; + + Ok(()) + } + + /// Start the storage engine by either loading an existing directory or creating a new one if the directory does + /// not exist. + pub fn open(path: impl AsRef, options: LsmStorageOptions) -> Result> { + let inner = Arc::new(LsmStorageInner::open(path, options)?); + let (tx1, rx) = crossbeam_channel::unbounded(); + let compaction_thread = inner.spawn_compaction_thread(rx)?; + let (tx2, rx) = crossbeam_channel::unbounded(); + let flush_thread = inner.spawn_flush_thread(rx)?; + Ok(Arc::new(Self { + inner, + flush_notifier: tx2, + flush_thread: Mutex::new(flush_thread), + compaction_notifier: tx1, + compaction_thread: Mutex::new(compaction_thread), + })) + } + + pub fn get(&self, key: &[u8]) -> Result> { + self.inner.get(key) + } + + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.inner.put(key, value) + } + + pub fn delete(&self, key: &[u8]) -> Result<()> { + self.inner.delete(key) + } + + pub fn sync(&self) -> Result<()> { + self.inner.sync() + } + + pub fn scan( + &self, + lower: Bound<&[u8]>, + upper: Bound<&[u8]>, + ) -> Result> { + self.inner.scan(lower, upper) + } + + /// Only call this in test cases due to race conditions + pub fn force_flush(&self) -> Result<()> { + if !self.inner.state.read().memtable.is_empty() { + self.inner + .force_freeze_memtable(&self.inner.state_lock.lock())?; + } + if !self.inner.state.read().imm_memtables.is_empty() { + self.inner.force_flush_next_imm_memtable()?; + } + Ok(()) + } + + pub fn force_full_compaction(&self) -> Result<()> { + self.inner.force_full_compaction() + } +} + +impl LsmStorageInner { + pub(crate) fn next_sst_id(&self) -> usize { + self.next_sst_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + /// Start the storage engine by either loading an existing directory or creating a new one if the directory does + /// not exist. + pub(crate) fn open(path: impl AsRef, options: LsmStorageOptions) -> Result { + let mut state = LsmStorageState::create(&options); + let path = path.as_ref(); + let mut next_sst_id = 1; + let block_cache = Arc::new(BlockCache::new(1 << 20)); // 4GB block cache, + let manifest; + + let compaction_controller = match &options.compaction_options { + CompactionOptions::Leveled(options) => { + CompactionController::Leveled(LeveledCompactionController::new(options.clone())) + } + CompactionOptions::Tiered(options) => { + CompactionController::Tiered(TieredCompactionController::new(options.clone())) + } + CompactionOptions::Simple(options) => CompactionController::Simple( + SimpleLeveledCompactionController::new(options.clone()), + ), + CompactionOptions::NoCompaction => CompactionController::NoCompaction, + }; + + if !path.exists() { + std::fs::create_dir_all(path).context("failed to create DB dir")?; + } + let manifest_path = path.join("MANIFEST"); + if !manifest_path.exists() { + if options.enable_wal { + state.memtable = Arc::new(MemTable::create_with_wal( + state.memtable.id(), + Self::path_of_wal_static(path, state.memtable.id()), + )?); + } + manifest = Manifest::create(&manifest_path).context("failed to create manifest")?; + manifest.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?; + } else { + let (m, records) = Manifest::recover(&manifest_path)?; + let mut memtables = BTreeSet::new(); + for record in records { + match record { + ManifestRecord::Flush(sst_id) => { + let res = memtables.remove(&sst_id); + assert!(res, "memtable not exist?"); + if compaction_controller.flush_to_l0() { + state.l0_sstables.insert(0, sst_id); + } else { + state.levels.insert(0, (sst_id, vec![sst_id])); + } + next_sst_id = next_sst_id.max(sst_id); + } + ManifestRecord::NewMemtable(x) => { + next_sst_id = next_sst_id.max(x); + memtables.insert(x); + } + ManifestRecord::Compaction(task, output) => { + let (new_state, _) = + compaction_controller.apply_compaction_result(&state, &task, &output); + // TODO: apply remove again + state = new_state; + next_sst_id = + next_sst_id.max(output.iter().max().copied().unwrap_or_default()); + } + } + } + + let mut sst_cnt = 0; + // recover SSTs + for table_id in state + .l0_sstables + .iter() + .chain(state.levels.iter().flat_map(|(_, files)| files)) + { + let table_id = *table_id; + let sst = SsTable::open( + table_id, + Some(block_cache.clone()), + FileObject::open(&Self::path_of_sst_static(path, table_id)) + .context("failed to open SST")?, + )?; + state.sstables.insert(table_id, Arc::new(sst)); + sst_cnt += 1; + } + println!("{} SSTs opened", sst_cnt); + + next_sst_id += 1; + + // recover memtables + if options.enable_wal { + let mut wal_cnt = 0; + for id in memtables.iter() { + let memtable = + MemTable::recover_from_wal(*id, Self::path_of_wal_static(path, *id))?; + if !memtable.is_empty() { + state.imm_memtables.insert(0, Arc::new(memtable)); + wal_cnt += 1; + } + } + println!("{} WALs recovered", wal_cnt); + state.memtable = Arc::new(MemTable::create_with_wal( + next_sst_id, + Self::path_of_wal_static(path, next_sst_id), + )?); + } else { + state.memtable = Arc::new(MemTable::create(next_sst_id)); + } + m.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?; + next_sst_id += 1; + manifest = m; + }; + + let storage = Self { + state: Arc::new(RwLock::new(Arc::new(state))), + state_lock: Mutex::new(()), + path: path.to_path_buf(), + block_cache, + next_sst_id: AtomicUsize::new(next_sst_id), + compaction_controller, + manifest: Some(manifest), + options: options.into(), + }; + storage.sync_dir()?; + + Ok(storage) + } + + pub fn sync(&self) -> Result<()> { + self.state.read().memtable.sync_wal() + } + + /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. + pub fn get(&self, key: &[u8]) -> Result> { + let snapshot = { + let guard = self.state.read(); + Arc::clone(&guard) + }; // drop global lock here + + // Search on the current memtable. + if let Some(value) = snapshot.memtable.get(key) { + if value.is_empty() { + // found tomestone, return key not exists + return Ok(None); + } + return Ok(Some(value)); + } + + // Search on immutable memtables. + for memtable in snapshot.imm_memtables.iter() { + if let Some(value) = memtable.get(key) { + if value.is_empty() { + // found tomestone, return key not exists + return Ok(None); + } + return Ok(Some(value)); + } + } + + let mut l0_iters = Vec::with_capacity(snapshot.l0_sstables.len()); + + let keep_table = |key: &[u8], table: &SsTable| { + if key_within( + key, + table.first_key().as_key_slice(), + table.last_key().as_key_slice(), + ) { + if let Some(bloom) = &table.bloom { + if bloom.may_contain(farmhash::fingerprint32(key)) { + return true; + } + } else { + return true; + } + } + false + }; + + for table in snapshot.l0_sstables.iter() { + let table = snapshot.sstables[table].clone(); + if keep_table(key, &table) { + l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_key( + table, + KeySlice::from_slice(key, key::TS_DEFAULT), + )?)); + } + } + let l0_iter = MergeIterator::create(l0_iters); + let mut level_iters = Vec::with_capacity(snapshot.levels.len()); + for (_, level_sst_ids) in &snapshot.levels { + let mut level_ssts = Vec::with_capacity(snapshot.levels[0].1.len()); + for table in level_sst_ids { + let table = snapshot.sstables[table].clone(); + if keep_table(key, &table) { + level_ssts.push(table); + } + } + let level_iter = SstConcatIterator::create_and_seek_to_key( + level_ssts, + KeySlice::from_slice(key, key::TS_DEFAULT), + )?; + level_iters.push(Box::new(level_iter)); + } + + let iter = TwoMergeIterator::create(l0_iter, MergeIterator::create(level_iters))?; + + if iter.is_valid() && iter.key().raw_ref() == key && !iter.value().is_empty() { + return Ok(Some(Bytes::copy_from_slice(iter.value()))); + } + Ok(None) + } + + /// Put a key-value pair into the storage by writing into the current memtable. + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + assert!(!value.is_empty(), "value cannot be empty"); + assert!(!key.is_empty(), "key cannot be empty"); + + let size; + { + let guard = self.state.read(); + guard.memtable.put(key, value)?; + size = guard.memtable.approximate_size(); + } + + self.try_freeze(size)?; + + Ok(()) + } + + /// Remove a key from the storage by writing an empty value. + pub fn delete(&self, key: &[u8]) -> Result<()> { + assert!(!key.is_empty(), "key cannot be empty"); + + let size; + { + let guard = self.state.read(); + guard.memtable.put(key, b"")?; + size = guard.memtable.approximate_size(); + } + + self.try_freeze(size)?; + + Ok(()) + } + + fn try_freeze(&self, estimated_size: usize) -> Result<()> { + if estimated_size >= self.options.target_sst_size { + let state_lock = self.state_lock.lock(); + let guard = self.state.read(); + // the memtable could have already been frozen, check again to ensure we really need to freeze + if guard.memtable.approximate_size() >= self.options.target_sst_size { + drop(guard); + self.force_freeze_memtable(&state_lock)?; + } + } + Ok(()) + } + + pub(crate) fn path_of_sst_static(path: impl AsRef, id: usize) -> PathBuf { + path.as_ref().join(format!("{:05}.sst", id)) + } + + pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf { + Self::path_of_sst_static(&self.path, id) + } + + pub(crate) fn path_of_wal_static(path: impl AsRef, id: usize) -> PathBuf { + path.as_ref().join(format!("{:05}.wal", id)) + } + + pub(crate) fn path_of_wal(&self, id: usize) -> PathBuf { + Self::path_of_wal_static(&self.path, id) + } + + pub(super) fn sync_dir(&self) -> Result<()> { + File::open(&self.path)?.sync_all()?; + Ok(()) + } + + fn freeze_memtable_with_memtable(&self, memtable: Arc) -> Result<()> { + let mut guard = self.state.write(); + // Swap the current memtable with a new one. + let mut snapshot = guard.as_ref().clone(); + let old_memtable = std::mem::replace(&mut snapshot.memtable, memtable); + // Add the memtable to the immutable memtables. + snapshot.imm_memtables.insert(0, old_memtable.clone()); + // Update the snapshot. + *guard = Arc::new(snapshot); + + drop(guard); + old_memtable.sync_wal()?; + + Ok(()) + } + + /// Force freeze the current memtable to an immutable memtable + pub fn force_freeze_memtable(&self, state_lock_observer: &MutexGuard<'_, ()>) -> Result<()> { + let memtable_id = self.next_sst_id(); + let memtable = if self.options.enable_wal { + Arc::new(MemTable::create_with_wal( + memtable_id, + self.path_of_wal(memtable_id), + )?) + } else { + Arc::new(MemTable::create(memtable_id)) + }; + + self.freeze_memtable_with_memtable(memtable)?; + + self.manifest.as_ref().unwrap().add_record( + state_lock_observer, + ManifestRecord::NewMemtable(memtable_id), + )?; + self.sync_dir()?; + + Ok(()) + } + + /// Force flush the earliest-created immutable memtable to disk + pub fn force_flush_next_imm_memtable(&self) -> Result<()> { + let state_lock = self.state_lock.lock(); + + let flush_memtable; + + { + let guard = self.state.read(); + flush_memtable = guard + .imm_memtables + .last() + .expect("no imm memtables!") + .clone(); + } + + let mut builder = SsTableBuilder::new(self.options.block_size); + flush_memtable.flush(&mut builder)?; + let sst_id = flush_memtable.id(); + let sst = Arc::new(builder.build( + sst_id, + Some(self.block_cache.clone()), + self.path_of_sst(sst_id), + )?); + + // Add the flushed L0 table to the list. + { + let mut guard = self.state.write(); + let mut snapshot = guard.as_ref().clone(); + // Remove the memtable from the immutable memtables. + let mem = snapshot.imm_memtables.pop().unwrap(); + assert_eq!(mem.id(), sst_id); + // Add L0 table + if self.compaction_controller.flush_to_l0() { + // In leveled compaction or no compaction, simply flush to L0 + snapshot.l0_sstables.insert(0, sst_id); + } else { + // In tiered compaction, create a new tier + snapshot.levels.insert(0, (sst_id, vec![sst_id])); + } + println!("flushed {}.sst with size={}", sst_id, sst.table_size()); + snapshot.sstables.insert(sst_id, sst); + // Update the snapshot. + *guard = Arc::new(snapshot); + } + + if self.options.enable_wal { + std::fs::remove_file(self.path_of_wal(sst_id))?; + } + + self.manifest + .as_ref() + .unwrap() + .add_record(&state_lock, ManifestRecord::Flush(sst_id))?; + + self.sync_dir()?; + + Ok(()) + } + + /// Create an iterator over a range of keys. + pub fn scan( + &self, + lower: Bound<&[u8]>, + upper: Bound<&[u8]>, + ) -> Result> { + let snapshot = { + let guard = self.state.read(); + Arc::clone(&guard) + }; // drop global lock here + + let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1); + memtable_iters.push(Box::new(snapshot.memtable.scan(lower, upper))); + for memtable in snapshot.imm_memtables.iter() { + memtable_iters.push(Box::new(memtable.scan(lower, upper))); + } + let memtable_iter = MergeIterator::create(memtable_iters); + + let mut table_iters = Vec::with_capacity(snapshot.l0_sstables.len()); + for table_id in snapshot.l0_sstables.iter() { + let table = snapshot.sstables[table_id].clone(); + if range_overlap( + lower, + upper, + table.first_key().as_key_slice(), + table.last_key().as_key_slice(), + ) { + let iter = match lower { + Bound::Included(key) => SsTableIterator::create_and_seek_to_key( + table, + KeySlice::from_slice(key, key::TS_DEFAULT), + )?, + Bound::Excluded(key) => { + let mut iter = SsTableIterator::create_and_seek_to_key( + table, + KeySlice::from_slice(key, key::TS_DEFAULT), + )?; + if iter.is_valid() && iter.key().raw_ref() == key { + iter.next()?; + } + iter + } + Bound::Unbounded => SsTableIterator::create_and_seek_to_first(table)?, + }; + + table_iters.push(Box::new(iter)); + } + } + + let l0_iter = MergeIterator::create(table_iters); + let mut level_iters = Vec::with_capacity(snapshot.levels.len()); + for (_, level_sst_ids) in &snapshot.levels { + let mut level_ssts = Vec::with_capacity(level_sst_ids.len()); + for table in level_sst_ids { + let table = snapshot.sstables[table].clone(); + if range_overlap( + lower, + upper, + table.first_key().as_key_slice(), + table.last_key().as_key_slice(), + ) { + level_ssts.push(table); + } + } + + let level_iter = match lower { + Bound::Included(key) => SstConcatIterator::create_and_seek_to_key( + level_ssts, + KeySlice::from_slice(key, key::TS_DEFAULT), + )?, + Bound::Excluded(key) => { + let mut iter = SstConcatIterator::create_and_seek_to_key( + level_ssts, + KeySlice::from_slice(key, key::TS_DEFAULT), + )?; + if iter.is_valid() && iter.key().raw_ref() == key { + iter.next()?; + } + iter + } + Bound::Unbounded => SstConcatIterator::create_and_seek_to_first(level_ssts)?, + }; + level_iters.push(Box::new(level_iter)); + } + + let iter = TwoMergeIterator::create(memtable_iter, l0_iter)?; + let iter = TwoMergeIterator::create(iter, MergeIterator::create(level_iters))?; + + Ok(FusedIterator::new(LsmIterator::new( + iter, + map_bound(upper), + )?)) + } +} diff --git a/mini-lsm-mvcc/src/manifest.rs b/mini-lsm-mvcc/src/manifest.rs new file mode 100644 index 0000000..62d968c --- /dev/null +++ b/mini-lsm-mvcc/src/manifest.rs @@ -0,0 +1,74 @@ +use std::fs::{File, OpenOptions}; +use std::io::{Read, Write}; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use parking_lot::{Mutex, MutexGuard}; +use serde::{Deserialize, Serialize}; +use serde_json::Deserializer; + +use crate::compact::CompactionTask; + +pub struct Manifest { + file: Arc>, +} + +#[derive(Serialize, Deserialize)] +pub enum ManifestRecord { + Flush(usize), + NewMemtable(usize), + Compaction(CompactionTask, Vec), +} + +impl Manifest { + pub fn create(path: impl AsRef) -> Result { + Ok(Self { + file: Arc::new(Mutex::new( + OpenOptions::new() + .read(true) + .create_new(true) + .write(true) + .open(path) + .context("failed to create manifest")?, + )), + }) + } + + pub fn recover(path: impl AsRef) -> Result<(Self, Vec)> { + let mut file = OpenOptions::new() + .read(true) + .append(true) + .open(path) + .context("failed to recover manifest")?; + let mut buf = Vec::new(); + file.read_to_end(&mut buf)?; + let stream = Deserializer::from_slice(&buf).into_iter::(); + let mut records = Vec::new(); + for x in stream { + records.push(x?); + } + Ok(( + Self { + file: Arc::new(Mutex::new(file)), + }, + records, + )) + } + + pub fn add_record( + &self, + _state_lock_observer: &MutexGuard<()>, + record: ManifestRecord, + ) -> Result<()> { + self.add_record_when_init(record) + } + + pub fn add_record_when_init(&self, record: ManifestRecord) -> Result<()> { + let mut file = self.file.lock(); + let buf = serde_json::to_vec(&record)?; + file.write_all(&buf)?; + file.sync_all()?; + Ok(()) + } +} diff --git a/mini-lsm-mvcc/src/mem_table.rs b/mini-lsm-mvcc/src/mem_table.rs new file mode 100644 index 0000000..73a74e1 --- /dev/null +++ b/mini-lsm-mvcc/src/mem_table.rs @@ -0,0 +1,184 @@ +use std::ops::Bound; +use std::path::Path; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +use anyhow::Result; +use bytes::Bytes; +use crossbeam_skiplist::map::Entry; +use crossbeam_skiplist::SkipMap; +use ouroboros::self_referencing; + +use crate::iterators::StorageIterator; +use crate::key::{self, KeySlice}; +use crate::table::SsTableBuilder; +use crate::wal::Wal; + +/// A basic mem-table based on crossbeam-skiplist. +/// +/// An initial implementation of memtable is part of week 1, day 1. It will be incrementally implemented in other +/// chapters of week 1 and week 2. +pub struct MemTable { + map: Arc>, + wal: Option, + id: usize, + approximate_size: Arc, +} + +/// Create a bound of `Bytes` from a bound of `&[u8]`. +pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound { + match bound { + Bound::Included(x) => Bound::Included(Bytes::copy_from_slice(x)), + Bound::Excluded(x) => Bound::Excluded(Bytes::copy_from_slice(x)), + Bound::Unbounded => Bound::Unbounded, + } +} + +impl MemTable { + /// Create a new mem-table. + pub fn create(id: usize) -> Self { + Self { + id, + map: Arc::new(SkipMap::new()), + wal: None, + approximate_size: Arc::new(AtomicUsize::new(0)), + } + } + + /// Create a new mem-table with WAL + pub fn create_with_wal(id: usize, path: impl AsRef) -> Result { + Ok(Self { + id, + map: Arc::new(SkipMap::new()), + wal: Some(Wal::create(path.as_ref())?), + approximate_size: Arc::new(AtomicUsize::new(0)), + }) + } + + /// Create a memtable from WAL + pub fn recover_from_wal(id: usize, path: impl AsRef) -> Result { + let map = Arc::new(SkipMap::new()); + Ok(Self { + id, + wal: Some(Wal::recover(path.as_ref(), &map)?), + map, + approximate_size: Arc::new(AtomicUsize::new(0)), + }) + } + + /// Get a value by key. + pub fn get(&self, key: &[u8]) -> Option { + self.map.get(key).map(|e| e.value().clone()) + } + + /// Put a key-value pair into the mem-table. + /// + /// In week 1, day 1, simply put the key-value pair into the skipmap. + /// In week 2, day 6, also flush the data to WAL. + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + let estimated_size = key.len() + value.len(); + self.map + .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); + self.approximate_size + .fetch_add(estimated_size, std::sync::atomic::Ordering::Relaxed); + if let Some(ref wal) = self.wal { + wal.put(key, value)?; + } + Ok(()) + } + + pub fn sync_wal(&self) -> Result<()> { + if let Some(ref wal) = self.wal { + wal.sync()?; + } + Ok(()) + } + + /// Get an iterator over a range of keys. + pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> MemTableIterator { + let (lower, upper) = (map_bound(lower), map_bound(upper)); + let mut iter = MemTableIteratorBuilder { + map: self.map.clone(), + iter_builder: |map| map.range((lower, upper)), + item: (Bytes::new(), Bytes::new()), + } + .build(); + let entry = iter.with_iter_mut(|iter| MemTableIterator::entry_to_item(iter.next())); + iter.with_mut(|x| *x.item = entry); + iter + } + + /// Flush the mem-table to SSTable. Implement in week 1 day 6. + pub fn flush(&self, builder: &mut SsTableBuilder) -> Result<()> { + for entry in self.map.iter() { + builder.add( + KeySlice::from_slice(&entry.key()[..], key::TS_DEFAULT), + &entry.value()[..], + ); + } + Ok(()) + } + + pub fn id(&self) -> usize { + self.id + } + + pub fn approximate_size(&self) -> usize { + self.approximate_size + .load(std::sync::atomic::Ordering::Relaxed) + } + + /// Only use this function when closing the database + pub fn is_empty(&self) -> bool { + self.map.is_empty() + } +} + +type SkipMapRangeIter<'a> = + crossbeam_skiplist::map::Range<'a, Bytes, (Bound, Bound), Bytes, Bytes>; + +/// An iterator over a range of `SkipMap`. This is a self-referential structure and please refer to week 1, day 2 +/// chapter for more information. +/// +/// This is part of week 1, day 2. +#[self_referencing] +pub struct MemTableIterator { + /// Stores a reference to the skipmap. + map: Arc>, + /// Stores a skipmap iterator that refers to the lifetime of `MemTableIterator` itself. + #[borrows(map)] + #[not_covariant] + iter: SkipMapRangeIter<'this>, + /// Stores the current key-value pair. + item: (Bytes, Bytes), +} + +impl MemTableIterator { + fn entry_to_item(entry: Option>) -> (Bytes, Bytes) { + entry + .map(|x| (x.key().clone(), x.value().clone())) + .unwrap_or_else(|| (Bytes::from_static(&[]), Bytes::from_static(&[]))) + } +} + +impl StorageIterator for MemTableIterator { + type KeyType<'a> = KeySlice<'a>; + + fn value(&self) -> &[u8] { + &self.borrow_item().1[..] + } + + fn key(&self) -> KeySlice { + KeySlice::from_slice(&self.borrow_item().0[..], key::TS_DEFAULT) + } + + fn is_valid(&self) -> bool { + !self.borrow_item().0.is_empty() + } + + fn next(&mut self) -> Result<()> { + let entry = self.with_iter_mut(|iter| MemTableIterator::entry_to_item(iter.next())); + self.with_mut(|x| *x.item = entry); + Ok(()) + } +} diff --git a/mini-lsm-mvcc/src/table.rs b/mini-lsm-mvcc/src/table.rs new file mode 100644 index 0000000..99fa818 --- /dev/null +++ b/mini-lsm-mvcc/src/table.rs @@ -0,0 +1,228 @@ +pub(crate) mod bloom; +mod builder; +mod iterator; + +use std::fs::File; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{anyhow, Result}; +pub use builder::SsTableBuilder; +use bytes::{Buf, BufMut}; +pub use iterator::SsTableIterator; + +use crate::block::Block; +use crate::key::{KeyBytes, KeySlice}; +use crate::lsm_storage::BlockCache; + +use self::bloom::Bloom; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct BlockMeta { + /// Offset of this data block. + pub offset: usize, + /// The first key of the data block. + pub first_key: KeyBytes, + /// The last key of the data block. + pub last_key: KeyBytes, +} + +impl BlockMeta { + /// Encode block meta to a buffer. + pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec) { + let mut estimated_size = 0; + for meta in block_meta { + // The size of offset + estimated_size += std::mem::size_of::(); + // The size of key length + estimated_size += std::mem::size_of::(); + // The size of actual key + estimated_size += meta.first_key.len(); + // The size of key length + estimated_size += std::mem::size_of::(); + // The size of actual key + estimated_size += meta.last_key.len(); + } + // Reserve the space to improve performance, especially when the size of incoming data is + // large + buf.reserve(estimated_size); + let original_len = buf.len(); + for meta in block_meta { + buf.put_u32(meta.offset as u32); + buf.put_u16(meta.first_key.len() as u16); + buf.put_slice(meta.first_key.raw_ref()); + buf.put_u16(meta.last_key.len() as u16); + buf.put_slice(meta.last_key.raw_ref()); + } + assert_eq!(estimated_size, buf.len() - original_len); + } + + /// Decode block meta from a buffer. + pub fn decode_block_meta(mut buf: impl Buf) -> Vec { + let mut block_meta = Vec::new(); + while buf.has_remaining() { + let offset = buf.get_u32() as usize; + let first_key_len = buf.get_u16() as usize; + let first_key = KeyBytes::from_bytes(buf.copy_to_bytes(first_key_len)); + let last_key_len: usize = buf.get_u16() as usize; + let last_key = KeyBytes::from_bytes(buf.copy_to_bytes(last_key_len)); + block_meta.push(BlockMeta { + offset, + first_key, + last_key, + }); + } + block_meta + } +} + +/// A file object. +pub struct FileObject(Option, u64); + +impl FileObject { + pub fn read(&self, offset: u64, len: u64) -> Result> { + use std::os::unix::fs::FileExt; + let mut data = vec![0; len as usize]; + self.0 + .as_ref() + .unwrap() + .read_exact_at(&mut data[..], offset)?; + Ok(data) + } + + pub fn size(&self) -> u64 { + self.1 + } + + /// Create a new file object (day 2) and write the file to the disk (day 4). + pub fn create(path: &Path, data: Vec) -> Result { + std::fs::write(path, &data)?; + File::open(path)?.sync_all()?; + Ok(FileObject( + Some(File::options().read(true).write(false).open(path)?), + data.len() as u64, + )) + } + + pub fn open(path: &Path) -> Result { + let file = File::options().read(true).write(false).open(path)?; + let size = file.metadata()?.len(); + Ok(FileObject(Some(file), size)) + } +} + +/// An SSTable. +pub struct SsTable { + /// The actual storage unit of SsTable, the format is as above. + pub(crate) file: FileObject, + /// The meta blocks that hold info for data blocks. + pub(crate) block_meta: Vec, + /// The offset that indicates the start point of meta blocks in `file`. + pub(crate) block_meta_offset: usize, + id: usize, + block_cache: Option>, + first_key: KeyBytes, + last_key: KeyBytes, + pub(crate) bloom: Option, +} +impl SsTable { + #[cfg(test)] + pub(crate) fn open_for_test(file: FileObject) -> Result { + Self::open(0, None, file) + } + + /// Open SSTable from a file. + pub fn open(id: usize, block_cache: Option>, file: FileObject) -> Result { + let len = file.size(); + let raw_bloom_offset = file.read(len - 4, 4)?; + let bloom_offset = (&raw_bloom_offset[..]).get_u32() as u64; + let raw_bloom = file.read(bloom_offset, len - 4 - bloom_offset)?; + let bloom_filter = Bloom::decode(&raw_bloom); + let raw_meta_offset = file.read(bloom_offset - 4, 4)?; + let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64; + let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?; + let block_meta = BlockMeta::decode_block_meta(&raw_meta[..]); + Ok(Self { + file, + first_key: block_meta.first().unwrap().first_key.clone(), + last_key: block_meta.last().unwrap().last_key.clone(), + block_meta, + block_meta_offset: block_meta_offset as usize, + id, + block_cache, + bloom: Some(bloom_filter), + }) + } + + /// Create a mock SST with only first key + last key metadata + pub fn create_meta_only( + id: usize, + file_size: u64, + first_key: KeyBytes, + last_key: KeyBytes, + ) -> Self { + Self { + file: FileObject(None, file_size), + block_meta: vec![], + block_meta_offset: 0, + id, + block_cache: None, + first_key, + last_key, + bloom: None, + } + } + + /// Read a block from the disk. + pub fn read_block(&self, block_idx: usize) -> Result> { + let offset = self.block_meta[block_idx].offset; + let offset_end = self + .block_meta + .get(block_idx + 1) + .map_or(self.block_meta_offset, |x| x.offset); + let block_data = self + .file + .read(offset as u64, (offset_end - offset) as u64)?; + Ok(Arc::new(Block::decode(&block_data[..]))) + } + + /// Read a block from disk, with block cache. + pub fn read_block_cached(&self, block_idx: usize) -> Result> { + if let Some(ref block_cache) = self.block_cache { + let blk = block_cache + .try_get_with((self.id, block_idx), || self.read_block(block_idx)) + .map_err(|e| anyhow!("{}", e))?; + Ok(blk) + } else { + self.read_block(block_idx) + } + } + + /// Find the block that may contain `key`. + pub fn find_block_idx(&self, key: KeySlice) -> usize { + self.block_meta + .partition_point(|meta| meta.first_key.as_key_slice() <= key) + .saturating_sub(1) + } + + /// Get number of data blocks. + pub fn num_of_blocks(&self) -> usize { + self.block_meta.len() + } + + pub fn first_key(&self) -> &KeyBytes { + &self.first_key + } + + pub fn last_key(&self) -> &KeyBytes { + &self.last_key + } + + pub fn table_size(&self) -> u64 { + self.file.1 + } + + pub fn sst_id(&self) -> usize { + self.id + } +} diff --git a/mini-lsm-mvcc/src/table/bloom.rs b/mini-lsm-mvcc/src/table/bloom.rs new file mode 100644 index 0000000..5b41d8e --- /dev/null +++ b/mini-lsm-mvcc/src/table/bloom.rs @@ -0,0 +1,113 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use bytes::{BufMut, Bytes, BytesMut}; + +/// Implements a bloom filter +pub struct Bloom { + /// data of filter in bits + pub(crate) filter: Bytes, + /// number of hash functions + pub(crate) k: u8, +} + +pub trait BitSlice { + fn get_bit(&self, idx: usize) -> bool; + fn bit_len(&self) -> usize; +} + +pub trait BitSliceMut { + fn set_bit(&mut self, idx: usize, val: bool); +} + +impl> BitSlice for T { + fn get_bit(&self, idx: usize) -> bool { + let pos = idx / 8; + let offset = idx % 8; + (self.as_ref()[pos] & (1 << offset)) != 0 + } + + fn bit_len(&self) -> usize { + self.as_ref().len() * 8 + } +} + +impl> BitSliceMut for T { + fn set_bit(&mut self, idx: usize, val: bool) { + let pos = idx / 8; + let offset = idx % 8; + if val { + self.as_mut()[pos] |= 1 << offset; + } else { + self.as_mut()[pos] &= !(1 << offset); + } + } +} + +impl Bloom { + /// Decode a bloom filter + pub fn decode(buf: &[u8]) -> Self { + let filter = &buf[..buf.len() - 1]; + let k = buf[buf.len() - 1]; + Self { + filter: filter.to_vec().into(), + k, + } + } + + /// Encode a bloom filter + pub fn encode(&self, buf: &mut Vec) { + buf.extend(&self.filter); + buf.put_u8(self.k); + } + + /// Get bloom filter bits per key from entries count and FPR + pub fn bloom_bits_per_key(entries: usize, false_positive_rate: f64) -> usize { + let size = + -1.0 * (entries as f64) * false_positive_rate.ln() / std::f64::consts::LN_2.powi(2); + let locs = (size / (entries as f64)).ceil(); + locs as usize + } + + /// Build bloom filter from key hashes + pub fn build_from_key_hashes(keys: &[u32], bits_per_key: usize) -> Self { + let k = (bits_per_key as f64 * 0.69) as u32; + let k = k.min(30).max(1); + let nbits = (keys.len() * bits_per_key).max(64); + let nbytes = (nbits + 7) / 8; + let nbits = nbytes * 8; + let mut filter = BytesMut::with_capacity(nbytes); + filter.resize(nbytes, 0); + for h in keys { + let mut h = *h; + let delta = (h >> 17) | (h << 15); + for _ in 0..k { + let bit_pos = (h as usize) % nbits; + filter.set_bit(bit_pos, true); + h = h.wrapping_add(delta); + } + } + Self { + filter: filter.freeze(), + k: k as u8, + } + } + + /// Check if a bloom filter may contain some data + pub fn may_contain(&self, mut h: u32) -> bool { + if self.k > 30 { + // potential new encoding for short bloom filters + true + } else { + let nbits = self.filter.bit_len(); + let delta = (h >> 17) | (h << 15); + for _ in 0..self.k { + let bit_pos = h % (nbits as u32); + if !self.filter.get_bit(bit_pos as usize) { + return false; + } + h = h.wrapping_add(delta); + } + true + } + } +} diff --git a/mini-lsm-mvcc/src/table/builder.rs b/mini-lsm-mvcc/src/table/builder.rs new file mode 100644 index 0000000..a8753d1 --- /dev/null +++ b/mini-lsm-mvcc/src/table/builder.rs @@ -0,0 +1,112 @@ +use std::path::Path; +use std::sync::Arc; + +use anyhow::Result; +use bytes::BufMut; + +use super::bloom::Bloom; +use super::{BlockMeta, FileObject, SsTable}; +use crate::block::BlockBuilder; +use crate::key::{KeySlice, KeyVec}; +use crate::lsm_storage::BlockCache; + +/// Builds an SSTable from key-value pairs. +pub struct SsTableBuilder { + builder: BlockBuilder, + first_key: KeyVec, + last_key: KeyVec, + data: Vec, + pub(crate) meta: Vec, + block_size: usize, + key_hashes: Vec, +} + +impl SsTableBuilder { + /// Create a builder based on target block size. + pub fn new(block_size: usize) -> Self { + Self { + data: Vec::new(), + meta: Vec::new(), + first_key: KeyVec::new(), + last_key: KeyVec::new(), + block_size, + builder: BlockBuilder::new(block_size), + key_hashes: Vec::new(), + } + } + + /// Adds a key-value pair to SSTable + pub fn add(&mut self, key: KeySlice, value: &[u8]) { + if self.first_key.is_empty() { + self.first_key.set_from_slice(key); + } + + self.key_hashes.push(farmhash::fingerprint32(key.raw_ref())); + + if self.builder.add(key, value) { + self.last_key.set_from_slice(key); + return; + } + + // create a new block builder and append block data + self.finish_block(); + + // add the key-value pair to the next block + assert!(self.builder.add(key, value)); + self.first_key.set_from_slice(key); + self.last_key.set_from_slice(key); + } + + /// Get the estimated size of the SSTable. + pub fn estimated_size(&self) -> usize { + self.data.len() + } + + fn finish_block(&mut self) { + let builder = std::mem::replace(&mut self.builder, BlockBuilder::new(self.block_size)); + let encoded_block = builder.build().encode(); + self.meta.push(BlockMeta { + offset: self.data.len(), + first_key: std::mem::take(&mut self.first_key).into_key_bytes(), + last_key: std::mem::take(&mut self.last_key).into_key_bytes(), + }); + self.data.extend(encoded_block); + } + + /// Builds the SSTable and writes it to the given path. Use the `FileObject` structure to manipulate the disk objects. + pub fn build( + mut self, + id: usize, + block_cache: Option>, + path: impl AsRef, + ) -> Result { + self.finish_block(); + let mut buf = self.data; + let meta_offset = buf.len(); + BlockMeta::encode_block_meta(&self.meta, &mut buf); + buf.put_u32(meta_offset as u32); + let bloom = Bloom::build_from_key_hashes( + &self.key_hashes, + Bloom::bloom_bits_per_key(self.key_hashes.len(), 0.01), + ); + let bloom_offset = buf.len(); + bloom.encode(&mut buf); + buf.put_u32(bloom_offset as u32); + let file = FileObject::create(path.as_ref(), buf)?; + Ok(SsTable { + id, + file, + first_key: self.meta.first().unwrap().first_key.clone(), + last_key: self.meta.last().unwrap().last_key.clone(), + block_meta: self.meta, + block_meta_offset: meta_offset, + block_cache, + bloom: Some(bloom), + }) + } + + #[cfg(test)] + pub(crate) fn build_for_test(self, path: impl AsRef) -> Result { + self.build(0, None, path) + } +} diff --git a/mini-lsm-mvcc/src/table/iterator.rs b/mini-lsm-mvcc/src/table/iterator.rs new file mode 100644 index 0000000..522f1a0 --- /dev/null +++ b/mini-lsm-mvcc/src/table/iterator.rs @@ -0,0 +1,105 @@ +use std::sync::Arc; + +use anyhow::Result; + +use super::SsTable; +use crate::block::BlockIterator; +use crate::iterators::StorageIterator; +use crate::key::KeySlice; + +/// An iterator over the contents of an SSTable. +pub struct SsTableIterator { + table: Arc, + blk_iter: BlockIterator, + blk_idx: usize, +} + +impl SsTableIterator { + fn seek_to_first_inner(table: &Arc) -> Result<(usize, BlockIterator)> { + Ok(( + 0, + BlockIterator::create_and_seek_to_first(table.read_block_cached(0)?), + )) + } + + /// Create a new iterator and seek to the first key-value pair. + pub fn create_and_seek_to_first(table: Arc) -> Result { + let (blk_idx, blk_iter) = Self::seek_to_first_inner(&table)?; + let iter = Self { + blk_iter, + table, + blk_idx, + }; + Ok(iter) + } + + /// Seek to the first key-value pair. + pub fn seek_to_first(&mut self) -> Result<()> { + let (blk_idx, blk_iter) = Self::seek_to_first_inner(&self.table)?; + self.blk_idx = blk_idx; + self.blk_iter = blk_iter; + Ok(()) + } + + fn seek_to_key_inner(table: &Arc, key: KeySlice) -> Result<(usize, BlockIterator)> { + let mut blk_idx = table.find_block_idx(key); + let mut blk_iter = + BlockIterator::create_and_seek_to_key(table.read_block_cached(blk_idx)?, key); + if !blk_iter.is_valid() { + blk_idx += 1; + if blk_idx < table.num_of_blocks() { + blk_iter = + BlockIterator::create_and_seek_to_first(table.read_block_cached(blk_idx)?); + } + } + Ok((blk_idx, blk_iter)) + } + + /// Create a new iterator and seek to the first key-value pair which >= `key`. + pub fn create_and_seek_to_key(table: Arc, key: KeySlice) -> Result { + let (blk_idx, blk_iter) = Self::seek_to_key_inner(&table, key)?; + let iter = Self { + blk_iter, + table, + blk_idx, + }; + Ok(iter) + } + + /// Seek to the first key-value pair which >= `key`. + pub fn seek_to_key(&mut self, key: KeySlice) -> Result<()> { + let (blk_idx, blk_iter) = Self::seek_to_key_inner(&self.table, key)?; + self.blk_iter = blk_iter; + self.blk_idx = blk_idx; + Ok(()) + } +} + +impl StorageIterator for SsTableIterator { + type KeyType<'a> = KeySlice<'a>; + + fn value(&self) -> &[u8] { + self.blk_iter.value() + } + + fn key(&self) -> KeySlice { + self.blk_iter.key() + } + + fn is_valid(&self) -> bool { + self.blk_iter.is_valid() + } + + fn next(&mut self) -> Result<()> { + self.blk_iter.next(); + if !self.blk_iter.is_valid() { + self.blk_idx += 1; + if self.blk_idx < self.table.num_of_blocks() { + self.blk_iter = BlockIterator::create_and_seek_to_first( + self.table.read_block_cached(self.blk_idx)?, + ); + } + } + Ok(()) + } +} diff --git a/mini-lsm-mvcc/src/tests.rs b/mini-lsm-mvcc/src/tests.rs new file mode 100644 index 0000000..e6ec3ed --- /dev/null +++ b/mini-lsm-mvcc/src/tests.rs @@ -0,0 +1,9 @@ +mod harness; +mod week1_day1; +mod week1_day2; +mod week1_day3; +mod week1_day4; +mod week1_day5; +mod week1_day6; +mod week1_day7; +mod week2_day1; diff --git a/mini-lsm-mvcc/src/tests/harness.rs b/mini-lsm-mvcc/src/tests/harness.rs new file mode 120000 index 0000000..801d9b1 --- /dev/null +++ b/mini-lsm-mvcc/src/tests/harness.rs @@ -0,0 +1 @@ +../../../mini-lsm/src/tests/harness.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/tests/week1_day1.rs b/mini-lsm-mvcc/src/tests/week1_day1.rs new file mode 120000 index 0000000..ab5f4be --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week1_day1.rs @@ -0,0 +1 @@ +../../../mini-lsm/src/tests/week1_day1.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/tests/week1_day2.rs b/mini-lsm-mvcc/src/tests/week1_day2.rs new file mode 120000 index 0000000..d2c85eb --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week1_day2.rs @@ -0,0 +1 @@ +../../../mini-lsm/src/tests/week1_day2.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/tests/week1_day3.rs b/mini-lsm-mvcc/src/tests/week1_day3.rs new file mode 120000 index 0000000..cc87707 --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week1_day3.rs @@ -0,0 +1 @@ +../../../mini-lsm/src/tests/week1_day3.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/tests/week1_day4.rs b/mini-lsm-mvcc/src/tests/week1_day4.rs new file mode 120000 index 0000000..6327153 --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week1_day4.rs @@ -0,0 +1 @@ +../../../mini-lsm/src/tests/week1_day4.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/tests/week1_day5.rs b/mini-lsm-mvcc/src/tests/week1_day5.rs new file mode 120000 index 0000000..6f3ccab --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week1_day5.rs @@ -0,0 +1 @@ +../../../mini-lsm/src/tests/week1_day5.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/tests/week1_day6.rs b/mini-lsm-mvcc/src/tests/week1_day6.rs new file mode 120000 index 0000000..e0922a2 --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week1_day6.rs @@ -0,0 +1 @@ +../../../mini-lsm/src/tests/week1_day6.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/tests/week1_day7.rs b/mini-lsm-mvcc/src/tests/week1_day7.rs new file mode 120000 index 0000000..8aa545c --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week1_day7.rs @@ -0,0 +1 @@ +../../../mini-lsm/src/tests/week1_day7.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/tests/week2_day1.rs b/mini-lsm-mvcc/src/tests/week2_day1.rs new file mode 120000 index 0000000..6c9fbe3 --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week2_day1.rs @@ -0,0 +1 @@ +../../../mini-lsm/src/tests/week2_day1.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/wal.rs b/mini-lsm-mvcc/src/wal.rs new file mode 100644 index 0000000..e14d53c --- /dev/null +++ b/mini-lsm-mvcc/src/wal.rs @@ -0,0 +1,70 @@ +use std::fs::{File, OpenOptions}; +use std::io::{Read, Write}; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use bytes::{Buf, BufMut, Bytes}; +use crossbeam_skiplist::SkipMap; +use parking_lot::Mutex; + +pub struct Wal { + file: Arc>, +} + +impl Wal { + pub fn create(path: impl AsRef) -> Result { + Ok(Self { + file: Arc::new(Mutex::new( + OpenOptions::new() + .read(true) + .create_new(true) + .write(true) + .open(path) + .context("failed to create WAL")?, + )), + }) + } + + pub fn recover(path: impl AsRef, skiplist: &SkipMap) -> Result { + let path = path.as_ref(); + let mut file = OpenOptions::new() + .read(true) + .append(true) + .open(path) + .context("failed to recover from WAL")?; + let mut buf = Vec::new(); + file.read_to_end(&mut buf)?; + let mut rbuf: &[u8] = buf.as_slice(); + while rbuf.has_remaining() { + let key_len = rbuf.get_u16() as usize; + let key = Bytes::copy_from_slice(&rbuf[..key_len]); + rbuf.advance(key_len); + let value_len = rbuf.get_u16() as usize; + let value = Bytes::copy_from_slice(&rbuf[..value_len]); + rbuf.advance(value_len); + skiplist.insert(key, value); + } + Ok(Self { + file: Arc::new(Mutex::new(file)), + }) + } + + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + let mut file = self.file.lock(); + let mut buf: Vec = + Vec::with_capacity(key.len() + value.len() + std::mem::size_of::()); + buf.put_u16(key.len() as u16); + buf.put_slice(key); + buf.put_u16(value.len() as u16); + buf.put_slice(value); + file.write_all(&buf)?; + Ok(()) + } + + pub fn sync(&self) -> Result<()> { + let file = self.file.lock(); + file.sync_all()?; + Ok(()) + } +} diff --git a/mini-lsm/README.md b/mini-lsm/README.md index cceb11b..81f71ad 100644 --- a/mini-lsm/README.md +++ b/mini-lsm/README.md @@ -1 +1,3 @@ -# mini-lsm week-2 solution \ No newline at end of file +# Week 2 Solution + +This is the solution of Mini-LSM week 1 + week 2.