diff --git a/Cargo.lock b/Cargo.lock index cdc0729..c67d121 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -420,6 +420,21 @@ dependencies = [ "tempfile", ] +[[package]] +name = "mini-lsm-week-1" +version = "0.1.0" +dependencies = [ + "anyhow", + "arc-swap", + "bytes", + "crossbeam-epoch", + "crossbeam-skiplist", + "moka", + "ouroboros", + "parking_lot", + "tempfile", +] + [[package]] name = "mini-lsm-xtask" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index b0faf3e..0adfaf5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "mini-lsm", + "mini-lsm-week-1", "xtask", "mini-lsm-starter", ] diff --git a/mini-lsm-week-1/Cargo.toml b/mini-lsm-week-1/Cargo.toml new file mode 100644 index 0000000..1d8c7c8 --- /dev/null +++ b/mini-lsm-week-1/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "mini-lsm-week-1" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +description = "A tutorial for building an LSM tree storage engine in a week." + +[dependencies] +anyhow = "1" +arc-swap = "1" +bytes = "1" +crossbeam-epoch = "0.9" +crossbeam-skiplist = "0.1" +parking_lot = "0.12" +ouroboros = "0.15" +moka = "0.9" + +[dev-dependencies] +tempfile = "3" diff --git a/mini-lsm-week-1/README.md b/mini-lsm-week-1/README.md new file mode 100644 index 0000000..1325529 --- /dev/null +++ b/mini-lsm-week-1/README.md @@ -0,0 +1 @@ +# mini-lsm week-1 solution \ No newline at end of file diff --git a/mini-lsm-week-1/src/block.rs b/mini-lsm-week-1/src/block.rs new file mode 100644 index 0000000..65935ed --- /dev/null +++ b/mini-lsm-week-1/src/block.rs @@ -0,0 +1,46 @@ +mod builder; +mod iterator; + +pub use builder::BlockBuilder; +use bytes::{Buf, BufMut, Bytes}; +pub use iterator::BlockIterator; + +pub const SIZEOF_U16: usize = std::mem::size_of::(); + +/// A block is the smallest unit of read and caching in LSM tree. It is a collection of sorted +/// key-value pairs. +pub struct Block { + data: Vec, + offsets: Vec, +} + +impl Block { + pub fn encode(&self) -> Bytes { + let mut buf = self.data.clone(); + let offsets_len = self.offsets.len(); + for offset in &self.offsets { + buf.put_u16(*offset); + } + // Adds number of elements at the end of the block + buf.put_u16(offsets_len as u16); + buf.into() + } + + pub fn decode(data: &[u8]) -> Self { + // get number of elements in the block + let entry_offsets_len = (&data[data.len() - SIZEOF_U16..]).get_u16() as usize; + let data_end = data.len() - SIZEOF_U16 - entry_offsets_len * SIZEOF_U16; + let offsets_raw = &data[data_end..data.len() - SIZEOF_U16]; + // get offset array + let offsets = offsets_raw + .chunks(SIZEOF_U16) + .map(|mut x| x.get_u16()) + .collect(); + // retrieve data + let data = data[0..data_end].to_vec(); + Self { data, offsets } + } +} + +#[cfg(test)] +mod tests; diff --git a/mini-lsm-week-1/src/block/builder.rs b/mini-lsm-week-1/src/block/builder.rs new file mode 100644 index 0000000..1e73188 --- /dev/null +++ b/mini-lsm-week-1/src/block/builder.rs @@ -0,0 +1,67 @@ +use bytes::BufMut; + +use super::{Block, SIZEOF_U16}; + +/// Builds a block. +pub struct BlockBuilder { + /// Offsets of each key-value entries. + offsets: Vec, + /// All serialized key-value pairs in the block. + data: Vec, + /// The expected block size. + block_size: usize, +} + +impl BlockBuilder { + /// Creates a new block builder. + pub fn new(block_size: usize) -> Self { + Self { + offsets: Vec::new(), + data: Vec::new(), + block_size, + } + } + + fn estimated_size(&self) -> usize { + SIZEOF_U16 /* number of key-value pairs in the block */ + self.offsets.len() * SIZEOF_U16 /* offsets */ + self.data.len() + /* key-value pairs */ + } + + /// Adds a key-value pair to the block. Returns false when the block is full. + #[must_use] + pub fn add(&mut self, key: &[u8], value: &[u8]) -> bool { + assert!(!key.is_empty(), "key must not be empty"); + if self.estimated_size() + key.len() + value.len() + SIZEOF_U16 * 3 /* key_len, value_len and offset */ > self.block_size + && !self.is_empty() + { + return false; + } + // Add the offset of the data into the offset array. + self.offsets.push(self.data.len() as u16); + // Encode key length. + self.data.put_u16(key.len() as u16); + // Encode key content. + self.data.put(key); + // Encode value length. + self.data.put_u16(value.len() as u16); + // Encode value content. + self.data.put(value); + true + } + + /// Check if there are no key-value pairs in the block. + pub fn is_empty(&self) -> bool { + self.offsets.is_empty() + } + + /// Finalize the block. + pub fn build(self) -> Block { + if self.is_empty() { + panic!("block should not be empty"); + } + Block { + data: self.data, + offsets: self.offsets, + } + } +} diff --git a/mini-lsm-week-1/src/block/iterator.rs b/mini-lsm-week-1/src/block/iterator.rs new file mode 100644 index 0000000..79d1393 --- /dev/null +++ b/mini-lsm-week-1/src/block/iterator.rs @@ -0,0 +1,117 @@ +use std::sync::Arc; + +use bytes::Buf; + +use super::Block; + +/// Iterates on a block. +pub struct BlockIterator { + /// reference to the block + block: Arc, + /// the current key at the iterator position + key: Vec, + /// the current value at the iterator position + value: Vec, + /// the current index at the iterator position + idx: usize, +} + +impl BlockIterator { + fn new(block: Arc) -> Self { + Self { + block, + key: Vec::new(), + value: Vec::new(), + idx: 0, + } + } + + /// Creates a block iterator and seek to the first entry. + pub fn create_and_seek_to_first(block: Arc) -> Self { + let mut iter = Self::new(block); + iter.seek_to_first(); + iter + } + + /// Creates a block iterator and seek to the first key that >= `key`. + pub fn create_and_seek_to_key(block: Arc, key: &[u8]) -> Self { + let mut iter = Self::new(block); + iter.seek_to_key(key); + iter + } + + /// Returns the key of the current entry. + pub fn key(&self) -> &[u8] { + debug_assert!(!self.key.is_empty(), "invalid iterator"); + &self.key + } + + /// Returns the value of the current entry. + pub fn value(&self) -> &[u8] { + debug_assert!(!self.key.is_empty(), "invalid iterator"); + &self.value + } + + /// Returns true if the iterator is valid. + pub fn is_valid(&self) -> bool { + !self.key.is_empty() + } + + /// Seeks to the first key in the block. + pub fn seek_to_first(&mut self) { + self.seek_to(0); + } + + /// Seeks to the idx-th key in the block. + fn seek_to(&mut self, idx: usize) { + if idx >= self.block.offsets.len() { + self.key.clear(); + self.value.clear(); + return; + } + let offset = self.block.offsets[idx] as usize; + self.seek_to_offset(offset); + self.idx = idx; + } + + /// Move to the next key in the block. + pub fn next(&mut self) { + self.idx += 1; + self.seek_to(self.idx); + } + + /// Seek to the specified position and update the current `key` and `value` + /// Index update will be handled by caller + fn seek_to_offset(&mut self, offset: usize) { + let mut entry = &self.block.data[offset..]; + // Since `get_u16()` will automatically move the ptr 2 bytes ahead here, + // we don't need to manually advance it + let key_len = entry.get_u16() as usize; + let key = entry[..key_len].to_vec(); + entry.advance(key_len); + self.key.clear(); + self.key.extend(key); + let value_len = entry.get_u16() as usize; + let value = entry[..value_len].to_vec(); + entry.advance(value_len); + self.value.clear(); + self.value.extend(value); + } + + /// Seek to the first key that is >= `key`. + pub fn seek_to_key(&mut self, key: &[u8]) { + let mut low = 0; + let mut high = self.block.offsets.len(); + while low < high { + let mid = low + (high - low) / 2; + self.seek_to(mid); + assert!(self.is_valid()); + match self.key().cmp(key) { + std::cmp::Ordering::Less => low = mid + 1, + std::cmp::Ordering::Greater => high = mid, + std::cmp::Ordering::Equal => return, + } + } + self.seek_to(low); + } +} diff --git a/mini-lsm-week-1/src/block/tests.rs b/mini-lsm-week-1/src/block/tests.rs new file mode 100644 index 0000000..52b2b12 --- /dev/null +++ b/mini-lsm-week-1/src/block/tests.rs @@ -0,0 +1,122 @@ +use std::sync::Arc; + +use super::builder::BlockBuilder; +use super::iterator::BlockIterator; +use super::*; + +#[test] +fn test_block_build_single_key() { + let mut builder = BlockBuilder::new(16); + assert!(builder.add(b"233", b"233333")); + builder.build(); +} + +#[test] +fn test_block_build_full() { + let mut builder = BlockBuilder::new(16); + assert!(builder.add(b"11", b"11")); + assert!(!builder.add(b"22", b"22")); + builder.build(); +} + +fn key_of(idx: usize) -> Vec { + format!("key_{:03}", idx * 5).into_bytes() +} + +fn value_of(idx: usize) -> Vec { + format!("value_{:010}", idx).into_bytes() +} + +fn num_of_keys() -> usize { + 100 +} + +fn generate_block() -> Block { + let mut builder = BlockBuilder::new(10000); + for idx in 0..num_of_keys() { + let key = key_of(idx); + let value = value_of(idx); + assert!(builder.add(&key[..], &value[..])); + } + builder.build() +} + +#[test] +fn test_block_build_all() { + generate_block(); +} + +#[test] +fn test_block_encode() { + let block = generate_block(); + block.encode(); +} + +#[test] +fn test_block_decode() { + let block = generate_block(); + let encoded = block.encode(); + let decoded_block = Block::decode(&encoded); + assert_eq!(block.offsets, decoded_block.offsets); + assert_eq!(block.data, decoded_block.data); +} + +fn as_bytes(x: &[u8]) -> Bytes { + Bytes::copy_from_slice(x) +} + +#[test] +fn test_block_iterator() { + let block = Arc::new(generate_block()); + let mut iter = BlockIterator::create_and_seek_to_first(block); + for _ in 0..5 { + for i in 0..num_of_keys() { + let key = iter.key(); + let value = iter.value(); + assert_eq!( + key, + key_of(i), + "expected key: {:?}, actual key: {:?}", + as_bytes(&key_of(i)), + as_bytes(key) + ); + assert_eq!( + value, + value_of(i), + "expected value: {:?}, actual value: {:?}", + as_bytes(&value_of(i)), + as_bytes(value) + ); + iter.next(); + } + iter.seek_to_first(); + } +} + +#[test] +fn test_block_seek_key() { + let block = Arc::new(generate_block()); + let mut iter = BlockIterator::create_and_seek_to_key(block, &key_of(0)); + for offset in 1..=5 { + for i in 0..num_of_keys() { + let key = iter.key(); + let value = iter.value(); + assert_eq!( + key, + key_of(i), + "expected key: {:?}, actual key: {:?}", + as_bytes(&key_of(i)), + as_bytes(key) + ); + assert_eq!( + value, + value_of(i), + "expected value: {:?}, actual value: {:?}", + as_bytes(&value_of(i)), + as_bytes(value) + ); + iter.seek_to_key(&format!("key_{:03}", i * 5 + offset).into_bytes()); + } + iter.seek_to_key(b"k"); + } +} diff --git a/mini-lsm-week-1/src/iterators.rs b/mini-lsm-week-1/src/iterators.rs new file mode 100644 index 0000000..bffb820 --- /dev/null +++ b/mini-lsm-week-1/src/iterators.rs @@ -0,0 +1,19 @@ +pub mod merge_iterator; +pub mod two_merge_iterator; + +pub trait StorageIterator { + /// Get the current value. + fn value(&self) -> &[u8]; + + /// Get the current key. + fn key(&self) -> &[u8]; + + /// Check if the current iterator is valid. + fn is_valid(&self) -> bool; + + /// Move to the next position. + fn next(&mut self) -> anyhow::Result<()>; +} + +#[cfg(test)] +mod tests; diff --git a/mini-lsm-week-1/src/iterators/merge_iterator.rs b/mini-lsm-week-1/src/iterators/merge_iterator.rs new file mode 100644 index 0000000..e7ee35b --- /dev/null +++ b/mini-lsm-week-1/src/iterators/merge_iterator.rs @@ -0,0 +1,139 @@ +use std::cmp::{self}; +use std::collections::binary_heap::PeekMut; +use std::collections::BinaryHeap; + +use anyhow::Result; + +use super::StorageIterator; + +struct HeapWrapper(pub usize, pub Box); + +impl PartialEq for HeapWrapper { + fn eq(&self, other: &Self) -> bool { + self.partial_cmp(other).unwrap() == cmp::Ordering::Equal + } +} + +impl Eq for HeapWrapper {} + +impl PartialOrd for HeapWrapper { + #[allow(clippy::non_canonical_partial_ord_impl)] + fn partial_cmp(&self, other: &Self) -> Option { + match self.1.key().cmp(other.1.key()) { + cmp::Ordering::Greater => Some(cmp::Ordering::Greater), + cmp::Ordering::Less => Some(cmp::Ordering::Less), + cmp::Ordering::Equal => self.0.partial_cmp(&other.0), + } + .map(|x| x.reverse()) + } +} + +impl Ord for HeapWrapper { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.partial_cmp(other).unwrap() + } +} + +/// Merge multiple iterators of the same type. If the same key occurs multiple times in some +/// iterators, perfer the one with smaller index. +pub struct MergeIterator { + iters: BinaryHeap>, + current: Option>, +} + +impl MergeIterator { + pub fn create(iters: Vec>) -> Self { + if iters.is_empty() { + return Self { + iters: BinaryHeap::new(), + current: None, + }; + } + + let mut heap = BinaryHeap::new(); + + if iters.iter().all(|x| !x.is_valid()) { + // All invalid, select the last one as the current. + let mut iters = iters; + return Self { + iters: heap, + current: Some(HeapWrapper(0, iters.pop().unwrap())), + }; + } + + for (idx, iter) in iters.into_iter().enumerate() { + if iter.is_valid() { + heap.push(HeapWrapper(idx, iter)); + } + } + + let current = heap.pop().unwrap(); + Self { + iters: heap, + current: Some(current), + } + } +} + +impl StorageIterator for MergeIterator { + fn key(&self) -> &[u8] { + unsafe { self.current.as_ref().unwrap_unchecked() }.1.key() + } + + fn value(&self) -> &[u8] { + unsafe { self.current.as_ref().unwrap_unchecked() } + .1 + .value() + } + + fn is_valid(&self) -> bool { + self.current + .as_ref() + .map(|x| x.1.is_valid()) + .unwrap_or(false) + } + + fn next(&mut self) -> Result<()> { + let current = unsafe { self.current.as_mut().unwrap_unchecked() }; + // Pop the item out of the heap if they have the same value. + while let Some(mut inner_iter) = self.iters.peek_mut() { + debug_assert!( + inner_iter.1.key() >= current.1.key(), + "heap invariant violated" + ); + if inner_iter.1.key() == current.1.key() { + // Case 1: an error occurred when calling `next`. + if let e @ Err(_) = inner_iter.1.next() { + PeekMut::pop(inner_iter); + return e; + } + + // Case 2: iter is no longer valid. + if !inner_iter.1.is_valid() { + PeekMut::pop(inner_iter); + } + } else { + break; + } + } + + current.1.next()?; + + // If the current iterator is invalid, pop it out of the heap and select the next one. + if !current.1.is_valid() { + if let Some(iter) = self.iters.pop() { + *current = iter; + } + return Ok(()); + } + + // Otherwise, compare with heap top and swap if necessary. + if let Some(mut inner_iter) = self.iters.peek_mut() { + if *current < *inner_iter { + std::mem::swap(&mut *inner_iter, current); + } + } + + Ok(()) + } +} diff --git a/mini-lsm-week-1/src/iterators/tests.rs b/mini-lsm-week-1/src/iterators/tests.rs new file mode 100644 index 0000000..2c2963c --- /dev/null +++ b/mini-lsm-week-1/src/iterators/tests.rs @@ -0,0 +1,40 @@ +use anyhow::Result; +use bytes::Bytes; + +use super::StorageIterator; + +pub mod merge_iterator_test; +pub mod two_merge_iterator_test; + +#[derive(Clone)] +pub struct MockIterator { + pub data: Vec<(Bytes, Bytes)>, + pub index: usize, +} + +impl MockIterator { + pub fn new(data: Vec<(Bytes, Bytes)>) -> Self { + Self { data, index: 0 } + } +} + +impl StorageIterator for MockIterator { + fn next(&mut self) -> Result<()> { + if self.index < self.data.len() { + self.index += 1; + } + Ok(()) + } + + fn key(&self) -> &[u8] { + self.data[self.index].0.as_ref() + } + + fn value(&self) -> &[u8] { + self.data[self.index].1.as_ref() + } + + fn is_valid(&self) -> bool { + self.index < self.data.len() + } +} diff --git a/mini-lsm-week-1/src/iterators/tests/merge_iterator_test.rs b/mini-lsm-week-1/src/iterators/tests/merge_iterator_test.rs new file mode 100644 index 0000000..e063911 --- /dev/null +++ b/mini-lsm-week-1/src/iterators/tests/merge_iterator_test.rs @@ -0,0 +1,137 @@ +use super::*; +use crate::iterators::merge_iterator::MergeIterator; + +fn as_bytes(x: &[u8]) -> Bytes { + Bytes::copy_from_slice(x) +} + +fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { + let mut iter = iter; + for (k, v) in expected { + assert!(iter.is_valid()); + assert_eq!( + k, + iter.key(), + "expected key: {:?}, actual key: {:?}", + k, + as_bytes(iter.key()), + ); + assert_eq!( + v, + iter.value(), + "expected value: {:?}, actual value: {:?}", + v, + as_bytes(iter.value()), + ); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + +#[test] +fn test_merge_1() { + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let i3 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.3")), + (Bytes::from("c"), Bytes::from("3.3")), + (Bytes::from("d"), Bytes::from("4.3")), + ]); + + let iter = MergeIterator::create(vec![ + Box::new(i1.clone()), + Box::new(i2.clone()), + Box::new(i3.clone()), + ]); + + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ); + + let iter = MergeIterator::create(vec![Box::new(i3), Box::new(i1), Box::new(i2)]); + + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.3")), + (Bytes::from("c"), Bytes::from("3.3")), + (Bytes::from("d"), Bytes::from("4.3")), + ], + ); +} + +#[test] +fn test_merge_2() { + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i2 = MockIterator::new(vec![ + (Bytes::from("d"), Bytes::from("1.2")), + (Bytes::from("e"), Bytes::from("2.2")), + (Bytes::from("f"), Bytes::from("3.2")), + (Bytes::from("g"), Bytes::from("4.2")), + ]); + let i3 = MockIterator::new(vec![ + (Bytes::from("h"), Bytes::from("1.3")), + (Bytes::from("i"), Bytes::from("2.3")), + (Bytes::from("j"), Bytes::from("3.3")), + (Bytes::from("k"), Bytes::from("4.3")), + ]); + let i4 = MockIterator::new(vec![]); + let result = vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + (Bytes::from("d"), Bytes::from("1.2")), + (Bytes::from("e"), Bytes::from("2.2")), + (Bytes::from("f"), Bytes::from("3.2")), + (Bytes::from("g"), Bytes::from("4.2")), + (Bytes::from("h"), Bytes::from("1.3")), + (Bytes::from("i"), Bytes::from("2.3")), + (Bytes::from("j"), Bytes::from("3.3")), + (Bytes::from("k"), Bytes::from("4.3")), + ]; + + let iter = MergeIterator::create(vec![ + Box::new(i1.clone()), + Box::new(i2.clone()), + Box::new(i3.clone()), + Box::new(i4.clone()), + ]); + check_iter_result(iter, result.clone()); + + let iter = MergeIterator::create(vec![ + Box::new(i2.clone()), + Box::new(i4.clone()), + Box::new(i3.clone()), + Box::new(i1.clone()), + ]); + check_iter_result(iter, result.clone()); + + let iter = MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]); + check_iter_result(iter, result); +} + +#[test] +fn test_merge_empty() { + let iter = MergeIterator::::create(vec![]); + check_iter_result(iter, vec![]); +} diff --git a/mini-lsm-week-1/src/iterators/tests/two_merge_iterator_test.rs b/mini-lsm-week-1/src/iterators/tests/two_merge_iterator_test.rs new file mode 100644 index 0000000..1719bf3 --- /dev/null +++ b/mini-lsm-week-1/src/iterators/tests/two_merge_iterator_test.rs @@ -0,0 +1,129 @@ +use super::*; +use crate::iterators::two_merge_iterator::TwoMergeIterator; + +fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { + let mut iter = iter; + for (k, v) in expected { + assert!(iter.is_valid()); + assert_eq!(iter.key(), k.as_ref()); + assert_eq!(iter.value(), v.as_ref()); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + +#[test] +fn test_merge_1() { + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ) +} + +#[test] +fn test_merge_2() { + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i1 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.2")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ) +} + +#[test] +fn test_merge_3() { + let i2 = MockIterator::new(vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.1")), + (Bytes::from("c"), Bytes::from("3.1")), + ]); + let i1 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("a"), Bytes::from("1.1")), + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ) +} + +#[test] +fn test_merge_4() { + let i2 = MockIterator::new(vec![]); + let i1 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ); + let i1 = MockIterator::new(vec![]); + let i2 = MockIterator::new(vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result( + iter, + vec![ + (Bytes::from("b"), Bytes::from("2.2")), + (Bytes::from("c"), Bytes::from("3.2")), + (Bytes::from("d"), Bytes::from("4.2")), + ], + ); +} + +#[test] +fn test_merge_5() { + let i2 = MockIterator::new(vec![]); + let i1 = MockIterator::new(vec![]); + let iter = TwoMergeIterator::create(i1, i2).unwrap(); + check_iter_result(iter, vec![]) +} diff --git a/mini-lsm-week-1/src/iterators/two_merge_iterator.rs b/mini-lsm-week-1/src/iterators/two_merge_iterator.rs new file mode 100644 index 0000000..89d7c43 --- /dev/null +++ b/mini-lsm-week-1/src/iterators/two_merge_iterator.rs @@ -0,0 +1,80 @@ +use anyhow::Result; + +use super::StorageIterator; + +/// Merges two iterators of different types into one. If the two iterators have the same key, only +/// produce the key once and prefer the entry from A. +pub struct TwoMergeIterator { + a: A, + b: B, + choose_a: bool, +} + +impl TwoMergeIterator { + fn choose_a(a: &A, b: &B) -> bool { + if !a.is_valid() { + return false; + } + if !b.is_valid() { + return true; + } + a.key() < b.key() + } + + fn skip_b(&mut self) -> Result<()> { + if self.a.is_valid() { + while self.b.is_valid() && self.b.key() == self.a.key() { + self.b.next()?; + } + } + Ok(()) + } + + pub fn create(a: A, b: B) -> Result { + let mut iter = Self { + choose_a: false, + a, + b, + }; + iter.skip_b()?; + iter.choose_a = Self::choose_a(&iter.a, &iter.b); + Ok(iter) + } +} + +impl StorageIterator for TwoMergeIterator { + fn key(&self) -> &[u8] { + if self.choose_a { + self.a.key() + } else { + self.b.key() + } + } + + fn value(&self) -> &[u8] { + if self.choose_a { + self.a.value() + } else { + self.b.value() + } + } + + fn is_valid(&self) -> bool { + if self.choose_a { + self.a.is_valid() + } else { + self.b.is_valid() + } + } + + fn next(&mut self) -> Result<()> { + if self.choose_a { + self.a.next()?; + } else { + self.b.next()?; + } + self.skip_b()?; + self.choose_a = Self::choose_a(&self.a, &self.b); + Ok(()) + } +} diff --git a/mini-lsm-week-1/src/lib.rs b/mini-lsm-week-1/src/lib.rs new file mode 100644 index 0000000..40ff84a --- /dev/null +++ b/mini-lsm-week-1/src/lib.rs @@ -0,0 +1,9 @@ +pub mod block; +pub mod iterators; +pub mod lsm_iterator; +pub mod lsm_storage; +pub mod mem_table; +pub mod table; + +#[cfg(test)] +mod tests; diff --git a/mini-lsm-week-1/src/lsm_iterator.rs b/mini-lsm-week-1/src/lsm_iterator.rs new file mode 100644 index 0000000..df49609 --- /dev/null +++ b/mini-lsm-week-1/src/lsm_iterator.rs @@ -0,0 +1,106 @@ +use std::ops::Bound; + +use anyhow::Result; +use bytes::Bytes; + +use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::two_merge_iterator::TwoMergeIterator; +use crate::iterators::StorageIterator; +use crate::mem_table::MemTableIterator; +use crate::table::SsTableIterator; + +type LsmIteratorInner = + TwoMergeIterator, MergeIterator>; + +pub struct LsmIterator { + iter: LsmIteratorInner, + end_bound: Bound, + is_valid: bool, +} + +impl LsmIterator { + pub(crate) fn new(iter: LsmIteratorInner, end_bound: Bound) -> Result { + let mut iter = Self { + is_valid: iter.is_valid(), + iter, + end_bound, + }; + iter.move_to_non_delete()?; + Ok(iter) + } + + fn next_inner(&mut self) -> Result<()> { + self.iter.next()?; + if !self.iter.is_valid() { + self.is_valid = false; + return Ok(()); + } + match self.end_bound.as_ref() { + Bound::Unbounded => {} + Bound::Included(key) => self.is_valid = self.iter.key() <= key.as_ref(), + Bound::Excluded(key) => self.is_valid = self.iter.key() < key.as_ref(), + } + Ok(()) + } + + fn move_to_non_delete(&mut self) -> Result<()> { + while self.is_valid() && self.iter.value().is_empty() { + self.next_inner()?; + } + Ok(()) + } +} + +impl StorageIterator for LsmIterator { + fn is_valid(&self) -> bool { + self.is_valid + } + + fn key(&self) -> &[u8] { + self.iter.key() + } + + fn value(&self) -> &[u8] { + self.iter.value() + } + + fn next(&mut self) -> Result<()> { + self.next_inner()?; + self.move_to_non_delete()?; + Ok(()) + } +} + +/// A wrapper around existing iterator, will prevent users from calling `next` when the iterator is +/// invalid. +pub struct FusedIterator { + iter: I, +} + +impl FusedIterator { + pub fn new(iter: I) -> Self { + Self { iter } + } +} + +impl StorageIterator for FusedIterator { + fn is_valid(&self) -> bool { + self.iter.is_valid() + } + + fn key(&self) -> &[u8] { + self.iter.key() + } + + fn value(&self) -> &[u8] { + self.iter.value() + } + + fn next(&mut self) -> Result<()> { + // only move when the iterator is valid + if self.iter.is_valid() { + self.iter.next()?; + } + Ok(()) + } +} diff --git a/mini-lsm-week-1/src/lsm_storage.rs b/mini-lsm-week-1/src/lsm_storage.rs new file mode 100644 index 0000000..538234d --- /dev/null +++ b/mini-lsm-week-1/src/lsm_storage.rs @@ -0,0 +1,229 @@ +use std::ops::Bound; +use std::path::{Path, PathBuf}; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +use anyhow::Result; +use bytes::Bytes; +use parking_lot::{Mutex, RwLock}; + +use crate::block::Block; +use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::two_merge_iterator::TwoMergeIterator; +use crate::iterators::StorageIterator; +use crate::lsm_iterator::{FusedIterator, LsmIterator}; +use crate::mem_table::{map_bound, MemTable}; +use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; + +pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; + +#[derive(Clone)] +pub struct LsmStorageInner { + /// The current memtable. + memtable: Arc, + /// Immutable memTables, from earliest to latest. + imm_memtables: Vec>, + /// L0 SsTables, from earliest to latest. + l0_sstables: Vec>, + /// L1 - L6 SsTables, sorted by key range. + #[allow(dead_code)] + levels: Vec>>, +} + +impl LsmStorageInner { + fn create() -> Self { + Self { + memtable: Arc::new(MemTable::create()), + imm_memtables: vec![], + l0_sstables: vec![], + levels: vec![], + } + } +} + +/// The storage interface of the LSM tree. +pub struct LsmStorage { + pub(crate) inner: Arc>>, + flush_lock: Mutex<()>, + path: PathBuf, + pub(crate) block_cache: Arc, + next_sst_id: AtomicUsize, +} + +impl LsmStorage { + pub(crate) fn next_sst_id(&self) -> usize { + self.next_sst_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + pub fn open(path: impl AsRef) -> Result { + Ok(Self { + inner: Arc::new(RwLock::new(Arc::new(LsmStorageInner::create()))), + flush_lock: Mutex::new(()), + path: path.as_ref().to_path_buf(), + block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache, + next_sst_id: AtomicUsize::new(1), + }) + } + + /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. + pub fn get(&self, key: &[u8]) -> Result> { + let snapshot = { + let guard = self.inner.read(); + Arc::clone(&guard) + }; // drop global lock here + + // Search on the current memtable. + if let Some(value) = snapshot.memtable.get(key) { + if value.is_empty() { + // found tomestone, return key not exists + return Ok(None); + } + return Ok(Some(value)); + } + // Search on immutable memtables. + for memtable in snapshot.imm_memtables.iter().rev() { + if let Some(value) = memtable.get(key) { + if value.is_empty() { + // found tomestone, return key not exists + return Ok(None); + } + return Ok(Some(value)); + } + } + let mut iters = Vec::with_capacity(snapshot.l0_sstables.len()); + for table in snapshot.l0_sstables.iter().rev() { + iters.push(Box::new(SsTableIterator::create_and_seek_to_key( + table.clone(), + key, + )?)); + } + let iter = MergeIterator::create(iters); + if iter.is_valid() { + return Ok(Some(Bytes::copy_from_slice(iter.value()))); + } + Ok(None) + } + + /// Put a key-value pair into the storage by writing into the current memtable. + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + assert!(!value.is_empty(), "value cannot be empty"); + assert!(!key.is_empty(), "key cannot be empty"); + + let guard = self.inner.read(); + guard.memtable.put(key, value); + + Ok(()) + } + + /// Remove a key from the storage by writing an empty value. + pub fn delete(&self, key: &[u8]) -> Result<()> { + assert!(!key.is_empty(), "key cannot be empty"); + + let guard = self.inner.read(); + guard.memtable.put(key, b""); + + Ok(()) + } + + pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf { + self.path.join(format!("{:05}.sst", id)) + } + + /// Persist data to disk. + /// + /// In day 3: flush the current memtable to disk as L0 SST. + /// In day 6: call `fsync` on WAL. + pub fn sync(&self) -> Result<()> { + let _flush_lock = self.flush_lock.lock(); + + let flush_memtable; + let sst_id; + + // Move mutable memtable to immutable memtables. + { + let mut guard = self.inner.write(); + // Swap the current memtable with a new one. + let mut snapshot = guard.as_ref().clone(); + let memtable = std::mem::replace(&mut snapshot.memtable, Arc::new(MemTable::create())); + flush_memtable = memtable.clone(); + sst_id = self.next_sst_id(); + // Add the memtable to the immutable memtables. + snapshot.imm_memtables.push(memtable); + // Update the snapshot. + *guard = Arc::new(snapshot); + } + + // At this point, the old memtable should be disabled for write, and all write threads + // should be operating on the new memtable. We can safely flush the old memtable to + // disk. + + let mut builder = SsTableBuilder::new(4096); + flush_memtable.flush(&mut builder)?; + let sst = Arc::new(builder.build( + sst_id, + Some(self.block_cache.clone()), + self.path_of_sst(sst_id), + )?); + + // Add the flushed L0 table to the list. + { + let mut guard = self.inner.write(); + let mut snapshot = guard.as_ref().clone(); + // Remove the memtable from the immutable memtables. + snapshot.imm_memtables.pop(); + // Add L0 table + snapshot.l0_sstables.push(sst); + // Update the snapshot. + *guard = Arc::new(snapshot); + } + + Ok(()) + } + + /// Create an iterator over a range of keys. + pub fn scan( + &self, + lower: Bound<&[u8]>, + upper: Bound<&[u8]>, + ) -> Result> { + let snapshot = { + let guard = self.inner.read(); + Arc::clone(&guard) + }; // drop global lock here + + let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1); + memtable_iters.push(Box::new(snapshot.memtable.scan(lower, upper))); + for memtable in snapshot.imm_memtables.iter().rev() { + memtable_iters.push(Box::new(memtable.scan(lower, upper))); + } + let memtable_iter = MergeIterator::create(memtable_iters); + + let mut table_iters = Vec::with_capacity(snapshot.l0_sstables.len()); + for table in snapshot.l0_sstables.iter().rev() { + let iter = match lower { + Bound::Included(key) => { + SsTableIterator::create_and_seek_to_key(table.clone(), key)? + } + Bound::Excluded(key) => { + let mut iter = SsTableIterator::create_and_seek_to_key(table.clone(), key)?; + if iter.is_valid() && iter.key() == key { + iter.next()?; + } + iter + } + Bound::Unbounded => SsTableIterator::create_and_seek_to_first(table.clone())?, + }; + + table_iters.push(Box::new(iter)); + } + let table_iter = MergeIterator::create(table_iters); + + let iter = TwoMergeIterator::create(memtable_iter, table_iter)?; + + Ok(FusedIterator::new(LsmIterator::new( + iter, + map_bound(upper), + )?)) + } +} diff --git a/mini-lsm-week-1/src/mem_table.rs b/mini-lsm-week-1/src/mem_table.rs new file mode 100644 index 0000000..637b762 --- /dev/null +++ b/mini-lsm-week-1/src/mem_table.rs @@ -0,0 +1,110 @@ +use std::ops::Bound; +use std::sync::Arc; + +use anyhow::Result; +use bytes::Bytes; +use crossbeam_skiplist::map::Entry; +use crossbeam_skiplist::SkipMap; +use ouroboros::self_referencing; + +use crate::iterators::StorageIterator; +use crate::table::SsTableBuilder; + +/// A basic mem-table based on crossbeam-skiplist +pub struct MemTable { + map: Arc>, +} + +pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound { + match bound { + Bound::Included(x) => Bound::Included(Bytes::copy_from_slice(x)), + Bound::Excluded(x) => Bound::Excluded(Bytes::copy_from_slice(x)), + Bound::Unbounded => Bound::Unbounded, + } +} + +impl MemTable { + /// Create a new mem-table. + pub fn create() -> Self { + Self { + map: Arc::new(SkipMap::new()), + } + } + + /// Get a value by key. + pub fn get(&self, key: &[u8]) -> Option { + self.map.get(key).map(|e| e.value().clone()) + } + + /// Put a key-value pair into the mem-table. + pub fn put(&self, key: &[u8], value: &[u8]) { + self.map + .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); + } + + /// Get an iterator over a range of keys. + pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> MemTableIterator { + let (lower, upper) = (map_bound(lower), map_bound(upper)); + let mut iter = MemTableIteratorBuilder { + map: self.map.clone(), + iter_builder: |map| map.range((lower, upper)), + item: (Bytes::from_static(&[]), Bytes::from_static(&[])), + } + .build(); + let entry = iter.with_iter_mut(|iter| MemTableIterator::entry_to_item(iter.next())); + iter.with_mut(|x| *x.item = entry); + iter + } + + /// Flush the mem-table to SSTable. + pub fn flush(&self, builder: &mut SsTableBuilder) -> Result<()> { + for entry in self.map.iter() { + builder.add(&entry.key()[..], &entry.value()[..]); + } + Ok(()) + } +} + +type SkipMapRangeIter<'a> = + crossbeam_skiplist::map::Range<'a, Bytes, (Bound, Bound), Bytes, Bytes>; + +/// An iterator over a range of `SkipMap`. +#[self_referencing] +pub struct MemTableIterator { + map: Arc>, + #[borrows(map)] + #[not_covariant] + iter: SkipMapRangeIter<'this>, + item: (Bytes, Bytes), +} + +impl MemTableIterator { + fn entry_to_item(entry: Option>) -> (Bytes, Bytes) { + entry + .map(|x| (x.key().clone(), x.value().clone())) + .unwrap_or_else(|| (Bytes::from_static(&[]), Bytes::from_static(&[]))) + } +} + +impl StorageIterator for MemTableIterator { + fn value(&self) -> &[u8] { + &self.borrow_item().1[..] + } + + fn key(&self) -> &[u8] { + &self.borrow_item().0[..] + } + + fn is_valid(&self) -> bool { + !self.borrow_item().0.is_empty() + } + + fn next(&mut self) -> Result<()> { + let entry = self.with_iter_mut(|iter| MemTableIterator::entry_to_item(iter.next())); + self.with_mut(|x| *x.item = entry); + Ok(()) + } +} + +#[cfg(test)] +mod tests; diff --git a/mini-lsm-week-1/src/mem_table/tests.rs b/mini-lsm-week-1/src/mem_table/tests.rs new file mode 100644 index 0000000..8371ecb --- /dev/null +++ b/mini-lsm-week-1/src/mem_table/tests.rs @@ -0,0 +1,95 @@ +use tempfile::tempdir; + +use super::MemTable; +use crate::iterators::StorageIterator; +use crate::table::{SsTableBuilder, SsTableIterator}; + +#[test] +fn test_memtable_get() { + let memtable = MemTable::create(); + memtable.put(b"key1", b"value1"); + memtable.put(b"key2", b"value2"); + memtable.put(b"key3", b"value3"); + assert_eq!(&memtable.get(b"key1").unwrap()[..], b"value1"); + assert_eq!(&memtable.get(b"key2").unwrap()[..], b"value2"); + assert_eq!(&memtable.get(b"key3").unwrap()[..], b"value3"); +} + +#[test] +fn test_memtable_overwrite() { + let memtable = MemTable::create(); + memtable.put(b"key1", b"value1"); + memtable.put(b"key2", b"value2"); + memtable.put(b"key3", b"value3"); + memtable.put(b"key1", b"value11"); + memtable.put(b"key2", b"value22"); + memtable.put(b"key3", b"value33"); + assert_eq!(&memtable.get(b"key1").unwrap()[..], b"value11"); + assert_eq!(&memtable.get(b"key2").unwrap()[..], b"value22"); + assert_eq!(&memtable.get(b"key3").unwrap()[..], b"value33"); +} + +#[test] +fn test_memtable_flush() { + let memtable = MemTable::create(); + memtable.put(b"key1", b"value1"); + memtable.put(b"key2", b"value2"); + memtable.put(b"key3", b"value3"); + let mut builder = SsTableBuilder::new(128); + memtable.flush(&mut builder).unwrap(); + let dir = tempdir().unwrap(); + let sst = builder.build_for_test(dir.path().join("1.sst")).unwrap(); + let mut iter = SsTableIterator::create_and_seek_to_first(sst.into()).unwrap(); + assert_eq!(iter.key(), b"key1"); + assert_eq!(iter.value(), b"value1"); + iter.next().unwrap(); + assert_eq!(iter.key(), b"key2"); + assert_eq!(iter.value(), b"value2"); + iter.next().unwrap(); + assert_eq!(iter.key(), b"key3"); + assert_eq!(iter.value(), b"value3"); + iter.next().unwrap(); + assert!(!iter.is_valid()); +} + +#[test] +fn test_memtable_iter() { + use std::ops::Bound; + let memtable = MemTable::create(); + memtable.put(b"key1", b"value1"); + memtable.put(b"key2", b"value2"); + memtable.put(b"key3", b"value3"); + + { + let mut iter = memtable.scan(Bound::Unbounded, Bound::Unbounded); + assert_eq!(iter.key(), b"key1"); + assert_eq!(iter.value(), b"value1"); + iter.next().unwrap(); + assert_eq!(iter.key(), b"key2"); + assert_eq!(iter.value(), b"value2"); + iter.next().unwrap(); + assert_eq!(iter.key(), b"key3"); + assert_eq!(iter.value(), b"value3"); + iter.next().unwrap(); + assert!(!iter.is_valid()); + } + + { + let mut iter = memtable.scan(Bound::Included(b"key1"), Bound::Included(b"key2")); + assert_eq!(iter.key(), b"key1"); + assert_eq!(iter.value(), b"value1"); + iter.next().unwrap(); + assert_eq!(iter.key(), b"key2"); + assert_eq!(iter.value(), b"value2"); + iter.next().unwrap(); + assert!(!iter.is_valid()); + } + + { + let mut iter = memtable.scan(Bound::Excluded(b"key1"), Bound::Excluded(b"key3")); + assert_eq!(iter.key(), b"key2"); + assert_eq!(iter.value(), b"value2"); + iter.next().unwrap(); + assert!(!iter.is_valid()); + } +} diff --git a/mini-lsm-week-1/src/table.rs b/mini-lsm-week-1/src/table.rs new file mode 100644 index 0000000..05a3be6 --- /dev/null +++ b/mini-lsm-week-1/src/table.rs @@ -0,0 +1,180 @@ +mod builder; +mod iterator; + +use std::fs::File; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{anyhow, Result}; +pub use builder::SsTableBuilder; +use bytes::{Buf, BufMut, Bytes}; +pub use iterator::SsTableIterator; + +use crate::block::Block; +use crate::lsm_storage::BlockCache; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct BlockMeta { + /// Offset of this data block. + pub offset: usize, + /// The first key of the data block. + pub first_key: Bytes, +} + +impl BlockMeta { + /// Encode block meta to a buffer. + pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec) { + let mut estimated_size = 0; + for meta in block_meta { + // The size of offset + estimated_size += std::mem::size_of::(); + // The size of key length + estimated_size += std::mem::size_of::(); + // The size of actual key + estimated_size += meta.first_key.len(); + } + // Reserve the space to improve performance, especially when the size of incoming data is large + buf.reserve(estimated_size); + let original_len = buf.len(); + for meta in block_meta { + buf.put_u32(meta.offset as u32); + buf.put_u16(meta.first_key.len() as u16); + buf.put_slice(&meta.first_key); + } + assert_eq!(estimated_size, buf.len() - original_len); + } + + /// Decode block meta from a buffer. + pub fn decode_block_meta(mut buf: impl Buf) -> Vec { + let mut block_meta = Vec::new(); + while buf.has_remaining() { + let offset = buf.get_u32() as usize; + let first_key_len = buf.get_u16() as usize; + let first_key = buf.copy_to_bytes(first_key_len); + block_meta.push(BlockMeta { offset, first_key }); + } + block_meta + } +} + +/// A file object. +/// +/// Before day 4, it should look like: +/// +/// ```ignore +/// pub struct FileObject(Bytes); +/// +/// impl FileObject { +/// pub fn read(&self, offset: u64, len: u64) -> Result> { +/// Ok(self.0[offset as usize..(offset + len) as usize].to_vec()) +/// } +/// pub fn size(&self) -> u64 { +/// self.0.len() as u64 +/// } +/// +/// pub fn create(_path: &Path, data: Vec) -> Result { +/// Ok(FileObject(data.into())) +/// } +/// +/// pub fn open(_path: &Path) -> Result { +/// unimplemented!() +/// } +/// } +/// ``` +pub struct FileObject(File, u64); + +impl FileObject { + pub fn read(&self, offset: u64, len: u64) -> Result> { + use std::os::unix::fs::FileExt; + let mut data = vec![0; len as usize]; + self.0.read_exact_at(&mut data[..], offset)?; + Ok(data) + } + + pub fn size(&self) -> u64 { + self.1 + } + + /// Create a new file object (day 2) and write the file to the disk (day 4). + pub fn create(path: &Path, data: Vec) -> Result { + std::fs::write(path, &data)?; + Ok(FileObject( + File::options().read(true).write(false).open(path)?, + data.len() as u64, + )) + } + + pub fn open(_path: &Path) -> Result { + unimplemented!() + } +} + +pub struct SsTable { + file: FileObject, + block_metas: Vec, + block_meta_offset: usize, + id: usize, + block_cache: Option>, +} + +impl SsTable { + #[cfg(test)] + pub(crate) fn open_for_test(file: FileObject) -> Result { + Self::open(0, None, file) + } + + /// Open SSTable from a file. + pub fn open(id: usize, block_cache: Option>, file: FileObject) -> Result { + let len = file.size(); + let raw_meta_offset = file.read(len - 4, 4)?; + let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64; + let raw_meta = file.read(block_meta_offset, len - 4 - block_meta_offset)?; + Ok(Self { + file, + block_metas: BlockMeta::decode_block_meta(&raw_meta[..]), + block_meta_offset: block_meta_offset as usize, + id, + block_cache, + }) + } + + /// Read a block from the disk. + pub fn read_block(&self, block_idx: usize) -> Result> { + let offset = self.block_metas[block_idx].offset; + let offset_end = self + .block_metas + .get(block_idx + 1) + .map_or(self.block_meta_offset, |x| x.offset); + let block_data = self + .file + .read(offset as u64, (offset_end - offset) as u64)?; + Ok(Arc::new(Block::decode(&block_data[..]))) + } + + /// Read a block from disk, with block cache. + pub fn read_block_cached(&self, block_idx: usize) -> Result> { + if let Some(ref block_cache) = self.block_cache { + let blk = block_cache + .try_get_with((self.id, block_idx), || self.read_block(block_idx)) + .map_err(|e| anyhow!("{}", e))?; + Ok(blk) + } else { + self.read_block(block_idx) + } + } + + /// Find the block that may contain `key`. + pub fn find_block_idx(&self, key: &[u8]) -> usize { + self.block_metas + .partition_point(|meta| meta.first_key <= key) + .saturating_sub(1) + } + + /// Get number of data blocks. + pub fn num_of_blocks(&self) -> usize { + self.block_metas.len() + } +} + +#[cfg(test)] +mod tests; diff --git a/mini-lsm-week-1/src/table/builder.rs b/mini-lsm-week-1/src/table/builder.rs new file mode 100644 index 0000000..7a35437 --- /dev/null +++ b/mini-lsm-week-1/src/table/builder.rs @@ -0,0 +1,91 @@ +use std::path::Path; +use std::sync::Arc; + +use anyhow::Result; +use bytes::BufMut; + +use super::{BlockMeta, FileObject, SsTable}; +use crate::block::BlockBuilder; +use crate::lsm_storage::BlockCache; + +/// Builds an SSTable from key-value pairs. +pub struct SsTableBuilder { + builder: BlockBuilder, + first_key: Vec, + data: Vec, + pub(super) meta: Vec, + block_size: usize, +} + +impl SsTableBuilder { + /// Create a builder based on target block size. + pub fn new(block_size: usize) -> Self { + Self { + data: Vec::new(), + meta: Vec::new(), + first_key: Vec::new(), + block_size, + builder: BlockBuilder::new(block_size), + } + } + + /// Adds a key-value pair to SSTable + pub fn add(&mut self, key: &[u8], value: &[u8]) { + if self.first_key.is_empty() { + self.first_key = key.to_vec(); + } + + if self.builder.add(key, value) { + return; + } + // create a new block builder and append block data + self.finish_block(); + + // add the key-value pair to the next block + assert!(self.builder.add(key, value)); + self.first_key = key.to_vec(); + } + + /// Get the estimated size of the SSTable. + pub fn estimated_size(&self) -> usize { + self.data.len() + } + + fn finish_block(&mut self) { + let builder = std::mem::replace(&mut self.builder, BlockBuilder::new(self.block_size)); + let encoded_block = builder.build().encode(); + self.meta.push(BlockMeta { + offset: self.data.len(), + first_key: std::mem::take(&mut self.first_key).into(), + }); + self.data.extend(encoded_block); + } + + /// Builds the SSTable and writes it to the given path. No need to actually write to disk until + /// chapter 4 block cache. + pub fn build( + mut self, + id: usize, + block_cache: Option>, + path: impl AsRef, + ) -> Result { + self.finish_block(); + let mut buf = self.data; + let meta_offset = buf.len(); + BlockMeta::encode_block_meta(&self.meta, &mut buf); + buf.put_u32(meta_offset as u32); + let file = FileObject::create(path.as_ref(), buf)?; + Ok(SsTable { + id, + file, + block_metas: self.meta, + block_meta_offset: meta_offset, + block_cache, + }) + } + + #[cfg(test)] + pub(crate) fn build_for_test(self, path: impl AsRef) -> Result { + self.build(0, None, path) + } +} diff --git a/mini-lsm-week-1/src/table/iterator.rs b/mini-lsm-week-1/src/table/iterator.rs new file mode 100644 index 0000000..08b135f --- /dev/null +++ b/mini-lsm-week-1/src/table/iterator.rs @@ -0,0 +1,102 @@ +use std::sync::Arc; + +use anyhow::Result; + +use super::SsTable; +use crate::block::BlockIterator; +use crate::iterators::StorageIterator; + +/// An iterator over the contents of an SSTable. +pub struct SsTableIterator { + table: Arc, + blk_iter: BlockIterator, + blk_idx: usize, +} + +impl SsTableIterator { + fn seek_to_first_inner(table: &Arc) -> Result<(usize, BlockIterator)> { + Ok(( + 0, + BlockIterator::create_and_seek_to_first(table.read_block_cached(0)?), + )) + } + + /// Create a new iterator and seek to the first key-value pair. + pub fn create_and_seek_to_first(table: Arc) -> Result { + let (blk_idx, blk_iter) = Self::seek_to_first_inner(&table)?; + let iter = Self { + blk_iter, + table, + blk_idx, + }; + Ok(iter) + } + + /// Seek to the first key-value pair. + pub fn seek_to_first(&mut self) -> Result<()> { + let (blk_idx, blk_iter) = Self::seek_to_first_inner(&self.table)?; + self.blk_idx = blk_idx; + self.blk_iter = blk_iter; + Ok(()) + } + + fn seek_to_key_inner(table: &Arc, key: &[u8]) -> Result<(usize, BlockIterator)> { + let mut blk_idx = table.find_block_idx(key); + let mut blk_iter = + BlockIterator::create_and_seek_to_key(table.read_block_cached(blk_idx)?, key); + if !blk_iter.is_valid() { + blk_idx += 1; + if blk_idx < table.num_of_blocks() { + blk_iter = + BlockIterator::create_and_seek_to_first(table.read_block_cached(blk_idx)?); + } + } + Ok((blk_idx, blk_iter)) + } + + /// Create a new iterator and seek to the first key-value pair which >= `key`. + pub fn create_and_seek_to_key(table: Arc, key: &[u8]) -> Result { + let (blk_idx, blk_iter) = Self::seek_to_key_inner(&table, key)?; + let iter = Self { + blk_iter, + table, + blk_idx, + }; + Ok(iter) + } + + /// Seek to the first key-value pair which >= `key`. + pub fn seek_to_key(&mut self, key: &[u8]) -> Result<()> { + let (blk_idx, blk_iter) = Self::seek_to_key_inner(&self.table, key)?; + self.blk_iter = blk_iter; + self.blk_idx = blk_idx; + Ok(()) + } +} + +impl StorageIterator for SsTableIterator { + fn value(&self) -> &[u8] { + self.blk_iter.value() + } + + fn key(&self) -> &[u8] { + self.blk_iter.key() + } + + fn is_valid(&self) -> bool { + self.blk_iter.is_valid() + } + + fn next(&mut self) -> Result<()> { + self.blk_iter.next(); + if !self.blk_iter.is_valid() { + self.blk_idx += 1; + if self.blk_idx < self.table.num_of_blocks() { + self.blk_iter = BlockIterator::create_and_seek_to_first( + self.table.read_block_cached(self.blk_idx)?, + ); + } + } + Ok(()) + } +} diff --git a/mini-lsm-week-1/src/table/tests.rs b/mini-lsm-week-1/src/table/tests.rs new file mode 100644 index 0000000..c393181 --- /dev/null +++ b/mini-lsm-week-1/src/table/tests.rs @@ -0,0 +1,130 @@ +use std::sync::Arc; + +use bytes::Bytes; +use tempfile::{tempdir, TempDir}; + +use super::*; +use crate::iterators::StorageIterator; +use crate::table::SsTableBuilder; + +#[test] +fn test_sst_build_single_key() { + let mut builder = SsTableBuilder::new(16); + builder.add(b"233", b"233333"); + let dir = tempdir().unwrap(); + builder.build_for_test(dir.path().join("1.sst")).unwrap(); +} + +#[test] +fn test_sst_build_two_blocks() { + let mut builder = SsTableBuilder::new(16); + builder.add(b"11", b"11"); + builder.add(b"22", b"22"); + builder.add(b"33", b"11"); + builder.add(b"44", b"22"); + builder.add(b"55", b"11"); + builder.add(b"66", b"22"); + assert!(builder.meta.len() >= 2); + let dir = tempdir().unwrap(); + builder.build_for_test(dir.path().join("1.sst")).unwrap(); +} + +fn key_of(idx: usize) -> Vec { + format!("key_{:03}", idx * 5).into_bytes() +} + +fn value_of(idx: usize) -> Vec { + format!("value_{:010}", idx).into_bytes() +} + +fn num_of_keys() -> usize { + 100 +} + +fn generate_sst() -> (TempDir, SsTable) { + let mut builder = SsTableBuilder::new(128); + for idx in 0..num_of_keys() { + let key = key_of(idx); + let value = value_of(idx); + builder.add(&key[..], &value[..]); + } + let dir = tempdir().unwrap(); + let path = dir.path().join("1.sst"); + (dir, builder.build_for_test(path).unwrap()) +} + +#[test] +fn test_sst_build_all() { + generate_sst(); +} + +#[test] +fn test_sst_decode() { + let (_dir, sst) = generate_sst(); + let meta = sst.block_metas.clone(); + let new_sst = SsTable::open_for_test(sst.file).unwrap(); + assert_eq!(new_sst.block_metas, meta); +} + +fn as_bytes(x: &[u8]) -> Bytes { + Bytes::copy_from_slice(x) +} + +#[test] +fn test_sst_iterator() { + let (_dir, sst) = generate_sst(); + let sst = Arc::new(sst); + let mut iter = SsTableIterator::create_and_seek_to_first(sst).unwrap(); + for _ in 0..5 { + for i in 0..num_of_keys() { + let key = iter.key(); + let value = iter.value(); + assert_eq!( + key, + key_of(i), + "expected key: {:?}, actual key: {:?}", + as_bytes(&key_of(i)), + as_bytes(key) + ); + assert_eq!( + value, + value_of(i), + "expected value: {:?}, actual value: {:?}", + as_bytes(&value_of(i)), + as_bytes(value) + ); + iter.next().unwrap(); + } + iter.seek_to_first().unwrap(); + } +} + +#[test] +fn test_sst_seek_key() { + let (_dir, sst) = generate_sst(); + let sst = Arc::new(sst); + let mut iter = SsTableIterator::create_and_seek_to_key(sst, &key_of(0)).unwrap(); + for offset in 1..=5 { + for i in 0..num_of_keys() { + let key = iter.key(); + let value = iter.value(); + assert_eq!( + key, + key_of(i), + "expected key: {:?}, actual key: {:?}", + as_bytes(&key_of(i)), + as_bytes(key) + ); + assert_eq!( + value, + value_of(i), + "expected value: {:?}, actual value: {:?}", + as_bytes(&value_of(i)), + as_bytes(value) + ); + iter.seek_to_key(&format!("key_{:03}", i * 5 + offset).into_bytes()) + .unwrap(); + } + iter.seek_to_key(b"k").unwrap(); + } +} diff --git a/mini-lsm-week-1/src/tests.rs b/mini-lsm-week-1/src/tests.rs new file mode 100644 index 0000000..45dd43c --- /dev/null +++ b/mini-lsm-week-1/src/tests.rs @@ -0,0 +1 @@ +pub mod day4_tests; diff --git a/mini-lsm-week-1/src/tests/day4_tests.rs b/mini-lsm-week-1/src/tests/day4_tests.rs new file mode 100644 index 0000000..9bfee56 --- /dev/null +++ b/mini-lsm-week-1/src/tests/day4_tests.rs @@ -0,0 +1,187 @@ +use std::ops::Bound; + +use bytes::Bytes; +use tempfile::tempdir; + +use crate::iterators::StorageIterator; + +fn as_bytes(x: &[u8]) -> Bytes { + Bytes::copy_from_slice(x) +} + +fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) { + let mut iter = iter; + for (k, v) in expected { + assert!(iter.is_valid()); + assert_eq!( + k, + iter.key(), + "expected key: {:?}, actual key: {:?}", + k, + as_bytes(iter.key()), + ); + assert_eq!( + v, + iter.value(), + "expected value: {:?}, actual value: {:?}", + v, + as_bytes(iter.value()), + ); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + +#[test] +fn test_storage_get() { + use crate::lsm_storage::LsmStorage; + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.put(b"3", b"23333").unwrap(); + assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233"); + assert_eq!(&storage.get(b"2").unwrap().unwrap()[..], b"2333"); + assert_eq!(&storage.get(b"3").unwrap().unwrap()[..], b"23333"); + storage.delete(b"2").unwrap(); + assert!(storage.get(b"2").unwrap().is_none()); +} + +#[test] +fn test_storage_scan_memtable_1() { + use crate::lsm_storage::LsmStorage; + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.put(b"3", b"23333").unwrap(); + storage.delete(b"2").unwrap(); + check_iter_result( + storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(), + vec![ + (Bytes::from("1"), Bytes::from("233")), + (Bytes::from("3"), Bytes::from("23333")), + ], + ); + check_iter_result( + storage + .scan(Bound::Included(b"1"), Bound::Included(b"2")) + .unwrap(), + vec![(Bytes::from("1"), Bytes::from("233"))], + ); + check_iter_result( + storage + .scan(Bound::Excluded(b"1"), Bound::Excluded(b"3")) + .unwrap(), + vec![], + ); +} + +#[test] +fn test_storage_scan_memtable_2() { + use crate::lsm_storage::LsmStorage; + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.put(b"3", b"23333").unwrap(); + storage.delete(b"1").unwrap(); + check_iter_result( + storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(), + vec![ + (Bytes::from("2"), Bytes::from("2333")), + (Bytes::from("3"), Bytes::from("23333")), + ], + ); + check_iter_result( + storage + .scan(Bound::Included(b"1"), Bound::Included(b"2")) + .unwrap(), + vec![(Bytes::from("2"), Bytes::from("2333"))], + ); + check_iter_result( + storage + .scan(Bound::Excluded(b"1"), Bound::Excluded(b"3")) + .unwrap(), + vec![(Bytes::from("2"), Bytes::from("2333"))], + ); +} + +#[test] +fn test_storage_get_after_sync() { + use crate::lsm_storage::LsmStorage; + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.sync().unwrap(); + storage.put(b"3", b"23333").unwrap(); + assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233"); + assert_eq!(&storage.get(b"2").unwrap().unwrap()[..], b"2333"); + assert_eq!(&storage.get(b"3").unwrap().unwrap()[..], b"23333"); + storage.delete(b"2").unwrap(); + assert!(storage.get(b"2").unwrap().is_none()); +} + +#[test] +fn test_storage_scan_memtable_1_after_sync() { + use crate::lsm_storage::LsmStorage; + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.sync().unwrap(); + storage.put(b"3", b"23333").unwrap(); + storage.delete(b"2").unwrap(); + check_iter_result( + storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(), + vec![ + (Bytes::from("1"), Bytes::from("233")), + (Bytes::from("3"), Bytes::from("23333")), + ], + ); + check_iter_result( + storage + .scan(Bound::Included(b"1"), Bound::Included(b"2")) + .unwrap(), + vec![(Bytes::from("1"), Bytes::from("233"))], + ); + check_iter_result( + storage + .scan(Bound::Excluded(b"1"), Bound::Excluded(b"3")) + .unwrap(), + vec![], + ); +} + +#[test] +fn test_storage_scan_memtable_2_after_sync() { + use crate::lsm_storage::LsmStorage; + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.sync().unwrap(); + storage.put(b"3", b"23333").unwrap(); + storage.sync().unwrap(); + storage.delete(b"1").unwrap(); + check_iter_result( + storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(), + vec![ + (Bytes::from("2"), Bytes::from("2333")), + (Bytes::from("3"), Bytes::from("23333")), + ], + ); + check_iter_result( + storage + .scan(Bound::Included(b"1"), Bound::Included(b"2")) + .unwrap(), + vec![(Bytes::from("2"), Bytes::from("2333"))], + ); + check_iter_result( + storage + .scan(Bound::Excluded(b"1"), Bound::Excluded(b"3")) + .unwrap(), + vec![(Bytes::from("2"), Bytes::from("2333"))], + ); +} diff --git a/mini-lsm/README.md b/mini-lsm/README.md new file mode 100644 index 0000000..cceb11b --- /dev/null +++ b/mini-lsm/README.md @@ -0,0 +1 @@ +# mini-lsm week-2 solution \ No newline at end of file