separate week 1 solution

Signed-off-by: Alex Chi Z <iskyzh@gmail.com>
This commit is contained in:
Alex Chi Z
2024-01-16 16:00:51 +08:00
parent a5c8a0687b
commit 327f6badef
26 changed files with 2176 additions and 0 deletions

15
Cargo.lock generated
View File

@@ -420,6 +420,21 @@ dependencies = [
"tempfile", "tempfile",
] ]
[[package]]
name = "mini-lsm-week-1"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"bytes",
"crossbeam-epoch",
"crossbeam-skiplist",
"moka",
"ouroboros",
"parking_lot",
"tempfile",
]
[[package]] [[package]]
name = "mini-lsm-xtask" name = "mini-lsm-xtask"
version = "0.1.0" version = "0.1.0"

View File

@@ -1,6 +1,7 @@
[workspace] [workspace]
members = [ members = [
"mini-lsm", "mini-lsm",
"mini-lsm-week-1",
"xtask", "xtask",
"mini-lsm-starter", "mini-lsm-starter",
] ]

View File

@@ -0,0 +1,22 @@
[package]
name = "mini-lsm-week-1"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
description = "A tutorial for building an LSM tree storage engine in a week."
[dependencies]
anyhow = "1"
arc-swap = "1"
bytes = "1"
crossbeam-epoch = "0.9"
crossbeam-skiplist = "0.1"
parking_lot = "0.12"
ouroboros = "0.15"
moka = "0.9"
[dev-dependencies]
tempfile = "3"

View File

@@ -0,0 +1 @@
# mini-lsm week-1 solution

View File

