From 6025bb8dca7f0ed4b5758f3d0ad6e6b9340baa3f Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Fri, 26 Jan 2024 16:52:37 +0800 Subject: [PATCH] implement mvcc compaction + snapshot Signed-off-by: Alex Chi --- mini-lsm-book/src/week4-overview.md | 3 +- mini-lsm-mvcc/src/compact.rs | 38 ++++++++-- mini-lsm-mvcc/src/lib.rs | 1 + mini-lsm-mvcc/src/lsm_storage.rs | 65 +++++++++++----- mini-lsm-mvcc/src/mvcc.rs | 103 ++++++++++++++++++++++++++ mini-lsm-mvcc/src/mvcc/watermark.rs | 29 ++++++++ mini-lsm-mvcc/src/tests.rs | 2 + mini-lsm-mvcc/src/tests/week2_day5.rs | 1 + mini-lsm-mvcc/src/tests/week2_day6.rs | 1 + mini-lsm-starter/src/lsm_storage.rs | 9 +++ mini-lsm/src/lsm_storage.rs | 9 +++ mini-lsm/src/tests/harness.rs | 55 +++++++++----- mini-lsm/src/tests/week1_day1.rs | 14 ++-- mini-lsm/src/tests/week1_day2.rs | 7 +- mini-lsm/src/tests/week1_day5.rs | 7 +- mini-lsm/src/tests/week1_day6.rs | 11 ++- mini-lsm/src/tests/week2_day1.rs | 8 +- 17 files changed, 300 insertions(+), 63 deletions(-) create mode 100644 mini-lsm-mvcc/src/mvcc.rs create mode 100644 mini-lsm-mvcc/src/mvcc/watermark.rs create mode 120000 mini-lsm-mvcc/src/tests/week2_day5.rs create mode 120000 mini-lsm-mvcc/src/tests/week2_day6.rs diff --git a/mini-lsm-book/src/week4-overview.md b/mini-lsm-book/src/week4-overview.md index 5b474d2..fb67b2b 100644 --- a/mini-lsm-book/src/week4-overview.md +++ b/mini-lsm-book/src/week4-overview.md @@ -17,4 +17,5 @@ This is an advanced part that deep dives into optimizations and applications of | 4.11 | Key-Value Separation | | | | | 4.12 | Column Families | | | | | 4.13 | Sharding | | | | -| 4.14 | SQL over Mini-LSM | | | | +| 4.14 | Compaction Optimizations | | | | +| 4.15 | SQL over Mini-LSM | | | | diff --git a/mini-lsm-mvcc/src/compact.rs b/mini-lsm-mvcc/src/compact.rs index 8613bf2..6e729d3 100644 --- a/mini-lsm-mvcc/src/compact.rs +++ b/mini-lsm-mvcc/src/compact.rs @@ -115,20 +115,45 @@ impl LsmStorageInner { fn compact_generate_sst_from_iter( &self, mut iter: impl for<'a> StorageIterator = KeySlice<'a>>, - _compact_to_bottom_level: bool, + compact_to_bottom_level: bool, ) -> Result>> { let mut builder = None; let mut new_sst = Vec::new(); - + let watermark = self.mvcc().watermark(); let mut last_key = Vec::::new(); + let mut first_key_below_watermark = false; while iter.is_valid() { if builder.is_none() { builder = Some(SsTableBuilder::new(self.options.block_size)); } - let builder_inner = builder.as_mut().unwrap(); - builder_inner.add(iter.key(), iter.value()); let same_as_last_key = iter.key().key_ref() == last_key; + if !same_as_last_key { + first_key_below_watermark = true; + } + + if compact_to_bottom_level + && !same_as_last_key + && iter.key().ts() <= watermark + && iter.value().is_empty() + { + last_key.clear(); + last_key.extend(iter.key().key_ref()); + iter.next()?; + first_key_below_watermark = false; + continue; + } + + if same_as_last_key && iter.key().ts() < watermark { + if !first_key_below_watermark { + iter.next()?; + continue; + } + first_key_below_watermark = false; + } + + let builder_inner = builder.as_mut().unwrap(); + builder_inner.add(iter.key(), iter.value()); if builder_inner.estimated_size() >= self.options.target_sst_size && !same_as_last_key { let sst_id = self.next_sst_id(); @@ -145,6 +170,7 @@ impl LsmStorageInner { last_key.clear(); last_key.extend(iter.key().key_ref()); } + iter.next()?; } if let Some(builder) = builder { @@ -343,9 +369,7 @@ impl LsmStorageInner { *state = Arc::new(snapshot); drop(state); self.sync_dir()?; - self.manifest - .as_ref() - .unwrap() + self.manifest() .add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?; ssts_to_remove }; diff --git a/mini-lsm-mvcc/src/lib.rs b/mini-lsm-mvcc/src/lib.rs index afdfb65..79341ab 100644 --- a/mini-lsm-mvcc/src/lib.rs +++ b/mini-lsm-mvcc/src/lib.rs @@ -7,6 +7,7 @@ pub mod lsm_iterator; pub mod lsm_storage; pub mod manifest; pub mod mem_table; +pub mod mvcc; pub mod table; pub mod wal; diff --git a/mini-lsm-mvcc/src/lsm_storage.rs b/mini-lsm-mvcc/src/lsm_storage.rs index 89d8ca2..8bcc411 100644 --- a/mini-lsm-mvcc/src/lsm_storage.rs +++ b/mini-lsm-mvcc/src/lsm_storage.rs @@ -2,7 +2,7 @@ use std::collections::{BTreeSet, HashMap}; use std::fs::File; use std::ops::Bound; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use anyhow::{Context, Result}; @@ -22,6 +22,7 @@ use crate::key::{self, KeySlice}; use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::manifest::{Manifest, ManifestRecord}; use crate::mem_table::{map_bound, map_key_bound_plus_ts, MemTable}; +use crate::mvcc::{LsmMvccInner, Transaction, TxnIterator}; use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator}; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; @@ -153,8 +154,7 @@ pub(crate) struct LsmStorageInner { pub(crate) options: Arc, pub(crate) compaction_controller: CompactionController, pub(crate) manifest: Option, - pub(crate) ts: Arc, - pub(crate) write_lock: Mutex<()>, + pub(crate) mvcc: Option, } /// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM. @@ -238,6 +238,10 @@ impl MiniLsm { })) } + pub fn new_txn(&self) -> Result> { + self.inner.new_txn() + } + pub fn get(&self, key: &[u8]) -> Result> { self.inner.get(key) } @@ -258,11 +262,7 @@ impl MiniLsm { self.inner.sync() } - pub fn scan( - &self, - lower: Bound<&[u8]>, - upper: Bound<&[u8]>, - ) -> Result> { + pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { self.inner.scan(lower, upper) } @@ -289,6 +289,14 @@ impl LsmStorageInner { .fetch_add(1, std::sync::atomic::Ordering::SeqCst) } + pub(crate) fn mvcc(&self) -> &LsmMvccInner { + self.mvcc.as_ref().unwrap() + } + + pub(crate) fn manifest(&self) -> &Manifest { + self.manifest.as_ref().unwrap() + } + /// Start the storage engine by either loading an existing directory or creating a new one if the directory does /// not exist. pub(crate) fn open(path: impl AsRef, options: LsmStorageOptions) -> Result { @@ -408,8 +416,7 @@ impl LsmStorageInner { compaction_controller, manifest: Some(manifest), options: options.into(), - ts: Arc::new(AtomicU64::new(0)), - write_lock: Mutex::new(()), + mvcc: Some(LsmMvccInner::new(0)), }; storage.sync_dir()?; @@ -420,8 +427,17 @@ impl LsmStorageInner { self.state.read().memtable.sync_wal() } + pub fn new_txn(self: &Arc) -> Result> { + Ok(self.mvcc().new_txn(self.clone())) + } + /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. - pub fn get(&self, key: &[u8]) -> Result> { + pub fn get(self: &Arc, key: &[u8]) -> Result> { + let txn = self.mvcc().new_txn(self.clone()); + txn.get(key) + } + + pub(crate) fn get_with_ts(&self, key: &[u8], read_ts: u64) -> Result> { let snapshot = { let guard = self.state.read(); Arc::clone(&guard) @@ -491,7 +507,7 @@ impl LsmStorageInner { MergeIterator::create(level_iters), )?, Bound::Unbounded, - self.ts.load(Ordering::SeqCst), + read_ts, )?; if iter.is_valid() && iter.key() == key && !iter.value().is_empty() { @@ -501,8 +517,8 @@ impl LsmStorageInner { } pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result<()> { - let _lck = self.write_lock.lock(); - let ts = self.ts.fetch_add(1, Ordering::Relaxed); + let _lck = self.mvcc().write_lock.lock(); + let ts = self.mvcc().latest_commit_ts() + 1; for record in batch { match record { WriteBatchRecord::Del(key) => { @@ -531,6 +547,7 @@ impl LsmStorageInner { } } } + self.mvcc().update_commit_ts(ts); Ok(()) } @@ -608,7 +625,7 @@ impl LsmStorageInner { self.freeze_memtable_with_memtable(memtable)?; - self.manifest.as_ref().unwrap().add_record( + self.manifest().add_record( state_lock_observer, ManifestRecord::NewMemtable(memtable_id), )?; @@ -666,9 +683,7 @@ impl LsmStorageInner { std::fs::remove_file(self.path_of_wal(sst_id))?; } - self.manifest - .as_ref() - .unwrap() + self.manifest() .add_record(&state_lock, ManifestRecord::Flush(sst_id))?; self.sync_dir()?; @@ -677,10 +692,20 @@ impl LsmStorageInner { } /// Create an iterator over a range of keys. - pub fn scan( + pub fn scan<'a>( + self: &'a Arc, + lower: Bound<&[u8]>, + upper: Bound<&[u8]>, + ) -> Result { + let txn = self.mvcc().new_txn(self.clone()); + txn.scan(lower, upper) + } + + pub(crate) fn scan_with_ts( &self, lower: Bound<&[u8]>, upper: Bound<&[u8]>, + read_ts: u64, ) -> Result> { let snapshot = { let guard = self.state.read(); @@ -773,7 +798,7 @@ impl LsmStorageInner { Ok(FusedIterator::new(LsmIterator::new( iter, map_bound(upper), - self.ts.load(Ordering::SeqCst), + read_ts, )?)) } } diff --git a/mini-lsm-mvcc/src/mvcc.rs b/mini-lsm-mvcc/src/mvcc.rs new file mode 100644 index 0000000..526c5ff --- /dev/null +++ b/mini-lsm-mvcc/src/mvcc.rs @@ -0,0 +1,103 @@ +mod watermark; + +use std::{ops::Bound, sync::Arc}; + +use anyhow::Result; +use bytes::Bytes; +use parking_lot::Mutex; + +use crate::{ + iterators::StorageIterator, + lsm_iterator::{FusedIterator, LsmIterator}, + lsm_storage::LsmStorageInner, +}; + +use self::watermark::Watermark; + +pub(crate) struct LsmMvccInner { + pub(crate) write_lock: Mutex<()>, + pub(crate) ts: Arc>, +} + +impl LsmMvccInner { + pub fn new(initial_ts: u64) -> Self { + Self { + write_lock: Mutex::new(()), + ts: Arc::new(Mutex::new((initial_ts, Watermark::new()))), + } + } + + pub fn latest_commit_ts(&self) -> u64 { + self.ts.lock().0 + } + + pub fn update_commit_ts(&self, ts: u64) { + self.ts.lock().0 = ts; + } + + /// All ts (strictly) below this ts can be garbage collected. + pub fn watermark(&self) -> u64 { + let ts = self.ts.lock(); + ts.1.watermark().unwrap_or(ts.0) + } + + pub fn new_txn(&self, inner: Arc) -> Arc { + let mut ts = self.ts.lock(); + let read_ts = ts.0; + ts.1.add_reader(read_ts); + Arc::new(Transaction { inner, read_ts }) + } +} + +pub struct Transaction { + read_ts: u64, + inner: Arc, +} + +impl Transaction { + pub fn get(&self, key: &[u8]) -> Result> { + self.inner.get_with_ts(key, self.read_ts) + } + + pub fn scan(self: &Arc, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { + Ok(TxnIterator { + _txn: self.clone(), + iter: self.inner.scan_with_ts(lower, upper, self.read_ts)?, + }) + } +} + +impl Drop for Transaction { + fn drop(&mut self) { + self.inner.mvcc().ts.lock().1.remove_reader(self.read_ts) + } +} + +pub struct TxnIterator { + _txn: Arc, + iter: FusedIterator, +} + +impl StorageIterator for TxnIterator { + type KeyType<'a> = &'a [u8] where Self: 'a; + + fn value(&self) -> &[u8] { + self.iter.value() + } + + fn key(&self) -> Self::KeyType<'_> { + self.iter.key() + } + + fn is_valid(&self) -> bool { + self.iter.is_valid() + } + + fn next(&mut self) -> Result<()> { + self.iter.next() + } + + fn num_active_iterators(&self) -> usize { + self.iter.num_active_iterators() + } +} diff --git a/mini-lsm-mvcc/src/mvcc/watermark.rs b/mini-lsm-mvcc/src/mvcc/watermark.rs new file mode 100644 index 0000000..dc7fe07 --- /dev/null +++ b/mini-lsm-mvcc/src/mvcc/watermark.rs @@ -0,0 +1,29 @@ +use std::collections::BTreeMap; + +pub struct Watermark { + readers: BTreeMap, +} + +impl Watermark { + pub fn new() -> Self { + Self { + readers: BTreeMap::new(), + } + } + + pub fn add_reader(&mut self, ts: u64) { + *self.readers.entry(ts).or_default() += 1; + } + + pub fn remove_reader(&mut self, ts: u64) { + let cnt = self.readers.get_mut(&ts).unwrap(); + *cnt -= 1; + if *cnt == 0 { + self.readers.remove(&ts); + } + } + + pub fn watermark(&self) -> Option { + self.readers.first_key_value().map(|(ts, _)| *ts) + } +} diff --git a/mini-lsm-mvcc/src/tests.rs b/mini-lsm-mvcc/src/tests.rs index fb295e9..ae2d28a 100644 --- a/mini-lsm-mvcc/src/tests.rs +++ b/mini-lsm-mvcc/src/tests.rs @@ -10,3 +10,5 @@ mod week2_day1; mod week2_day2; mod week2_day3; mod week2_day4; +// mod week2_day5; +// mod week2_day6; diff --git a/mini-lsm-mvcc/src/tests/week2_day5.rs b/mini-lsm-mvcc/src/tests/week2_day5.rs new file mode 120000 index 0000000..468fec0 --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week2_day5.rs @@ -0,0 +1 @@ +../../../mini-lsm/src/tests/week2_day5.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/tests/week2_day6.rs b/mini-lsm-mvcc/src/tests/week2_day6.rs new file mode 120000 index 0000000..bf78fc5 --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week2_day6.rs @@ -0,0 +1 @@ +../../../mini-lsm/src/tests/week2_day6.rs \ No newline at end of file diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index e77dc7b..e2d2702 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -162,6 +162,10 @@ impl MiniLsm { })) } + pub fn new_txn(&self) -> Result<()> { + self.inner.new_txn() + } + pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result<()> { self.inner.write_batch(batch) } @@ -300,6 +304,11 @@ impl LsmStorageInner { unimplemented!() } + pub fn new_txn(&self) -> Result<()> { + // no-op + Ok(()) + } + /// Create an iterator over a range of keys. pub fn scan( &self, diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index 8c499a1..67aa386 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -256,6 +256,10 @@ impl MiniLsm { self.inner.sync() } + pub fn new_txn(&self) -> Result<()> { + self.inner.new_txn() + } + pub fn scan( &self, lower: Bound<&[u8]>, @@ -668,6 +672,11 @@ impl LsmStorageInner { Ok(()) } + pub fn new_txn(&self) -> Result<()> { + // no-op + Ok(()) + } + /// Create an iterator over a range of keys. pub fn scan( &self, diff --git a/mini-lsm/src/tests/harness.rs b/mini-lsm/src/tests/harness.rs index 183fbc6..2339a80 100644 --- a/mini-lsm/src/tests/harness.rs +++ b/mini-lsm/src/tests/harness.rs @@ -184,25 +184,7 @@ pub fn compaction_bench(storage: Arc) { max_key = max_key.max(i); } } - - let mut expected_key_value_pairs = Vec::new(); - for i in 0..(max_key + 40000) { - let key = gen_key(i); - let value = storage.get(key.as_bytes()).unwrap(); - if let Some(val) = key_map.get(&i) { - let expected_value = gen_value(*val); - assert_eq!(value, Some(Bytes::from(expected_value.clone()))); - expected_key_value_pairs.push((Bytes::from(key), Bytes::from(expected_value))); - } else { - assert!(value.is_none()); - } - } - - check_lsm_iter_result_by_key( - &mut storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(), - expected_key_value_pairs, - ); - + std::thread::sleep(Duration::from_secs(1)); // wait until all memtables flush while { let snapshot = storage.inner.state.read(); !snapshot.imm_memtables.is_empty() @@ -222,6 +204,24 @@ pub fn compaction_bench(storage: Arc) { println!("waiting for compaction to converge"); } + let mut expected_key_value_pairs = Vec::new(); + for i in 0..(max_key + 40000) { + let key = gen_key(i); + let value = storage.get(key.as_bytes()).unwrap(); + if let Some(val) = key_map.get(&i) { + let expected_value = gen_value(*val); + assert_eq!(value, Some(Bytes::from(expected_value.clone()))); + expected_key_value_pairs.push((Bytes::from(key), Bytes::from(expected_value))); + } else { + assert!(value.is_none()); + } + } + + check_lsm_iter_result_by_key( + &mut storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(), + expected_key_value_pairs, + ); + storage.dump_structure(); println!("This test case does not guarantee your compaction algorithm produces a LSM state as expected. It only does minimal checks on the size of the levels. Please use the compaction simulator to check if the compaction is correctly going on."); @@ -243,6 +243,11 @@ pub fn check_compaction_ratio(storage: Arc) { }; level_size.push(size); } + let num_iters = storage + .scan(Bound::Unbounded, Bound::Unbounded) + .unwrap() + .num_active_iterators(); + let num_memtables = storage.inner.state.read().imm_memtables.len() + 1; match compaction_options { CompactionOptions::NoCompaction => unreachable!(), CompactionOptions::Simple(SimpleLeveledCompactionOptions { @@ -268,6 +273,10 @@ pub fn check_compaction_ratio(storage: Arc) { size_ratio_percent ); } + assert!( + num_iters <= l0_sst_num + num_memtables + max_levels, + "did you use concat iterators?" + ); } CompactionOptions::Leveled(LeveledCompactionOptions { level_size_multiplier, @@ -291,6 +300,10 @@ pub fn check_compaction_ratio(storage: Arc) { level_size_multiplier ); } + assert!( + num_iters <= l0_sst_num + num_memtables + max_levels, + "did you use concat iterators?" + ); } CompactionOptions::Tiered(TieredCompactionOptions { num_tiers, @@ -329,6 +342,10 @@ pub fn check_compaction_ratio(storage: Arc) { } sum_size += this_size; } + assert!( + num_iters <= num_memtables + num_tiers, + "did you use concat iterators?" + ); } } } diff --git a/mini-lsm/src/tests/week1_day1.rs b/mini-lsm/src/tests/week1_day1.rs index e7b1f5d..bce740f 100644 --- a/mini-lsm/src/tests/week1_day1.rs +++ b/mini-lsm/src/tests/week1_day1.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use tempfile::tempdir; use crate::{ @@ -51,8 +53,9 @@ fn test_task1_memtable_overwrite() { #[test] fn test_task2_storage_integration() { let dir = tempdir().unwrap(); - let storage = - LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(); + let storage = Arc::new( + LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(), + ); assert_eq!(&storage.get(b"0").unwrap(), &None); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); @@ -99,7 +102,7 @@ fn test_task3_freeze_on_capacity() { let mut options = LsmStorageOptions::default_for_week1_test(); options.target_sst_size = 1024; options.num_memtable_limit = 1000; - let storage = LsmStorageInner::open(dir.path(), options).unwrap(); + let storage = Arc::new(LsmStorageInner::open(dir.path(), options).unwrap()); for _ in 0..1000 { storage.put(b"1", b"2333").unwrap(); } @@ -117,8 +120,9 @@ fn test_task3_freeze_on_capacity() { #[test] fn test_task4_storage_integration() { let dir = tempdir().unwrap(); - let storage = - LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(); + let storage = Arc::new( + LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(), + ); assert_eq!(&storage.get(b"0").unwrap(), &None); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); diff --git a/mini-lsm/src/tests/week1_day2.rs b/mini-lsm/src/tests/week1_day2.rs index 0c0aca1..6c1b002 100644 --- a/mini-lsm/src/tests/week1_day2.rs +++ b/mini-lsm/src/tests/week1_day2.rs @@ -1,4 +1,4 @@ -use std::ops::Bound; +use std::{ops::Bound, sync::Arc}; use bytes::Bytes; use tempfile::tempdir; @@ -262,8 +262,9 @@ fn test_task3_fused_iterator() { #[test] fn test_task4_integration() { let dir = tempdir().unwrap(); - let storage = - LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(); + let storage = Arc::new( + LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(), + ); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.put(b"3", b"23333").unwrap(); diff --git a/mini-lsm/src/tests/week1_day5.rs b/mini-lsm/src/tests/week1_day5.rs index 6e7dc40..93706c2 100644 --- a/mini-lsm/src/tests/week1_day5.rs +++ b/mini-lsm/src/tests/week1_day5.rs @@ -1,4 +1,5 @@ use std::ops::Bound; +use std::sync::Arc; use self::harness::{check_iter_result_by_key, MockIterator}; use self::harness::{check_lsm_iter_result_by_key, generate_sst}; @@ -130,7 +131,8 @@ fn test_task1_merge_5() { #[test] fn test_task2_storage_scan() { let dir = tempdir().unwrap(); - let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); + let storage = + Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap()); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.put(b"00", b"2333").unwrap(); @@ -190,7 +192,8 @@ fn test_task2_storage_scan() { #[test] fn test_task3_storage_get() { let dir = tempdir().unwrap(); - let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); + let storage = + Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap()); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.put(b"00", b"2333").unwrap(); diff --git a/mini-lsm/src/tests/week1_day6.rs b/mini-lsm/src/tests/week1_day6.rs index 969cd2f..a435d75 100644 --- a/mini-lsm/src/tests/week1_day6.rs +++ b/mini-lsm/src/tests/week1_day6.rs @@ -1,4 +1,4 @@ -use std::{ops::Bound, time::Duration}; +use std::{ops::Bound, sync::Arc, time::Duration}; use bytes::Bytes; use tempfile::tempdir; @@ -14,7 +14,8 @@ use crate::{ #[test] fn test_task1_storage_scan() { let dir = tempdir().unwrap(); - let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); + let storage = + Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap()); storage.put(b"0", b"2333333").unwrap(); storage.put(b"00", b"2333333").unwrap(); storage.put(b"4", b"23").unwrap(); @@ -67,7 +68,8 @@ fn test_task1_storage_scan() { #[test] fn test_task1_storage_get() { let dir = tempdir().unwrap(); - let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); + let storage = + Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap()); storage.put(b"0", b"2333333").unwrap(); storage.put(b"00", b"2333333").unwrap(); storage.put(b"4", b"23").unwrap(); @@ -137,7 +139,8 @@ fn test_task2_auto_flush() { #[test] fn test_task3_sst_filter() { let dir = tempdir().unwrap(); - let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); + let storage = + Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap()); for i in 1..=10000 { if i % 1000 == 0 { diff --git a/mini-lsm/src/tests/week2_day1.rs b/mini-lsm/src/tests/week2_day1.rs index 70fec22..74ce57d 100644 --- a/mini-lsm/src/tests/week2_day1.rs +++ b/mini-lsm/src/tests/week2_day1.rs @@ -39,7 +39,10 @@ fn construct_merge_iterator_over_storage( fn test_task1_full_compaction() { // We do not use LSM iterator in this test because it's implemented as part of task 3 let dir = tempdir().unwrap(); - let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); + let storage = + Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap()); + #[allow(clippy::let_unit_value)] + let _txn = storage.new_txn().unwrap(); storage.put(b"0", b"v1").unwrap(); sync(&storage); storage.put(b"0", b"v2").unwrap(); @@ -211,7 +214,8 @@ fn test_task2_concat_iterator() { #[test] fn test_task3_integration() { let dir = tempdir().unwrap(); - let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap(); + let storage = + Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap()); storage.put(b"0", b"2333333").unwrap(); storage.put(b"00", b"2333333").unwrap(); storage.put(b"4", b"23").unwrap();