diff --git a/mini-lsm/src/bin/compaction_simulator.rs b/mini-lsm/src/bin/compaction_simulator.rs index c41bbb3..2e43851 100644 --- a/mini-lsm/src/bin/compaction_simulator.rs +++ b/mini-lsm/src/bin/compaction_simulator.rs @@ -7,7 +7,7 @@ use mini_lsm::compact::{ LeveledCompactionController, LeveledCompactionOptions, SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController, TieredCompactionOptions, }; -use mini_lsm::lsm_storage::LsmStorageInner; +use mini_lsm::lsm_storage::LsmStorageState; use mini_lsm::mem_table::MemTable; use mini_lsm::table::SsTable; @@ -59,7 +59,7 @@ enum Args { } pub struct MockStorage { - snapshot: LsmStorageInner, + snapshot: LsmStorageState, next_sst_id: usize, /// Maps SST ID to the original flushed SST ID file_list: HashMap, @@ -69,7 +69,7 @@ pub struct MockStorage { impl MockStorage { pub fn new() -> Self { - let snapshot = LsmStorageInner { + let snapshot = LsmStorageState { memtable: Arc::new(MemTable::create()), imm_memtables: Vec::new(), l0_sstables: Vec::new(), diff --git a/mini-lsm/src/compact.rs b/mini-lsm/src/compact.rs index 1ac9487..6b31546 100644 --- a/mini-lsm/src/compact.rs +++ b/mini-lsm/src/compact.rs @@ -13,21 +13,50 @@ pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredComp use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::StorageIterator; -use crate::lsm_storage::LsmStorage; +use crate::lsm_storage::LsmStorageInner; use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; -pub enum CompactionTask { +pub(crate) enum CompactionTask { Leveled(LeveledCompactionTask), Tiered(TieredCompactionTask), + Simple(SimpleLeveledCompactionTask), } struct CompactOptions { block_size: usize, target_sst_size: usize, - compact_to_bottom_level: bool, } -impl LsmStorage { +pub(crate) enum CompactionController { + Leveled(LeveledCompactionController), + Tiered(TieredCompactionController), + Simple(SimpleLeveledCompactionController), + NoCompaction, +} + +impl CompactionController { + pub fn flush_to_l0(&self) -> bool { + if let Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction = self { + true + } else { + false + } + } +} + +pub enum CompactionOptions { + /// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled + /// Compaction) + Leveled(LeveledCompactionOptions), + /// Tiered compaction (= RocksDB's universal compaction) + Tiered(TieredCompactionOptions), + /// Simple leveled compaction + Simple(SimpleLeveledCompactionOptions), + /// In no compaction mode (week 1), always flush to L0 + NoCompaction, +} + +impl LsmStorageInner { #[allow(dead_code)] fn compact( &self, @@ -46,12 +75,14 @@ impl LsmStorage { let mut builder = None; let mut new_sst = vec![]; + let compact_to_bottom_level = false; + while iter.is_valid() { if builder.is_none() { builder = Some(SsTableBuilder::new(options.block_size)); } let builder_inner = builder.as_mut().unwrap(); - if options.compact_to_bottom_level { + if compact_to_bottom_level { if !iter.value().is_empty() { builder_inner.add(iter.key(), iter.value()); } @@ -82,4 +113,11 @@ impl LsmStorage { } Ok(new_sst) } + + pub(crate) fn spawn_compaction_thread( + self: &Arc, + rx: std::sync::mpsc::Receiver<()>, + ) -> Result>> { + Ok(None) + } } diff --git a/mini-lsm/src/compact/leveled.rs b/mini-lsm/src/compact/leveled.rs index ad854c7..a25b94a 100644 --- a/mini-lsm/src/compact/leveled.rs +++ b/mini-lsm/src/compact/leveled.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use crate::lsm_storage::LsmStorageInner; +use crate::lsm_storage::LsmStorageState; pub struct LeveledCompactionTask { // if upper_level is `None`, then it is L0 compaction @@ -10,6 +10,7 @@ pub struct LeveledCompactionTask { pub lower_level_sst_ids: Vec, } +#[derive(Debug, Clone)] pub struct LeveledCompactionOptions { pub level_size_multiplier: usize, pub level0_file_num_compaction_trigger: usize, @@ -28,7 +29,7 @@ impl LeveledCompactionController { fn find_overlapping_ssts( &self, - snapshot: &LsmStorageInner, + snapshot: &LsmStorageState, sst_ids: &[usize], in_level: usize, ) -> Vec { @@ -58,7 +59,7 @@ impl LeveledCompactionController { pub fn generate_compaction_task( &self, - snapshot: &LsmStorageInner, + snapshot: &LsmStorageState, ) -> Option { // step 1: compute target level size let mut target_level_size = (0..self.options.max_levels).map(|_| 0).collect::>(); // exclude level 0 @@ -149,10 +150,10 @@ impl LeveledCompactionController { pub fn apply_compaction_result( &self, - snapshot: &LsmStorageInner, + snapshot: &LsmStorageState, task: &LeveledCompactionTask, output: &[usize], - ) -> (LsmStorageInner, Vec) { + ) -> (LsmStorageState, Vec) { let mut snapshot = snapshot.clone(); let mut files_to_remove = Vec::new(); let mut upper_level_sst_ids_set = task diff --git a/mini-lsm/src/compact/simple_leveled.rs b/mini-lsm/src/compact/simple_leveled.rs index 66f16dd..295a907 100644 --- a/mini-lsm/src/compact/simple_leveled.rs +++ b/mini-lsm/src/compact/simple_leveled.rs @@ -1,5 +1,6 @@ -use crate::lsm_storage::LsmStorageInner; +use crate::lsm_storage::LsmStorageState; +#[derive(Debug, Clone)] pub struct SimpleLeveledCompactionOptions { pub size_ratio_percent: usize, pub level0_file_num_compaction_trigger: usize, @@ -25,7 +26,7 @@ impl SimpleLeveledCompactionController { pub fn generate_compaction_task( &self, - snapshot: &LsmStorageInner, + snapshot: &LsmStorageState, ) -> Option { let mut level_sizes = Vec::new(); level_sizes.push(snapshot.l0_sstables.len()); @@ -64,10 +65,10 @@ impl SimpleLeveledCompactionController { pub fn apply_compaction_result( &self, - snapshot: &LsmStorageInner, + snapshot: &LsmStorageState, task: &SimpleLeveledCompactionTask, output: &[usize], - ) -> (LsmStorageInner, Vec) { + ) -> (LsmStorageState, Vec) { let mut snapshot = snapshot.clone(); let mut files_to_remove = Vec::new(); if let Some(upper_level) = task.upper_level { diff --git a/mini-lsm/src/compact/tiered.rs b/mini-lsm/src/compact/tiered.rs index a6750c2..492431e 100644 --- a/mini-lsm/src/compact/tiered.rs +++ b/mini-lsm/src/compact/tiered.rs @@ -1,11 +1,12 @@ use std::collections::HashMap; -use crate::lsm_storage::LsmStorageInner; +use crate::lsm_storage::LsmStorageState; pub struct TieredCompactionTask { pub tiers: Vec<(usize, Vec)>, } +#[derive(Debug, Clone)] pub struct TieredCompactionOptions { pub level0_file_num_compaction_trigger: usize, pub max_size_amplification_percent: usize, @@ -24,7 +25,7 @@ impl TieredCompactionController { pub fn generate_compaction_task( &self, - snapshot: &LsmStorageInner, + snapshot: &LsmStorageState, ) -> Option { assert!( snapshot.l0_sstables.is_empty(), @@ -87,10 +88,10 @@ impl TieredCompactionController { pub fn apply_compaction_result( &self, - snapshot: &LsmStorageInner, + snapshot: &LsmStorageState, task: &TieredCompactionTask, output: &[usize], - ) -> (LsmStorageInner, Vec) { + ) -> (LsmStorageState, Vec) { assert!( snapshot.l0_sstables.is_empty(), "should not add l0 ssts in tiered compaction" diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index a6e8595..b1c9736 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -9,6 +9,10 @@ use bytes::Bytes; use parking_lot::{Mutex, RwLock}; use crate::block::Block; +use crate::compact::{ + CompactionController, CompactionOptions, LeveledCompactionController, LeveledCompactionOptions, + SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController, +}; use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::two_merge_iterator::TwoMergeIterator; use crate::iterators::StorageIterator; @@ -19,62 +23,162 @@ use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; #[derive(Clone)] -pub struct LsmStorageInner { +pub struct LsmStorageState { /// The current memtable. pub memtable: Arc, - /// Immutable memTables, from earliest to latest. + /// Immutable memtables, from earliest to latest. pub imm_memtables: Vec>, - /// L0 SsTables, from earliest to latest. + /// L0 SSTs, from earliest to latest. pub l0_sstables: Vec, - /// SsTables sorted by key range; L1 - L6 for leveled compaction, or tiers for tiered + /// SsTables sorted by key range; L1 - L_max for leveled compaction, or tiers for tiered /// compaction. - #[allow(dead_code)] pub levels: Vec<(usize, Vec)>, - /// SsTable objects. + /// SST objects. pub sstables: HashMap>, } -impl LsmStorageInner { - fn create() -> Self { +impl LsmStorageState { + fn create(options: &LsmStorageOptions) -> Self { + match &options.compaction_options { + CompactionOptions::Leveled(LeveledCompactionOptions { max_levels, .. }) + | CompactionOptions::Simple(SimpleLeveledCompactionOptions { max_levels, .. }) => { + Self { + memtable: Arc::new(MemTable::create()), + imm_memtables: Vec::new(), + l0_sstables: Vec::new(), + levels: (1..=*max_levels) + .map(|level| (level, Vec::new())) + .collect::>(), + sstables: Default::default(), + } + } + CompactionOptions::Tiered(_) | CompactionOptions::NoCompaction => Self { + memtable: Arc::new(MemTable::create()), + imm_memtables: Vec::new(), + l0_sstables: Vec::new(), + levels: Vec::new(), + sstables: Default::default(), + }, + } + } +} + +pub struct LsmStorageOptions { + block_size: usize, + target_sst_size: usize, + compaction_options: CompactionOptions, +} + +impl LsmStorageOptions { + pub fn default_for_week1_test() -> Self { Self { - memtable: Arc::new(MemTable::create()), - imm_memtables: vec![], - l0_sstables: vec![], - levels: vec![], - sstables: Default::default(), + block_size: 4096, + target_sst_size: 2 << 20, + compaction_options: CompactionOptions::NoCompaction, } } } /// The storage interface of the LSM tree. -pub struct LsmStorage { - pub(crate) inner: Arc>>, - flush_lock: Mutex<()>, +pub(crate) struct LsmStorageInner { + pub(crate) state: Arc>>, + state_lock: Mutex<()>, path: PathBuf, pub(crate) block_cache: Arc, next_sst_id: AtomicUsize, + options: Arc, + compaction_controller: CompactionController, } -impl LsmStorage { +pub struct MiniLsm { + inner: Arc, + compaction_notifier: std::sync::mpsc::Sender<()>, + compaction_thread: Mutex>>, +} + +impl Drop for MiniLsm { + fn drop(&mut self) { + self.compaction_notifier.send(()).ok(); + } +} + +impl MiniLsm { + pub fn close(&self) -> Result<()> { + self.compaction_notifier.send(()).ok(); + let mut compaction_thread = self.compaction_thread.lock(); + if let Some(mut compaction_thread) = compaction_thread.take() { + compaction_thread + .join() + .map_err(|e| anyhow::anyhow!("{:?}", e))?; + } + Ok(()) + } + + pub fn open(path: impl AsRef, options: LsmStorageOptions) -> Result> { + let inner = Arc::new(LsmStorageInner::open(path, options)?); + let (tx, rx) = std::sync::mpsc::channel(); + let compaction_thread = inner.spawn_compaction_thread(rx)?; + Ok(Arc::new(Self { + inner, + compaction_notifier: tx, + compaction_thread: Mutex::new(compaction_thread), + })) + } + + pub fn get(&self, key: &[u8]) -> Result> { + self.inner.get(key) + } + + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.inner.put(key, value) + } + + pub fn delete(&self, key: &[u8]) -> Result<()> { + self.inner.delete(key) + } + + pub fn scan( + &self, + lower: Bound<&[u8]>, + upper: Bound<&[u8]>, + ) -> Result> { + self.inner.scan(lower, upper) + } +} + +impl LsmStorageInner { pub(crate) fn next_sst_id(&self) -> usize { self.next_sst_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst) } - pub fn open(path: impl AsRef) -> Result { + pub(crate) fn open(path: impl AsRef, options: LsmStorageOptions) -> Result { Ok(Self { - inner: Arc::new(RwLock::new(Arc::new(LsmStorageInner::create()))), - flush_lock: Mutex::new(()), + state: Arc::new(RwLock::new(Arc::new(LsmStorageState::create(&options)))), + state_lock: Mutex::new(()), path: path.as_ref().to_path_buf(), block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache, next_sst_id: AtomicUsize::new(1), + compaction_controller: match &options.compaction_options { + CompactionOptions::Leveled(options) => { + CompactionController::Leveled(LeveledCompactionController::new(options.clone())) + } + CompactionOptions::Tiered(options) => { + CompactionController::Tiered(TieredCompactionController::new(options.clone())) + } + CompactionOptions::Simple(options) => CompactionController::Simple( + SimpleLeveledCompactionController::new(options.clone()), + ), + CompactionOptions::NoCompaction => CompactionController::NoCompaction, + }, + options: options.into(), }) } /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. pub fn get(&self, key: &[u8]) -> Result> { let snapshot = { - let guard = self.inner.read(); + let guard = self.state.read(); Arc::clone(&guard) }; // drop global lock here @@ -115,7 +219,7 @@ impl LsmStorage { assert!(!value.is_empty(), "value cannot be empty"); assert!(!key.is_empty(), "key cannot be empty"); - let guard = self.inner.read(); + let guard = self.state.read(); guard.memtable.put(key, value); Ok(()) @@ -125,7 +229,7 @@ impl LsmStorage { pub fn delete(&self, key: &[u8]) -> Result<()> { assert!(!key.is_empty(), "key cannot be empty"); - let guard = self.inner.read(); + let guard = self.state.read(); guard.memtable.put(key, b""); Ok(()) @@ -135,16 +239,21 @@ impl LsmStorage { self.path.join(format!("{:05}.sst", id)) } - /// Persist data to disk. - pub fn sync(&self) -> Result<()> { - let _flush_lock = self.flush_lock.lock(); + /// Force freeze the current memetable to an immutable memtable + pub fn force_freeze_memtable(&self) -> Result<()> { + Ok(()) + } + + /// Force flush the all immutable memtables to disk + pub fn force_flush_imm_memtables(&self) -> Result<()> { + let _flush_lock = self.state_lock.lock(); let flush_memtable; let sst_id; // Move mutable memtable to immutable memtables. { - let mut guard = self.inner.write(); + let mut guard = self.state.write(); // Swap the current memtable with a new one. let mut snapshot = guard.as_ref().clone(); let memtable = std::mem::replace(&mut snapshot.memtable, Arc::new(MemTable::create())); @@ -170,12 +279,18 @@ impl LsmStorage { // Add the flushed L0 table to the list. { - let mut guard = self.inner.write(); + let mut guard = self.state.write(); let mut snapshot = guard.as_ref().clone(); // Remove the memtable from the immutable memtables. snapshot.imm_memtables.pop(); // Add L0 table - snapshot.l0_sstables.push(sst_id); + if self.compaction_controller.flush_to_l0() { + // In leveled compaction or no compaction, simply flush to L0 + snapshot.l0_sstables.push(sst_id); + } else { + // In tiered compaction, create a new tier + snapshot.levels.insert(0, (sst_id, vec![sst_id])); + } snapshot.sstables.insert(sst_id, sst); // Update the snapshot. *guard = Arc::new(snapshot); @@ -191,7 +306,7 @@ impl LsmStorage { upper: Bound<&[u8]>, ) -> Result> { let snapshot = { - let guard = self.inner.read(); + let guard = self.state.read(); Arc::clone(&guard) }; // drop global lock here diff --git a/mini-lsm/src/tests/day4_tests.rs b/mini-lsm/src/tests/day4_tests.rs index 9bfee56..a870ae3 100644 --- a/mini-lsm/src/tests/day4_tests.rs +++ b/mini-lsm/src/tests/day4_tests.rs @@ -4,6 +4,7 @@ use bytes::Bytes; use tempfile::tempdir; use crate::iterators::StorageIterator; +use crate::lsm_storage::{LsmStorageInner, LsmStorageOptions}; fn as_bytes(x: &[u8]) -> Bytes { Bytes::copy_from_slice(x) @@ -32,11 +33,16 @@ fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) assert!(!iter.is_valid()); } +fn sync(storage: &LsmStorageInner) { + storage.force_freeze_memtable().unwrap(); + storage.force_flush_imm_memtables().unwrap(); +} + #[test] fn test_storage_get() { - use crate::lsm_storage::LsmStorage; + use crate::lsm_storage::LsmStorageInner; let dir = tempdir().unwrap(); - let storage = LsmStorage::open(&dir).unwrap(); + let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.put(b"3", b"23333").unwrap(); @@ -49,9 +55,9 @@ fn test_storage_get() { #[test] fn test_storage_scan_memtable_1() { - use crate::lsm_storage::LsmStorage; + use crate::lsm_storage::LsmStorageInner; let dir = tempdir().unwrap(); - let storage = LsmStorage::open(&dir).unwrap(); + let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.put(b"3", b"23333").unwrap(); @@ -79,9 +85,9 @@ fn test_storage_scan_memtable_1() { #[test] fn test_storage_scan_memtable_2() { - use crate::lsm_storage::LsmStorage; + use crate::lsm_storage::LsmStorageInner; let dir = tempdir().unwrap(); - let storage = LsmStorage::open(&dir).unwrap(); + let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.put(b"3", b"23333").unwrap(); @@ -109,12 +115,12 @@ fn test_storage_scan_memtable_2() { #[test] fn test_storage_get_after_sync() { - use crate::lsm_storage::LsmStorage; + use crate::lsm_storage::LsmStorageInner; let dir = tempdir().unwrap(); - let storage = LsmStorage::open(&dir).unwrap(); + let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); - storage.sync().unwrap(); + sync(&storage); storage.put(b"3", b"23333").unwrap(); assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233"); assert_eq!(&storage.get(b"2").unwrap().unwrap()[..], b"2333"); @@ -125,12 +131,12 @@ fn test_storage_get_after_sync() { #[test] fn test_storage_scan_memtable_1_after_sync() { - use crate::lsm_storage::LsmStorage; + use crate::lsm_storage::LsmStorageInner; let dir = tempdir().unwrap(); - let storage = LsmStorage::open(&dir).unwrap(); + let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); - storage.sync().unwrap(); + sync(&storage); storage.put(b"3", b"23333").unwrap(); storage.delete(b"2").unwrap(); check_iter_result( @@ -156,14 +162,14 @@ fn test_storage_scan_memtable_1_after_sync() { #[test] fn test_storage_scan_memtable_2_after_sync() { - use crate::lsm_storage::LsmStorage; + use crate::lsm_storage::LsmStorageInner; let dir = tempdir().unwrap(); - let storage = LsmStorage::open(&dir).unwrap(); + let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); - storage.sync().unwrap(); + sync(&storage); storage.put(b"3", b"23333").unwrap(); - storage.sync().unwrap(); + sync(&storage); storage.delete(b"1").unwrap(); check_iter_result( storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),