@@ -0,0 +1,46 @@
mod builder;
mod iterator;
pub use builder::BlockBuilder;
use bytes::{Buf, BufMut, Bytes};
pub use iterator::BlockIterator;
pub const SIZEOF_U16: usize = std::mem::size_of::<u16>();
/// A block is the smallest unit of read and caching in LSM tree. It is a collection of sorted
/// key-value pairs.
pub struct Block {
data: Vec<u8>,
offsets: Vec<u16>,
}
impl Block {
pub fn encode(&self) -> Bytes {
let mut buf = self.data.clone();
let offsets_len = self.offsets.len();
for offset in &self.offsets {
buf.put_u16(*offset);
}
// Adds number of elements at the end of the block
buf.put_u16(offsets_len as u16);
buf.into()
}
pub fn decode(data: &[u8]) -> Self {
// get number of elements in the block
let entry_offsets_len = (&data[data.len() - SIZEOF_U16..]).get_u16() as usize;
let data_end = data.len() - SIZEOF_U16 - entry_offsets_len * SIZEOF_U16;
let offsets_raw = &data[data_end..data.len() - SIZEOF_U16];
// get offset array
let offsets = offsets_raw
.chunks(SIZEOF_U16)
.map(|mut x| x.get_u16())
.collect();
// retrieve data
let data = data[0..data_end].to_vec();
Self { data, offsets }
}
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,67 @@
use bytes::BufMut;
use super::{Block, SIZEOF_U16};
/// Builds a block.
pub struct BlockBuilder {
/// Offsets of each key-value entries.
offsets: Vec<u16>,
/// All serialized key-value pairs in the block.
data: Vec<u8>,
/// The expected block size.
block_size: usize,
}
impl BlockBuilder {
/// Creates a new block builder.
pub fn new(block_size: usize) -> Self {
Self {
offsets: Vec::new(),
data: Vec::new(),
block_size,
}
}
fn estimated_size(&self) -> usize {
SIZEOF_U16 /* number of key-value pairs in the block */ + self.offsets.len() * SIZEOF_U16 /* offsets */ + self.data.len()
/* key-value pairs */
}
/// Adds a key-value pair to the block. Returns false when the block is full.
#[must_use]
pub fn add(&mut self, key: &[u8], value: &[u8]) -> bool {
assert!(!key.is_empty(), "key must not be empty");
if self.estimated_size() + key.len() + value.len() + SIZEOF_U16 * 3 /* key_len, value_len and offset */ > self.block_size
&& !self.is_empty()
{
return false;
}
// Add the offset of the data into the offset array.
self.offsets.push(self.data.len() as u16);
// Encode key length.
self.data.put_u16(key.len() as u16);
// Encode key content.
self.data.put(key);
// Encode value length.
self.data.put_u16(value.len() as u16);
// Encode value content.
self.data.put(value);
true
}
/// Check if there are no key-value pairs in the block.
pub fn is_empty(&self) -> bool {
self.offsets.is_empty()
}
/// Finalize the block.
pub fn build(self) -> Block {
if self.is_empty() {
panic!("block should not be empty");
}
Block {
data: self.data,
offsets: self.offsets,
}
}
}

View File

@@ -0,0 +1,117 @@
use std::sync::Arc;
use bytes::Buf;
use super::Block;
/// Iterates on a block.
pub struct BlockIterator {
/// reference to the block
block: Arc<Block>,
/// the current key at the iterator position
key: Vec<u8>,
/// the current value at the iterator position
value: Vec<u8>,
/// the current index at the iterator position
idx: usize,
}
impl BlockIterator {
fn new(block: Arc<Block>) -> Self {
Self {
block,
key: Vec::new(),
value: Vec::new(),
idx: 0,
}
}
/// Creates a block iterator and seek to the first entry.
pub fn create_and_seek_to_first(block: Arc<Block>) -> Self {
let mut iter = Self::new(block);
iter.seek_to_first();
iter
}
/// Creates a block iterator and seek to the first key that >= `key`.
pub fn create_and_seek_to_key(block: Arc<Block>, key: &[u8]) -> Self {
let mut iter = Self::new(block);
iter.seek_to_key(key);
iter
}
/// Returns the key of the current entry.
pub fn key(&self) -> &[u8] {
debug_assert!(!self.key.is_empty(), "invalid iterator");
&self.key
}
/// Returns the value of the current entry.
pub fn value(&self) -> &[u8] {
debug_assert!(!self.key.is_empty(), "invalid iterator");
&self.value
}
/// Returns true if the iterator is valid.
pub fn is_valid(&self) -> bool {
!self.key.is_empty()
}
/// Seeks to the first key in the block.
pub fn seek_to_first(&mut self) {
self.seek_to(0);
}
/// Seeks to the idx-th key in the block.
fn seek_to(&mut self, idx: usize) {
if idx >= self.block.offsets.len() {
self.key.clear();
self.value.clear();
return;
}
let offset = self.block.offsets[idx] as usize;
self.seek_to_offset(offset);
self.idx = idx;
}
/// Move to the next key in the block.
pub fn next(&mut self) {
self.idx += 1;
self.seek_to(self.idx);
}
/// Seek to the specified position and update the current `key` and `value`
/// Index update will be handled by caller
fn seek_to_offset(&mut self, offset: usize) {
let mut entry = &self.block.data[offset..];
// Since `get_u16()` will automatically move the ptr 2 bytes ahead here,
// we don't need to manually advance it
let key_len = entry.get_u16() as usize;
let key = entry[..key_len].to_vec();
entry.advance(key_len);
self.key.clear();
self.key.extend(key);
let value_len = entry.get_u16() as usize;
let value = entry[..value_len].to_vec();
entry.advance(value_len);
self.value.clear();
self.value.extend(value);
}
/// Seek to the first key that is >= `key`.
pub fn seek_to_key(&mut self, key: &[u8]) {
let mut low = 0;
let mut high = self.block.offsets.len();
while low < high {
let mid = low + (high - low) / 2;
self.seek_to(mid);
assert!(self.is_valid());
match self.key().cmp(key) {
std::cmp::Ordering::Less => low = mid + 1,
std::cmp::Ordering::Greater => high = mid,
std::cmp::Ordering::Equal => return,
}
}
self.seek_to(low);
}
}

View File

@@ -0,0 +1,122 @@
use std::sync::Arc;
use super::builder::BlockBuilder;
use super::iterator::BlockIterator;
use super::*;
#[test]
fn test_block_build_single_key() {
let mut builder = BlockBuilder::new(16);
assert!(builder.add(b"233", b"233333"));
builder.build();
}
#[test]
fn test_block_build_full() {
let mut builder = BlockBuilder::new(16);
assert!(builder.add(b"11", b"11"));
assert!(!builder.add(b"22", b"22"));
builder.build();
}
fn key_of(idx: usize) -> Vec<u8> {
format!("key_{:03}", idx * 5).into_bytes()
}
fn value_of(idx: usize) -> Vec<u8> {
format!("value_{:010}", idx).into_bytes()
}
fn num_of_keys() -> usize {
100
}
fn generate_block() -> Block {
let mut builder = BlockBuilder::new(10000);
for idx in 0..num_of_keys() {
let key = key_of(idx);
let value = value_of(idx);
assert!(builder.add(&key[..], &value[..]));
}
builder.build()
}
#[test]
fn test_block_build_all() {
generate_block();
}
#[test]
fn test_block_encode() {
let block = generate_block();
block.encode();
}
#[test]
fn test_block_decode() {
let block = generate_block();
let encoded = block.encode();
let decoded_block = Block::decode(&encoded);
assert_eq!(block.offsets, decoded_block.offsets);
assert_eq!(block.data, decoded_block.data);
}
fn as_bytes(x: &[u8]) -> Bytes {
Bytes::copy_from_slice(x)
}
#[test]
fn test_block_iterator() {
let block = Arc::new(generate_block());
let mut iter = BlockIterator::create_and_seek_to_first(block);
for _ in 0..5 {
for i in 0..num_of_keys() {
let key = iter.key();
let value = iter.value();
assert_eq!(
key,
key_of(i),
"expected key: {:?}, actual key: {:?}",
as_bytes(&key_of(i)),
as_bytes(key)
);
assert_eq!(
value,
value_of(i),
"expected value: {:?}, actual value: {:?}",
as_bytes(&value_of(i)),
as_bytes(value)
);
iter.next();
}
iter.seek_to_first();
}
}
#[test]
fn test_block_seek_key() {
let block = Arc::new(generate_block());
let mut iter = BlockIterator::create_and_seek_to_key(block, &key_of(0));
for offset in 1..=5 {
for i in 0..num_of_keys() {
let key = iter.key();
let value = iter.value();
assert_eq!(
key,
key_of(i),
"expected key: {:?}, actual key: {:?}",
as_bytes(&key_of(i)),
as_bytes(key)
);
assert_eq!(
value,
value_of(i),
"expected value: {:?}, actual value: {:?}",
as_bytes(&value_of(i)),
as_bytes(value)
);
iter.seek_to_key(&format!("key_{:03}", i * 5 + offset).into_bytes());
}
iter.seek_to_key(b"k");
}
}

View File

@@ -0,0 +1,19 @@
pub mod merge_iterator;
pub mod two_merge_iterator;
pub trait StorageIterator {
/// Get the current value.
fn value(&self) -> &[u8];
/// Get the current key.
fn key(&self) -> &[u8];
/// Check if the current iterator is valid.
fn is_valid(&self) -> bool;
/// Move to the next position.
fn next(&mut self) -> anyhow::Result<()>;
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,139 @@
use std::cmp::{self};
use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use anyhow::Result;
use super::StorageIterator;
struct HeapWrapper<I: StorageIterator>(pub usize, pub Box<I>);
impl<I: StorageIterator> PartialEq for HeapWrapper<I> {
fn eq(&self, other: &Self) -> bool {
self.partial_cmp(other).unwrap() == cmp::Ordering::Equal
}
}
impl<I: StorageIterator> Eq for HeapWrapper<I> {}
impl<I: StorageIterator> PartialOrd for HeapWrapper<I> {
#[allow(clippy::non_canonical_partial_ord_impl)]
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
match self.1.key().cmp(other.1.key()) {
cmp::Ordering::Greater => Some(cmp::Ordering::Greater),
cmp::Ordering::Less => Some(cmp::Ordering::Less),
cmp::Ordering::Equal => self.0.partial_cmp(&other.0),
}
.map(|x| x.reverse())
}
}
impl<I: StorageIterator> Ord for HeapWrapper<I> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.partial_cmp(other).unwrap()
}
}
/// Merge multiple iterators of the same type. If the same key occurs multiple times in some
/// iterators, perfer the one with smaller index.
pub struct MergeIterator<I: StorageIterator> {
iters: BinaryHeap<HeapWrapper<I>>,
current: Option<HeapWrapper<I>>,
}
impl<I: StorageIterator> MergeIterator<I> {
pub fn create(iters: Vec<Box<I>>) -> Self {
if iters.is_empty() {
return Self {
iters: BinaryHeap::new(),
current: None,
};
}
let mut heap = BinaryHeap::new();
if iters.iter().all(|x| !x.is_valid()) {
// All invalid, select the last one as the current.
let mut iters = iters;
return Self {
iters: heap,
current: Some(HeapWrapper(0, iters.pop().unwrap())),
};
}
for (idx, iter) in iters.into_iter().enumerate() {
if iter.is_valid() {
heap.push(HeapWrapper(idx, iter));
}
}
let current = heap.pop().unwrap();
Self {
iters: heap,
current: Some(current),
}
}
}
impl<I: StorageIterator> StorageIterator for MergeIterator<I> {
fn key(&self) -> &[u8] {
unsafe { self.current.as_ref().unwrap_unchecked() }.1.key()
}
fn value(&self) -> &[u8] {
unsafe { self.current.as_ref().unwrap_unchecked() }
.1
.value()
}
fn is_valid(&self) -> bool {
self.current
.as_ref()
.map(|x| x.1.is_valid())
.unwrap_or(false)
}
fn next(&mut self) -> Result<()> {
let current = unsafe { self.current.as_mut().unwrap_unchecked() };
// Pop the item out of the heap if they have the same value.
while let Some(mut inner_iter) = self.iters.peek_mut() {
debug_assert!(
inner_iter.1.key() >= current.1.key(),
"heap invariant violated"
);
if inner_iter.1.key() == current.1.key() {
// Case 1: an error occurred when calling `next`.
if let e @ Err(_) = inner_iter.1.next() {
PeekMut::pop(inner_iter);
return e;
}
// Case 2: iter is no longer valid.
if !inner_iter.1.is_valid() {
PeekMut::pop(inner_iter);
}
} else {
break;
}
}
current.1.next()?;
// If the current iterator is invalid, pop it out of the heap and select the next one.
if !current.1.is_valid() {
if let Some(iter) = self.iters.pop() {
*current = iter;
}
return Ok(());
}
// Otherwise, compare with heap top and swap if necessary.
if let Some(mut inner_iter) = self.iters.peek_mut() {
if *current < *inner_iter {
std::mem::swap(&mut *inner_iter, current);
}
}
Ok(())
}
}

