From 3ed6204400bfb9902e6910b40853833e167c9e60 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Sat, 24 Dec 2022 14:48:57 -0500 Subject: [PATCH] feat(code): finish part 3 Signed-off-by: Alex Chi --- Cargo.lock | 39 +++++++ README.md | 2 +- mini-lsm-starter/Cargo.toml | 2 + mini-lsm-starter/src/lsm_iterator.rs | 53 +++++++++ mini-lsm-starter/src/lsm_storage.rs | 37 +++--- mini-lsm-starter/src/mem_table.rs | 21 ++-- mini-lsm-starter/src/mem_table/tests.rs | 1 + mini-lsm-starter/src/mem_table_test.rs | 1 - mini-lsm/Cargo.toml | 2 + mini-lsm/src/iterators/merge_iterator.rs | 39 ++++--- .../iterators/tests/merge_iterator_test.rs | 6 + mini-lsm/src/lib.rs | 3 + mini-lsm/src/lsm_iterator.rs | 106 ++++++++++++++++- mini-lsm/src/lsm_storage.rs | 107 +++++++++++++++--- mini-lsm/src/mem_table.rs | 70 ++++++------ .../{mem_table_test.rs => mem_table/tests.rs} | 0 mini-lsm/src/tests.rs | 1 + mini-lsm/src/tests/day3_tests.rs | 107 ++++++++++++++++++ xtask/src/main.rs | 21 ++++ 19 files changed, 517 insertions(+), 101 deletions(-) create mode 100644 mini-lsm-starter/src/mem_table/tests.rs delete mode 100644 mini-lsm-starter/src/mem_table_test.rs rename mini-lsm/src/{mem_table_test.rs => mem_table/tests.rs} (100%) create mode 100644 mini-lsm/src/tests.rs create mode 100644 mini-lsm/src/tests/day3_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 9b7776a..020c72c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "Inflector" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" + +[[package]] +name = "aliasable" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" + [[package]] name = "anyhow" version = "1.0.68" @@ -249,7 +261,9 @@ dependencies = [ "anyhow", "arc-swap", "bytes", + "crossbeam-epoch", "crossbeam-skiplist", + "ouroboros", "parking_lot", ] @@ -260,7 +274,9 @@ dependencies = [ "anyhow", "arc-swap", "bytes", + "crossbeam-epoch", "crossbeam-skiplist", + "ouroboros", "parking_lot", ] @@ -296,6 +312,29 @@ version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" +[[package]] +name = "ouroboros" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbb50b356159620db6ac971c6d5c9ab788c9cc38a6f49619fca2a27acb062ca" +dependencies = [ + "aliasable", + "ouroboros_macro", +] + +[[package]] +name = "ouroboros_macro" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a0d9d1a6191c4f391f87219d1ea42b23f09ee84d64763cd05ee6ea88d9f384d" +dependencies = [ + "Inflector", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "parking_lot" version = "0.12.1" diff --git a/README.md b/README.md index 2f21a57..261f8d1 100644 --- a/README.md +++ b/README.md @@ -34,4 +34,4 @@ The tutorial has 8 parts (which can be finished in 7 days): * Day 6: Recovery. We will implement WAL and manifest so that the engine can recover after restart. * Day 7: Bloom filter and key compression. They are widely-used optimizations in LSM tree structures. -We have reference solution up to day 2 and tutorial up to day 1 for now. +We have reference solution up to day 3 and tutorial up to day 1 for now. diff --git a/mini-lsm-starter/Cargo.toml b/mini-lsm-starter/Cargo.toml index 2931aa4..20621fd 100644 --- a/mini-lsm-starter/Cargo.toml +++ b/mini-lsm-starter/Cargo.toml @@ -8,5 +8,7 @@ publish = false anyhow = "1" arc-swap = "1" bytes = "1" +crossbeam-epoch = "0.9" crossbeam-skiplist = "0.1" parking_lot = "0.12" +ouroboros = "0.15" diff --git a/mini-lsm-starter/src/lsm_iterator.rs b/mini-lsm-starter/src/lsm_iterator.rs index 8de188e..2f90a79 100644 --- a/mini-lsm-starter/src/lsm_iterator.rs +++ b/mini-lsm-starter/src/lsm_iterator.rs @@ -1 +1,54 @@ +#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod +#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod + +use crate::iterators::impls::StorageIterator; +use anyhow::Result; + pub struct LsmIterator {} + +impl StorageIterator for LsmIterator { + fn is_valid(&self) -> bool { + unimplemented!() + } + + fn key(&self) -> &[u8] { + unimplemented!() + } + + fn value(&self) -> &[u8] { + unimplemented!() + } + + fn next(&mut self) -> Result<()> { + unimplemented!() + } +} + +/// A wrapper around existing iterator, will prevent users from calling `next` when the iterator is invalid. +pub struct FusedIterator { + iter: I, +} + +impl FusedIterator { + pub fn new(iter: I) -> Self { + Self { iter } + } +} + +impl StorageIterator for FusedIterator { + fn is_valid(&self) -> bool { + unimplemented!() + } + + fn key(&self) -> &[u8] { + unimplemented!() + } + + fn value(&self) -> &[u8] { + unimplemented!() + } + + fn next(&mut self) -> Result<()> { + unimplemented!() + } +} diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index 54a29b2..8ade102 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -1,3 +1,6 @@ +#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod +#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod + use std::ops::Bound; use std::path::Path; use std::sync::Arc; @@ -6,20 +9,23 @@ use anyhow::Result; use arc_swap::ArcSwap; use bytes::Bytes; -use crate::lsm_iterator::LsmIterator; +use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::mem_table::MemTable; -use crate::table::{SsTable, SsTableIterator}; +use crate::table::SsTable; +#[derive(Clone)] pub struct LsmStorageInner { + /// MemTables, from oldest to earliest. memtables: Vec>, - sstables: Vec>, + /// L0 SsTables, from oldest to earliest. + l0_sstables: Vec>, } impl LsmStorageInner { fn create() -> Self { Self { memtables: vec![Arc::new(MemTable::create())], - sstables: vec![], + l0_sstables: vec![], } } } @@ -37,22 +43,7 @@ impl LsmStorage { } pub fn get(&self, key: &[u8]) -> Result> { - let snapshot = self.inner.load(); - for memtable in &snapshot.memtables { - if let Some(value) = memtable.get(key)? { - if value.is_empty() { - // found tomestone, return key not exists - return Ok(None); - } - return Ok(Some(value)); - } - } - let mut iters = Vec::new(); - iters.reserve(snapshot.sstables.len()); - for table in snapshot.sstables.iter().rev() { - iters.push(SsTableIterator::create_and_seek_to_key(table.clone(), key)?); - } - Ok(None) + unimplemented!() } pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { @@ -69,7 +60,11 @@ impl LsmStorage { unimplemented!() } - pub fn scan(&self, _lower: Bound<&[u8]>, _upper: Bound<&[u8]>) -> Result { + pub fn scan( + &self, + _lower: Bound<&[u8]>, + _upper: Bound<&[u8]>, + ) -> Result> { unimplemented!() } } diff --git a/mini-lsm-starter/src/mem_table.rs b/mini-lsm-starter/src/mem_table.rs index aa58e58..5b904bb 100644 --- a/mini-lsm-starter/src/mem_table.rs +++ b/mini-lsm-starter/src/mem_table.rs @@ -2,10 +2,12 @@ #![allow(dead_code)] // TODO(you): remove this lint after implementing this mod use std::ops::Bound; +use std::sync::Arc; use anyhow::Result; use bytes::Bytes; use crossbeam_skiplist::SkipMap; +use ouroboros::self_referencing; use crate::iterators::impls::StorageIterator; use crate::table::SsTableBuilder; @@ -46,17 +48,16 @@ type SkipMapRangeIter<'a> = crossbeam_skiplist::map::Range<'a, Bytes, (Bound, Bound), Bytes, Bytes>; /// An iterator over a range of `SkipMap`. -pub struct MemTableIterator<'a> { - _phantom: std::marker::PhantomData<&'a ()>, +#[self_referencing] +pub struct MemTableIterator { + map: Arc>, + #[borrows(map)] + #[not_covariant] + iter: SkipMapRangeIter<'this>, + item: (Bytes, Bytes), } -impl<'a> MemTableIterator<'a> { - fn new(iter: SkipMapRangeIter<'a>) -> Self { - unimplemented!() - } -} - -impl StorageIterator for MemTableIterator<'_> { +impl StorageIterator for MemTableIterator { fn value(&self) -> &[u8] { unimplemented!() } @@ -74,6 +75,4 @@ impl StorageIterator for MemTableIterator<'_> { } } -#[cfg(test)] -#[path = "mem_table_test.rs"] mod tests; diff --git a/mini-lsm-starter/src/mem_table/tests.rs b/mini-lsm-starter/src/mem_table/tests.rs new file mode 100644 index 0000000..da15e42 --- /dev/null +++ b/mini-lsm-starter/src/mem_table/tests.rs @@ -0,0 +1 @@ +//! Please copy `mini-lsm/src/mem_table/tests.rs` here so that you can run tests. diff --git a/mini-lsm-starter/src/mem_table_test.rs b/mini-lsm-starter/src/mem_table_test.rs deleted file mode 100644 index fe2039e..0000000 --- a/mini-lsm-starter/src/mem_table_test.rs +++ /dev/null @@ -1 +0,0 @@ -//! Please copy `mini-lsm/src/mem_table_test.rs` here so that you can run tests. diff --git a/mini-lsm/Cargo.toml b/mini-lsm/Cargo.toml index 3e4e5c1..15d7354 100644 --- a/mini-lsm/Cargo.toml +++ b/mini-lsm/Cargo.toml @@ -14,5 +14,7 @@ description = "A tutorial for building an LSM tree storage engine in a week." anyhow = "1" arc-swap = "1" bytes = "1" +crossbeam-epoch = "0.9" crossbeam-skiplist = "0.1" parking_lot = "0.12" +ouroboros = "0.15" diff --git a/mini-lsm/src/iterators/merge_iterator.rs b/mini-lsm/src/iterators/merge_iterator.rs index f4e3425..b68e461 100644 --- a/mini-lsm/src/iterators/merge_iterator.rs +++ b/mini-lsm/src/iterators/merge_iterator.rs @@ -37,12 +37,17 @@ impl Ord for HeapWrapper { /// iterators, perfer the one with smaller index. pub struct MergeIterator { iters: BinaryHeap>, - current: HeapWrapper, + current: Option>, } impl MergeIterator { pub fn create(iters: Vec>) -> Self { - assert!(!iters.is_empty()); + if iters.is_empty() { + return Self { + iters: BinaryHeap::new(), + current: None, + }; + } let mut heap = BinaryHeap::new(); @@ -51,7 +56,7 @@ impl MergeIterator { let mut iters = iters; return Self { iters: heap, - current: HeapWrapper(0, iters.pop().unwrap()), + current: Some(HeapWrapper(0, iters.pop().unwrap())), }; } @@ -64,32 +69,38 @@ impl MergeIterator { let current = heap.pop().unwrap(); Self { iters: heap, - current, + current: Some(current), } } } impl StorageIterator for MergeIterator { fn key(&self) -> &[u8] { - self.current.1.key() + unsafe { self.current.as_ref().unwrap_unchecked() }.1.key() } fn value(&self) -> &[u8] { - self.current.1.value() + unsafe { self.current.as_ref().unwrap_unchecked() } + .1 + .value() } fn is_valid(&self) -> bool { - self.current.1.is_valid() + self.current + .as_ref() + .map(|x| x.1.is_valid()) + .unwrap_or(false) } fn next(&mut self) -> Result<()> { + let current = unsafe { self.current.as_mut().unwrap_unchecked() }; // Pop the item out of the heap if they have the same value. while let Some(mut inner_iter) = self.iters.peek_mut() { debug_assert!( - inner_iter.1.key() >= self.current.1.key(), + inner_iter.1.key() >= current.1.key(), "heap invariant violated" ); - if inner_iter.1.key() == self.current.1.key() { + if inner_iter.1.key() == current.1.key() { // Case 1: an error occurred when calling `next`. if let e @ Err(_) = inner_iter.1.next() { PeekMut::pop(inner_iter); @@ -105,20 +116,20 @@ impl StorageIterator for MergeIterator { } } - self.current.1.next()?; + current.1.next()?; // If the current iterator is invalid, pop it out of the heap and select the next one. - if !self.current.1.is_valid() { + if !current.1.is_valid() { if let Some(iter) = self.iters.pop() { - self.current = iter; + *current = iter; } return Ok(()); } // Otherwise, compare with heap top and swap if necessary. if let Some(mut inner_iter) = self.iters.peek_mut() { - if self.current < *inner_iter { - std::mem::swap(&mut *inner_iter, &mut self.current); + if *current < *inner_iter { + std::mem::swap(&mut *inner_iter, current); } } diff --git a/mini-lsm/src/iterators/tests/merge_iterator_test.rs b/mini-lsm/src/iterators/tests/merge_iterator_test.rs index 0a9e4d9..e063911 100644 --- a/mini-lsm/src/iterators/tests/merge_iterator_test.rs +++ b/mini-lsm/src/iterators/tests/merge_iterator_test.rs @@ -129,3 +129,9 @@ fn test_merge_2() { let iter = MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]); check_iter_result(iter, result); } + +#[test] +fn test_merge_empty() { + let iter = MergeIterator::::create(vec![]); + check_iter_result(iter, vec![]); +} diff --git a/mini-lsm/src/lib.rs b/mini-lsm/src/lib.rs index c78ce8a..40ff84a 100644 --- a/mini-lsm/src/lib.rs +++ b/mini-lsm/src/lib.rs @@ -4,3 +4,6 @@ pub mod lsm_iterator; pub mod lsm_storage; pub mod mem_table; pub mod table; + +#[cfg(test)] +mod tests; diff --git a/mini-lsm/src/lsm_iterator.rs b/mini-lsm/src/lsm_iterator.rs index 8de188e..2b74b5f 100644 --- a/mini-lsm/src/lsm_iterator.rs +++ b/mini-lsm/src/lsm_iterator.rs @@ -1 +1,105 @@ -pub struct LsmIterator {} +use std::ops::Bound; + +use anyhow::Result; +use bytes::Bytes; + +use crate::iterators::impls::StorageIterator; +use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::two_merge_iterator::TwoMergeIterator; +use crate::mem_table::MemTableIterator; +use crate::table::SsTableIterator; + +type LsmIteratorInner = + TwoMergeIterator, MergeIterator>; + +pub struct LsmIterator { + iter: LsmIteratorInner, + end_bound: Bound, + is_valid: bool, +} + +impl LsmIterator { + pub(crate) fn new(iter: LsmIteratorInner, end_bound: Bound) -> Result { + let mut iter = Self { + is_valid: iter.is_valid(), + iter, + end_bound, + }; + iter.move_to_non_delete()?; + Ok(iter) + } + + fn next_inner(&mut self) -> Result<()> { + self.iter.next()?; + if !self.iter.is_valid() { + self.is_valid = false; + return Ok(()); + } + match self.end_bound.as_ref() { + Bound::Unbounded => {} + Bound::Included(key) => self.is_valid = self.iter.key() <= key.as_ref(), + Bound::Excluded(key) => self.is_valid = self.iter.key() < key.as_ref(), + } + Ok(()) + } + + fn move_to_non_delete(&mut self) -> Result<()> { + while self.is_valid() && self.iter.value().is_empty() { + self.next_inner()?; + } + Ok(()) + } +} + +impl StorageIterator for LsmIterator { + fn is_valid(&self) -> bool { + self.is_valid + } + + fn key(&self) -> &[u8] { + self.iter.key() + } + + fn value(&self) -> &[u8] { + self.iter.value() + } + + fn next(&mut self) -> Result<()> { + self.next_inner()?; + self.move_to_non_delete()?; + Ok(()) + } +} + +/// A wrapper around existing iterator, will prevent users from calling `next` when the iterator is invalid. +pub struct FusedIterator { + iter: I, +} + +impl FusedIterator { + pub fn new(iter: I) -> Self { + Self { iter } + } +} + +impl StorageIterator for FusedIterator { + fn is_valid(&self) -> bool { + self.iter.is_valid() + } + + fn key(&self) -> &[u8] { + self.iter.key() + } + + fn value(&self) -> &[u8] { + self.iter.value() + } + + fn next(&mut self) -> Result<()> { + // only move when the iterator is valid + if self.iter.is_valid() { + self.iter.next()?; + } + Ok(()) + } +} diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index 54a29b2..cf9144c 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -5,21 +5,28 @@ use std::sync::Arc; use anyhow::Result; use arc_swap::ArcSwap; use bytes::Bytes; +use parking_lot::Mutex; -use crate::lsm_iterator::LsmIterator; -use crate::mem_table::MemTable; -use crate::table::{SsTable, SsTableIterator}; +use crate::iterators::impls::StorageIterator; +use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::two_merge_iterator::TwoMergeIterator; +use crate::lsm_iterator::{FusedIterator, LsmIterator}; +use crate::mem_table::{map_bound, MemTable}; +use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; +#[derive(Clone)] pub struct LsmStorageInner { + /// MemTables, from oldest to earliest. memtables: Vec>, - sstables: Vec>, + /// L0 SsTables, from oldest to earliest. + l0_sstables: Vec>, } impl LsmStorageInner { fn create() -> Self { Self { memtables: vec![Arc::new(MemTable::create())], - sstables: vec![], + l0_sstables: vec![], } } } @@ -27,18 +34,20 @@ impl LsmStorageInner { /// The storage interface of the LSM tree. pub struct LsmStorage { inner: ArcSwap, + flush_lock: Mutex<()>, } impl LsmStorage { - pub fn open(_path: &Path) -> Result { + pub fn open(_path: impl AsRef) -> Result { Ok(Self { inner: ArcSwap::from_pointee(LsmStorageInner::create()), + flush_lock: Mutex::new(()), }) } pub fn get(&self, key: &[u8]) -> Result> { let snapshot = self.inner.load(); - for memtable in &snapshot.memtables { + for memtable in snapshot.memtables.iter().rev() { if let Some(value) = memtable.get(key)? { if value.is_empty() { // found tomestone, return key not exists @@ -48,28 +57,90 @@ impl LsmStorage { } } let mut iters = Vec::new(); - iters.reserve(snapshot.sstables.len()); - for table in snapshot.sstables.iter().rev() { - iters.push(SsTableIterator::create_and_seek_to_key(table.clone(), key)?); + iters.reserve(snapshot.l0_sstables.len()); + for table in snapshot.l0_sstables.iter().rev() { + iters.push(Box::new(SsTableIterator::create_and_seek_to_key( + table.clone(), + key, + )?)); + } + let iter = MergeIterator::create(iters); + if iter.is_valid() { + return Ok(Some(Bytes::copy_from_slice(iter.value()))); } Ok(None) } - pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { assert!(!value.is_empty(), "value cannot be empty"); assert!(!key.is_empty(), "key cannot be empty"); - unimplemented!() + let snapshot = self.inner.load(); + snapshot.memtables[0].put(key, value)?; + Ok(()) } - pub fn delete(&mut self, _key: &[u8]) -> Result<()> { - unimplemented!() + pub fn delete(&self, key: &[u8]) -> Result<()> { + let snapshot = self.inner.load(); + snapshot.memtables[0].put(key, b"")?; + Ok(()) } - pub fn sync(&mut self) -> Result<()> { - unimplemented!() + pub fn sync(&self) -> Result<()> { + let _flush_lock = self.flush_lock.lock(); + let mut snapshot = { + let snapshot = self.inner.load(); + snapshot.as_ref().clone() + }; + + let mut builder = SsTableBuilder::new(4096); + let memtable = snapshot.memtables.pop().unwrap(); + assert!(snapshot.memtables.is_empty()); + memtable.flush(&mut builder)?; + snapshot.l0_sstables.push(Arc::new(builder.build("")?)); + self.inner.store(Arc::new(snapshot)); + Ok(()) } - pub fn scan(&self, _lower: Bound<&[u8]>, _upper: Bound<&[u8]>) -> Result { - unimplemented!() + pub fn scan( + &self, + lower: Bound<&[u8]>, + upper: Bound<&[u8]>, + ) -> Result> { + let snapshot = self.inner.load(); + + let mut memtable_iters = Vec::new(); + memtable_iters.reserve(snapshot.memtables.len()); + for memtable in snapshot.memtables.iter().rev() { + memtable_iters.push(Box::new(memtable.scan(lower, upper)?)); + } + let memtable_iter = MergeIterator::create(memtable_iters); + + let mut table_iters = Vec::new(); + table_iters.reserve(snapshot.l0_sstables.len()); + for table in snapshot.l0_sstables.iter().rev() { + let iter = match lower { + Bound::Included(key) => { + SsTableIterator::create_and_seek_to_key(table.clone(), key)? + } + Bound::Excluded(key) => { + let mut iter = SsTableIterator::create_and_seek_to_key(table.clone(), key)?; + if iter.is_valid() && iter.key() == key { + iter.next()?; + } + iter + } + Bound::Unbounded => SsTableIterator::create_and_seek_to_first(table.clone())?, + }; + + table_iters.push(Box::new(iter)); + } + let table_iter = MergeIterator::create(table_iters); + + let iter = TwoMergeIterator::create(memtable_iter, table_iter)?; + + Ok(FusedIterator::new(LsmIterator::new( + iter, + map_bound(upper), + )?)) } } diff --git a/mini-lsm/src/mem_table.rs b/mini-lsm/src/mem_table.rs index 14041b4..17801d8 100644 --- a/mini-lsm/src/mem_table.rs +++ b/mini-lsm/src/mem_table.rs @@ -1,23 +1,33 @@ use std::ops::Bound; +use std::sync::Arc; use anyhow::Result; use bytes::Bytes; use crossbeam_skiplist::map::Entry; use crossbeam_skiplist::SkipMap; +use ouroboros::self_referencing; use crate::iterators::impls::StorageIterator; use crate::table::SsTableBuilder; /// A basic mem-table based on crossbeam-skiplist pub struct MemTable { - map: SkipMap, + map: Arc>, +} + +pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound { + match bound { + Bound::Included(x) => Bound::Included(Bytes::copy_from_slice(x)), + Bound::Excluded(x) => Bound::Excluded(Bytes::copy_from_slice(x)), + Bound::Unbounded => Bound::Unbounded, + } } impl MemTable { /// Create a new mem-table. pub fn create() -> Self { Self { - map: SkipMap::new(), + map: Arc::new(SkipMap::new()), } } @@ -34,20 +44,18 @@ impl MemTable { Ok(()) } - fn map_bound(bound: Bound<&[u8]>) -> Bound { - match bound { - Bound::Included(x) => Bound::Included(Bytes::copy_from_slice(x)), - Bound::Excluded(x) => Bound::Excluded(Bytes::copy_from_slice(x)), - Bound::Unbounded => Bound::Unbounded, - } - } - /// Get an iterator over a range of keys. pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { - let iter = self - .map - .range((Self::map_bound(lower), Self::map_bound(upper))); - Ok(MemTableIterator::new(iter)) + let (lower, upper) = (map_bound(lower), map_bound(upper)); + let mut iter = MemTableIteratorBuilder { + map: self.map.clone(), + iter_builder: |map| map.range((lower, upper)), + item: (Bytes::from_static(&[]), Bytes::from_static(&[])), + } + .build(); + let entry = iter.with_iter_mut(|iter| MemTableIterator::entry_to_item(iter.next())); + iter.with_mut(|x| *x.item = entry); + Ok(iter) } /// Flush the mem-table to SSTable. @@ -63,48 +71,42 @@ type SkipMapRangeIter<'a> = crossbeam_skiplist::map::Range<'a, Bytes, (Bound, Bound), Bytes, Bytes>; /// An iterator over a range of `SkipMap`. -pub struct MemTableIterator<'a> { - iter: SkipMapRangeIter<'a>, +#[self_referencing] +pub struct MemTableIterator { + map: Arc>, + #[borrows(map)] + #[not_covariant] + iter: SkipMapRangeIter<'this>, item: (Bytes, Bytes), } -impl<'a> MemTableIterator<'a> { - fn entry_to_item(entry: Option>) -> (Bytes, Bytes) { +impl MemTableIterator { + fn entry_to_item(entry: Option>) -> (Bytes, Bytes) { entry .map(|x| (x.key().clone(), x.value().clone())) .unwrap_or_else(|| (Bytes::from_static(&[]), Bytes::from_static(&[]))) } - - fn new(mut iter: SkipMapRangeIter<'a>) -> Self { - let entry = iter.next(); - - Self { - item: Self::entry_to_item(entry), - iter, - } - } } -impl StorageIterator for MemTableIterator<'_> { +impl StorageIterator for MemTableIterator { fn value(&self) -> &[u8] { - &self.item.1[..] + &self.borrow_item().1[..] } fn key(&self) -> &[u8] { - &self.item.0[..] + &self.borrow_item().0[..] } fn is_valid(&self) -> bool { - !self.item.0.is_empty() + !self.borrow_item().0.is_empty() } fn next(&mut self) -> Result<()> { - let entry = self.iter.next(); - self.item = Self::entry_to_item(entry); + let entry = self.with_iter_mut(|iter| MemTableIterator::entry_to_item(iter.next())); + self.with_mut(|x| *x.item = entry); Ok(()) } } #[cfg(test)] -#[path = "mem_table_test.rs"] mod tests; diff --git a/mini-lsm/src/mem_table_test.rs b/mini-lsm/src/mem_table/tests.rs similarity index 100% rename from mini-lsm/src/mem_table_test.rs rename to mini-lsm/src/mem_table/tests.rs diff --git a/mini-lsm/src/tests.rs b/mini-lsm/src/tests.rs new file mode 100644 index 0000000..da46569 --- /dev/null +++ b/mini-lsm/src/tests.rs @@ -0,0 +1 @@ +pub mod day3_tests; diff --git a/mini-lsm/src/tests/day3_tests.rs b/mini-lsm/src/tests/day3_tests.rs new file mode 100644 index 0000000..1189910 --- /dev/null +++ b/mini-lsm/src/tests/day3_tests.rs @@ -0,0 +1,107 @@ +use std::ops::Bound; + +use bytes::Bytes; + +use crate::iterators::impls::StorageIterator; + +fn as_bytes(x: &[u8]) -> Bytes { + Bytes::copy_from_slice(x) +} + +fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { + let mut iter = iter; + for (k, v) in expected { + assert!(iter.is_valid()); + assert_eq!( + k, + iter.key(), + "expected key: {:?}, actual key: {:?}", + k, + as_bytes(iter.key()), + ); + assert_eq!( + v, + iter.value(), + "expected value: {:?}, actual value: {:?}", + v, + as_bytes(iter.value()), + ); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + +#[test] +fn test_storage_get() { + use crate::lsm_storage::LsmStorage; + + let storage = LsmStorage::open("").unwrap(); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.put(b"3", b"23333").unwrap(); + assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233"); + assert_eq!(&storage.get(b"2").unwrap().unwrap()[..], b"2333"); + assert_eq!(&storage.get(b"3").unwrap().unwrap()[..], b"23333"); + storage.delete(b"2").unwrap(); + assert!(storage.get(b"2").unwrap().is_none()); +} + +#[test] +fn test_storage_scan_memtable_1() { + use crate::lsm_storage::LsmStorage; + + let storage = LsmStorage::open("").unwrap(); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.put(b"3", b"23333").unwrap(); + storage.delete(b"2").unwrap(); + check_iter_result( + storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(), + vec![ + (Bytes::from("1"), Bytes::from("233")), + (Bytes::from("3"), Bytes::from("23333")), + ], + ); + check_iter_result( + storage + .scan(Bound::Included(b"1"), Bound::Included(b"2")) + .unwrap(), + vec![(Bytes::from("1"), Bytes::from("233"))], + ); + check_iter_result( + storage + .scan(Bound::Excluded(b"1"), Bound::Excluded(b"3")) + .unwrap(), + vec![], + ); +} + +#[test] +fn test_storage_scan_memtable_2() { + use crate::lsm_storage::LsmStorage; + + let storage = LsmStorage::open("").unwrap(); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.put(b"3", b"23333").unwrap(); + storage.delete(b"1").unwrap(); + check_iter_result( + storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(), + vec![ + (Bytes::from("2"), Bytes::from("2333")), + (Bytes::from("3"), Bytes::from("23333")), + ], + ); + check_iter_result( + storage + .scan(Bound::Included(b"1"), Bound::Included(b"2")) + .unwrap(), + vec![(Bytes::from("2"), Bytes::from("2333"))], + ); + check_iter_result( + storage + .scan(Bound::Excluded(b"1"), Bound::Excluded(b"3")) + .unwrap(), + vec![(Bytes::from("2"), Bytes::from("2333"))], + ); +} diff --git a/xtask/src/main.rs b/xtask/src/main.rs index dd55f20..46e1bbc 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -9,6 +9,7 @@ use duct::cmd; enum CopyTestAction { Day1, Day2, + Day3, } #[derive(clap::Subcommand, Debug)] @@ -141,6 +142,26 @@ fn copy_test_case(test: CopyTestAction) -> Result<()> { ) .run()?; } + CopyTestAction::Day3 => { + cmd!( + "cp", + "mini-lsm/src/mem_table/tests.rs", + "mini-lsm-starter/src/mem_table/tests.rs" + ) + .run()?; + cmd!( + "cp", + "mini-lsm/src/iterators/tests/merge_iterator_test.rs", + "mini-lsm-starter/src/iterators/tests/merge_iterator_test.rs" + ) + .run()?; + cmd!( + "cp", + "mini-lsm/src/iterators/tests/two_merge_iterator_test.rs", + "mini-lsm-starter/src/iterators/tests/two_merge_iterator_test.rs" + ) + .run()?; + } } Ok(()) }