diff --git a/mini-lsm-book/src/week3-02-snapshot-read-part-1.md b/mini-lsm-book/src/week3-02-snapshot-read-part-1.md index 6dc1f1c..f8ce35a 100644 --- a/mini-lsm-book/src/week3-02-snapshot-read-part-1.md +++ b/mini-lsm-book/src/week3-02-snapshot-read-part-1.md @@ -1,3 +1,7 @@ # Snapshot Read - Memtables and SSTs During the refactor, you might need to change the signature of some functions from `&self` to `self: &Arc` as necessary. + +## MemTable + +## WAL diff --git a/mini-lsm-book/src/week3-overview.md b/mini-lsm-book/src/week3-overview.md index 7703ab7..b16aa05 100644 --- a/mini-lsm-book/src/week3-overview.md +++ b/mini-lsm-book/src/week3-overview.md @@ -33,7 +33,7 @@ We have 7 chapters (days) in this part: * [Day 1: Timestamp Key Refactor](./week3-01-ts-key-refactor.md). You will change the `key` module to the MVCC one and refactor your system to use key with timestamp. * [Day 2: Snapshot Read - Memtables and SSTs](./week3-02-snapshot-read-part-1.md). You will refactor the memtable and the SST format to support multiple version. -* [Day 3: Snapshot Read - Engine Read Path](./week3-03-snapshot-read-part-2.md). You will implement a timestamp oracle to assign the timestamps and add multi-version support to the LSM engine by using the transaction API. +* [Day 3: Snapshot Read - Transaction API](./week3-03-snapshot-read-part-2.md). You will implement a timestamp oracle to assign the timestamps and add multi-version support to the LSM engine by using the transaction API. * [Day 4: Watermark and Garbage Collection](./week3-04-watermark.md). You will implement the watermark computation algorithm and implement garbage collection at compaction time to remove old versions. * [Day 5: Transaction and Optimistic Concurrency Control](./week3-05-txn-occ.md). You will create a private workspace for all transactions and commit them in batch so that the modifications of a transaction will not be visible to other transactions. * [Day 6: Serializable Snapshot Isolation](./week3-06-serializable.md). You will implement the OCC serializable checks to ensure the modifications to the database is serializable and abort transactions that violates serializability. diff --git a/mini-lsm-mvcc/src/debug.rs b/mini-lsm-mvcc/src/debug.rs deleted file mode 100644 index c9eab3d..0000000 --- a/mini-lsm-mvcc/src/debug.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::lsm_storage::{LsmStorageInner, MiniLsm}; - -impl LsmStorageInner { - pub fn dump_structure(&self) { - let snapshot = self.state.read(); - if !snapshot.l0_sstables.is_empty() { - println!( - "L0 ({}): {:?}", - snapshot.l0_sstables.len(), - snapshot.l0_sstables, - ); - } - for (level, files) in &snapshot.levels { - println!("L{level} ({}): {:?}", files.len(), files); - } - } -} - -impl MiniLsm { - pub fn dump_structure(&self) { - self.inner.dump_structure() - } -} diff --git a/mini-lsm-mvcc/src/debug.rs b/mini-lsm-mvcc/src/debug.rs new file mode 120000 index 0000000..4fc02d6 --- /dev/null +++ b/mini-lsm-mvcc/src/debug.rs @@ -0,0 +1 @@ +../../mini-lsm-starter/src/debug.rs \ No newline at end of file diff --git a/mini-lsm-mvcc/src/iterators/tests/merge_iterator_test.rs b/mini-lsm-mvcc/src/iterators/tests/merge_iterator_test.rs deleted file mode 100644 index e063911..0000000 --- a/mini-lsm-mvcc/src/iterators/tests/merge_iterator_test.rs +++ /dev/null @@ -1,137 +0,0 @@ -use super::*; -use crate::iterators::merge_iterator::MergeIterator; - -fn as_bytes(x: &[u8]) -> Bytes { - Bytes::copy_from_slice(x) -} - -fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { - let mut iter = iter; - for (k, v) in expected { - assert!(iter.is_valid()); - assert_eq!( - k, - iter.key(), - "expected key: {:?}, actual key: {:?}", - k, - as_bytes(iter.key()), - ); - assert_eq!( - v, - iter.value(), - "expected value: {:?}, actual value: {:?}", - v, - as_bytes(iter.value()), - ); - iter.next().unwrap(); - } - assert!(!iter.is_valid()); -} - -#[test] -fn test_merge_1() { - let i1 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - ]); - let i2 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.2")), - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let i3 = MockIterator::new(vec![ - (Bytes::from("b"), Bytes::from("2.3")), - (Bytes::from("c"), Bytes::from("3.3")), - (Bytes::from("d"), Bytes::from("4.3")), - ]); - - let iter = MergeIterator::create(vec![ - Box::new(i1.clone()), - Box::new(i2.clone()), - Box::new(i3.clone()), - ]); - - check_iter_result( - iter, - vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ); - - let iter = MergeIterator::create(vec![Box::new(i3), Box::new(i1), Box::new(i2)]); - - check_iter_result( - iter, - vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.3")), - (Bytes::from("c"), Bytes::from("3.3")), - (Bytes::from("d"), Bytes::from("4.3")), - ], - ); -} - -#[test] -fn test_merge_2() { - let i1 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - ]); - let i2 = MockIterator::new(vec![ - (Bytes::from("d"), Bytes::from("1.2")), - (Bytes::from("e"), Bytes::from("2.2")), - (Bytes::from("f"), Bytes::from("3.2")), - (Bytes::from("g"), Bytes::from("4.2")), - ]); - let i3 = MockIterator::new(vec![ - (Bytes::from("h"), Bytes::from("1.3")), - (Bytes::from("i"), Bytes::from("2.3")), - (Bytes::from("j"), Bytes::from("3.3")), - (Bytes::from("k"), Bytes::from("4.3")), - ]); - let i4 = MockIterator::new(vec![]); - let result = vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - (Bytes::from("d"), Bytes::from("1.2")), - (Bytes::from("e"), Bytes::from("2.2")), - (Bytes::from("f"), Bytes::from("3.2")), - (Bytes::from("g"), Bytes::from("4.2")), - (Bytes::from("h"), Bytes::from("1.3")), - (Bytes::from("i"), Bytes::from("2.3")), - (Bytes::from("j"), Bytes::from("3.3")), - (Bytes::from("k"), Bytes::from("4.3")), - ]; - - let iter = MergeIterator::create(vec![ - Box::new(i1.clone()), - Box::new(i2.clone()), - Box::new(i3.clone()), - Box::new(i4.clone()), - ]); - check_iter_result(iter, result.clone()); - - let iter = MergeIterator::create(vec![ - Box::new(i2.clone()), - Box::new(i4.clone()), - Box::new(i3.clone()), - Box::new(i1.clone()), - ]); - check_iter_result(iter, result.clone()); - - let iter = MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]); - check_iter_result(iter, result); -} - -#[test] -fn test_merge_empty() { - let iter = MergeIterator::::create(vec![]); - check_iter_result(iter, vec![]); -} diff --git a/mini-lsm-mvcc/src/iterators/tests/two_merge_iterator_test.rs b/mini-lsm-mvcc/src/iterators/tests/two_merge_iterator_test.rs deleted file mode 100644 index 1719bf3..0000000 --- a/mini-lsm-mvcc/src/iterators/tests/two_merge_iterator_test.rs +++ /dev/null @@ -1,129 +0,0 @@ -use super::*; -use crate::iterators::two_merge_iterator::TwoMergeIterator; - -fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { - let mut iter = iter; - for (k, v) in expected { - assert!(iter.is_valid()); - assert_eq!(iter.key(), k.as_ref()); - assert_eq!(iter.value(), v.as_ref()); - iter.next().unwrap(); - } - assert!(!iter.is_valid()); -} - -#[test] -fn test_merge_1() { - let i1 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - ]); - let i2 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.2")), - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result( - iter, - vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ) -} - -#[test] -fn test_merge_2() { - let i2 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - ]); - let i1 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.2")), - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result( - iter, - vec![ - (Bytes::from("a"), Bytes::from("1.2")), - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ) -} - -#[test] -fn test_merge_3() { - let i2 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - ]); - let i1 = MockIterator::new(vec![ - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result( - iter, - vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ) -} - -#[test] -fn test_merge_4() { - let i2 = MockIterator::new(vec![]); - let i1 = MockIterator::new(vec![ - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result( - iter, - vec![ - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ); - let i1 = MockIterator::new(vec![]); - let i2 = MockIterator::new(vec![ - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result( - iter, - vec![ - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ); -} - -#[test] -fn test_merge_5() { - let i2 = MockIterator::new(vec![]); - let i1 = MockIterator::new(vec![]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result(iter, vec![]) -} diff --git a/mini-lsm-starter/src/debug.rs b/mini-lsm-starter/src/debug.rs index 76702de..c9eab3d 100644 --- a/mini-lsm-starter/src/debug.rs +++ b/mini-lsm-starter/src/debug.rs @@ -1,8 +1,8 @@ -use crate::lsm_storage::MiniLsm; +use crate::lsm_storage::{LsmStorageInner, MiniLsm}; -impl MiniLsm { +impl LsmStorageInner { pub fn dump_structure(&self) { - let snapshot = self.inner.state.read(); + let snapshot = self.state.read(); if !snapshot.l0_sstables.is_empty() { println!( "L0 ({}): {:?}", @@ -15,3 +15,9 @@ impl MiniLsm { } } } + +impl MiniLsm { + pub fn dump_structure(&self) { + self.inner.dump_structure() + } +} diff --git a/mini-lsm-starter/src/iterators/two_merge_iterator.rs b/mini-lsm-starter/src/iterators/two_merge_iterator.rs index c51dc72..bb7b4a8 100644 --- a/mini-lsm-starter/src/iterators/two_merge_iterator.rs +++ b/mini-lsm-starter/src/iterators/two_merge_iterator.rs @@ -3,8 +3,6 @@ use anyhow::Result; -use crate::key::KeySlice; - use super::StorageIterator; /// Merges two iterators of different types into one. If the two iterators have the same key, only @@ -16,8 +14,8 @@ pub struct TwoMergeIterator { } impl< - A: 'static + for<'a> StorageIterator = KeySlice<'a>>, - B: 'static + for<'a> StorageIterator = KeySlice<'a>>, + A: 'static + StorageIterator, + B: 'static + for<'a> StorageIterator = A::KeyType<'a>>, > TwoMergeIterator { pub fn create(a: A, b: B) -> Result { @@ -26,13 +24,13 @@ impl< } impl< - A: 'static + for<'a> StorageIterator = KeySlice<'a>>, - B: 'static + for<'a> StorageIterator = KeySlice<'a>>, + A: 'static + StorageIterator, + B: 'static + for<'a> StorageIterator = A::KeyType<'a>>, > StorageIterator for TwoMergeIterator { - type KeyType<'a> = KeySlice<'a>; + type KeyType<'a> = A::KeyType<'a>; - fn key(&self) -> KeySlice { + fn key(&self) -> Self::KeyType<'_> { unimplemented!() } diff --git a/mini-lsm-starter/src/lib.rs b/mini-lsm-starter/src/lib.rs index afdfb65..79341ab 100644 --- a/mini-lsm-starter/src/lib.rs +++ b/mini-lsm-starter/src/lib.rs @@ -7,6 +7,7 @@ pub mod lsm_iterator; pub mod lsm_storage; pub mod manifest; pub mod mem_table; +pub mod mvcc; pub mod table; pub mod wal; diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index 338fde5..428ef25 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -18,6 +18,7 @@ use crate::compact::{ use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::manifest::Manifest; use crate::mem_table::MemTable; +use crate::mvcc::LsmMvccInner; use crate::table::SsTable; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; @@ -122,6 +123,7 @@ pub(crate) struct LsmStorageInner { pub(crate) options: Arc, pub(crate) compaction_controller: CompactionController, pub(crate) manifest: Option, + pub(crate) mvcc: Option, } /// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM. @@ -249,6 +251,7 @@ impl LsmStorageInner { compaction_controller, manifest: None, options: options.into(), + mvcc: None, }; Ok(storage) diff --git a/mini-lsm-starter/src/mvcc.rs b/mini-lsm-starter/src/mvcc.rs new file mode 100644 index 0000000..63e34ae --- /dev/null +++ b/mini-lsm-starter/src/mvcc.rs @@ -0,0 +1,58 @@ +#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod +#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod + +pub mod txn; +mod watermark; + +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, +}; + +use parking_lot::Mutex; + +use crate::lsm_storage::LsmStorageInner; + +use self::{txn::Transaction, watermark::Watermark}; + +pub(crate) struct CommittedTxnData { + pub(crate) key_hashes: HashSet, + #[allow(dead_code)] + pub(crate) read_ts: u64, + #[allow(dead_code)] + pub(crate) commit_ts: u64, +} + +pub(crate) struct LsmMvccInner { + pub(crate) write_lock: Mutex<()>, + pub(crate) ts: Arc>, + pub(crate) committed_txns: Arc>>, +} + +impl LsmMvccInner { + pub fn new(initial_ts: u64) -> Self { + Self { + write_lock: Mutex::new(()), + ts: Arc::new(Mutex::new((initial_ts, Watermark::new()))), + committed_txns: Arc::new(Mutex::new(BTreeMap::new())), + } + } + + pub fn latest_commit_ts(&self) -> u64 { + self.ts.lock().0 + } + + pub fn update_commit_ts(&self, ts: u64) { + self.ts.lock().0 = ts; + } + + /// All ts (strictly) below this ts can be garbage collected. + pub fn watermark(&self) -> u64 { + let ts = self.ts.lock(); + ts.1.watermark().unwrap_or(ts.0) + } + + pub fn new_txn(&self, inner: Arc, serializable: bool) -> Arc { + unimplemented!() + } +} diff --git a/mini-lsm-starter/src/mvcc/txn.rs b/mini-lsm-starter/src/mvcc/txn.rs new file mode 100644 index 0000000..33ec6a0 --- /dev/null +++ b/mini-lsm-starter/src/mvcc/txn.rs @@ -0,0 +1,128 @@ +#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod +#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod + +use std::{ + collections::HashSet, + ops::Bound, + sync::{atomic::AtomicBool, Arc}, +}; + +use anyhow::Result; +use bytes::Bytes; +use crossbeam_skiplist::SkipMap; +use ouroboros::self_referencing; +use parking_lot::Mutex; + +use crate::{ + iterators::{two_merge_iterator::TwoMergeIterator, StorageIterator}, + lsm_iterator::{FusedIterator, LsmIterator}, + lsm_storage::LsmStorageInner, +}; + +pub struct Transaction { + pub(crate) read_ts: u64, + pub(crate) inner: Arc, + pub(crate) local_storage: Arc>, + pub(crate) committed: Arc, + /// Write set and read set + pub(crate) key_hashes: Option, HashSet)>>, +} + +impl Transaction { + pub fn get(&self, key: &[u8]) -> Result> { + unimplemented!() + } + + pub fn scan(self: &Arc, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { + unimplemented!() + } + + pub fn put(&self, key: &[u8], value: &[u8]) { + unimplemented!() + } + + pub fn delete(&self, key: &[u8]) { + unimplemented!() + } + + pub fn commit(&self) -> Result<()> { + unimplemented!() + } +} + +impl Drop for Transaction { + fn drop(&mut self) {} +} + +type SkipMapRangeIter<'a> = + crossbeam_skiplist::map::Range<'a, Bytes, (Bound, Bound), Bytes, Bytes>; + +#[self_referencing] +pub struct TxnLocalIterator { + /// Stores a reference to the skipmap. + map: Arc>, + /// Stores a skipmap iterator that refers to the lifetime of `MemTableIterator` itself. + #[borrows(map)] + #[not_covariant] + iter: SkipMapRangeIter<'this>, + /// Stores the current key-value pair. + item: (Bytes, Bytes), +} + +impl StorageIterator for TxnLocalIterator { + type KeyType<'a> = &'a [u8]; + + fn value(&self) -> &[u8] { + unimplemented!() + } + + fn key(&self) -> &[u8] { + unimplemented!() + } + + fn is_valid(&self) -> bool { + unimplemented!() + } + + fn next(&mut self) -> Result<()> { + unimplemented!() + } +} + +pub struct TxnIterator { + _txn: Arc, + iter: TwoMergeIterator>, +} + +impl TxnIterator { + pub fn create( + txn: Arc, + iter: TwoMergeIterator>, + ) -> Result { + unimplemented!() + } +} + +impl StorageIterator for TxnIterator { + type KeyType<'a> = &'a [u8] where Self: 'a; + + fn value(&self) -> &[u8] { + self.iter.value() + } + + fn key(&self) -> Self::KeyType<'_> { + self.iter.key() + } + + fn is_valid(&self) -> bool { + self.iter.is_valid() + } + + fn next(&mut self) -> Result<()> { + unimplemented!() + } + + fn num_active_iterators(&self) -> usize { + self.iter.num_active_iterators() + } +} diff --git a/mini-lsm-starter/src/mvcc/watermark.rs b/mini-lsm-starter/src/mvcc/watermark.rs new file mode 100644 index 0000000..4bbb4fa --- /dev/null +++ b/mini-lsm-starter/src/mvcc/watermark.rs @@ -0,0 +1,24 @@ +#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod +#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod + +use std::collections::BTreeMap; + +pub struct Watermark { + readers: BTreeMap, +} + +impl Watermark { + pub fn new() -> Self { + Self { + readers: BTreeMap::new(), + } + } + + pub fn add_reader(&mut self, ts: u64) {} + + pub fn remove_reader(&mut self, ts: u64) {} + + pub fn watermark(&self) -> Option { + Some(0) + } +} diff --git a/mini-lsm/src/debug.rs b/mini-lsm/src/debug.rs deleted file mode 100644 index c9eab3d..0000000 --- a/mini-lsm/src/debug.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::lsm_storage::{LsmStorageInner, MiniLsm}; - -impl LsmStorageInner { - pub fn dump_structure(&self) { - let snapshot = self.state.read(); - if !snapshot.l0_sstables.is_empty() { - println!( - "L0 ({}): {:?}", - snapshot.l0_sstables.len(), - snapshot.l0_sstables, - ); - } - for (level, files) in &snapshot.levels { - println!("L{level} ({}): {:?}", files.len(), files); - } - } -} - -impl MiniLsm { - pub fn dump_structure(&self) { - self.inner.dump_structure() - } -} diff --git a/mini-lsm/src/debug.rs b/mini-lsm/src/debug.rs new file mode 120000 index 0000000..4fc02d6 --- /dev/null +++ b/mini-lsm/src/debug.rs @@ -0,0 +1 @@ +../../mini-lsm-starter/src/debug.rs \ No newline at end of file diff --git a/mini-lsm/src/iterators/tests/merge_iterator_test.rs b/mini-lsm/src/iterators/tests/merge_iterator_test.rs deleted file mode 100644 index e063911..0000000 --- a/mini-lsm/src/iterators/tests/merge_iterator_test.rs +++ /dev/null @@ -1,137 +0,0 @@ -use super::*; -use crate::iterators::merge_iterator::MergeIterator; - -fn as_bytes(x: &[u8]) -> Bytes { - Bytes::copy_from_slice(x) -} - -fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { - let mut iter = iter; - for (k, v) in expected { - assert!(iter.is_valid()); - assert_eq!( - k, - iter.key(), - "expected key: {:?}, actual key: {:?}", - k, - as_bytes(iter.key()), - ); - assert_eq!( - v, - iter.value(), - "expected value: {:?}, actual value: {:?}", - v, - as_bytes(iter.value()), - ); - iter.next().unwrap(); - } - assert!(!iter.is_valid()); -} - -#[test] -fn test_merge_1() { - let i1 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - ]); - let i2 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.2")), - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let i3 = MockIterator::new(vec![ - (Bytes::from("b"), Bytes::from("2.3")), - (Bytes::from("c"), Bytes::from("3.3")), - (Bytes::from("d"), Bytes::from("4.3")), - ]); - - let iter = MergeIterator::create(vec![ - Box::new(i1.clone()), - Box::new(i2.clone()), - Box::new(i3.clone()), - ]); - - check_iter_result( - iter, - vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ); - - let iter = MergeIterator::create(vec![Box::new(i3), Box::new(i1), Box::new(i2)]); - - check_iter_result( - iter, - vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.3")), - (Bytes::from("c"), Bytes::from("3.3")), - (Bytes::from("d"), Bytes::from("4.3")), - ], - ); -} - -#[test] -fn test_merge_2() { - let i1 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - ]); - let i2 = MockIterator::new(vec![ - (Bytes::from("d"), Bytes::from("1.2")), - (Bytes::from("e"), Bytes::from("2.2")), - (Bytes::from("f"), Bytes::from("3.2")), - (Bytes::from("g"), Bytes::from("4.2")), - ]); - let i3 = MockIterator::new(vec![ - (Bytes::from("h"), Bytes::from("1.3")), - (Bytes::from("i"), Bytes::from("2.3")), - (Bytes::from("j"), Bytes::from("3.3")), - (Bytes::from("k"), Bytes::from("4.3")), - ]); - let i4 = MockIterator::new(vec![]); - let result = vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - (Bytes::from("d"), Bytes::from("1.2")), - (Bytes::from("e"), Bytes::from("2.2")), - (Bytes::from("f"), Bytes::from("3.2")), - (Bytes::from("g"), Bytes::from("4.2")), - (Bytes::from("h"), Bytes::from("1.3")), - (Bytes::from("i"), Bytes::from("2.3")), - (Bytes::from("j"), Bytes::from("3.3")), - (Bytes::from("k"), Bytes::from("4.3")), - ]; - - let iter = MergeIterator::create(vec![ - Box::new(i1.clone()), - Box::new(i2.clone()), - Box::new(i3.clone()), - Box::new(i4.clone()), - ]); - check_iter_result(iter, result.clone()); - - let iter = MergeIterator::create(vec![ - Box::new(i2.clone()), - Box::new(i4.clone()), - Box::new(i3.clone()), - Box::new(i1.clone()), - ]); - check_iter_result(iter, result.clone()); - - let iter = MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]); - check_iter_result(iter, result); -} - -#[test] -fn test_merge_empty() { - let iter = MergeIterator::::create(vec![]); - check_iter_result(iter, vec![]); -} diff --git a/mini-lsm/src/iterators/tests/two_merge_iterator_test.rs b/mini-lsm/src/iterators/tests/two_merge_iterator_test.rs deleted file mode 100644 index 1719bf3..0000000 --- a/mini-lsm/src/iterators/tests/two_merge_iterator_test.rs +++ /dev/null @@ -1,129 +0,0 @@ -use super::*; -use crate::iterators::two_merge_iterator::TwoMergeIterator; - -fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { - let mut iter = iter; - for (k, v) in expected { - assert!(iter.is_valid()); - assert_eq!(iter.key(), k.as_ref()); - assert_eq!(iter.value(), v.as_ref()); - iter.next().unwrap(); - } - assert!(!iter.is_valid()); -} - -#[test] -fn test_merge_1() { - let i1 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - ]); - let i2 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.2")), - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result( - iter, - vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ) -} - -#[test] -fn test_merge_2() { - let i2 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - ]); - let i1 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.2")), - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result( - iter, - vec![ - (Bytes::from("a"), Bytes::from("1.2")), - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ) -} - -#[test] -fn test_merge_3() { - let i2 = MockIterator::new(vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.1")), - (Bytes::from("c"), Bytes::from("3.1")), - ]); - let i1 = MockIterator::new(vec![ - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result( - iter, - vec![ - (Bytes::from("a"), Bytes::from("1.1")), - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ) -} - -#[test] -fn test_merge_4() { - let i2 = MockIterator::new(vec![]); - let i1 = MockIterator::new(vec![ - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result( - iter, - vec![ - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ); - let i1 = MockIterator::new(vec![]); - let i2 = MockIterator::new(vec![ - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result( - iter, - vec![ - (Bytes::from("b"), Bytes::from("2.2")), - (Bytes::from("c"), Bytes::from("3.2")), - (Bytes::from("d"), Bytes::from("4.2")), - ], - ); -} - -#[test] -fn test_merge_5() { - let i2 = MockIterator::new(vec![]); - let i1 = MockIterator::new(vec![]); - let iter = TwoMergeIterator::create(i1, i2).unwrap(); - check_iter_result(iter, vec![]) -} diff --git a/mini-lsm/src/iterators/two_merge_iterator.rs b/mini-lsm/src/iterators/two_merge_iterator.rs index 781055a..57bf77b 100644 --- a/mini-lsm/src/iterators/two_merge_iterator.rs +++ b/mini-lsm/src/iterators/two_merge_iterator.rs @@ -1,7 +1,5 @@ use anyhow::Result; -use crate::key::KeySlice; - use super::StorageIterator; /// Merges two iterators of different types into one. If the two iterators have the same key, only @@ -13,8 +11,8 @@ pub struct TwoMergeIterator { } impl< - A: 'static + for<'a> StorageIterator = KeySlice<'a>>, - B: 'static + for<'a> StorageIterator = KeySlice<'a>>, + A: 'static + StorageIterator, + B: 'static + for<'a> StorageIterator = A::KeyType<'a>>, > TwoMergeIterator { fn choose_a(a: &A, b: &B) -> bool { @@ -47,13 +45,13 @@ impl< } impl< - A: 'static + for<'a> StorageIterator = KeySlice<'a>>, - B: 'static + for<'a> StorageIterator = KeySlice<'a>>, + A: 'static + StorageIterator, + B: 'static + for<'a> StorageIterator = A::KeyType<'a>>, > StorageIterator for TwoMergeIterator { - type KeyType<'a> = KeySlice<'a>; + type KeyType<'a> = A::KeyType<'a>; - fn key(&self) -> KeySlice { + fn key(&self) -> Self::KeyType<'_> { if self.choose_a { self.a.key() } else { diff --git a/mini-lsm/src/lib.rs b/mini-lsm/src/lib.rs index afdfb65..79341ab 100644 --- a/mini-lsm/src/lib.rs +++ b/mini-lsm/src/lib.rs @@ -7,6 +7,7 @@ pub mod lsm_iterator; pub mod lsm_storage; pub mod manifest; pub mod mem_table; +pub mod mvcc; pub mod table; pub mod wal; diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index a5a4c2a..d16223f 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -22,6 +22,7 @@ use crate::key::KeySlice; use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::manifest::{Manifest, ManifestRecord}; use crate::mem_table::{map_bound, MemTable}; +use crate::mvcc::LsmMvccInner; use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator}; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; @@ -157,6 +158,8 @@ pub(crate) struct LsmStorageInner { pub(crate) options: Arc, pub(crate) compaction_controller: CompactionController, pub(crate) manifest: Option, + #[allow(dead_code)] + pub(crate) mvcc: Option, } /// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM. @@ -414,6 +417,7 @@ impl LsmStorageInner { compaction_controller, manifest: Some(manifest), options: options.into(), + mvcc: None, }; storage.sync_dir()?; diff --git a/mini-lsm/src/mvcc.rs b/mini-lsm/src/mvcc.rs new file mode 120000 index 0000000..b718940 --- /dev/null +++ b/mini-lsm/src/mvcc.rs @@ -0,0 +1 @@ +../../mini-lsm-starter/src/mvcc.rs \ No newline at end of file diff --git a/mini-lsm/src/mvcc/txn.rs b/mini-lsm/src/mvcc/txn.rs new file mode 120000 index 0000000..b0618ce --- /dev/null +++ b/mini-lsm/src/mvcc/txn.rs @@ -0,0 +1 @@ +../../../mini-lsm-starter/src/mvcc/txn.rs \ No newline at end of file diff --git a/mini-lsm/src/mvcc/watermark.rs b/mini-lsm/src/mvcc/watermark.rs new file mode 120000 index 0000000..a32ab04 --- /dev/null +++ b/mini-lsm/src/mvcc/watermark.rs @@ -0,0 +1 @@ +../../../mini-lsm-starter/src/mvcc/watermark.rs \ No newline at end of file