View File

@@ -0,0 +1,40 @@
use anyhow::Result;
use bytes::Bytes;
use super::StorageIterator;
pub mod merge_iterator_test;
pub mod two_merge_iterator_test;
#[derive(Clone)]
pub struct MockIterator {
pub data: Vec<(Bytes, Bytes)>,
pub index: usize,
}
impl MockIterator {
pub fn new(data: Vec<(Bytes, Bytes)>) -> Self {
Self { data, index: 0 }
}
}
impl StorageIterator for MockIterator {
fn next(&mut self) -> Result<()> {
if self.index < self.data.len() {
self.index += 1;
}
Ok(())
}
fn key(&self) -> &[u8] {
self.data[self.index].0.as_ref()
}
fn value(&self) -> &[u8] {
self.data[self.index].1.as_ref()
}
fn is_valid(&self) -> bool {
self.index < self.data.len()
}
}

View File

@@ -0,0 +1,137 @@
use super::*;
use crate::iterators::merge_iterator::MergeIterator;
fn as_bytes(x: &[u8]) -> Bytes {
Bytes::copy_from_slice(x)
}
fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) {
let mut iter = iter;
for (k, v) in expected {
assert!(iter.is_valid());
assert_eq!(
k,
iter.key(),
"expected key: {:?}, actual key: {:?}",
k,
as_bytes(iter.key()),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
iter.next().unwrap();
}
assert!(!iter.is_valid());
}
#[test]
fn test_merge_1() {
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let i3 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.3")),
(Bytes::from("c"), Bytes::from("3.3")),
(Bytes::from("d"), Bytes::from("4.3")),
]);
let iter = MergeIterator::create(vec![
Box::new(i1.clone()),
Box::new(i2.clone()),
Box::new(i3.clone()),
]);
check_iter_result(
iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("d"), Bytes::from("4.2")),
],
);
let iter = MergeIterator::create(vec![Box::new(i3), Box::new(i1), Box::new(i2)]);
check_iter_result(
iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.3")),
(Bytes::from("c"), Bytes::from("3.3")),
(Bytes::from("d"), Bytes::from("4.3")),
],
);
}
#[test]
fn test_merge_2() {
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new(vec![
(Bytes::from("d"), Bytes::from("1.2")),
(Bytes::from("e"), Bytes::from("2.2")),
(Bytes::from("f"), Bytes::from("3.2")),
(Bytes::from("g"), Bytes::from("4.2")),
]);
let i3 = MockIterator::new(vec![
(Bytes::from("h"), Bytes::from("1.3")),
(Bytes::from("i"), Bytes::from("2.3")),
(Bytes::from("j"), Bytes::from("3.3")),
(Bytes::from("k"), Bytes::from("4.3")),
]);
let i4 = MockIterator::new(vec![]);
let result = vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("d"), Bytes::from("1.2")),
(Bytes::from("e"), Bytes::from("2.2")),
(Bytes::from("f"), Bytes::from("3.2")),
(Bytes::from("g"), Bytes::from("4.2")),
(Bytes::from("h"), Bytes::from("1.3")),
(Bytes::from("i"), Bytes::from("2.3")),
(Bytes::from("j"), Bytes::from("3.3")),
(Bytes::from("k"), Bytes::from("4.3")),
];
let iter = MergeIterator::create(vec![
Box::new(i1.clone()),
Box::new(i2.clone()),
Box::new(i3.clone()),
Box::new(i4.clone()),
]);
check_iter_result(iter, result.clone());
let iter = MergeIterator::create(vec![
Box::new(i2.clone()),
Box::new(i4.clone()),
Box::new(i3.clone()),
Box::new(i1.clone()),
]);
check_iter_result(iter, result.clone());
let iter = MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]);
check_iter_result(iter, result);
}
#[test]
fn test_merge_empty() {
let iter = MergeIterator::<MockIterator>::create(vec![]);
check_iter_result(iter, vec![]);
}

View File

