finish week 1 day 5 read path

Signed-off-by: Alex Chi Z <iskyzh@gmail.com>
This commit is contained in:
Alex Chi Z
2024-01-21 15:26:22 +08:00
parent 99da8855b8
commit 8be0a2d475
11 changed files with 307 additions and 53 deletions

View File

@@ -44,7 +44,7 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we
| 1.2 | Merge Iterators | ✅ | ✅ | ✅ |
| 1.3 | Block Format | ✅ | ✅ | ✅ |
| 1.4 | Table Format | ✅ | ✅ | ✅ |
| 1.5 | Storage Engine - Read Path | ✅ | 🚧 | 🚧 |
| 1.5 | Storage Engine - Read Path | ✅ | | |
| 1.6 | Storage Engine - Write Path | ✅ | 🚧 | 🚧 |
| 1.7 | Bloom Filter and Key Compression | | | |
| 2.1 | Compaction Implementation | ✅ | 🚧 | 🚧 |

View File

@@ -18,9 +18,62 @@ cargo x scheck
## Task 1: Two Merge Iterator
## Task 2: Read Path - Get
## Task 3: Read Path - Scan
In this task, you will need to modify:
```
src/iterators/two_merge_iterator.rs
```
You have already implemented a merge iterator that merges iterators of the same type (i.e., memtable iterators). Now that we have implemented the SST formats, we have both on-disk SST structures and in-memory memtables. When we scan from the storage engine, we will need to merge data from both memtable iterators and SST iterators into a single one. In this case, we need a `TwoMergeIterator<X, Y>` that merges two different types of iterators.
You can implement `TwoMergeIterator` in `two_merge_iter.rs`. As we only have two iterators here, we do not need to maintain a binary heap. Instead, we can simply use a flag to indicate which iterator to read. Similar to `MergeIterator`, if the same key is found in both of the iterator, the first iterator takes the precedence.
## Task 2: Read Path - Scan
In this task, you will need to modify:
```
src/lsm_iterator.rs
src/lsm_storage.rs
```
After implementing `TwoMergeIterator`, we can change the `LsmIteratorInner` to have the following type:
```rust,no_run
type LsmIteratorInner =
TwoMergeIterator<MergeIterator<MemTableIterator>, MergeIterator<SsTableIterator>>;
```
So that our internal iterator of the LSM storage engine will be an iterator combining both data from the memtables and the SSTs.
Note that our SST iterator does not support passing a end bound to it. Therefore, we will need to handle the `end_bound` manually in `LsmIterator`. You will need to modify your `LsmIterator` logic to stop when the key from the inner iterator reaches the end boundary.
Our test cases will generate some memtables and SSTs in `l0_sstables`, and you will need to scan all of these data out correctly in this task. You do not need to flush SSTs until next chapter. Therefore, you can go ahead and modify your `LsmStorageInner::scan` interface to create a merge iterator over all memtables and SSTs, so as to finish the read path of your storage engine.
Because `SsTableIterator::create` involves I/O operations and might be slow, we do not want to do this in the `state` critical region. Therefore, you should firstly take read the `state` and clone the `Arc` of the LSM state snapshot. Then, you should drop the lock. After that, you can go through all L0 SSTs and create iterators for each of them, then create a merge iterator to retrieve the data.
```rust,no_run
fn scan(&self) {
let snapshot = {
let guard = self.state.read();
Arc::clone(&guard)
}
// create iterators and seek them
}
```
In the LSM storage state, we only store the SST ids in the `l0_sstables` vector. You will need to retrieve the actual SST object from the `sstables` hash map.
## Task 3: Read Path - Get
In this task, you will need to modify:
```
src/lsm_storage.rs
```
For get requests, it will be processed as lookups in the memtables, and then scans on the SSTs. You can create a merge iterator over all SSTs after probing all memtables. You can seek to the key that the user wants to lookup. There are two possibilities of the seek: the key is the same as what the user probes, and the key is not the same / does not exist. You should only return the value to the user when the key exists and is the same as probed. You should also reduce the critical region of the state lock as in the previous section.
## Test Your Understanding

View File

