diff --git a/mini-lsm-mvcc/src/compact.rs b/mini-lsm-mvcc/src/compact.rs index ab09c1d..06df389 100644 --- a/mini-lsm-mvcc/src/compact.rs +++ b/mini-lsm-mvcc/src/compact.rs @@ -144,7 +144,7 @@ impl LsmStorageInner { continue; } - if same_as_last_key && iter.key().ts() < watermark { + if same_as_last_key && iter.key().ts() <= watermark { if !first_key_below_watermark { iter.next()?; continue; @@ -155,7 +155,7 @@ impl LsmStorageInner { let builder_inner = builder.as_mut().unwrap(); if builder_inner.estimated_size() >= self.options.target_sst_size && !same_as_last_key { - let sst_id: usize = self.next_sst_id(); + let sst_id = self.next_sst_id(); let old_builder = builder.take().unwrap(); let sst = Arc::new(old_builder.build( sst_id, @@ -300,22 +300,22 @@ impl LsmStorageInner { println!("force full compaction: {:?}", compaction_task); let sstables = self.compact(&compaction_task)?; + let mut ids = Vec::with_capacity(sstables.len()); { - let _state_lock = self.state_lock.lock(); + let state_lock = self.state_lock.lock(); let mut state = self.state.read().as_ref().clone(); for sst in l0_sstables.iter().chain(l1_sstables.iter()) { let result = state.sstables.remove(sst); assert!(result.is_some()); } - let mut ids = Vec::with_capacity(sstables.len()); for new_sst in sstables { ids.push(new_sst.sst_id()); let result = state.sstables.insert(new_sst.sst_id(), new_sst); assert!(result.is_none()); } assert_eq!(l1_sstables, state.levels[0].1); - state.levels[0].1 = ids; + state.levels[0].1 = ids.clone(); let mut l0_sstables_map = l0_sstables.iter().copied().collect::>(); state.l0_sstables = state .l0_sstables @@ -325,12 +325,17 @@ impl LsmStorageInner { .collect::>(); assert!(l0_sstables_map.is_empty()); *self.state.write() = Arc::new(state); + self.sync_dir()?; + self.manifest.as_ref().unwrap().add_record( + &state_lock, + ManifestRecord::Compaction(compaction_task, ids.clone()), + )?; } for sst in l0_sstables.iter().chain(l1_sstables.iter()) { std::fs::remove_file(self.path_of_sst(*sst))?; } - println!("force full compaction done"); + println!("force full compaction done, new SSTs: {:?}", ids); Ok(()) } diff --git a/mini-lsm-mvcc/src/lsm_storage.rs b/mini-lsm-mvcc/src/lsm_storage.rs index 162e41e..7eda97b 100644 --- a/mini-lsm-mvcc/src/lsm_storage.rs +++ b/mini-lsm-mvcc/src/lsm_storage.rs @@ -773,7 +773,7 @@ impl LsmStorageInner { table, KeySlice::from_slice(key, key::TS_RANGE_BEGIN), )?; - if iter.is_valid() && iter.key().key_ref() == key { + while iter.is_valid() && iter.key().key_ref() == key { iter.next()?; } iter @@ -811,7 +811,7 @@ impl LsmStorageInner { level_ssts, KeySlice::from_slice(key, key::TS_RANGE_BEGIN), )?; - if iter.is_valid() && iter.key().key_ref() == key { + while iter.is_valid() && iter.key().key_ref() == key { iter.next()?; } iter diff --git a/mini-lsm-mvcc/src/mvcc.rs b/mini-lsm-mvcc/src/mvcc.rs index 4c93b11..81793be 100644 --- a/mini-lsm-mvcc/src/mvcc.rs +++ b/mini-lsm-mvcc/src/mvcc.rs @@ -1,5 +1,5 @@ pub mod txn; -mod watermark; +pub mod watermark; use std::{ collections::{BTreeMap, HashSet}, diff --git a/mini-lsm-mvcc/src/tests.rs b/mini-lsm-mvcc/src/tests.rs index 7d4862b..f4851f6 100644 --- a/mini-lsm-mvcc/src/tests.rs +++ b/mini-lsm-mvcc/src/tests.rs @@ -15,3 +15,4 @@ mod week2_day6; mod week3_day1; mod week3_day2; mod week3_day3; +mod week3_day4; diff --git a/mini-lsm-mvcc/src/tests/week3_day3.rs b/mini-lsm-mvcc/src/tests/week3_day3.rs index 391ae17..a0b2532 100644 --- a/mini-lsm-mvcc/src/tests/week3_day3.rs +++ b/mini-lsm-mvcc/src/tests/week3_day3.rs @@ -247,6 +247,18 @@ fn test_task2_lsm_iterator_mvcc() { (Bytes::from("c"), Bytes::from("5")), ], ); + check_lsm_iter_result_by_key( + &mut snapshot6 + .scan(Bound::Included(b"a"), Bound::Included(b"a")) + .unwrap(), + vec![(Bytes::from("a"), Bytes::from("4"))], + ); + check_lsm_iter_result_by_key( + &mut snapshot6 + .scan(Bound::Excluded(b"a"), Bound::Excluded(b"c")) + .unwrap(), + vec![], + ); } #[test] diff --git a/mini-lsm-mvcc/src/tests/week3_day4.rs b/mini-lsm-mvcc/src/tests/week3_day4.rs new file mode 100644 index 0000000..f0e6326 --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week3_day4.rs @@ -0,0 +1,188 @@ +use std::ops::Bound; + +use bytes::Bytes; +use tempfile::tempdir; + +use crate::{ + compact::CompactionOptions, + key::KeySlice, + lsm_storage::{LsmStorageOptions, MiniLsm, WriteBatchRecord}, + mvcc::watermark::Watermark, + table::SsTableBuilder, + tests::harness::check_lsm_iter_result_by_key, +}; + +use super::harness::{check_iter_result_by_key, construct_merge_iterator_over_storage}; + +#[test] +fn test_task1_watermark() { + let mut watermark = Watermark::new(); + watermark.add_reader(0); + for i in 1..=1000 { + watermark.add_reader(i); + assert_eq!(watermark.watermark(), Some(0)); + assert_eq!(watermark.num_retained_snapshots(), i as usize + 1); + } + let mut cnt = 1001; + for i in 0..500 { + watermark.remove_reader(i); + assert_eq!(watermark.watermark(), Some(i + 1)); + cnt -= 1; + assert_eq!(watermark.num_retained_snapshots(), cnt); + } + for i in (501..=1000).rev() { + watermark.remove_reader(i); + assert_eq!(watermark.watermark(), Some(500)); + cnt -= 1; + assert_eq!(watermark.num_retained_snapshots(), cnt); + } + watermark.remove_reader(500); + assert_eq!(watermark.watermark(), None); + assert_eq!(watermark.num_retained_snapshots(), 0); + watermark.add_reader(2000); + watermark.add_reader(2000); + watermark.add_reader(2001); + assert_eq!(watermark.num_retained_snapshots(), 2); + assert_eq!(watermark.watermark(), Some(2000)); + watermark.remove_reader(2000); + assert_eq!(watermark.num_retained_snapshots(), 2); + assert_eq!(watermark.watermark(), Some(2000)); + watermark.remove_reader(2000); + assert_eq!(watermark.num_retained_snapshots(), 1); + assert_eq!(watermark.watermark(), Some(2001)); +} + +#[test] +fn test_task2_snapshot_watermark() { + let dir = tempdir().unwrap(); + let options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction); + let storage = MiniLsm::open(&dir, options.clone()).unwrap(); + let txn1 = storage.new_txn().unwrap(); + let txn2 = storage.new_txn().unwrap(); + storage.put(b"233", b"23333").unwrap(); + let txn3 = storage.new_txn().unwrap(); + assert_eq!(storage.inner.mvcc().watermark(), txn1.read_ts); + drop(txn1); + assert_eq!(storage.inner.mvcc().watermark(), txn2.read_ts); + drop(txn2); + assert_eq!(storage.inner.mvcc().watermark(), txn3.read_ts); + drop(txn3); + assert_eq!( + storage.inner.mvcc().watermark(), + storage.inner.mvcc().latest_commit_ts() + ); +} + +#[test] +fn test_task3_mvcc_compaction() { + let dir = tempdir().unwrap(); + let options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction); + let storage = MiniLsm::open(&dir, options.clone()).unwrap(); + let snapshot0 = storage.new_txn().unwrap(); + storage + .write_batch(&[ + WriteBatchRecord::Put(b"a", b"1"), + WriteBatchRecord::Put(b"b", b"1"), + ]) + .unwrap(); + let snapshot1 = storage.new_txn().unwrap(); + storage + .write_batch(&[ + WriteBatchRecord::Put(b"a", b"2"), + WriteBatchRecord::Put(b"d", b"2"), + ]) + .unwrap(); + let snapshot2 = storage.new_txn().unwrap(); + storage + .write_batch(&[ + WriteBatchRecord::Put(b"a", b"3"), + WriteBatchRecord::Del(b"d"), + ]) + .unwrap(); + let snapshot3 = storage.new_txn().unwrap(); + storage + .write_batch(&[ + WriteBatchRecord::Put(b"c", b"4"), + WriteBatchRecord::Del(b"a"), + ]) + .unwrap(); + + storage.force_flush().unwrap(); + storage.force_full_compaction().unwrap(); + + let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read()); + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from("a"), Bytes::new()), + (Bytes::from("a"), Bytes::from("3")), + (Bytes::from("a"), Bytes::from("2")), + (Bytes::from("a"), Bytes::from("1")), + (Bytes::from("b"), Bytes::from("1")), + (Bytes::from("c"), Bytes::from("4")), + (Bytes::from("d"), Bytes::new()), + (Bytes::from("d"), Bytes::from("2")), + ], + ); + + drop(snapshot0); + storage.force_full_compaction().unwrap(); + + let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read()); + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from("a"), Bytes::new()), + (Bytes::from("a"), Bytes::from("3")), + (Bytes::from("a"), Bytes::from("2")), + (Bytes::from("a"), Bytes::from("1")), + (Bytes::from("b"), Bytes::from("1")), + (Bytes::from("c"), Bytes::from("4")), + (Bytes::from("d"), Bytes::new()), + (Bytes::from("d"), Bytes::from("2")), + ], + ); + + drop(snapshot1); + storage.force_full_compaction().unwrap(); + + let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read()); + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from("a"), Bytes::new()), + (Bytes::from("a"), Bytes::from("3")), + (Bytes::from("a"), Bytes::from("2")), + (Bytes::from("b"), Bytes::from("1")), + (Bytes::from("c"), Bytes::from("4")), + (Bytes::from("d"), Bytes::new()), + (Bytes::from("d"), Bytes::from("2")), + ], + ); + + drop(snapshot2); + storage.force_full_compaction().unwrap(); + + let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read()); + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from("a"), Bytes::new()), + (Bytes::from("a"), Bytes::from("3")), + (Bytes::from("b"), Bytes::from("1")), + (Bytes::from("c"), Bytes::from("4")), + ], + ); + + drop(snapshot3); + storage.force_full_compaction().unwrap(); + + let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read()); + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from("b"), Bytes::from("1")), + (Bytes::from("c"), Bytes::from("4")), + ], + ); +} diff --git a/mini-lsm/src/tests/harness.rs b/mini-lsm/src/tests/harness.rs index d55b32a..6fbb19f 100644 --- a/mini-lsm/src/tests/harness.rs +++ b/mini-lsm/src/tests/harness.rs @@ -11,10 +11,10 @@ use crate::{ CompactionOptions, LeveledCompactionOptions, SimpleLeveledCompactionOptions, TieredCompactionOptions, }, - iterators::StorageIterator, + iterators::{merge_iterator::MergeIterator, StorageIterator}, key::{KeySlice, TS_ENABLED}, - lsm_storage::{BlockCache, LsmStorageInner, MiniLsm}, - table::{SsTable, SsTableBuilder}, + lsm_storage::{BlockCache, LsmStorageInner, LsmStorageState, MiniLsm}, + table::{SsTable, SsTableBuilder, SsTableIterator}, }; #[derive(Clone)] @@ -411,3 +411,24 @@ pub fn dump_files_in_dir(path: impl AsRef) { ); } } + +pub fn construct_merge_iterator_over_storage( + state: &LsmStorageState, +) -> MergeIterator { + let mut iters = Vec::new(); + for t in &state.l0_sstables { + iters.push(Box::new( + SsTableIterator::create_and_seek_to_first(state.sstables.get(t).cloned().unwrap()) + .unwrap(), + )); + } + for (_, files) in &state.levels { + for f in files { + iters.push(Box::new( + SsTableIterator::create_and_seek_to_first(state.sstables.get(f).cloned().unwrap()) + .unwrap(), + )); + } + } + MergeIterator::create(iters) +} diff --git a/mini-lsm/src/tests/week2_day1.rs b/mini-lsm/src/tests/week2_day1.rs index 74ce57d..0124968 100644 --- a/mini-lsm/src/tests/week2_day1.rs +++ b/mini-lsm/src/tests/week2_day1.rs @@ -3,6 +3,7 @@ use std::{ops::Bound, path::Path, sync::Arc}; use self::harness::{check_iter_result_by_key, check_lsm_iter_result_by_key, sync}; use bytes::Bytes; use tempfile::tempdir; +use week2_day1::harness::construct_merge_iterator_over_storage; use super::*; use crate::{ @@ -14,27 +15,6 @@ use crate::{ table::{SsTable, SsTableBuilder, SsTableIterator}, }; -fn construct_merge_iterator_over_storage( - state: &LsmStorageState, -) -> MergeIterator { - let mut iters = Vec::new(); - for t in &state.l0_sstables { - iters.push(Box::new( - SsTableIterator::create_and_seek_to_first(state.sstables.get(t).cloned().unwrap()) - .unwrap(), - )); - } - for (_, files) in &state.levels { - for f in files { - iters.push(Box::new( - SsTableIterator::create_and_seek_to_first(state.sstables.get(f).cloned().unwrap()) - .unwrap(), - )); - } - } - MergeIterator::create(iters) -} - #[test] fn test_task1_full_compaction() { // We do not use LSM iterator in this test because it's implemented as part of task 3