@@ -0,0 +1,129 @@
use super::*;
use crate::iterators::two_merge_iterator::TwoMergeIterator;
fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) {
let mut iter = iter;
for (k, v) in expected {
assert!(iter.is_valid());
assert_eq!(iter.key(), k.as_ref());
assert_eq!(iter.value(), v.as_ref());
iter.next().unwrap();
}
assert!(!iter.is_valid());
}
#[test]
fn test_merge_1() {
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("d"), Bytes::from("4.2")),
],
)
}
#[test]
fn test_merge_2() {
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
iter,
vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
)
}
#[test]
fn test_merge_3() {
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i1 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
)
}
#[test]
fn test_merge_4() {
let i2 = MockIterator::new(vec![]);
let i1 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
iter,
vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
);
let i1 = MockIterator::new(vec![]);
let i2 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
iter,
vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
);
}
#[test]
fn test_merge_5() {
let i2 = MockIterator::new(vec![]);
let i1 = MockIterator::new(vec![]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(iter, vec![])
}

View File

@@ -0,0 +1,80 @@
use anyhow::Result;
use super::StorageIterator;
/// Merges two iterators of different types into one. If the two iterators have the same key, only
/// produce the key once and prefer the entry from A.
pub struct TwoMergeIterator<A: StorageIterator, B: StorageIterator> {
a: A,
b: B,
choose_a: bool,
}
impl<A: StorageIterator, B: StorageIterator> TwoMergeIterator<A, B> {
fn choose_a(a: &A, b: &B) -> bool {
if !a.is_valid() {
return false;
}
if !b.is_valid() {
return true;
}
a.key() < b.key()
}
fn skip_b(&mut self) -> Result<()> {
if self.a.is_valid() {
while self.b.is_valid() && self.b.key() == self.a.key() {
self.b.next()?;
}
}
Ok(())
}
pub fn create(a: A, b: B) -> Result<Self> {
let mut iter = Self {
choose_a: false,
a,
b,
};
iter.skip_b()?;
iter.choose_a = Self::choose_a(&iter.a, &iter.b);
Ok(iter)
}
}
impl<A: StorageIterator, B: StorageIterator> StorageIterator for TwoMergeIterator<A, B> {
fn key(&self) -> &[u8] {
if self.choose_a {
self.a.key()
} else {
self.b.key()
}
}
fn value(&self) -> &[u8] {
if self.choose_a {
self.a.value()
} else {
self.b.value()
}
}
fn is_valid(&self) -> bool {
if self.choose_a {
self.a.is_valid()
} else {
self.b.is_valid()
}
}
fn next(&mut self) -> Result<()> {
if self.choose_a {
self.a.next()?;
} else {
self.b.next()?;
}
self.skip_b()?;
self.choose_a = Self::choose_a(&self.a, &self.b);
Ok(())
}
}

View File

@@ -0,0 +1,9 @@
pub mod block;
pub mod iterators;
pub mod lsm_iterator;
pub mod lsm_storage;
pub mod mem_table;
pub mod table;
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,106 @@
use std::ops::Bound;
use anyhow::Result;
use bytes::Bytes;
use crate::iterators::merge_iterator::MergeIterator;
use crate::iterators::two_merge_iterator::TwoMergeIterator;
use crate::iterators::StorageIterator;
use crate::mem_table::MemTableIterator;
use crate::table::SsTableIterator;
type LsmIteratorInner =
TwoMergeIterator<MergeIterator<MemTableIterator>, MergeIterator<SsTableIterator>>;
pub struct LsmIterator {
iter: LsmIteratorInner,
end_bound: Bound<Bytes>,
is_valid: bool,
}
impl LsmIterator {
pub(crate) fn new(iter: LsmIteratorInner, end_bound: Bound<Bytes>) -> Result<Self> {
let mut iter = Self {
is_valid: iter.is_valid(),
iter,
end_bound,
};
iter.move_to_non_delete()?;
Ok(iter)
}
fn next_inner(&mut self) -> Result<()> {
self.iter.next()?;
if !self.iter.is_valid() {
self.is_valid = false;
return Ok(());
}
match self.end_bound.as_ref() {
Bound::Unbounded => {}
Bound::Included(key) => self.is_valid = self.iter.key() <= key.as_ref(),
Bound::Excluded(key) => self.is_valid = self.iter.key() < key.as_ref(),
}
Ok(())
}
fn move_to_non_delete(&mut self) -> Result<()> {
while self.is_valid() && self.iter.value().is_empty() {
self.next_inner()?;
}
Ok(())
}
}
impl StorageIterator for LsmIterator {
fn is_valid(&self) -> bool {
self.is_valid
}
fn key(&self) -> &[u8] {
self.iter.key()
}
fn value(&self) -> &[u8] {
self.iter.value()
}
fn next(&mut self) -> Result<()> {
self.next_inner()?;
self.move_to_non_delete()?;
Ok(())
}
}
/// A wrapper around existing iterator, will prevent users from calling `next` when the iterator is
/// invalid.
pub struct FusedIterator<I: StorageIterator> {
iter: I,
}
impl<I: StorageIterator> FusedIterator<I> {
pub fn new(iter: I) -> Self {
Self { iter }
}
}
impl<I: StorageIterator> StorageIterator for FusedIterator<I> {
fn is_valid(&self) -> bool {
self.iter.is_valid()
}
fn key(&self) -> &[u8] {
self.iter.key()
}
fn value(&self) -> &[u8] {
self.iter.value()
}
fn next(&mut self) -> Result<()> {
// only move when the iterator is valid
if self.iter.is_valid() {
self.iter.next()?;
}
Ok(())
}
}

View File

@@ -0,0 +1,229 @@
use std::ops::Bound;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use anyhow::Result;
use bytes::Bytes;
use parking_lot::{Mutex, RwLock};
use crate::block::Block;
use crate::iterators::merge_iterator::MergeIterator;
use crate::iterators::two_merge_iterator::TwoMergeIterator;
use crate::iterators::StorageIterator;
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::mem_table::{map_bound, MemTable};
use crate::table::{SsTable, SsTableBuilder, SsTableIterator};
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
#[derive(Clone)]
pub struct LsmStorageInner {
/// The current memtable.
memtable: Arc<MemTable>,
/// Immutable memTables, from earliest to latest.
imm_memtables: Vec<Arc<MemTable>>,
/// L0 SsTables, from earliest to latest.
l0_sstables: Vec<Arc<SsTable>>,
/// L1 - L6 SsTables, sorted by key range.
#[allow(dead_code)]
levels: Vec<Vec<Arc<SsTable>>>,
}
impl LsmStorageInner {
fn create() -> Self {
Self {
memtable: Arc::new(MemTable::create()),
imm_memtables: vec![],
l0_sstables: vec![],
levels: vec![],
}
}
}
/// The storage interface of the LSM tree.
pub struct LsmStorage {
pub(crate) inner: Arc<RwLock<Arc<LsmStorageInner>>>,
flush_lock: Mutex<()>,
path: PathBuf,
pub(crate) block_cache: Arc<BlockCache>,
next_sst_id: AtomicUsize,
}
impl LsmStorage {
pub(crate) fn next_sst_id(&self) -> usize {
self.next_sst_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {
inner: Arc::new(RwLock::new(Arc::new(LsmStorageInner::create()))),
flush_lock: Mutex::new(()),
path: path.as_ref().to_path_buf(),
block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache,
next_sst_id: AtomicUsize::new(1),
})
}
/// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
let snapshot = {
let guard = self.inner.read();
Arc::clone(&guard)
}; // drop global lock here
// Search on the current memtable.
if let Some(value) = snapshot.memtable.get(key) {
if value.is_empty() {
// found tomestone, return key not exists
return Ok(None);
}
return Ok(Some(value));
}
// Search on immutable memtables.
for memtable in snapshot.imm_memtables.iter().rev() {
if let Some(value) = memtable.get(key) {
if value.is_empty() {
// found tomestone, return key not exists
return Ok(None);
}
return Ok(Some(value));
}
}
let mut iters = Vec::with_capacity(snapshot.l0_sstables.len());
for table in snapshot.l0_sstables.iter().rev() {
iters.push(Box::new(SsTableIterator::create_and_seek_to_key(
table.clone(),
key,
)?));
}
let iter = MergeIterator::create(iters);
if iter.is_valid() {
return Ok(Some(Bytes::copy_from_slice(iter.value())));
}
Ok(None)
}
/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
assert!(!value.is_empty(), "value cannot be empty");
assert!(!key.is_empty(), "key cannot be empty");
let guard = self.inner.read();
guard.memtable.put(key, value);
Ok(())
}
/// Remove a key from the storage by writing an empty value.
pub fn delete(&self, key: &[u8]) -> Result<()> {
assert!(!key.is_empty(), "key cannot be empty");
let guard = self.inner.read();
guard.memtable.put(key, b"");
Ok(())
}
pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf {
self.path.join(format!("{:05}.sst", id))
}
/// Persist data to disk.
///
/// In day 3: flush the current memtable to disk as L0 SST.
/// In day 6: call `fsync` on WAL.
pub fn sync(&self) -> Result<()> {
let _flush_lock = self.flush_lock.lock();
let flush_memtable;
let sst_id;
// Move mutable memtable to immutable memtables.
{
let mut guard = self.inner.write();
// Swap the current memtable with a new one.
let mut snapshot = guard.as_ref().clone();
let memtable = std::mem::replace(&mut snapshot.memtable, Arc::new(MemTable::create()));
flush_memtable = memtable.clone();
sst_id = self.next_sst_id();
// Add the memtable to the immutable memtables.
snapshot.imm_memtables.push(memtable);
// Update the snapshot.
*guard = Arc::new(snapshot);
}
// At this point, the old memtable should be disabled for write, and all write threads
// should be operating on the new memtable. We can safely flush the old memtable to
// disk.
let mut builder = SsTableBuilder::new(4096);
flush_memtable.flush(&mut builder)?;
let sst = Arc::new(builder.build(
sst_id,
Some(self.block_cache.clone()),
self.path_of_sst(sst_id),
)?);
// Add the flushed L0 table to the list.
{
let mut guard = self.inner.write();
let mut snapshot = guard.as_ref().clone();
// Remove the memtable from the immutable memtables.
snapshot.imm_memtables.pop();
// Add L0 table
snapshot.l0_sstables.push(sst);
// Update the snapshot.
*guard = Arc::new(snapshot);
}
Ok(())
}
/// Create an iterator over a range of keys.
pub fn scan(
&self,
lower: Bound<&[u8]>,
upper: Bound<&[u8]>,
) -> Result<FusedIterator<LsmIterator>> {
let snapshot = {
let guard = self.inner.read();
Arc::clone(&guard)
}; // drop global lock here
let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1);
memtable_iters.push(Box::new(snapshot.memtable.scan(lower, upper)));
for memtable in snapshot.imm_memtables.iter().rev() {
memtable_iters.push(Box::new(memtable.scan(lower, upper)));
}
let memtable_iter = MergeIterator::create(memtable_iters);
let mut table_iters = Vec::with_capacity(snapshot.l0_sstables.len());
for table in snapshot.l0_sstables.iter().rev() {
let iter = match lower {
Bound::Included(key) => {
SsTableIterator::create_and_seek_to_key(table.clone(), key)?
}
Bound::Excluded(key) => {
let mut iter = SsTableIterator::create_and_seek_to_key(table.clone(), key)?;
if iter.is_valid() && iter.key() == key {
iter.next()?;
}
iter
}
Bound::Unbounded => SsTableIterator::create_and_seek_to_first(table.clone())?,
};
table_iters.push(Box::new(iter));
}
let table_iter = MergeIterator::create(table_iters);
let iter = TwoMergeIterator::create(memtable_iter, table_iter)?;
Ok(FusedIterator::new(LsmIterator::new(
iter,
map_bound(upper),
)?))
}
}

