diff --git a/Cargo.lock b/Cargo.lock index 54ba756..9b7776a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,18 @@ version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" +[[package]] +name = "arc-swap" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164" + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + [[package]] name = "bitflags" version = "1.3.2" @@ -26,6 +38,12 @@ version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a20104e2335ce8a659d6dd92a51a767a0c062599c73b343fd152cb401e828c3d" +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "clap" version = "4.0.32" @@ -77,6 +95,40 @@ dependencies = [ "winapi", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-skiplist" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af28cff91e276c27eb739e14b0dda13dfd4cbab8364ebdb8d517a4c5454a227a" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" +dependencies = [ + "cfg-if", +] + [[package]] name = "duct" version = "0.13.6" @@ -171,12 +223,34 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" +[[package]] +name = "lock_api" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "mini-lsm" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "bytes", + "crossbeam-skiplist", + "parking_lot", ] [[package]] @@ -184,7 +258,10 @@ name = "mini-lsm-starter" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "bytes", + "crossbeam-skiplist", + "parking_lot", ] [[package]] @@ -219,6 +296,29 @@ version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -261,6 +361,15 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags", +] + [[package]] name = "rustix" version = "0.36.5" @@ -275,6 +384,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "shared_child" version = "1.0.0" @@ -285,6 +400,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "smallvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" + [[package]] name = "strsim" version = "0.10.0" diff --git a/mini-lsm-book/src/00-overview.md b/mini-lsm-book/src/00-overview.md index 1701831..2738ab8 100644 --- a/mini-lsm-book/src/00-overview.md +++ b/mini-lsm-book/src/00-overview.md @@ -42,6 +42,7 @@ The storage engine generally provides the following interfaces: * `Put(key, value)`: store a key-value pair in the LSM tree. * `Delete(key)`: remove a key and its corresponding value. * `Get(key)`: get the value corresponding to a key. +* `Scan(range)`: get a range of key-value pairs. To ensure persistence, diff --git a/mini-lsm-starter/Cargo.toml b/mini-lsm-starter/Cargo.toml index 7846954..2931aa4 100644 --- a/mini-lsm-starter/Cargo.toml +++ b/mini-lsm-starter/Cargo.toml @@ -6,4 +6,7 @@ publish = false [dependencies] anyhow = "1" +arc-swap = "1" bytes = "1" +crossbeam-skiplist = "0.1" +parking_lot = "0.12" diff --git a/mini-lsm-starter/src/block.rs b/mini-lsm-starter/src/block.rs index d672036..98aedfd 100644 --- a/mini-lsm-starter/src/block.rs +++ b/mini-lsm-starter/src/block.rs @@ -4,12 +4,12 @@ mod builder; mod iterator; -use bytes::Bytes; - pub use builder::BlockBuilder; +use bytes::Bytes; pub use iterator::BlockIterator; -/// A block is the smallest unit of read and caching in LSM tree. It is a collection of sorted key-value pairs. +/// A block is the smallest unit of read and caching in LSM tree. It is a collection of sorted +/// key-value pairs. pub struct Block { data: Vec, offsets: Vec, diff --git a/mini-lsm-starter/src/iterators.rs b/mini-lsm-starter/src/iterators.rs new file mode 100644 index 0000000..19f6c65 --- /dev/null +++ b/mini-lsm-starter/src/iterators.rs @@ -0,0 +1,6 @@ +pub mod impls; +pub mod merge_iterator; +pub mod two_merge_iterator; + +#[cfg(test)] +mod tests; diff --git a/mini-lsm-starter/src/iterators/impls.rs b/mini-lsm-starter/src/iterators/impls.rs new file mode 100644 index 0000000..a05c75c --- /dev/null +++ b/mini-lsm-starter/src/iterators/impls.rs @@ -0,0 +1,15 @@ +use anyhow::Result; + +pub trait StorageIterator { + /// Get the current value. + fn value(&self) -> &[u8]; + + /// Get the current key. + fn key(&self) -> &[u8]; + + /// Check if the current iterator is valid. + fn is_valid(&self) -> bool; + + /// Move to the next position. + fn next(&mut self) -> Result<()>; +} diff --git a/mini-lsm-starter/src/iterators/merge_iterator.rs b/mini-lsm-starter/src/iterators/merge_iterator.rs new file mode 100644 index 0000000..fa798b9 --- /dev/null +++ b/mini-lsm-starter/src/iterators/merge_iterator.rs @@ -0,0 +1,67 @@ +#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod +#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod + +use std::cmp::{self}; +use std::collections::BinaryHeap; + +use anyhow::Result; + +use super::impls::StorageIterator; + +struct HeapWrapper(pub usize, pub Box); + +impl PartialEq for HeapWrapper { + fn eq(&self, other: &Self) -> bool { + self.partial_cmp(other).unwrap() == cmp::Ordering::Equal + } +} + +impl Eq for HeapWrapper {} + +impl PartialOrd for HeapWrapper { + fn partial_cmp(&self, other: &Self) -> Option { + match self.1.key().cmp(other.1.key()) { + cmp::Ordering::Greater => Some(cmp::Ordering::Greater), + cmp::Ordering::Less => Some(cmp::Ordering::Less), + cmp::Ordering::Equal => self.0.partial_cmp(&other.0), + } + .map(|x| x.reverse()) + } +} + +impl Ord for HeapWrapper { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.partial_cmp(other).unwrap() + } +} + +/// Merge multiple iterators of the same type. If the same key occurs multiple times in some +/// iterators, perfer the one with smaller index. +pub struct MergeIterator { + iters: BinaryHeap>, + current: HeapWrapper, +} + +impl MergeIterator { + pub fn create(iters: Vec>) -> Self { + unimplemented!() + } +} + +impl StorageIterator for MergeIterator { + fn key(&self) -> &[u8] { + unimplemented!() + } + + fn value(&self) -> &[u8] { + unimplemented!() + } + + fn is_valid(&self) -> bool { + unimplemented!() + } + + fn next(&mut self) -> Result<()> { + unimplemented!() + } +} diff --git a/mini-lsm-starter/src/iterators/tests.rs b/mini-lsm-starter/src/iterators/tests.rs new file mode 100644 index 0000000..6bcf0ef --- /dev/null +++ b/mini-lsm-starter/src/iterators/tests.rs @@ -0,0 +1,4 @@ +//! Please copy `mini-lsm/src/iterators/tests.rs` here so that you can run tests. + +pub mod merge_iterator_test; +pub mod two_merge_iterator_test; diff --git a/mini-lsm-starter/src/iterators/tests/merge_iterator_test.rs b/mini-lsm-starter/src/iterators/tests/merge_iterator_test.rs new file mode 100644 index 0000000..ea5b8cf --- /dev/null +++ b/mini-lsm-starter/src/iterators/tests/merge_iterator_test.rs @@ -0,0 +1,2 @@ +//! Please copy `mini-lsm/src/iterators/tests/merge_iterator_test.rs` here so that you can run +//! tests. diff --git a/mini-lsm-starter/src/iterators/tests/two_merge_iterator_test.rs b/mini-lsm-starter/src/iterators/tests/two_merge_iterator_test.rs new file mode 100644 index 0000000..78d324b --- /dev/null +++ b/mini-lsm-starter/src/iterators/tests/two_merge_iterator_test.rs @@ -0,0 +1,2 @@ +//! Please copy `mini-lsm/src/iterators/tests/two_merge_iterator_test.rs` here so that you can run +//! tests. diff --git a/mini-lsm-starter/src/iterators/two_merge_iterator.rs b/mini-lsm-starter/src/iterators/two_merge_iterator.rs new file mode 100644 index 0000000..3b9d682 --- /dev/null +++ b/mini-lsm-starter/src/iterators/two_merge_iterator.rs @@ -0,0 +1,38 @@ +#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod +#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod + +use anyhow::Result; + +use super::impls::StorageIterator; + +/// Merges two iterators of different types into one. If the two iterators have the same key, only +/// produce the key once and prefer the entry from A. +pub struct TwoMergeIterator { + a: A, + b: B, + // Add fields as need +} + +impl TwoMergeIterator { + pub fn create(a: A, b: B) -> Result { + unimplemented!() + } +} + +impl StorageIterator for TwoMergeIterator { + fn key(&self) -> &[u8] { + unimplemented!() + } + + fn value(&self) -> &[u8] { + unimplemented!() + } + + fn is_valid(&self) -> bool { + unimplemented!() + } + + fn next(&mut self) -> Result<()> { + unimplemented!() + } +} diff --git a/mini-lsm-starter/src/lib.rs b/mini-lsm-starter/src/lib.rs index 3ed909f..c78ce8a 100644 --- a/mini-lsm-starter/src/lib.rs +++ b/mini-lsm-starter/src/lib.rs @@ -1,3 +1,6 @@ pub mod block; -pub mod storage; +pub mod iterators; +pub mod lsm_iterator; +pub mod lsm_storage; +pub mod mem_table; pub mod table; diff --git a/mini-lsm-starter/src/lsm_iterator.rs b/mini-lsm-starter/src/lsm_iterator.rs new file mode 100644 index 0000000..8de188e --- /dev/null +++ b/mini-lsm-starter/src/lsm_iterator.rs @@ -0,0 +1 @@ +pub struct LsmIterator {} diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs new file mode 100644 index 0000000..54a29b2 --- /dev/null +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -0,0 +1,75 @@ +use std::ops::Bound; +use std::path::Path; +use std::sync::Arc; + +use anyhow::Result; +use arc_swap::ArcSwap; +use bytes::Bytes; + +use crate::lsm_iterator::LsmIterator; +use crate::mem_table::MemTable; +use crate::table::{SsTable, SsTableIterator}; + +pub struct LsmStorageInner { + memtables: Vec>, + sstables: Vec>, +} + +impl LsmStorageInner { + fn create() -> Self { + Self { + memtables: vec![Arc::new(MemTable::create())], + sstables: vec![], + } + } +} + +/// The storage interface of the LSM tree. +pub struct LsmStorage { + inner: ArcSwap, +} + +impl LsmStorage { + pub fn open(_path: &Path) -> Result { + Ok(Self { + inner: ArcSwap::from_pointee(LsmStorageInner::create()), + }) + } + + pub fn get(&self, key: &[u8]) -> Result> { + let snapshot = self.inner.load(); + for memtable in &snapshot.memtables { + if let Some(value) = memtable.get(key)? { + if value.is_empty() { + // found tomestone, return key not exists + return Ok(None); + } + return Ok(Some(value)); + } + } + let mut iters = Vec::new(); + iters.reserve(snapshot.sstables.len()); + for table in snapshot.sstables.iter().rev() { + iters.push(SsTableIterator::create_and_seek_to_key(table.clone(), key)?); + } + Ok(None) + } + + pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + assert!(!value.is_empty(), "value cannot be empty"); + assert!(!key.is_empty(), "key cannot be empty"); + unimplemented!() + } + + pub fn delete(&mut self, _key: &[u8]) -> Result<()> { + unimplemented!() + } + + pub fn sync(&mut self) -> Result<()> { + unimplemented!() + } + + pub fn scan(&self, _lower: Bound<&[u8]>, _upper: Bound<&[u8]>) -> Result { + unimplemented!() + } +} diff --git a/mini-lsm-starter/src/mem_table.rs b/mini-lsm-starter/src/mem_table.rs new file mode 100644 index 0000000..aa58e58 --- /dev/null +++ b/mini-lsm-starter/src/mem_table.rs @@ -0,0 +1,79 @@ +#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod +#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod + +use std::ops::Bound; + +use anyhow::Result; +use bytes::Bytes; +use crossbeam_skiplist::SkipMap; + +use crate::iterators::impls::StorageIterator; +use crate::table::SsTableBuilder; + +/// A basic mem-table based on crossbeam-skiplist +pub struct MemTable { + map: SkipMap, +} + +impl MemTable { + /// Create a new mem-table. + pub fn create() -> Self { + unimplemented!() + } + + /// Get a value by key. + pub fn get(&self, key: &[u8]) -> Result> { + unimplemented!() + } + + /// Put a key-value pair into the mem-table. + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + unimplemented!() + } + + /// Get an iterator over a range of keys. + pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { + unimplemented!() + } + + /// Flush the mem-table to SSTable. + pub fn flush(&self, builder: &mut SsTableBuilder) -> Result<()> { + unimplemented!() + } +} + +type SkipMapRangeIter<'a> = + crossbeam_skiplist::map::Range<'a, Bytes, (Bound, Bound), Bytes, Bytes>; + +/// An iterator over a range of `SkipMap`. +pub struct MemTableIterator<'a> { + _phantom: std::marker::PhantomData<&'a ()>, +} + +impl<'a> MemTableIterator<'a> { + fn new(iter: SkipMapRangeIter<'a>) -> Self { + unimplemented!() + } +} + +impl StorageIterator for MemTableIterator<'_> { + fn value(&self) -> &[u8] { + unimplemented!() + } + + fn key(&self) -> &[u8] { + unimplemented!() + } + + fn is_valid(&self) -> bool { + unimplemented!() + } + + fn next(&mut self) -> Result<()> { + unimplemented!() + } +} + +#[cfg(test)] +#[path = "mem_table_test.rs"] +mod tests; diff --git a/mini-lsm-starter/src/mem_table_test.rs b/mini-lsm-starter/src/mem_table_test.rs new file mode 100644 index 0000000..fe2039e --- /dev/null +++ b/mini-lsm-starter/src/mem_table_test.rs @@ -0,0 +1 @@ +//! Please copy `mini-lsm/src/mem_table_test.rs` here so that you can run tests. diff --git a/mini-lsm-starter/src/table.rs b/mini-lsm-starter/src/table.rs index 1a55831..0e1ee80 100644 --- a/mini-lsm-starter/src/table.rs +++ b/mini-lsm-starter/src/table.rs @@ -4,14 +4,15 @@ mod builder; mod iterator; -use std::{path::Path, sync::Arc}; +use std::path::Path; +use std::sync::Arc; +use anyhow::Result; pub use builder::SsTableBuilder; use bytes::{Buf, Bytes}; pub use iterator::SsTableIterator; use crate::block::Block; -use anyhow::Result; #[derive(Clone, Debug, PartialEq, Eq)] pub struct BlockMeta { @@ -25,7 +26,8 @@ impl BlockMeta { /// Encode block meta to a buffer. pub fn encode_block_meta( block_meta: &[BlockMeta], - #[allow(clippy::ptr_arg)] /* remove this allow after you finish */ buf: &mut Vec, + #[allow(clippy::ptr_arg)] // remove this allow after you finish + buf: &mut Vec, ) { unimplemented!() } diff --git a/mini-lsm-starter/src/table/builder.rs b/mini-lsm-starter/src/table/builder.rs index ff4561d..19d3fe2 100644 --- a/mini-lsm-starter/src/table/builder.rs +++ b/mini-lsm-starter/src/table/builder.rs @@ -1,9 +1,10 @@ #![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod #![allow(dead_code)] // TODO(you): remove this lint after implementing this mod -use anyhow::Result; use std::path::Path; +use anyhow::Result; + use super::{BlockMeta, SsTable}; /// Builds an SSTable from key-value pairs. @@ -14,17 +15,22 @@ pub struct SsTableBuilder { impl SsTableBuilder { /// Create a builder based on target SST size and target block size. - pub fn new(target_size: usize, block_size: usize) -> Self { + pub fn new(block_size: usize) -> Self { unimplemented!() } - /// Adds a key-value pair to SSTable, return false when SST full. - #[must_use] - pub fn add(&mut self, key: &[u8], value: &[u8]) -> bool { + /// Adds a key-value pair to SSTable + pub fn add(&mut self, key: &[u8], value: &[u8]) { unimplemented!() } - /// Builds the SSTable and writes it to the given path. No need to actually write to disk until chapter 4 block cache. + /// Get the estimated size of the SSTable. + pub fn estimated_size(&self) -> usize { + unimplemented!() + } + + /// Builds the SSTable and writes it to the given path. No need to actually write to disk until + /// chapter 4 block cache. pub fn build(self, path: impl AsRef) -> Result { unimplemented!() } diff --git a/mini-lsm-starter/src/table/iterator.rs b/mini-lsm-starter/src/table/iterator.rs index a14654d..302c2fd 100644 --- a/mini-lsm-starter/src/table/iterator.rs +++ b/mini-lsm-starter/src/table/iterator.rs @@ -1,9 +1,10 @@ #![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod #![allow(dead_code)] // TODO(you): remove this lint after implementing this mod -use anyhow::Result; use std::sync::Arc; +use anyhow::Result; + use super::SsTable; /// An iterator over the contents of an SSTable. diff --git a/mini-lsm/Cargo.toml b/mini-lsm/Cargo.toml index dbe801e..3e4e5c1 100644 --- a/mini-lsm/Cargo.toml +++ b/mini-lsm/Cargo.toml @@ -12,4 +12,7 @@ description = "A tutorial for building an LSM tree storage engine in a week." [dependencies] anyhow = "1" +arc-swap = "1" bytes = "1" +crossbeam-skiplist = "0.1" +parking_lot = "0.12" diff --git a/mini-lsm/src/block.rs b/mini-lsm/src/block.rs index 97e072a..e60415a 100644 --- a/mini-lsm/src/block.rs +++ b/mini-lsm/src/block.rs @@ -1,14 +1,14 @@ mod builder; mod iterator; -use bytes::{Buf, BufMut, Bytes}; - pub use builder::BlockBuilder; +use bytes::{Buf, BufMut, Bytes}; pub use iterator::BlockIterator; pub const SIZEOF_U16: usize = std::mem::size_of::(); -/// A block is the smallest unit of read and caching in LSM tree. It is a collection of sorted key-value pairs. +/// A block is the smallest unit of read and caching in LSM tree. It is a collection of sorted +/// key-value pairs. pub struct Block { data: Vec, offsets: Vec, diff --git a/mini-lsm/src/block/tests.rs b/mini-lsm/src/block/tests.rs index ae52135..52b2b12 100644 --- a/mini-lsm/src/block/tests.rs +++ b/mini-lsm/src/block/tests.rs @@ -1,6 +1,8 @@ use std::sync::Arc; -use super::{builder::BlockBuilder, iterator::BlockIterator, *}; +use super::builder::BlockBuilder; +use super::iterator::BlockIterator; +use super::*; #[test] fn test_block_build_single_key() { diff --git a/mini-lsm/src/iterators.rs b/mini-lsm/src/iterators.rs new file mode 100644 index 0000000..19f6c65 --- /dev/null +++ b/mini-lsm/src/iterators.rs @@ -0,0 +1,6 @@ +pub mod impls; +pub mod merge_iterator; +pub mod two_merge_iterator; + +#[cfg(test)] +mod tests; diff --git a/mini-lsm/src/iterators/impls.rs b/mini-lsm/src/iterators/impls.rs new file mode 100644 index 0000000..a05c75c --- /dev/null +++ b/mini-lsm/src/iterators/impls.rs @@ -0,0 +1,15 @@ +use anyhow::Result; + +pub trait StorageIterator { + /// Get the current value. + fn value(&self) -> &[u8]; + + /// Get the current key. + fn key(&self) -> &[u8]; + + /// Check if the current iterator is valid. + fn is_valid(&self) -> bool; + + /// Move to the next position. + fn next(&mut self) -> Result<()>; +} diff --git a/mini-lsm/src/iterators/merge_iterator.rs b/mini-lsm/src/iterators/merge_iterator.rs new file mode 100644 index 0000000..f4e3425 --- /dev/null +++ b/mini-lsm/src/iterators/merge_iterator.rs @@ -0,0 +1,127 @@ +use std::cmp::{self}; +use std::collections::binary_heap::PeekMut; +use std::collections::BinaryHeap; + +use anyhow::Result; + +use super::impls::StorageIterator; + +struct HeapWrapper(pub usize, pub Box); + +impl PartialEq for HeapWrapper { + fn eq(&self, other: &Self) -> bool { + self.partial_cmp(other).unwrap() == cmp::Ordering::Equal + } +} + +impl Eq for HeapWrapper {} + +impl PartialOrd for HeapWrapper { + fn partial_cmp(&self, other: &Self) -> Option { + match self.1.key().cmp(other.1.key()) { + cmp::Ordering::Greater => Some(cmp::Ordering::Greater), + cmp::Ordering::Less => Some(cmp::Ordering::Less), + cmp::Ordering::Equal => self.0.partial_cmp(&other.0), + } + .map(|x| x.reverse()) + } +} + +impl Ord for HeapWrapper { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.partial_cmp(other).unwrap() + } +} + +/// Merge multiple iterators of the same type. If the same key occurs multiple times in some +/// iterators, perfer the one with smaller index. +pub struct MergeIterator { + iters: BinaryHeap>, + current: HeapWrapper, +} + +impl MergeIterator { + pub fn create(iters: Vec>) -> Self { + assert!(!iters.is_empty()); + + let mut heap = BinaryHeap::new(); + + if iters.iter().all(|x| !x.is_valid()) { + // All invalid, select the last one as the current. + let mut iters = iters; + return Self { + iters: heap, + current: HeapWrapper(0, iters.pop().unwrap()), + }; + } + + for (idx, iter) in iters.into_iter().enumerate() { + if iter.is_valid() { + heap.push(HeapWrapper(idx, iter)); + } + } + + let current = heap.pop().unwrap(); + Self { + iters: heap, + current, + } + } +} + +impl StorageIterator for MergeIterator { + fn key(&self) -> &[u8] { + self.current.1.key() + } + + fn value(&self) -> &[u8] { + self.current.1.value() + } + + fn is_valid(&self) -> bool { + self.current.1.is_valid() + } + + fn next(&mut self) -> Result<()> { + // Pop the item out of the heap if they have the same value. + while let Some(mut inner_iter) = self.iters.peek_mut() { + debug_assert!( + inner_iter.1.key() >= self.current.1.key(), + "heap invariant violated" + ); + if inner_iter.1.key() == self.current.1.key() { + // Case 1: an error occurred when calling `next`. + if let e @ Err(_) = inner_iter.1.next() { + PeekMut::pop(inner_iter); + return e; + } + + // Case 2: iter is no longer valid. + if !inner_iter.1.is_valid() { + PeekMut::pop(inner_iter); + } + } else { + break; + } + } + + self.current.1.next()?; + + // If the current iterator is invalid, pop it out of the heap and select the next one. + if !self.current.1.is_valid() { + if let Some(iter) = self.iters.pop() { + self.current = iter; + } + return Ok(()); + } + + // Otherwise, compare with heap top and swap if necessary. + if let Some(mut inner_iter) = self.iters.peek_mut() { + if self.current < *inner_iter { + std::mem::swap(&mut *inner_iter, &mut self.current); + } + } + + Ok(()) + } +} diff --git a/mini-lsm/src/iterators/tests.rs b/mini-lsm/src/iterators/tests.rs new file mode 100644 index 0000000..e0c9f90 --- /dev/null +++ b/mini-lsm/src/iterators/tests.rs @@ -0,0 +1,40 @@ +use anyhow::Result; +use bytes::Bytes; + +use super::impls::StorageIterator; + +pub mod merge_iterator_test; +pub mod two_merge_iterator_test; + +#[derive(Clone)] +pub struct MockIterator { + pub data: Vec<(Bytes, Bytes)>, + pub index: usize, +} + +impl MockIterator { + pub fn new(data: Vec<(Bytes, Bytes)>) -> Self { + Self { data, index: 0 } + } +} + +impl StorageIterator for MockIterator { + fn next(&mut self) -> Result<()> { + if self.index < self.data.len() { + self.index += 1; + } + Ok(()) + } + + fn key(&self) -> &[u8] { + self.data[self.index].0.as_ref() + } + + fn value(&self) -> &[u8] { + self.data[self.index].1.as_ref() + } + + fn is_valid(&self) -> bool { + self.index < self.data.len() + } +} diff --git a/mini-lsm/src/iterators/tests/merge_iterator_test.rs b/mini-lsm/src/iterators/tests/merge_iterator_test.rs new file mode 100644 index 0000000..0a9e4d9 --- /dev/null +++ b/mini-lsm/src/iterators/tests/merge_iterator_test.rs @@ -0,0 +1,131 @@ +use super::*; +use crate::iterators::merge_iterator::MergeIterator; + +fn as_bytes(x: &[u8]) -> Bytes { + Bytes::copy_from_slice(x) +} + +fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { + let mut iter = iter; + for (k, v) in expected { + assert!(iter.is_valid()); + assert_eq!( + k, + iter.key(), + "expected key: {:?}, actual key: {:?}", + k, + as_bytes(iter.key()), + ); + assert_eq!( + v, + iter.value(), + "expected value: {:?}, actual value: {:?}", + v, + as_bytes(iter.value()), + ); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + +#[test] +fn test_merge_1() { + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let i3 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.3")), + (Bytes::from("c"), Bytes::from("3.3")), + (Bytes::from("d"), Bytes::from("4.3")), + ]); + + let iter = MergeIterator::create(vec![ + Box::new(i1.clone()), + Box::new(i2.clone()), + Box::new(i3.clone()), + ]); + + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ); + + let iter = MergeIterator::create(vec![Box::new(i3), Box::new(i1), Box::new(i2)]); + + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.3")), + (Bytes::from("c"), Bytes::from("3.3")), + (Bytes::from("d"), Bytes::from("4.3")), + ], + ); +} + +#[test] +fn test_merge_2() { + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i2 = MockIterator::new(vec![ + (Bytes::from("d"), Bytes::from("1.2")), + (Bytes::from("e"), Bytes::from("2.2")), + (Bytes::from("f"), Bytes::from("3.2")), + (Bytes::from("g"), Bytes::from("4.2")), + ]); + let i3 = MockIterator::new(vec![ + (Bytes::from("h"), Bytes::from("1.3")), + (Bytes::from("i"), Bytes::from("2.3")), + (Bytes::from("j"), Bytes::from("3.3")), + (Bytes::from("k"), Bytes::from("4.3")), + ]); + let i4 = MockIterator::new(vec![]); + let result = vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + (Bytes::from("d"), Bytes::from("1.2")), + (Bytes::from("e"), Bytes::from("2.2")), + (Bytes::from("f"), Bytes::from("3.2")), + (Bytes::from("g"), Bytes::from("4.2")), + (Bytes::from("h"), Bytes::from("1.3")), + (Bytes::from("i"), Bytes::from("2.3")), + (Bytes::from("j"), Bytes::from("3.3")), + (Bytes::from("k"), Bytes::from("4.3")), + ]; + + let iter = MergeIterator::create(vec![ + Box::new(i1.clone()), + Box::new(i2.clone()), + Box::new(i3.clone()), + Box::new(i4.clone()), + ]); + check_iter_result(iter, result.clone()); + + let iter = MergeIterator::create(vec![ + Box::new(i2.clone()), + Box::new(i4.clone()), + Box::new(i3.clone()), + Box::new(i1.clone()), + ]); + check_iter_result(iter, result.clone()); + + let iter = MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]); + check_iter_result(iter, result); +} diff --git a/mini-lsm/src/iterators/tests/two_merge_iterator_test.rs b/mini-lsm/src/iterators/tests/two_merge_iterator_test.rs new file mode 100644 index 0000000..1719bf3 --- /dev/null +++ b/mini-lsm/src/iterators/tests/two_merge_iterator_test.rs @@ -0,0 +1,129 @@ +use super::*; +use crate::iterators::two_merge_iterator::TwoMergeIterator; + +fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { + let mut iter = iter; + for (k, v) in expected { + assert!(iter.is_valid()); + assert_eq!(iter.key(), k.as_ref()); + assert_eq!(iter.value(), v.as_ref()); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + +#[test] +fn test_merge_1() { + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ) +} + +#[test] +fn test_merge_2() { + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ) +} + +#[test] +fn test_merge_3() { + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i1 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ) +} + +#[test] +fn test_merge_4() { + let i2 = MockIterator::new(vec![]); + let i1 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ); + let i1 = MockIterator::new(vec![]); + let i2 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ); +} + +#[test] +fn test_merge_5() { + let i2 = MockIterator::new(vec![]); + let i1 = MockIterator::new(vec![]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result(iter, vec![]) +} diff --git a/mini-lsm/src/iterators/two_merge_iterator.rs b/mini-lsm/src/iterators/two_merge_iterator.rs new file mode 100644 index 0000000..7789ace --- /dev/null +++ b/mini-lsm/src/iterators/two_merge_iterator.rs @@ -0,0 +1,80 @@ +use anyhow::Result; + +use super::impls::StorageIterator; + +/// Merges two iterators of different types into one. If the two iterators have the same key, only +/// produce the key once and prefer the entry from A. +pub struct TwoMergeIterator { + a: A, + b: B, + choose_a: bool, +} + +impl TwoMergeIterator { + fn choose_a(a: &A, b: &B) -> bool { + if !a.is_valid() { + return false; + } + if !b.is_valid() { + return true; + } + a.key() < b.key() + } + + fn skip_b(&mut self) -> Result<()> { + if self.a.is_valid() { + while self.b.is_valid() && self.b.key() == self.a.key() { + self.b.next()?; + } + } + Ok(()) + } + + pub fn create(a: A, b: B) -> Result { + let mut iter = Self { + choose_a: false, + a, + b, + }; + iter.skip_b()?; + iter.choose_a = Self::choose_a(&iter.a, &iter.b); + Ok(iter) + } +} + +impl StorageIterator for TwoMergeIterator { + fn key(&self) -> &[u8] { + if self.choose_a { + self.a.key() + } else { + self.b.key() + } + } + + fn value(&self) -> &[u8] { + if self.choose_a { + self.a.value() + } else { + self.b.value() + } + } + + fn is_valid(&self) -> bool { + if self.choose_a { + self.a.is_valid() + } else { + self.b.is_valid() + } + } + + fn next(&mut self) -> Result<()> { + if self.choose_a { + self.a.next()?; + } else { + self.b.next()?; + } + self.skip_b()?; + self.choose_a = Self::choose_a(&self.a, &self.b); + Ok(()) + } +} diff --git a/mini-lsm/src/lib.rs b/mini-lsm/src/lib.rs index 3ed909f..c78ce8a 100644 --- a/mini-lsm/src/lib.rs +++ b/mini-lsm/src/lib.rs @@ -1,3 +1,6 @@ pub mod block; -pub mod storage; +pub mod iterators; +pub mod lsm_iterator; +pub mod lsm_storage; +pub mod mem_table; pub mod table; diff --git a/mini-lsm/src/lsm_iterator.rs b/mini-lsm/src/lsm_iterator.rs new file mode 100644 index 0000000..8de188e --- /dev/null +++ b/mini-lsm/src/lsm_iterator.rs @@ -0,0 +1 @@ +pub struct LsmIterator {} diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs new file mode 100644 index 0000000..54a29b2 --- /dev/null +++ b/mini-lsm/src/lsm_storage.rs @@ -0,0 +1,75 @@ +use std::ops::Bound; +use std::path::Path; +use std::sync::Arc; + +use anyhow::Result; +use arc_swap::ArcSwap; +use bytes::Bytes; + +use crate::lsm_iterator::LsmIterator; +use crate::mem_table::MemTable; +use crate::table::{SsTable, SsTableIterator}; + +pub struct LsmStorageInner { + memtables: Vec>, + sstables: Vec>, +} + +impl LsmStorageInner { + fn create() -> Self { + Self { + memtables: vec![Arc::new(MemTable::create())], + sstables: vec![], + } + } +} + +/// The storage interface of the LSM tree. +pub struct LsmStorage { + inner: ArcSwap, +} + +impl LsmStorage { + pub fn open(_path: &Path) -> Result { + Ok(Self { + inner: ArcSwap::from_pointee(LsmStorageInner::create()), + }) + } + + pub fn get(&self, key: &[u8]) -> Result> { + let snapshot = self.inner.load(); + for memtable in &snapshot.memtables { + if let Some(value) = memtable.get(key)? { + if value.is_empty() { + // found tomestone, return key not exists + return Ok(None); + } + return Ok(Some(value)); + } + } + let mut iters = Vec::new(); + iters.reserve(snapshot.sstables.len()); + for table in snapshot.sstables.iter().rev() { + iters.push(SsTableIterator::create_and_seek_to_key(table.clone(), key)?); + } + Ok(None) + } + + pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + assert!(!value.is_empty(), "value cannot be empty"); + assert!(!key.is_empty(), "key cannot be empty"); + unimplemented!() + } + + pub fn delete(&mut self, _key: &[u8]) -> Result<()> { + unimplemented!() + } + + pub fn sync(&mut self) -> Result<()> { + unimplemented!() + } + + pub fn scan(&self, _lower: Bound<&[u8]>, _upper: Bound<&[u8]>) -> Result { + unimplemented!() + } +} diff --git a/mini-lsm/src/mem_table.rs b/mini-lsm/src/mem_table.rs new file mode 100644 index 0000000..14041b4 --- /dev/null +++ b/mini-lsm/src/mem_table.rs @@ -0,0 +1,110 @@ +use std::ops::Bound; + +use anyhow::Result; +use bytes::Bytes; +use crossbeam_skiplist::map::Entry; +use crossbeam_skiplist::SkipMap; + +use crate::iterators::impls::StorageIterator; +use crate::table::SsTableBuilder; + +/// A basic mem-table based on crossbeam-skiplist +pub struct MemTable { + map: SkipMap, +} + +impl MemTable { + /// Create a new mem-table. + pub fn create() -> Self { + Self { + map: SkipMap::new(), + } + } + + /// Get a value by key. + pub fn get(&self, key: &[u8]) -> Result> { + let entry = self.map.get(key).map(|e| e.value().clone()); + Ok(entry) + } + + /// Put a key-value pair into the mem-table. + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.map + .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); + Ok(()) + } + + fn map_bound(bound: Bound<&[u8]>) -> Bound { + match bound { + Bound::Included(x) => Bound::Included(Bytes::copy_from_slice(x)), + Bound::Excluded(x) => Bound::Excluded(Bytes::copy_from_slice(x)), + Bound::Unbounded => Bound::Unbounded, + } + } + + /// Get an iterator over a range of keys. + pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { + let iter = self + .map + .range((Self::map_bound(lower), Self::map_bound(upper))); + Ok(MemTableIterator::new(iter)) + } + + /// Flush the mem-table to SSTable. + pub fn flush(&self, builder: &mut SsTableBuilder) -> Result<()> { + for entry in self.map.iter() { + builder.add(&entry.key()[..], &entry.value()[..]); + } + Ok(()) + } +} + +type SkipMapRangeIter<'a> = + crossbeam_skiplist::map::Range<'a, Bytes, (Bound, Bound), Bytes, Bytes>; + +/// An iterator over a range of `SkipMap`. +pub struct MemTableIterator<'a> { + iter: SkipMapRangeIter<'a>, + item: (Bytes, Bytes), +} + +impl<'a> MemTableIterator<'a> { + fn entry_to_item(entry: Option>) -> (Bytes, Bytes) { + entry + .map(|x| (x.key().clone(), x.value().clone())) + .unwrap_or_else(|| (Bytes::from_static(&[]), Bytes::from_static(&[]))) + } + + fn new(mut iter: SkipMapRangeIter<'a>) -> Self { + let entry = iter.next(); + + Self { + item: Self::entry_to_item(entry), + iter, + } + } +} + +impl StorageIterator for MemTableIterator<'_> { + fn value(&self) -> &[u8] { + &self.item.1[..] + } + + fn key(&self) -> &[u8] { + &self.item.0[..] + } + + fn is_valid(&self) -> bool { + !self.item.0.is_empty() + } + + fn next(&mut self) -> Result<()> { + let entry = self.iter.next(); + self.item = Self::entry_to_item(entry); + Ok(()) + } +} + +#[cfg(test)] +#[path = "mem_table_test.rs"] +mod tests; diff --git a/mini-lsm/src/mem_table_test.rs b/mini-lsm/src/mem_table_test.rs new file mode 100644 index 0000000..4b1f7d0 --- /dev/null +++ b/mini-lsm/src/mem_table_test.rs @@ -0,0 +1,96 @@ +use super::MemTable; +use crate::iterators::impls::StorageIterator; +use crate::table::{SsTableBuilder, SsTableIterator}; + +#[test] +fn test_memtable_get() { + let memtable = MemTable::create(); + memtable.put(b"key1", b"value1").unwrap(); + memtable.put(b"key2", b"value2").unwrap(); + memtable.put(b"key3", b"value3").unwrap(); + assert_eq!(&memtable.get(b"key1").unwrap().unwrap()[..], b"value1"); + assert_eq!(&memtable.get(b"key2").unwrap().unwrap()[..], b"value2"); + assert_eq!(&memtable.get(b"key3").unwrap().unwrap()[..], b"value3"); +} + +#[test] +fn test_memtable_overwrite() { + let memtable = MemTable::create(); + memtable.put(b"key1", b"value1").unwrap(); + memtable.put(b"key2", b"value2").unwrap(); + memtable.put(b"key3", b"value3").unwrap(); + memtable.put(b"key1", b"value11").unwrap(); + memtable.put(b"key2", b"value22").unwrap(); + memtable.put(b"key3", b"value33").unwrap(); + assert_eq!(&memtable.get(b"key1").unwrap().unwrap()[..], b"value11"); + assert_eq!(&memtable.get(b"key2").unwrap().unwrap()[..], b"value22"); + assert_eq!(&memtable.get(b"key3").unwrap().unwrap()[..], b"value33"); +} + +#[test] +fn test_memtable_flush() { + let memtable = MemTable::create(); + memtable.put(b"key1", b"value1").unwrap(); + memtable.put(b"key2", b"value2").unwrap(); + memtable.put(b"key3", b"value3").unwrap(); + let mut builder = SsTableBuilder::new(128); + memtable.flush(&mut builder).unwrap(); + let sst = builder.build("").unwrap(); + let mut iter = SsTableIterator::create_and_seek_to_first(sst.into()).unwrap(); + assert_eq!(iter.key(), b"key1"); + assert_eq!(iter.value(), b"value1"); + iter.next().unwrap(); + assert_eq!(iter.key(), b"key2"); + assert_eq!(iter.value(), b"value2"); + iter.next().unwrap(); + assert_eq!(iter.key(), b"key3"); + assert_eq!(iter.value(), b"value3"); + iter.next().unwrap(); + assert!(!iter.is_valid()); +} + +#[test] +fn test_memtable_iter() { + use std::ops::Bound; + let memtable = MemTable::create(); + memtable.put(b"key1", b"value1").unwrap(); + memtable.put(b"key2", b"value2").unwrap(); + memtable.put(b"key3", b"value3").unwrap(); + + { + let mut iter = memtable.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); + assert_eq!(iter.key(), b"key1"); + assert_eq!(iter.value(), b"value1"); + iter.next().unwrap(); + assert_eq!(iter.key(), b"key2"); + assert_eq!(iter.value(), b"value2"); + iter.next().unwrap(); + assert_eq!(iter.key(), b"key3"); + assert_eq!(iter.value(), b"value3"); + iter.next().unwrap(); + assert!(!iter.is_valid()); + } + + { + let mut iter = memtable + .scan(Bound::Included(b"key1"), Bound::Included(b"key2")) + .unwrap(); + assert_eq!(iter.key(), b"key1"); + assert_eq!(iter.value(), b"value1"); + iter.next().unwrap(); + assert_eq!(iter.key(), b"key2"); + assert_eq!(iter.value(), b"value2"); + iter.next().unwrap(); + assert!(!iter.is_valid()); + } + + { + let mut iter = memtable + .scan(Bound::Excluded(b"key1"), Bound::Excluded(b"key3")) + .unwrap(); + assert_eq!(iter.key(), b"key2"); + assert_eq!(iter.value(), b"value2"); + iter.next().unwrap(); + assert!(!iter.is_valid()); + } +} diff --git a/mini-lsm/src/storage.rs b/mini-lsm/src/storage.rs deleted file mode 100644 index 8a0cef5..0000000 --- a/mini-lsm/src/storage.rs +++ /dev/null @@ -1 +0,0 @@ -pub struct Storage {} diff --git a/mini-lsm/src/table.rs b/mini-lsm/src/table.rs index ebc4b4b..fab64a3 100644 --- a/mini-lsm/src/table.rs +++ b/mini-lsm/src/table.rs @@ -1,14 +1,15 @@ mod builder; mod iterator; -use std::{path::Path, sync::Arc}; +use std::path::Path; +use std::sync::Arc; +use anyhow::Result; pub use builder::SsTableBuilder; use bytes::{Buf, BufMut, Bytes}; pub use iterator::SsTableIterator; use crate::block::Block; -use anyhow::Result; #[derive(Clone, Debug, PartialEq, Eq)] pub struct BlockMeta { diff --git a/mini-lsm/src/table/builder.rs b/mini-lsm/src/table/builder.rs index c128ed1..9fb33e6 100644 --- a/mini-lsm/src/table/builder.rs +++ b/mini-lsm/src/table/builder.rs @@ -1,6 +1,7 @@ +use std::path::Path; + use anyhow::Result; use bytes::BufMut; -use std::path::Path; use super::{BlockMeta, FileObject, SsTable}; use crate::block::BlockBuilder; @@ -11,36 +12,29 @@ pub struct SsTableBuilder { first_key: Vec, data: Vec, pub(super) meta: Vec, - target_size: usize, block_size: usize, } impl SsTableBuilder { /// Create a builder based on target SST size and target block size. - pub fn new(target_size: usize, block_size: usize) -> Self { + pub fn new(block_size: usize) -> Self { Self { data: Vec::new(), meta: Vec::new(), first_key: Vec::new(), - target_size, block_size, builder: BlockBuilder::new(block_size), } } - /// Adds a key-value pair to SSTable, return false when SST full. - #[must_use] - pub fn add(&mut self, key: &[u8], value: &[u8]) -> bool { - if self.data.len() > self.target_size { - return false; - } - + /// Adds a key-value pair to SSTable + pub fn add(&mut self, key: &[u8], value: &[u8]) { if self.first_key.is_empty() { self.first_key = key.to_vec(); } if self.builder.add(key, value) { - return true; + return; } // create a new block builder and append block data self.finish_block(); @@ -48,8 +42,11 @@ impl SsTableBuilder { // add the key-value pair to the next block assert!(self.builder.add(key, value)); self.first_key = key.to_vec(); + } - true + /// Get the estimated size of the SSTable. + pub fn estimated_size(&self) -> usize { + self.data.len() } fn finish_block(&mut self) { @@ -62,7 +59,8 @@ impl SsTableBuilder { self.data.extend(encoded_block); } - /// Builds the SSTable and writes it to the given path. No need to actually write to disk until chapter 4 block cache. + /// Builds the SSTable and writes it to the given path. No need to actually write to disk until + /// chapter 4 block cache. pub fn build(mut self, path: impl AsRef) -> Result { self.finish_block(); let mut buf = self.data; diff --git a/mini-lsm/src/table/iterator.rs b/mini-lsm/src/table/iterator.rs index 4392335..a6bf1f4 100644 --- a/mini-lsm/src/table/iterator.rs +++ b/mini-lsm/src/table/iterator.rs @@ -1,8 +1,10 @@ -use anyhow::Result; use std::sync::Arc; +use anyhow::Result; + use super::SsTable; use crate::block::BlockIterator; +use crate::iterators::impls::StorageIterator; /// An iterator over the contents of an SSTable. pub struct SsTableIterator { @@ -68,25 +70,22 @@ impl SsTableIterator { self.blk_idx = blk_idx; Ok(()) } +} - /// Get the current key. - pub fn key(&self) -> &[u8] { - self.blk_iter.key() - } - - /// Get the current value. - pub fn value(&self) -> &[u8] { +impl StorageIterator for SsTableIterator { + fn value(&self) -> &[u8] { self.blk_iter.value() } - /// Check if the iterator is valid. - pub fn is_valid(&self) -> bool { + fn key(&self) -> &[u8] { + self.blk_iter.key() + } + + fn is_valid(&self) -> bool { self.blk_iter.is_valid() } - /// Move to the next key-value pair. - #[allow(clippy::should_implement_trait)] - pub fn next(&mut self) -> Result<()> { + fn next(&mut self) -> Result<()> { self.blk_iter.next(); if !self.blk_iter.is_valid() { self.blk_idx += 1; diff --git a/mini-lsm/src/table/tests.rs b/mini-lsm/src/table/tests.rs index 7c7dc67..177346c 100644 --- a/mini-lsm/src/table/tests.rs +++ b/mini-lsm/src/table/tests.rs @@ -2,41 +2,30 @@ use std::sync::Arc; use bytes::Bytes; +use super::*; +use crate::iterators::impls::StorageIterator; use crate::table::SsTableBuilder; -use super::{SsTable, SsTableIterator}; - #[test] fn test_sst_build_single_key() { - let mut builder = SsTableBuilder::new(16, 16); - assert!(builder.add(b"233", b"233333")); + let mut builder = SsTableBuilder::new(16); + builder.add(b"233", b"233333"); builder.build("").unwrap(); } #[test] fn test_sst_build_two_blocks() { - let mut builder = SsTableBuilder::new(1024, 16); - assert!(builder.add(b"11", b"11")); - assert!(builder.add(b"22", b"22")); - assert!(builder.add(b"33", b"11")); - assert!(builder.add(b"44", b"22")); - assert!(builder.add(b"55", b"11")); - assert!(builder.add(b"66", b"22")); + let mut builder = SsTableBuilder::new(16); + builder.add(b"11", b"11"); + builder.add(b"22", b"22"); + builder.add(b"33", b"11"); + builder.add(b"44", b"22"); + builder.add(b"55", b"11"); + builder.add(b"66", b"22"); assert!(builder.meta.len() >= 2); builder.build("").unwrap(); } -#[test] -fn test_sst_build_full() { - let mut builder = SsTableBuilder::new(32, 16); - assert!(builder.add(b"11", b"11")); - assert!(builder.add(b"22", b"22")); - assert!(builder.add(b"33", b"11")); - assert!(builder.add(b"44", b"22")); - assert!(!builder.add(b"55", b"11")); - builder.build("").unwrap(); -} - fn key_of(idx: usize) -> Vec { format!("key_{:03}", idx * 5).into_bytes() } @@ -50,11 +39,11 @@ fn num_of_keys() -> usize { } fn generate_sst() -> SsTable { - let mut builder = SsTableBuilder::new(65536, 128); + let mut builder = SsTableBuilder::new(128); for idx in 0..num_of_keys() { let key = key_of(idx); let value = value_of(idx); - assert!(builder.add(&key[..], &value[..])); + builder.add(&key[..], &value[..]); } builder.build("").unwrap() }