From 63429b50d53eee19095f65ce2f9076ccf94a5726 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Thu, 25 Jan 2024 23:38:26 +0800 Subject: [PATCH] consider merge width for tiered trigger + mvcc compaction Signed-off-by: Alex Chi --- mini-lsm-book/src/week3-overview.md | 7 ++ mini-lsm-mvcc/src/compact.rs | 30 ++++--- mini-lsm-mvcc/src/debug.rs | 12 ++- mini-lsm-mvcc/src/tests.rs | 8 +- mini-lsm/src/debug.rs | 12 ++- mini-lsm/src/tests/harness.rs | 24 +++--- mini-lsm/src/tests/week2_day1.rs | 119 +++++++++++++++++++++------- 7 files changed, 153 insertions(+), 59 deletions(-) diff --git a/mini-lsm-book/src/week3-overview.md b/mini-lsm-book/src/week3-overview.md index 48440ac..1986d87 100644 --- a/mini-lsm-book/src/week3-overview.md +++ b/mini-lsm-book/src/week3-overview.md @@ -2,4 +2,11 @@ In this part, you will implement MVCC over the LSM engine that you have built in the previous two weeks. We will add timestamp encoding in the keys to maintain multiple versions of a key, and change some part of the engine to ensure old data are either retained or garbage-collected based on whether there are users reading an old version. +1. Use the new key module +2. Refactor until no compile error +3. Use correct key ranges, add timestamp for engine +4. Memtable refactor +5. LsmIterator use read_ts +6. Compaction no delete + {{#include copyright.md}} diff --git a/mini-lsm-mvcc/src/compact.rs b/mini-lsm-mvcc/src/compact.rs index e21f6f0..9140f9e 100644 --- a/mini-lsm-mvcc/src/compact.rs +++ b/mini-lsm-mvcc/src/compact.rs @@ -115,26 +115,22 @@ impl LsmStorageInner { fn compact_generate_sst_from_iter( &self, mut iter: impl for<'a> StorageIterator = KeySlice<'a>>, - compact_to_bottom_level: bool, + _compact_to_bottom_level: bool, ) -> Result>> { let mut builder = None; let mut new_sst = Vec::new(); + let mut last_key = Vec::::new(); while iter.is_valid() { if builder.is_none() { builder = Some(SsTableBuilder::new(self.options.block_size)); } let builder_inner = builder.as_mut().unwrap(); - if compact_to_bottom_level { - if !iter.value().is_empty() { - builder_inner.add(iter.key(), iter.value()); - } - } else { - builder_inner.add(iter.key(), iter.value()); - } - iter.next()?; + builder_inner.add(iter.key(), iter.value()); - if builder_inner.estimated_size() >= self.options.target_sst_size { + let same_as_last_key = iter.key().key_ref() == &last_key; + + if builder_inner.estimated_size() >= self.options.target_sst_size && !same_as_last_key { let sst_id = self.next_sst_id(); let builder = builder.take().unwrap(); let sst = Arc::new(builder.build( @@ -144,6 +140,12 @@ impl LsmStorageInner { )?); new_sst.push(sst); } + + if !same_as_last_key { + last_key.clear(); + last_key.extend(iter.key().key_ref()); + } + iter.next()?; } if let Some(builder) = builder { let sst_id = self.next_sst_id(); // lock dropped here @@ -253,6 +255,7 @@ impl LsmStorageInner { let CompactionOptions::NoCompaction = self.options.compaction_options else { panic!("full compaction can only be called with compaction is not enabled") }; + let snapshot = { let state = self.state.read(); state.clone() @@ -264,6 +267,9 @@ impl LsmStorageInner { l0_sstables: l0_sstables.clone(), l1_sstables: l1_sstables.clone(), }; + + println!("force full compaction: {:?}", compaction_task); + let sstables = self.compact(&compaction_task)?; { @@ -294,10 +300,14 @@ impl LsmStorageInner { for sst in l0_sstables.iter().chain(l1_sstables.iter()) { std::fs::remove_file(self.path_of_sst(*sst))?; } + + println!("force full compaction done"); + Ok(()) } fn trigger_compaction(&self) -> Result<()> { + self.dump_structure(); let snapshot = { let state = self.state.read(); state.clone() diff --git a/mini-lsm-mvcc/src/debug.rs b/mini-lsm-mvcc/src/debug.rs index 76702de..c9eab3d 100644 --- a/mini-lsm-mvcc/src/debug.rs +++ b/mini-lsm-mvcc/src/debug.rs @@ -1,8 +1,8 @@ -use crate::lsm_storage::MiniLsm; +use crate::lsm_storage::{LsmStorageInner, MiniLsm}; -impl MiniLsm { +impl LsmStorageInner { pub fn dump_structure(&self) { - let snapshot = self.inner.state.read(); + let snapshot = self.state.read(); if !snapshot.l0_sstables.is_empty() { println!( "L0 ({}): {:?}", @@ -15,3 +15,9 @@ impl MiniLsm { } } } + +impl MiniLsm { + pub fn dump_structure(&self) { + self.inner.dump_structure() + } +} diff --git a/mini-lsm-mvcc/src/tests.rs b/mini-lsm-mvcc/src/tests.rs index 45f80b8..fb295e9 100644 --- a/mini-lsm-mvcc/src/tests.rs +++ b/mini-lsm-mvcc/src/tests.rs @@ -6,7 +6,7 @@ mod week1_day4; mod week1_day5; mod week1_day6; mod week1_day7; -// mod week2_day1; -// mod week2_day2; -// mod week2_day3; -// mod week2_day4; +mod week2_day1; +mod week2_day2; +mod week2_day3; +mod week2_day4; diff --git a/mini-lsm/src/debug.rs b/mini-lsm/src/debug.rs index 76702de..c9eab3d 100644 --- a/mini-lsm/src/debug.rs +++ b/mini-lsm/src/debug.rs @@ -1,8 +1,8 @@ -use crate::lsm_storage::MiniLsm; +use crate::lsm_storage::{LsmStorageInner, MiniLsm}; -impl MiniLsm { +impl LsmStorageInner { pub fn dump_structure(&self) { - let snapshot = self.inner.state.read(); + let snapshot = self.state.read(); if !snapshot.l0_sstables.is_empty() { println!( "L0 ({}): {:?}", @@ -15,3 +15,9 @@ impl MiniLsm { } } } + +impl MiniLsm { + pub fn dump_structure(&self) { + self.inner.dump_structure() + } +} diff --git a/mini-lsm/src/tests/harness.rs b/mini-lsm/src/tests/harness.rs index ebbca69..3986651 100644 --- a/mini-lsm/src/tests/harness.rs +++ b/mini-lsm/src/tests/harness.rs @@ -288,7 +288,7 @@ pub fn check_compaction_ratio(storage: Arc) { num_tiers, max_size_amplification_percent, size_ratio, - .. + min_merge_width, }) => { let size_ratio_trigger = (100.0 + size_ratio as f64) / 100.0; assert_eq!(l0_sst_num, 0); @@ -296,20 +296,22 @@ pub fn check_compaction_ratio(storage: Arc) { let mut sum_size = level_size[0]; for idx in 1..level_size.len() { let this_size = level_size[idx]; - assert!( - sum_size as f64 / this_size as f64 <= size_ratio_trigger, - "sum(⬆️L{})/L{}, {}/{}>{}", - state.levels[idx - 1].0, - state.levels[idx].0, - sum_size, - this_size, - size_ratio_trigger - ); + if level_size.len() > min_merge_width { + assert!( + sum_size as f64 / this_size as f64 <= size_ratio_trigger, + "violation of size ratio: sum(⬆️L{})/L{}, {}/{}>{}", + state.levels[idx - 1].0, + state.levels[idx].0, + sum_size, + this_size, + size_ratio_trigger + ); + } if idx + 1 == level_size.len() { assert!( sum_size as f64 / this_size as f64 <= max_size_amplification_percent as f64 / 100.0, - "sum(⬆️L{})/L{}, {}/{}>{}%", + "violation of space amp: sum(⬆️L{})/L{}, {}/{}>{}%", state.levels[idx - 1].0, state.levels[idx].0, sum_size, diff --git a/mini-lsm/src/tests/week2_day1.rs b/mini-lsm/src/tests/week2_day1.rs index 6820569..70fec22 100644 --- a/mini-lsm/src/tests/week2_day1.rs +++ b/mini-lsm/src/tests/week2_day1.rs @@ -9,7 +9,7 @@ use crate::{ iterators::{ concat_iterator::SstConcatIterator, merge_iterator::MergeIterator, StorageIterator, }, - key::KeySlice, + key::{KeySlice, TS_ENABLED}, lsm_storage::{LsmStorageInner, LsmStorageOptions, LsmStorageState}, table::{SsTable, SsTableBuilder, SsTableIterator}, }; @@ -37,6 +37,7 @@ fn construct_merge_iterator_over_storage( #[test] fn test_task1_full_compaction() { + // We do not use LSM iterator in this test because it's implemented as part of task 3 let dir = tempdir().unwrap(); let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); storage.put(b"0", b"v1").unwrap(); @@ -50,45 +51,107 @@ fn test_task1_full_compaction() { sync(&storage); assert_eq!(storage.state.read().l0_sstables.len(), 3); let mut iter = construct_merge_iterator_over_storage(&storage.state.read()); - check_iter_result_by_key( - &mut iter, - vec![ - (Bytes::from_static(b"0"), Bytes::from_static(b"")), - (Bytes::from_static(b"1"), Bytes::from_static(b"v2")), - (Bytes::from_static(b"2"), Bytes::from_static(b"")), - ], - ); + if TS_ENABLED { + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from_static(b"0"), Bytes::from_static(b"")), + (Bytes::from_static(b"0"), Bytes::from_static(b"v2")), + (Bytes::from_static(b"0"), Bytes::from_static(b"v1")), + (Bytes::from_static(b"1"), Bytes::from_static(b"v2")), + (Bytes::from_static(b"2"), Bytes::from_static(b"")), + (Bytes::from_static(b"2"), Bytes::from_static(b"v2")), + ], + ); + } else { + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from_static(b"0"), Bytes::from_static(b"")), + (Bytes::from_static(b"1"), Bytes::from_static(b"v2")), + (Bytes::from_static(b"2"), Bytes::from_static(b"")), + ], + ); + } storage.force_full_compaction().unwrap(); assert!(storage.state.read().l0_sstables.is_empty()); let mut iter = construct_merge_iterator_over_storage(&storage.state.read()); - check_iter_result_by_key( - &mut iter, - vec![(Bytes::from_static(b"1"), Bytes::from_static(b"v2"))], - ); + if TS_ENABLED { + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from_static(b"0"), Bytes::from_static(b"")), + (Bytes::from_static(b"0"), Bytes::from_static(b"v2")), + (Bytes::from_static(b"0"), Bytes::from_static(b"v1")), + (Bytes::from_static(b"1"), Bytes::from_static(b"v2")), + (Bytes::from_static(b"2"), Bytes::from_static(b"")), + (Bytes::from_static(b"2"), Bytes::from_static(b"v2")), + ], + ); + } else { + check_iter_result_by_key( + &mut iter, + vec![(Bytes::from_static(b"1"), Bytes::from_static(b"v2"))], + ); + } storage.put(b"0", b"v3").unwrap(); storage.put(b"2", b"v3").unwrap(); sync(&storage); storage.delete(b"1").unwrap(); sync(&storage); let mut iter = construct_merge_iterator_over_storage(&storage.state.read()); - check_iter_result_by_key( - &mut iter, - vec![ - (Bytes::from_static(b"0"), Bytes::from_static(b"v3")), - (Bytes::from_static(b"1"), Bytes::from_static(b"")), - (Bytes::from_static(b"2"), Bytes::from_static(b"v3")), - ], - ); + if TS_ENABLED { + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from_static(b"0"), Bytes::from_static(b"v3")), + (Bytes::from_static(b"0"), Bytes::from_static(b"")), + (Bytes::from_static(b"0"), Bytes::from_static(b"v2")), + (Bytes::from_static(b"0"), Bytes::from_static(b"v1")), + (Bytes::from_static(b"1"), Bytes::from_static(b"")), + (Bytes::from_static(b"1"), Bytes::from_static(b"v2")), + (Bytes::from_static(b"2"), Bytes::from_static(b"v3")), + (Bytes::from_static(b"2"), Bytes::from_static(b"")), + (Bytes::from_static(b"2"), Bytes::from_static(b"v2")), + ], + ); + } else { + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from_static(b"0"), Bytes::from_static(b"v3")), + (Bytes::from_static(b"1"), Bytes::from_static(b"")), + (Bytes::from_static(b"2"), Bytes::from_static(b"v3")), + ], + ); + } storage.force_full_compaction().unwrap(); assert!(storage.state.read().l0_sstables.is_empty()); let mut iter = construct_merge_iterator_over_storage(&storage.state.read()); - check_iter_result_by_key( - &mut iter, - vec![ - (Bytes::from_static(b"0"), Bytes::from_static(b"v3")), - (Bytes::from_static(b"2"), Bytes::from_static(b"v3")), - ], - ); + if TS_ENABLED { + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from_static(b"0"), Bytes::from_static(b"v3")), + (Bytes::from_static(b"0"), Bytes::from_static(b"")), + (Bytes::from_static(b"0"), Bytes::from_static(b"v2")), + (Bytes::from_static(b"0"), Bytes::from_static(b"v1")), + (Bytes::from_static(b"1"), Bytes::from_static(b"")), + (Bytes::from_static(b"1"), Bytes::from_static(b"v2")), + (Bytes::from_static(b"2"), Bytes::from_static(b"v3")), + (Bytes::from_static(b"2"), Bytes::from_static(b"")), + (Bytes::from_static(b"2"), Bytes::from_static(b"v2")), + ], + ); + } else { + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from_static(b"0"), Bytes::from_static(b"v3")), + (Bytes::from_static(b"2"), Bytes::from_static(b"v3")), + ], + ); + } } fn generate_concat_sst(