View File

@@ -0,0 +1,110 @@
use std::ops::Bound;
use std::sync::Arc;
use anyhow::Result;
use bytes::Bytes;
use crossbeam_skiplist::map::Entry;
use crossbeam_skiplist::SkipMap;
use ouroboros::self_referencing;
use crate::iterators::StorageIterator;
use crate::table::SsTableBuilder;
/// A basic mem-table based on crossbeam-skiplist
pub struct MemTable {
map: Arc<SkipMap<Bytes, Bytes>>,
}
pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound<Bytes> {
match bound {
Bound::Included(x) => Bound::Included(Bytes::copy_from_slice(x)),
Bound::Excluded(x) => Bound::Excluded(Bytes::copy_from_slice(x)),
Bound::Unbounded => Bound::Unbounded,
}
}
impl MemTable {
/// Create a new mem-table.
pub fn create() -> Self {
Self {
map: Arc::new(SkipMap::new()),
}
}
/// Get a value by key.
pub fn get(&self, key: &[u8]) -> Option<Bytes> {
self.map.get(key).map(|e| e.value().clone())
}
/// Put a key-value pair into the mem-table.
pub fn put(&self, key: &[u8], value: &[u8]) {
self.map
.insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value));
}
/// Get an iterator over a range of keys.
pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> MemTableIterator {
let (lower, upper) = (map_bound(lower), map_bound(upper));
let mut iter = MemTableIteratorBuilder {
map: self.map.clone(),
iter_builder: |map| map.range((lower, upper)),
item: (Bytes::from_static(&[]), Bytes::from_static(&[])),
}
.build();
let entry = iter.with_iter_mut(|iter| MemTableIterator::entry_to_item(iter.next()));
iter.with_mut(|x| *x.item = entry);
iter
}
/// Flush the mem-table to SSTable.
pub fn flush(&self, builder: &mut SsTableBuilder) -> Result<()> {
for entry in self.map.iter() {
builder.add(&entry.key()[..], &entry.value()[..]);
}
Ok(())
}
}
type SkipMapRangeIter<'a> =
crossbeam_skiplist::map::Range<'a, Bytes, (Bound<Bytes>, Bound<Bytes>), Bytes, Bytes>;
/// An iterator over a range of `SkipMap`.
#[self_referencing]
pub struct MemTableIterator {
map: Arc<SkipMap<Bytes, Bytes>>,
#[borrows(map)]
#[not_covariant]
iter: SkipMapRangeIter<'this>,
item: (Bytes, Bytes),
}
impl MemTableIterator {
fn entry_to_item(entry: Option<Entry<'_, Bytes, Bytes>>) -> (Bytes, Bytes) {
entry
.map(|x| (x.key().clone(), x.value().clone()))
.unwrap_or_else(|| (Bytes::from_static(&[]), Bytes::from_static(&[])))
}
}
impl StorageIterator for MemTableIterator {
fn value(&self) -> &[u8] {
&self.borrow_item().1[..]
}
fn key(&self) -> &[u8] {
&self.borrow_item().0[..]
}
fn is_valid(&self) -> bool {
!self.borrow_item().0.is_empty()
}
fn next(&mut self) -> Result<()> {
let entry = self.with_iter_mut(|iter| MemTableIterator::entry_to_item(iter.next()));
self.with_mut(|x| *x.item = entry);
Ok(())
}
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,95 @@
use tempfile::tempdir;
use super::MemTable;
use crate::iterators::StorageIterator;
use crate::table::{SsTableBuilder, SsTableIterator};
#[test]
fn test_memtable_get() {
let memtable = MemTable::create();
memtable.put(b"key1", b"value1");
memtable.put(b"key2", b"value2");
memtable.put(b"key3", b"value3");
assert_eq!(&memtable.get(b"key1").unwrap()[..], b"value1");
assert_eq!(&memtable.get(b"key2").unwrap()[..], b"value2");
assert_eq!(&memtable.get(b"key3").unwrap()[..], b"value3");
}
#[test]
fn test_memtable_overwrite() {
let memtable = MemTable::create();
memtable.put(b"key1", b"value1");
memtable.put(b"key2", b"value2");
memtable.put(b"key3", b"value3");
memtable.put(b"key1", b"value11");
memtable.put(b"key2", b"value22");
memtable.put(b"key3", b"value33");
assert_eq!(&memtable.get(b"key1").unwrap()[..], b"value11");
assert_eq!(&memtable.get(b"key2").unwrap()[..], b"value22");
assert_eq!(&memtable.get(b"key3").unwrap()[..], b"value33");
}
#[test]
fn test_memtable_flush() {
let memtable = MemTable::create();
memtable.put(b"key1", b"value1");
memtable.put(b"key2", b"value2");
memtable.put(b"key3", b"value3");
let mut builder = SsTableBuilder::new(128);
memtable.flush(&mut builder).unwrap();
let dir = tempdir().unwrap();
let sst = builder.build_for_test(dir.path().join("1.sst")).unwrap();
let mut iter = SsTableIterator::create_and_seek_to_first(sst.into()).unwrap();
assert_eq!(iter.key(), b"key1");
assert_eq!(iter.value(), b"value1");
iter.next().unwrap();
assert_eq!(iter.key(), b"key2");
assert_eq!(iter.value(), b"value2");
iter.next().unwrap();
assert_eq!(iter.key(), b"key3");
assert_eq!(iter.value(), b"value3");
iter.next().unwrap();
assert!(!iter.is_valid());
}
#[test]
fn test_memtable_iter() {
use std::ops::Bound;
let memtable = MemTable::create();
memtable.put(b"key1", b"value1");
memtable.put(b"key2", b"value2");
memtable.put(b"key3", b"value3");
{
let mut iter = memtable.scan(Bound::Unbounded, Bound::Unbounded);
assert_eq!(iter.key(), b"key1");
assert_eq!(iter.value(), b"value1");
iter.next().unwrap();
assert_eq!(iter.key(), b"key2");
assert_eq!(iter.value(), b"value2");
iter.next().unwrap();
assert_eq!(iter.key(), b"key3");
assert_eq!(iter.value(), b"value3");
iter.next().unwrap();
assert!(!iter.is_valid());
}
{
let mut iter = memtable.scan(Bound::Included(b"key1"), Bound::Included(b"key2"));
assert_eq!(iter.key(), b"key1");
assert_eq!(iter.value(), b"value1");
iter.next().unwrap();
assert_eq!(iter.key(), b"key2");
assert_eq!(iter.value(), b"value2");
iter.next().unwrap();
assert!(!iter.is_valid());
}
{
let mut iter = memtable.scan(Bound::Excluded(b"key1"), Bound::Excluded(b"key3"));
assert_eq!(iter.key(), b"key2");
assert_eq!(iter.value(), b"value2");
iter.next().unwrap();
assert!(!iter.is_valid());
}
}

View File

@@ -0,0 +1,180 @@
mod builder;
mod iterator;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Result};
pub use builder::SsTableBuilder;
use bytes::{Buf, BufMut, Bytes};
pub use iterator::SsTableIterator;
use crate::block::Block;
use crate::lsm_storage::BlockCache;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BlockMeta {
/// Offset of this data block.
pub offset: usize,
/// The first key of the data block.
pub first_key: Bytes,
}
impl BlockMeta {
/// Encode block meta to a buffer.
pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec<u8>) {
let mut estimated_size = 0;
for meta in block_meta {
// The size of offset
estimated_size += std::mem::size_of::<u32>();
// The size of key length
estimated_size += std::mem::size_of::<u16>();
// The size of actual key
estimated_size += meta.first_key.len();
}
// Reserve the space to improve performance, especially when the size of incoming data is large
buf.reserve(estimated_size);
let original_len = buf.len();
for meta in block_meta {
buf.put_u32(meta.offset as u32);
buf.put_u16(meta.first_key.len() as u16);
buf.put_slice(&meta.first_key);
}
assert_eq!(estimated_size, buf.len() - original_len);
}
/// Decode block meta from a buffer.
pub fn decode_block_meta(mut buf: impl Buf) -> Vec<BlockMeta> {
let mut block_meta = Vec::new();
while buf.has_remaining() {
let offset = buf.get_u32() as usize;
let first_key_len = buf.get_u16() as usize;
let first_key = buf.copy_to_bytes(first_key_len);
block_meta.push(BlockMeta { offset, first_key });
}
block_meta
}
}
/// A file object.
///
/// Before day 4, it should look like:
///
/// ```ignore
/// pub struct FileObject(Bytes);
///
/// impl FileObject {
/// pub fn read(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
/// Ok(self.0[offset as usize..(offset + len) as usize].to_vec())
/// }
/// pub fn size(&self) -> u64 {
/// self.0.len() as u64
/// }
///
/// pub fn create(_path: &Path, data: Vec<u8>) -> Result<Self> {
/// Ok(FileObject(data.into()))
/// }
///
/// pub fn open(_path: &Path) -> Result<Self> {
/// unimplemented!()
/// }
/// }
/// ```
pub struct FileObject(File, u64);
impl FileObject {
pub fn read(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
use std::os::unix::fs::FileExt;
let mut data = vec![0; len as usize];
self.0.read_exact_at(&mut data[..], offset)?;
Ok(data)
}
pub fn size(&self) -> u64 {
self.1
}
/// Create a new file object (day 2) and write the file to the disk (day 4).
pub fn create(path: &Path, data: Vec<u8>) -> Result<Self> {
std::fs::write(path, &data)?;
Ok(FileObject(
File::options().read(true).write(false).open(path)?,
data.len() as u64,
))
}
pub fn open(_path: &Path) -> Result<Self> {
unimplemented!()
}
}
pub struct SsTable {
file: FileObject,
block_metas: Vec<BlockMeta>,
block_meta_offset: usize,
id: usize,
block_cache: Option<Arc<BlockCache>>,
}
impl SsTable {
#[cfg(test)]
pub(crate) fn open_for_test(file: FileObject) -> Result<Self> {
Self::open(0, None, file)
}
/// Open SSTable from a file.
pub fn open(id: usize, block_cache: Option<Arc<BlockCache>>, file: FileObject) -> Result<Self> {
let len = file.size();
let raw_meta_offset = file.read(len - 4, 4)?;
let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64;
let raw_meta = file.read(block_meta_offset, len - 4 - block_meta_offset)?;
Ok(Self {
file,
block_metas: BlockMeta::decode_block_meta(&raw_meta[..]),
block_meta_offset: block_meta_offset as usize,
id,
block_cache,
})
}
/// Read a block from the disk.
pub fn read_block(&self, block_idx: usize) -> Result<Arc<Block>> {
let offset = self.block_metas[block_idx].offset;
let offset_end = self
.block_metas
.get(block_idx + 1)
.map_or(self.block_meta_offset, |x| x.offset);
let block_data = self
.file
.read(offset as u64, (offset_end - offset) as u64)?;
Ok(Arc::new(Block::decode(&block_data[..])))
}
/// Read a block from disk, with block cache.
pub fn read_block_cached(&self, block_idx: usize) -> Result<Arc<Block>> {
if let Some(ref block_cache) = self.block_cache {
let blk = block_cache
.try_get_with((self.id, block_idx), || self.read_block(block_idx))
.map_err(|e| anyhow!("{}", e))?;
Ok(blk)
} else {
self.read_block(block_idx)
}
}
/// Find the block that may contain `key`.
pub fn find_block_idx(&self, key: &[u8]) -> usize {
self.block_metas
.partition_point(|meta| meta.first_key <= key)
.saturating_sub(1)
}
/// Get number of data blocks.
pub fn num_of_blocks(&self) -> usize {
self.block_metas.len()
}
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,91 @@
use std::path::Path;
use std::sync::Arc;
use anyhow::Result;
use bytes::BufMut;
use super::{BlockMeta, FileObject, SsTable};
use crate::block::BlockBuilder;
use crate::lsm_storage::BlockCache;
/// Builds an SSTable from key-value pairs.
pub struct SsTableBuilder {
builder: BlockBuilder,
first_key: Vec<u8>,
data: Vec<u8>,
pub(super) meta: Vec<BlockMeta>,
block_size: usize,
}
impl SsTableBuilder {
/// Create a builder based on target block size.
pub fn new(block_size: usize) -> Self {
Self {
data: Vec::new(),
meta: Vec::new(),
first_key: Vec::new(),
block_size,
builder: BlockBuilder::new(block_size),
}
}
/// Adds a key-value pair to SSTable
pub fn add(&mut self, key: &[u8], value: &[u8]) {
if self.first_key.is_empty() {
self.first_key = key.to_vec();
}
if self.builder.add(key, value) {
return;
}
// create a new block builder and append block data
self.finish_block();
// add the key-value pair to the next block
assert!(self.builder.add(key, value));
self.first_key = key.to_vec();
}
/// Get the estimated size of the SSTable.
pub fn estimated_size(&self) -> usize {
self.data.len()
}
fn finish_block(&mut self) {
let builder = std::mem::replace(&mut self.builder, BlockBuilder::new(self.block_size));
let encoded_block = builder.build().encode();
self.meta.push(BlockMeta {
offset: self.data.len(),
first_key: std::mem::take(&mut self.first_key).into(),
});
self.data.extend(encoded_block);
}
/// Builds the SSTable and writes it to the given path. No need to actually write to disk until
/// chapter 4 block cache.
pub fn build(
mut self,
id: usize,
block_cache: Option<Arc<BlockCache>>,
path: impl AsRef<Path>,
) -> Result<SsTable> {
self.finish_block();
let mut buf = self.data;
let meta_offset = buf.len();
BlockMeta::encode_block_meta(&self.meta, &mut buf);
buf.put_u32(meta_offset as u32);
let file = FileObject::create(path.as_ref(), buf)?;
Ok(SsTable {
id,
file,
block_metas: self.meta,
block_meta_offset: meta_offset,
block_cache,
})
}
#[cfg(test)]
pub(crate) fn build_for_test(self, path: impl AsRef<Path>) -> Result<SsTable> {
self.build(0, None, path)
}
}

View File

@@ -0,0 +1,102 @@
use std::sync::Arc;
use anyhow::Result;
use super::SsTable;
use crate::block::BlockIterator;
use crate::iterators::StorageIterator;
/// An iterator over the contents of an SSTable.
pub struct SsTableIterator {
table: Arc<SsTable>,
blk_iter: BlockIterator,
blk_idx: usize,
}
impl SsTableIterator {
fn seek_to_first_inner(table: &Arc<SsTable>) -> Result<(usize, BlockIterator)> {
Ok((
0,
BlockIterator::create_and_seek_to_first(table.read_block_cached(0)?),
))
}
/// Create a new iterator and seek to the first key-value pair.
pub fn create_and_seek_to_first(table: Arc<SsTable>) -> Result<Self> {
let (blk_idx, blk_iter) = Self::seek_to_first_inner(&table)?;
let iter = Self {
blk_iter,
table,
blk_idx,
};
Ok(iter)
}
/// Seek to the first key-value pair.
pub fn seek_to_first(&mut self) -> Result<()> {
let (blk_idx, blk_iter) = Self::seek_to_first_inner(&self.table)?;
self.blk_idx = blk_idx;
self.blk_iter = blk_iter;
Ok(())
}
fn seek_to_key_inner(table: &Arc<SsTable>, key: &[u8]) -> Result<(usize, BlockIterator)> {
let mut blk_idx = table.find_block_idx(key);
let mut blk_iter =
BlockIterator::create_and_seek_to_key(table.read_block_cached(blk_idx)?, key);
if !blk_iter.is_valid() {
blk_idx += 1;
if blk_idx < table.num_of_blocks() {
blk_iter =
BlockIterator::create_and_seek_to_first(table.read_block_cached(blk_idx)?);
}
}
Ok((blk_idx, blk_iter))
}
/// Create a new iterator and seek to the first key-value pair which >= `key`.
pub fn create_and_seek_to_key(table: Arc<SsTable>, key: &[u8]) -> Result<Self> {
let (blk_idx, blk_iter) = Self::seek_to_key_inner(&table, key)?;
let iter = Self {
blk_iter,
table,
blk_idx,
};
Ok(iter)
}
/// Seek to the first key-value pair which >= `key`.
pub fn seek_to_key(&mut self, key: &[u8]) -> Result<()> {
let (blk_idx, blk_iter) = Self::seek_to_key_inner(&self.table, key)?;
self.blk_iter = blk_iter;
self.blk_idx = blk_idx;
Ok(())
}
}
impl StorageIterator for SsTableIterator {
fn value(&self) -> &[u8] {
self.blk_iter.value()
}
fn key(&self) -> &[u8] {
self.blk_iter.key()
}
fn is_valid(&self) -> bool {
self.blk_iter.is_valid()
}
fn next(&mut self) -> Result<()> {
self.blk_iter.next();
if !self.blk_iter.is_valid() {
self.blk_idx += 1;
if self.blk_idx < self.table.num_of_blocks() {
self.blk_iter = BlockIterator::create_and_seek_to_first(
self.table.read_block_cached(self.blk_idx)?,
);
}
}
Ok(())
}
}

View File

@@ -0,0 +1,130 @@
use std::sync::Arc;
use bytes::Bytes;
use tempfile::{tempdir, TempDir};
use super::*;
use crate::iterators::StorageIterator;
use crate::table::SsTableBuilder;
#[test]
fn test_sst_build_single_key() {
let mut builder = SsTableBuilder::new(16);
builder.add(b"233", b"233333");
let dir = tempdir().unwrap();
builder.build_for_test(dir.path().join("1.sst")).unwrap();
}
#[test]
fn test_sst_build_two_blocks() {
let mut builder = SsTableBuilder::new(16);
builder.add(b"11", b"11");
builder.add(b"22", b"22");
builder.add(b"33", b"11");
builder.add(b"44", b"22");
builder.add(b"55", b"11");
builder.add(b"66", b"22");
assert!(builder.meta.len() >= 2);
let dir = tempdir().unwrap();
builder.build_for_test(dir.path().join("1.sst")).unwrap();
}
fn key_of(idx: usize) -> Vec<u8> {
format!("key_{:03}", idx * 5).into_bytes()
}
fn value_of(idx: usize) -> Vec<u8> {
format!("value_{:010}", idx).into_bytes()
}
fn num_of_keys() -> usize {
100
}
fn generate_sst() -> (TempDir, SsTable) {
let mut builder = SsTableBuilder::new(128);
for idx in 0..num_of_keys() {
let key = key_of(idx);
let value = value_of(idx);
builder.add(&key[..], &value[..]);
}
let dir = tempdir().unwrap();
let path = dir.path().join("1.sst");
(dir, builder.build_for_test(path).unwrap())
}
#[test]
fn test_sst_build_all() {
generate_sst();
}
#[test]
fn test_sst_decode() {
let (_dir, sst) = generate_sst();
let meta = sst.block_metas.clone();
let new_sst = SsTable::open_for_test(sst.file).unwrap();
assert_eq!(new_sst.block_metas, meta);
}
fn as_bytes(x: &[u8]) -> Bytes {
Bytes::copy_from_slice(x)
}
#[test]
fn test_sst_iterator() {
let (_dir, sst) = generate_sst();
let sst = Arc::new(sst);
let mut iter = SsTableIterator::create_and_seek_to_first(sst).unwrap();
for _ in 0..5 {
for i in 0..num_of_keys() {
let key = iter.key();
let value = iter.value();
assert_eq!(
key,
key_of(i),
"expected key: {:?}, actual key: {:?}",
as_bytes(&key_of(i)),
as_bytes(key)
);
assert_eq!(
value,
value_of(i),
"expected value: {:?}, actual value: {:?}",
as_bytes(&value_of(i)),
as_bytes(value)
);
iter.next().unwrap();
}
iter.seek_to_first().unwrap();
}
}
#[test]
fn test_sst_seek_key() {
let (_dir, sst) = generate_sst();
let sst = Arc::new(sst);
let mut iter = SsTableIterator::create_and_seek_to_key(sst, &key_of(0)).unwrap();
for offset in 1..=5 {
for i in 0..num_of_keys() {
let key = iter.key();
let value = iter.value();
assert_eq!(
key,
key_of(i),
"expected key: {:?}, actual key: {:?}",
as_bytes(&key_of(i)),
as_bytes(key)
);
assert_eq!(
value,
value_of(i),
"expected value: {:?}, actual value: {:?}",
as_bytes(&value_of(i)),
as_bytes(value)
);
iter.seek_to_key(&format!("key_{:03}", i * 5 + offset).into_bytes())
.unwrap();
}
iter.seek_to_key(b"k").unwrap();
}
}

View File

@@ -0,0 +1 @@
pub mod day4_tests;

View File

@@ -0,0 +1,187 @@
use std::ops::Bound;
use bytes::Bytes;
use tempfile::tempdir;
use crate::iterators::StorageIterator;
fn as_bytes(x: &[u8]) -> Bytes {
Bytes::copy_from_slice(x)
}
fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) {
let mut iter = iter;
for (k, v) in expected {
assert!(iter.is_valid());
assert_eq!(
k,
iter.key(),
"expected key: {:?}, actual key: {:?}",
k,
as_bytes(iter.key()),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
iter.next().unwrap();
}
assert!(!iter.is_valid());
}
#[test]
fn test_storage_get() {
use crate::lsm_storage::LsmStorage;
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233");
assert_eq!(&storage.get(b"2").unwrap().unwrap()[..], b"2333");
assert_eq!(&storage.get(b"3").unwrap().unwrap()[..], b"23333");
storage.delete(b"2").unwrap();
assert!(storage.get(b"2").unwrap().is_none());
}
#[test]
fn test_storage_scan_memtable_1() {
use crate::lsm_storage::LsmStorage;
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
storage.delete(b"2").unwrap();
check_iter_result(
storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
vec![
(Bytes::from("1"), Bytes::from("233")),
(Bytes::from("3"), Bytes::from("23333")),
],
);
check_iter_result(
storage
.scan(Bound::Included(b"1"), Bound::Included(b"2"))
.unwrap(),
vec![(Bytes::from("1"), Bytes::from("233"))],
);
check_iter_result(
storage
.scan(Bound::Excluded(b"1"), Bound::Excluded(b"3"))
.unwrap(),
vec![],
);
}
#[test]
fn test_storage_scan_memtable_2() {
use crate::lsm_storage::LsmStorage;
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
storage.delete(b"1").unwrap();
check_iter_result(
storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
vec![
(Bytes::from("2"), Bytes::from("2333")),
(Bytes::from("3"), Bytes::from("23333")),
],
);
check_iter_result(
storage
.scan(Bound::Included(b"1"), Bound::Included(b"2"))
.unwrap(),
vec![(Bytes::from("2"), Bytes::from("2333"))],
);
check_iter_result(
storage
.scan(Bound::Excluded(b"1"), Bound::Excluded(b"3"))
.unwrap(),
vec![(Bytes::from("2"), Bytes::from("2333"))],
);
}
#[test]
fn test_storage_get_after_sync() {
use crate::lsm_storage::LsmStorage;
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.sync().unwrap();
storage.put(b"3", b"23333").unwrap();
assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233");
assert_eq!(&storage.get(b"2").unwrap().unwrap()[..], b"2333");
assert_eq!(&storage.get(b"3").unwrap().unwrap()[..], b"23333");
storage.delete(b"2").unwrap();
assert!(storage.get(b"2").unwrap().is_none());
}
#[test]
fn test_storage_scan_memtable_1_after_sync() {
use crate::lsm_storage::LsmStorage;
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.sync().unwrap();
storage.put(b"3", b"23333").unwrap();
storage.delete(b"2").unwrap();
check_iter_result(
storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
vec![
(Bytes::from("1"), Bytes::from("233")),
(Bytes::from("3"), Bytes::from("23333")),
],
);
check_iter_result(
storage
.scan(Bound::Included(b"1"), Bound::Included(b"2"))
.unwrap(),
vec![(Bytes::from("1"), Bytes::from("233"))],
);
check_iter_result(
storage
.scan(Bound::Excluded(b"1"), Bound::Excluded(b"3"))
.unwrap(),
vec![],
);
}
#[test]
fn test_storage_scan_memtable_2_after_sync() {
use crate::lsm_storage::LsmStorage;
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.sync().unwrap();
storage.put(b"3", b"23333").unwrap();
storage.sync().unwrap();
storage.delete(b"1").unwrap();
check_iter_result(
storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
vec![
(Bytes::from("2"), Bytes::from("2333")),
(Bytes::from("3"), Bytes::from("23333")),
],
);
check_iter_result(
storage
.scan(Bound::Included(b"1"), Bound::Included(b"2"))
.unwrap(),
vec![(Bytes::from("2"), Bytes::from("2333"))],
);
check_iter_result(
storage
.scan(Bound::Excluded(b"1"), Bound::Excluded(b"3"))
.unwrap(),
vec![(Bytes::from("2"), Bytes::from("2333"))],
);
}

1
mini-lsm/README.md Normal file
View File

@@ -0,0 +1 @@
# mini-lsm week-2 solution