@@ -21,4 +21,10 @@ cargo x scheck
## Task 3: Filter the SSTs
## Test Your Understanding
* What happens if a user requests to delete a key twice?
We do not provide reference answers to the questions, and feel free to discuss about them in the Discord community.
{{#include copyright.md}}

View File

@@ -14,6 +14,3 @@ pub trait StorageIterator {
/// Move to the next position.
fn next(&mut self) -> anyhow::Result<()>;
}
#[cfg(test)]
mod tests;

View File

@@ -1,4 +0,0 @@
//! Please copy `mini-lsm/src/iterators/tests.rs` here so that you can run tests.
pub mod merge_iterator_test;
pub mod two_merge_iterator_test;

View File

@@ -1,2 +0,0 @@
//! Please copy `mini-lsm/src/iterators/tests/merge_iterator_test.rs` here so that you can run
//! tests.

View File

@@ -1,2 +0,0 @@
//! Please copy `mini-lsm/src/iterators/tests/two_merge_iterator_test.rs` here so that you can run
//! tests.

View File

@@ -23,7 +23,7 @@ impl<A: StorageIterator, B: StorageIterator> TwoMergeIterator<A, B> {
fn skip_b(&mut self) -> Result<()> {
if self.a.is_valid() {
while self.b.is_valid() && self.b.key() == self.a.key() {
if self.b.is_valid() && self.b.key() == self.a.key() {
self.b.next()?;
}
}

View File

@@ -1,7 +1,13 @@
use std::{path::Path, sync::Arc};
use anyhow::{bail, Result};
use bytes::Bytes;
use crate::iterators::StorageIterator;
use crate::{
iterators::StorageIterator,
lsm_storage::BlockCache,
table::{SsTable, SsTableBuilder},
};
#[derive(Clone)]
pub struct MockIterator {
@@ -68,3 +74,52 @@ impl StorageIterator for MockIterator {
self.index < self.data.len()
}
}
pub fn as_bytes(x: &[u8]) -> Bytes {
Bytes::copy_from_slice(x)
}
pub fn check_iter_result(iter: &mut impl StorageIterator, expected: Vec<(Bytes, Bytes)>) {
for (k, v) in expected {
assert!(iter.is_valid());
assert_eq!(
k,
iter.key(),
"expected key: {:?}, actual key: {:?}",
k,
as_bytes(iter.key()),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
iter.next().unwrap();
}
assert!(!iter.is_valid());
}
pub fn expect_iter_error(mut iter: impl StorageIterator) {
loop {
match iter.next() {
Ok(_) if iter.is_valid() => continue,
Ok(_) => panic!("expect an error"),
Err(_) => break,
}
}
}
pub fn generate_sst(
id: usize,
path: impl AsRef<Path>,
data: Vec<(Bytes, Bytes)>,
block_cache: Option<Arc<BlockCache>>,
) -> SsTable {
let mut builder = SsTableBuilder::new(128);
for (key, value) in data {
builder.add(&key[..], &value[..]);
}
builder.build(id, block_cache, path.as_ref()).unwrap()
}

View File

@@ -10,7 +10,7 @@ use crate::{
mem_table::MemTable,
};
use super::harness::MockIterator;
use super::harness::{check_iter_result, expect_iter_error, MockIterator};
#[test]
fn test_task1_memtable_iter() {
@@ -78,42 +78,6 @@ fn test_task1_empty_memtable_iter() {
}
}
fn as_bytes(x: &[u8]) -> Bytes {
Bytes::copy_from_slice(x)
}
fn check_iter_result(iter: &mut impl StorageIterator, expected: Vec<(Bytes, Bytes)>) {
for (k, v) in expected {
assert!(iter.is_valid());
assert_eq!(
k,
iter.key(),
"expected key: {:?}, actual key: {:?}",
k,
as_bytes(iter.key()),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
iter.next().unwrap();
}
assert!(!iter.is_valid());
}
fn expect_iter_error(mut iter: impl StorageIterator) {
loop {
match iter.next() {
Ok(_) if iter.is_valid() => continue,
Ok(_) => panic!("expect an error"),
Err(_) => break,
}
}
}
#[test]
fn test_task2_merge_1() {
let i1 = MockIterator::new(vec![

View File

@@ -0,0 +1,187 @@
use std::ops::Bound;
use bytes::Bytes;
use tempfile::tempdir;
use week1_day5::harness::generate_sst;
use self::harness::{check_iter_result, MockIterator};
use super::*;
use crate::{iterators::two_merge_iterator::TwoMergeIterator, lsm_storage::LsmStorageOptions};
#[test]
fn test_task1_merge_1() {
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let mut iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
&mut iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("d"), Bytes::from("4.2")),
],
)
}
#[test]
fn test_task1_merge_2() {
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let mut iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
&mut iter,
vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
)
}
#[test]
fn test_task1_merge_3() {
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i1 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let mut iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
&mut iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
)
}
#[test]
fn test_task1_merge_4() {
let i2 = MockIterator::new(vec![]);
let i1 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let mut iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
&mut iter,
vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
);
let i1 = MockIterator::new(vec![]);
let i2 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let mut iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
&mut iter,
vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
);
}
#[test]
fn test_task1_merge_5() {
let i2 = MockIterator::new(vec![]);
let i1 = MockIterator::new(vec![]);
let mut iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(&mut iter, vec![])
}
#[test]
fn test_task2_storage_scan() {
use crate::lsm_storage::LsmStorageInner;
let dir = tempdir().unwrap();
let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"00", b"2333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
storage.put(b"3", b"23333").unwrap();
storage.delete(b"1").unwrap();
let sst1 = generate_sst(
10,
dir.path().join("10.sst"),
vec![
(Bytes::from_static(b"0"), Bytes::from_static(b"2333333")),
(Bytes::from_static(b"00"), Bytes::from_static(b"2333333")),
(Bytes::from_static(b"4"), Bytes::from_static(b"23")),
],
Some(storage.block_cache.clone()),
);
let sst2 = generate_sst(
11,
dir.path().join("11.sst"),
vec![(Bytes::from_static(b"4"), Bytes::from_static(b""))],
Some(storage.block_cache.clone()),
);
{
let mut state = storage.state.write();
let mut snapshot = state.as_ref().clone();
snapshot.l0_sstables.push(sst2.sst_id()); // this is the latest SST
snapshot.l0_sstables.push(sst1.sst_id());
snapshot.sstables.insert(sst2.sst_id(), sst2.into());
snapshot.sstables.insert(sst1.sst_id(), sst1.into());
*state = snapshot.into();
}
check_iter_result(
&mut storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
vec![
(Bytes::from("0"), Bytes::from("2333333")),
(Bytes::from("00"), Bytes::from("2333")),
(Bytes::from("2"), Bytes::from("2333")),
(Bytes::from("3"), Bytes::from("23333")),
],
);
check_iter_result(
&mut storage
.scan(Bound::Included(b"1"), Bound::Included(b"2"))
.unwrap(),
vec![(Bytes::from("2"), Bytes::from("2333"))],
);
check_iter_result(
&mut storage
.scan(Bound::Excluded(b"1"), Bound::Excluded(b"3"))
.unwrap(),
vec![(Bytes::from("2"), Bytes::from("2333"))],
);
}