Compare commits

..

10 Commits

Author SHA1 Message Date
ECROF88
8fb4176e71 First Commit
Some checks failed
CI (main) / build (push) Has been cancelled
CI (main) / deploy (push) Has been cancelled
2025-10-20 20:12:40 +08:00
Li Yuhan
f484bc5e6c adds self solution link to SOLUTIONS.md (#158) 2025-06-29 13:44:47 -07:00
Liu Jinyi
c6b7ff8b07 docs: update week2-03-tiered.md (#154) 2025-06-07 23:51:09 +08:00
Joseph Koshakow
af96807ecc Replace a predicate that was always false with a literal (#151)
* Replace a predicate that was always false with a literal

* comment + fix mvcc version

Signed-off-by: Alex Chi <iskyzh@gmail.com>

---------

Signed-off-by: Alex Chi <iskyzh@gmail.com>
Co-authored-by: Alex Chi <iskyzh@gmail.com>
2025-06-07 19:27:52 +08:00
Liu Jinyi
067fd2e682 docs: update the introduction of StorageIterator (#152) 2025-06-05 15:56:56 +08:00
lxc
fc4765b925 Fix wrong input type of put_batch (#146)
* Fix wrong input type of put_batch

Update wal.rs

* fix

Signed-off-by: Alex Chi <iskyzh@gmail.com>

---------

Signed-off-by: Alex Chi <iskyzh@gmail.com>
Co-authored-by: Alex Chi <iskyzh@gmail.com>
2025-05-31 14:08:31 +08:00
Liu Jinyi
4c8d4ebf23 docs: specify the updated LsmIterator::new constructor signature (#150) 2025-05-31 14:04:13 +08:00
Liu Jinyi
47ad0802a9 docs: clarify MergeIterator heap rationale and ouroboros usage (#149) 2025-05-30 21:06:05 +08:00
Liu Jinyi
05d5d42abd explain copy-on-write strategy (#148)
* explain copy-on-write strategy

* fix a typo
2025-05-30 20:15:16 +08:00
lxc
17b221fb4e remove unnecessary compaction condition check for key below watermark (#145)
remove unnecessary condition check
2025-05-06 21:59:03 +08:00
28 changed files with 1904 additions and 129 deletions

View File

@@ -6,6 +6,7 @@ You can add your solution to this page once you finish any full week of the cour
* [pj/mini-lsm-simple-solution](https://github.com/pjzhong/mini-lsm-solution): A simple solution of Mini-LSM.
## Week 2
* [7143192/mini-lsm](https://github.com/7143192/mini-lsm): A solution of mini-lsm, finish all tasks except bonus task in week 1 and week2.
## Week 3

View File

@@ -33,7 +33,7 @@ crossbeam-skiplist provides similar interfaces to the Rust std's `BTreeMap`: ins
You will also notice that the `MemTable` structure does not have a `delete` interface. In the mini-lsm implementation, deletion is represented as a key corresponding to an empty value.
In this task, you will need to implement `MemTable::get` and `MemTable::put` to enable modifications of the memtable. Note that `put` should always overwrite a key if it already exists. You won't have mutiple entries of the same key in a single memtable.
In this task, you will need to implement `MemTable::get` and `MemTable::put` to enable modifications of the memtable. Note that `put` should always overwrite a key if it already exists. You won't have multiple entries of the same key in a single memtable.
We use the `bytes` crate for storing the data in the memtable. `bytes::Byte` is similar to `Arc<[u8]>`. When you clone the `Bytes`, or get a slice of `Bytes`, the underlying data will not be copied, and therefore cloning it is cheap. Instead, it simply creates a new reference to the storage area and the storage area will be freed when there are no reference to that area.
@@ -72,6 +72,23 @@ A memtable cannot continuously grow in size, and we will need to freeze them (an
In this task, you will need to compute the approximate memtable size when put/delete a key in the memtable. This can be computed by simply adding the total number of bytes of keys and values when `put` is called. If a key is put twice, though the skiplist only contains the latest value, you may count it twice in the approximate memtable size. Once a memtable reaches the limit, you should call `force_freeze_memtable` to freeze the memtable and create a new one.
The `state: Arc<RwLock<Arc<LsmStorageState>>>` field in `LsmStorageInner` is structured this way to manage the LSM tree's overall state concurrently and safely, primarily using a Copy-on-Write (CoW) strategy:
1. Inner `Arc<LsmStorageState>`: This holds an **immutable snapshot** of the actual `LsmStorageState` (which contains memtable lists, SST references, etc.). Cloning this `Arc` is very cheap (just an atomic reference count increment) and gives any reader a consistent, unchanging view of the state for the duration of their operation.
2. `RwLock<Arc<LsmStorageState>>`: This read-write lock protects the *pointer* to the current `Arc<LsmStorageState>` (the active snapshot).
* **Readers** acquire a read lock, clone the `Arc<LsmStorageState>` (getting their own reference to the current snapshot), and then quickly release the read lock. They can then work with their snapshot without further locking.
* **Writers** (when modifying the state, e.g., freezing a memtable) will:
* Create a *new* `LsmStorageState` instance, often by cloning the data from the current snapshot and then applying modifications.
* Wrap this new state in a new `Arc<LsmStorageState>`.
* Acquire the write lock on the `RwLock`.
* Replace the old `Arc<LsmStorageState>` with the new one.
* Release the write lock.
3. Outer `Arc<RwLock<...>>`: This allows the `RwLock` itself (and thus the mechanism for accessing and updating the state) to be shared safely across multiple threads or parts of your application that might need to interact with `LsmStorageInner`.
This CoW approach ensures that readers always see a valid, consistent state snapshot and experience minimal blocking. Writers update the state atomically by swapping out the entire state snapshot, reducing the time critical locks are held and thus improving concurrency.
Because there could be multiple threads getting data into the storage engine, `force_freeze_memtable` might be called concurrently from multiple threads. You will need to think about how to avoid race conditions in this case.
There are multiple places where you may want to modify the LSM state: freeze a mutable memtable, flush memtable to SST, and GC/compaction. During all of these modifications, there could be I/O operations. An intuitive way to structure the locking strategy is to:

View File

@@ -12,7 +12,6 @@ In this chapter, you will:
* Implement merge iterator.
* Implement LSM read path `scan` for memtables.
To copy the test cases into the starter code and run them,
```
@@ -30,9 +29,30 @@ In this task, you will need to modify:
src/mem_table.rs
```
All LSM iterators implement the `StorageIterator` trait. It has 4 functions: `key`, `value`, `next`, and `is_valid`. When the iterator is created, its cursor will stop on some element, and `key` / `value` will return the first key in the memtable/block/SST satisfying the start condition (i.e., start key). These two interfaces will return a `&[u8]` to avoid copy. Note that this iterator interface is different from the Rust-style iterator.
All LSM iterators implement the `StorageIterator` trait. It has 4 functions: `key`, `value`, `next`, and `is_valid`. If you're familiar with Rust's standard library `Iterator` trait, you might find `StorageIterator` a bit different. Instead, `StorageIterator` employs a cursor-based API, a design pattern common in database systems and notably inspired by RocksDB's iterators (see [`iterator_base.h`](https://github.com/facebook/rocksdb/blob/main/include/rocksdb/iterator_base.h) and [`iterator.h`](https://github.com/facebook/rocksdb/blob/main/include/rocksdb/iterator.h) for reference).
`next` moves the cursor to the next place. `is_valid` returns if the iterator has reached the end or errored. You can assume `next` will only be called when `is_valid` returns true. There will be a `FusedIterator` wrapper for iterators that block calls to `next` when the iterator is not valid to avoid users from misusing the iterators.
When the iterator is created, its cursor will stop on some element, and `key` / `value` will return the first key in the memtable/block/SST satisfying the start condition (i.e., start key). These two interfaces will return a `&[u8]` to avoid copy.
From the caller's perspective, the typical usage pattern is:
```rust
let mut iter: impl StorageIterator = ...;
while iter.is_valid() {
let key = iter.key();
let value = iter.value();
// Process key and value
iter.next()?; // Advance to the next item, handling potential errors
}
```
The semantics of `StorageIterator` are distinct for its core methods:
* `next()`: This method is solely responsible for attempting to move the cursor to the next element. It returns a `Result` to report any errors encountered during this advancement (e.g., I/O issues). It does *not* inherently guarantee that the new position is valid, only that the attempt to move was made.
* `is_valid()`: This method indicates whether the iterator's current cursor points to a valid data element. It does *not* advance the iterator.
Therefore, as an implementer of `StorageIterator`, after each call to `next()` (even if it succeeds without an error from the `next()` operation itself), you are responsible for updating the internal state so that `is_valid()` correctly reflects whether the new cursor position actually points to a valid item.
In summary, `next` moves the cursor to the next place. `is_valid` returns if the iterator has reached the end or errored. You can assume `next` will only be called when `is_valid` returns true. There will be a `FusedIterator` wrapper for iterators that block calls to `next` when the iterator is not valid to avoid users from misusing the iterators.
Back to the memtable iterator. You should have found out that the iterator does not have any lifetime associated with that. Imagine that you create a `Vec<u64>` and call `vec.iter()`, the iterator type will be something like `VecIterator<'a>`, where `'a` is the lifetime of the `vec` object. The same applies to `SkipMap`, where its `iter` API returns an iterator with a lifetime. However, in our case, we do not want to have such lifetimes on our iterators to avoid making the system overcomplicated (and hard to compile...).
@@ -58,7 +78,7 @@ pub struct MemtableIterator { // <- with lifetime 'this
Then the problem is solved! You can do this with the help of some third-party libraries like `ouroboros`. It provides an easy way to define self-referential structure. It is also possible to do this with unsafe Rust (and indeed, `ouroboros` itself uses unsafe Rust internally...)
We have already defined the self-referential `MemtableIterator` fields for you, and you will need to implement `MemtableIterator` and the `Memtable::scan` API.
We have leveraged [`ouroboros`](https://docs.rs/ouroboros/latest/ouroboros/attr.self_referencing.html) to define the self-referential `MemtableIterator` fields for you. You will need to implement the `MemtableIterator` logic and the `Memtable::scan` API based on this provided structure.
## Task 2: Merge Iterator
@@ -70,7 +90,7 @@ src/iterators/merge_iterator.rs
Now that you have multiple memtables and you will create multiple memtable iterators. You will need to merge the results from the memtables and return the latest version of each key to the user.
`MergeIterator` maintains a binary heap internally. You'll see that the ordering of the binary heap is such that the iterator with the lowest head key value is first. When multiple iterators have the same head key value, the newest one is first. Note that you will need to handle errors (i.e., when an iterator is not valid) and ensure that the latest version of a key-value pair comes out.
`MergeIterator` maintains a binary heap internally. Consider the challenge of merging `n` sorted sequences (our iterators) into a single sorted output; a binary heap is a natural fit here, as it efficiently helps identify which sequence currently holds the overall smallest element. You'll see that the ordering of the binary heap is such that the iterator with the lowest head key value is first. When multiple iterators have the same head key value, the newest one is first. Note that you will need to handle errors (i.e., when an iterator is not valid) and ensure that the latest version of a key-value pair comes out.
For example, if we have the following data:

View File

@@ -11,7 +11,6 @@ In this chapter, you will:
* Implement SST encoding and metadata encoding.
* Implement SST decoding and iterator.
To copy the test cases into the starter code and run them,
```
@@ -84,7 +83,7 @@ src/table.rs
You can implement a new `read_block_cached` function on `SsTable` .
We use `moka-rs` as our block cache implementation. Blocks are cached by `(sst_id, block_id)` as the cache key. You may use `try_get_with` to get the block from cache if it hits the cache / populate the cache if it misses the cache. If there are multiple requests reading the same block and cache misses, `try_get_with` will only issue a single read request to the disk and broadcast the result to all requests.
We use [`moka-rs`](https://docs.rs/moka/latest/moka/) as our block cache implementation. Blocks are cached by `(sst_id, block_id)` as the cache key. You may use `try_get_with` to get the block from cache if it hits the cache / populate the cache if it misses the cache. If there are multiple requests reading the same block and cache misses, `try_get_with` will only issue a single read request to the disk and broadcast the result to all requests.
At this point, you may change your table iterator to use `read_block_cached` instead of `read_block` to leverage the block cache.

View File

@@ -12,7 +12,6 @@ In this chapter, you will:
* Implement LSM read path `get` with SSTs.
* Implement LSM read path `scan` with SSTs.
To copy the test cases into the starter code and run them,
```
@@ -22,7 +21,6 @@ cargo x scheck
## Task 1: Two Merge Iterator
In this task, you will need to modify:
```
@@ -51,7 +49,13 @@ type LsmIteratorInner =
So that our internal iterator of the LSM storage engine will be an iterator combining both data from the memtables and the SSTs.
Note that our SST iterator does not support passing an end bound to it. Therefore, you will need to handle the `end_bound` manually in `LsmIterator`. You will need to modify your `LsmIterator` logic to stop when the key from the inner iterator reaches the end boundary.
Currently, our SST iterator doesn't support an end bound for scans. To address this, you'll need to implement this boundary check within the `LsmIterator` itself. This involves updating the `LsmIterator::new` constructor to accept an `end_bound` parameter:
```rust,no_run
pub(crate) fn new(iter: LsmIteratorInner, end_bound: Bound<Bytes>) -> Result<Self> {}
```
You will then need to modify the `LsmIterator`'s iteration logic to ensure it stops when the keys from the inner iterator reach or exceed this specified `end_bound`.
Our test cases will generate some memtables and SSTs in `l0_sstables`, and you will need to scan all of these data out correctly in this task. You do not need to flush SSTs until next chapter. Therefore, you can go ahead and modify your `LsmStorageInner::scan` interface to create a merge iterator over all memtables and SSTs, so as to finish the read path of your storage engine.

View File

@@ -40,7 +40,7 @@ src/compact/tiered.rs
In universal compaction, we do not use L0 SSTs in the LSM state. Instead, we directly flush new SSTs to a single sorted run (called tier). In the LSM state, `levels` will now include all tiers, where **the lowest index is the latest SST flushed**. Each element in the `levels` vector stores a tuple: level ID (used as tier ID) and the SSTs in that level. Every time you flush L0 SSTs, you should flush the SST into a tier placed at the front of the vector. The compaction simulator generates tier id based on the first SST id, and you should do the same in your implementation.
Universal compaction will only trigger tasks when the number of tiers (sorted runs) is larger than `num_tiers`. Otherwise, it does not trigger any compaction.
Universal compaction will only trigger tasks when the number of tiers (sorted runs) reaches `num_tiers`. Otherwise, it does not trigger any compaction.
### Task 1.1: Triggered by Space Amplification Ratio
@@ -153,7 +153,7 @@ The current trigger only reduces space amplification. We will need to add new tr
The next trigger is the size ratio trigger. The trigger maintains the size ratio between the tiers. From the first tier, we compute the size of `this tier / sum of all previous tiers`. For the first encountered tier where this value `> (100 + size_ratio) * 1%`, we will compact all previous tiers excluding the current tier. We only do this compaction with there are more than `min_merge_width` tiers to be merged.
For example, given the following LSM state, and assume size_ratio = 1, and min_merge_width = 2. We should compact when the ratio value > 101%:
For example, given the following LSM state, and assume `size_ratio` = 1, and `min_merge_width` = 2. We should compact when the ratio value > 101%:
```
Tier 3: 1
@@ -231,7 +231,7 @@ There will be fewer 1-SST tiers and the compaction algorithm will maintain the t
### Task 1.3: Reduce Sorted Runs
If none of the previous triggers produce compaction tasks, we will do a compaction to reduce the number of tiers. We will simply take the all tiers into one tier (subject by max_merge_tiers), so that we do a major compaction that includes all SST files.
If none of the previous triggers produce compaction tasks, we will do a major compaction that merges SST files from the first up to `max_merge_tiers` tiers into one tier to reduce the number of tiers.
With this compaction trigger enabled, you will see:
@@ -252,16 +252,19 @@ Read Amplification: 7x
You can also try tiered compaction with more number of tiers:
```bash
cargo run --bin compaction-simulator tiered --iterations 200 --size-only --num-tiers 16
```
```
=== Iteration 199 ===
--- After Flush ---
Levels: 0 1 1 4 5 21 28 140
Levels: 0 1 1 1 1 1 1 1 1 1 1 15 175
no compaction triggered
--- Statistics ---
Write Amplification: 742/200=3.710x
Maximum Space Usage: 280/200=1.400x
Read Amplification: 7x
Write Amplification: 607/200=3.035x
Maximum Space Usage: 350/200=1.750x
Read Amplification: 12x
```
**Note: we do not provide fine-grained unit tests for this part. You can run the compaction simulator and compare with the output of the reference solution to see if your implementation is correct.**

View File

@@ -161,7 +161,7 @@ impl LsmStorageInner {
}
if iter.key().ts() <= watermark {
if same_as_last_key && !first_key_below_watermark {
if !first_key_below_watermark {
iter.next()?;
continue;
}

View File

@@ -90,7 +90,8 @@ impl TieredCompactionController {
.take(id + 1)
.cloned()
.collect::<Vec<_>>(),
bottom_tier_included: id + 1 >= snapshot.levels.len(),
// Size ratio trigger will never include the bottom level
bottom_tier_included: false,
});
}
}

View File

@@ -32,11 +32,40 @@ impl Block {
/// Encode the internal data to the data layout illustrated in the course
/// Note: You may want to recheck if any of the expected field is missing from your output
pub fn encode(&self) -> Bytes {
unimplemented!()
let mut buf = Vec::new();
buf.extend_from_slice(&self.data);
for offset in &self.offsets {
buf.extend_from_slice(&offset.to_le_bytes());
}
let num_of_elements = self.offsets.len() as u16;
buf.extend_from_slice(&num_of_elements.to_le_bytes());
Bytes::from(buf)
}
/// Decode from the data layout, transform the input `data` to a single `Block`
pub fn decode(data: &[u8]) -> Self {
unimplemented!()
let data_len = data.len();
let element_nums = u16::from_le_bytes([data[data_len - 2], data[data_len - 1]]) as usize;
let offset_start = data_len - element_nums * 2 - 2;
let data_vec = data[..offset_start].to_vec();
let mut offset_vec = Vec::new();
for i in 0..element_nums {
let start_position = offset_start + i * 2;
let offset = u16::from_le_bytes([data[start_position], data[start_position + 1]]);
offset_vec.push(offset);
}
Self {
data: data_vec,
offsets: offset_vec,
}
}
}

View File

@@ -15,6 +15,8 @@
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
use bytes::BufMut;
use crate::key::{KeySlice, KeyVec};
use super::Block;
@@ -34,22 +36,59 @@ pub struct BlockBuilder {
impl BlockBuilder {
/// Creates a new block builder.
pub fn new(block_size: usize) -> Self {
unimplemented!()
Self {
block_size,
data: Vec::new(),
offsets: Vec::new(),
first_key: KeyVec::new(),
}
}
/// Adds a key-value pair to the block. Returns false when the block is full.
/// You may find the `bytes::BufMut` trait useful for manipulating binary data.
#[must_use]
pub fn add(&mut self, key: KeySlice, value: &[u8]) -> bool {
unimplemented!()
// 计算添加这个 entry 后的总大小
let entry_size = 2 + key.len() + 2 + value.len(); // key_len + key + value_len + value
let new_data_size = self.data.len() + entry_size;
let new_offsets_size = (self.offsets.len() + 1) * 2; // 新增一个 offset
let total_size = new_data_size + new_offsets_size + 2; // +2 for num_of_elements
// 如果不是第一个 entry,且总大小会超过 block_size,返回 false
if !self.is_empty() && total_size > self.block_size {
return false;
}
let key_len = key.len() as u16;
let value_len = value.len() as u16;
self.offsets.push(self.data.len() as u16);
// 一开始用的extend_from_slice,使用BufMut里面的方法更简洁
self.data.put_u16_le(key.len() as u16); // 写入 u16 小端序
self.data.put_slice(key.into_inner()); // 写入 slice
self.data.put_u16_le(value.len() as u16); // 写入 u16 小端序
self.data.put_slice(value); // 写入 slice
if self.first_key.is_empty() {
self.first_key = key.to_key_vec();
}
true
}
/// Check if there is no key-value pair in the block.
pub fn is_empty(&self) -> bool {
unimplemented!()
if self.data.is_empty() && self.offsets.is_empty() {
return true;
}
false
}
/// Finalize the block.
pub fn build(self) -> Block {
unimplemented!()
Block {
data: self.data,
offsets: self.offsets,
}
}
}

View File

@@ -48,44 +48,112 @@ impl BlockIterator {
/// Creates a block iterator and seek to the first entry.
pub fn create_and_seek_to_first(block: Arc<Block>) -> Self {
unimplemented!()
let mut iter = Self::new(block);
iter.seek_to_first();
iter
}
/// Creates a block iterator and seek to the first key that >= `key`.
pub fn create_and_seek_to_key(block: Arc<Block>, key: KeySlice) -> Self {
unimplemented!()
let mut iter = Self::new(block);
iter.seek_to_key(key);
iter
}
/// Returns the key of the current entry.
pub fn key(&self) -> KeySlice {
unimplemented!()
pub fn key(&self) -> KeySlice<'_> {
self.key.as_key_slice()
}
/// Returns the value of the current entry.
pub fn value(&self) -> &[u8] {
unimplemented!()
self.get_value_at_index(self.idx)
}
/// Returns true if the iterator is valid.
/// Note: You may want to make use of `key`
pub fn is_valid(&self) -> bool {
unimplemented!()
!self.key.is_empty()
}
/// Seeks to the first key in the block.
pub fn seek_to_first(&mut self) {
unimplemented!()
let data = &self.block.data;
if !data.is_empty() {
let first_key_len = u16::from_le_bytes([data[0], data[1]]) as usize;
let first_key = data[2..2 + first_key_len].to_vec();
self.key = KeyVec::from_vec(first_key);
self.first_key = self.key.clone();
self.idx = 0;
}
}
/// Move to the next key in the block.
pub fn next(&mut self) {
unimplemented!()
if self.idx + 1 < self.block.offsets.len() {
self.idx += 1;
self.key = self.get_key_at_index(self.idx).to_key_vec();
} else {
self.idx = self.block.offsets.len();
self.key.clear();
}
}
/// Seek to the first key that >= `key`.
/// Note: You should assume the key-value pairs in the block are sorted when being added by
/// callers.
pub fn seek_to_key(&mut self, key: KeySlice) {
unimplemented!()
let offsets = &self.block.offsets;
let mut left = 0;
let mut right = offsets.len();
while left < right {
let mid = left + (right - left) / 2;
let mid_key = self.get_key_at_index(mid);
if mid_key < key {
left = mid + 1;
} else {
right = mid;
}
}
// now left is the index
if left < offsets.len() {
self.idx = left;
self.key = self.get_key_at_index(left).to_key_vec();
} else {
// not found
self.idx = offsets.len();
self.key.clear();
}
}
fn get_key_at_index(&self, index: usize) -> KeySlice<'_> {
let data_pos = self.block.offsets[index] as usize;
let key_len =
u16::from_le_bytes([self.block.data[data_pos], self.block.data[data_pos + 1]]) as usize;
let key = &self.block.data[data_pos + 2..data_pos + 2 + key_len];
KeySlice::from_slice(key)
}
fn get_value_at_index(&self, index: usize) -> &[u8] {
let data = &self.block.data;
let data_pos = self.block.offsets[index] as usize;
let key_len = u16::from_le_bytes([data[data_pos], data[data_pos + 1]]) as usize;
let value_len = u16::from_le_bytes([
data[data_pos + 2 + key_len],
data[data_pos + 2 + key_len + 1],
]) as usize;
&data[data_pos + 2 + key_len + 2..data_pos + 2 + key_len + 2 + value_len]
}
}

View File

@@ -17,6 +17,7 @@
use std::cmp::{self};
use std::collections::BinaryHeap;
use std::collections::binary_heap::PeekMut;
use anyhow::Result;
@@ -46,7 +47,7 @@ impl<I: StorageIterator> Ord for HeapWrapper<I> {
.key()
.cmp(&other.1.key())
.then(self.0.cmp(&other.0))
.reverse()
.reverse() // 反转,实现让小的在堆顶部
}
}
@@ -54,12 +55,39 @@ impl<I: StorageIterator> Ord for HeapWrapper<I> {
/// iterators, prefer the one with smaller index.
pub struct MergeIterator<I: StorageIterator> {
iters: BinaryHeap<HeapWrapper<I>>,
// 待处理的堆,里面也是迭代器,每个都是(usize, Box<I>),iter.1就是真正的那个usize对应的memtable的迭代器
// 由于最新的iter.0最小,因此拿到的就是最新的数据
current: Option<HeapWrapper<I>>,
// 当前的迭代器,这个是真正要使用的
}
impl<I: StorageIterator> MergeIterator<I> {
pub fn create(iters: Vec<Box<I>>) -> Self {
unimplemented!()
/*
把所有的iter只要有效就放在堆里
然后最后从堆中弹出一个有效的iter作为current
*/
let mut heap = BinaryHeap::new();
for (idx, iter) in iters.into_iter().enumerate() {
if iter.is_valid() {
heap.push(HeapWrapper(idx, iter));
}
}
let mut current = None;
while let Some(candidate) = heap.pop() {
// 如果还有效,就作为 current
if candidate.1.is_valid() {
current = Some(candidate);
break;
}
// 否则继续尝试下一个
}
Self {
iters: heap,
current,
}
}
}
@@ -68,19 +96,87 @@ impl<I: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>> StorageIt
{
type KeyType<'a> = KeySlice<'a>;
fn key(&self) -> KeySlice {
unimplemented!()
fn key(&self) -> KeySlice<'_> {
self.current.as_ref().unwrap().1.key()
}
fn value(&self) -> &[u8] {
unimplemented!()
self.current.as_ref().unwrap().1.value()
}
fn is_valid(&self) -> bool {
unimplemented!()
self.current
.as_ref()
.map(|x| x.1.is_valid())
.unwrap_or(false)
// unimplemented!()
}
fn next(&mut self) -> Result<()> {
unimplemented!()
// let current = self.current.as_mut().unwrap(); // 当前的迭代器
let Some(mut current) = self.current.take() else {
return Ok(());
};
let current_key = current.1.key().to_key_vec(); // 需要clone一下否则会遇到所有权问题
// 先把当前的迭代器往下走一步
// current.1.next 本质是 *current.1, 是一个可变借用
if let Err(e) = current.1.next() {
self.current = None;
return Err(e);
} else {
// 如果当前的迭代器还是有效的,就把它放回堆里
if current.1.is_valid() {
self.iters.push(current);
}
}
while let Some(mut heap_top) = self.iters.peek_mut() {
if !heap_top.1.is_valid() {
// 如果当前迭代器失效了,就把它弹出
PeekMut::pop(heap_top);
continue;
}
if current_key.as_key_slice() == heap_top.1.key() {
if let Err(e) = heap_top.1.next() {
PeekMut::pop(heap_top);
return Err(e);
} else {
if !heap_top.1.is_valid() {
PeekMut::pop(heap_top);
}
}
} else {
break;
}
}
// 弹出新的最小 key 对应的那个迭代器,作为 current
while let Some(candidate) = self.iters.pop() {
if candidate.1.is_valid() {
self.current = Some(candidate);
return Ok(());
}
}
self.current = None;
Ok(())
}
/*
深入理解合并原理;
1. 首先多个memtable的iter都被放在一个堆中
其中先按照key的大小排序再按照memtable的index排序
这样就保证了堆顶的元素是当前所有iter中key最小
2. current字段指的是当前的iter但是堆中又很多iter
需要先保存一下current的key
再需将current调用一次next尝试往后走一步然后再把它放回堆
实现堆顶的元素是key最小的。
3. 然后就是从堆顶拿出数据看一看它的key
如果和current的key相同就说明这个key是旧值
需要将堆顶的iter也调用next尝试往后走一步
然后再把它放回堆
直到堆顶的key和current的key不相同为止
4. 最后将堆顶的iter弹出作为新的current
*/
}

View File

@@ -68,7 +68,7 @@ impl Key<Vec<u8>> {
self.0.extend(key_slice.0);
}
pub fn as_key_slice(&self) -> KeySlice {
pub fn as_key_slice(&'_ self) -> KeySlice<'_> {
Key(self.0.as_slice())
}

View File

@@ -39,19 +39,25 @@ impl StorageIterator for LsmIterator {
type KeyType<'a> = &'a [u8];
fn is_valid(&self) -> bool {
unimplemented!()
self.inner.is_valid()
}
fn key(&self) -> &[u8] {
unimplemented!()
self.inner.key().into_inner()
}
fn value(&self) -> &[u8] {
unimplemented!()
self.inner.value()
}
fn next(&mut self) -> Result<()> {
unimplemented!()
// TODO: I dont know if I need use ? to return the Error or not
self.inner.next()?;
// 跳过已经删除的key
while self.inner.is_valid() && self.inner.value().is_empty() {
self.inner.next()?;
}
Ok(())
}
}
@@ -79,18 +85,40 @@ impl<I: StorageIterator> StorageIterator for FusedIterator<I> {
Self: 'a;
fn is_valid(&self) -> bool {
unimplemented!()
// self.iter.is_valid() && !self.has_errored
if self.has_errored {
false
} else {
self.iter.is_valid()
}
}
fn key(&self) -> Self::KeyType<'_> {
unimplemented!()
self.iter.key()
}
fn value(&self) -> &[u8] {
unimplemented!()
self.iter.value()
}
fn next(&mut self) -> Result<()> {
unimplemented!()
// 按照测试用例出现错误就要返回err
if self.has_errored {
return Err(anyhow::anyhow!("iterator has errored previously"));
}
// 但是对于迭代器已经失效,不应该报错
if !self.iter.is_valid() {
return Ok(());
}
//
match self.iter.next() {
Ok(()) => Ok(()),
Err(e) => {
self.has_errored = true;
Err(e)
}
}
}
}

View File

@@ -21,7 +21,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use anyhow::Result;
use anyhow::{Ok, Result};
use bytes::Bytes;
use parking_lot::{Mutex, MutexGuard, RwLock};
@@ -30,6 +30,8 @@ use crate::compact::{
CompactionController, CompactionOptions, LeveledCompactionController, LeveledCompactionOptions,
SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController,
};
use crate::iterators::StorageIterator;
use crate::iterators::merge_iterator::MergeIterator;
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::Manifest;
use crate::mem_table::MemTable;
@@ -243,6 +245,29 @@ impl MiniLsm {
}
impl LsmStorageInner {
pub fn show_memtable_datas(&self) {
let state = self.state.read();
println!("Current Memtable Data:");
for entry in state.memtable.map.iter() {
println!(
"Key: {}, Value: {}",
String::from_utf8_lossy(entry.key()),
String::from_utf8_lossy(entry.value())
);
}
for (i, imm_memtable) in state.imm_memtables.iter().enumerate() {
println!("Immutable Memtable {} Data:", i);
for entry in imm_memtable.map.iter() {
println!(
"Key: {}, Value: {}",
String::from_utf8_lossy(entry.key()),
String::from_utf8_lossy(entry.value())
);
}
}
}
pub(crate) fn next_sst_id(&self) -> usize {
self.next_sst_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
@@ -297,8 +322,27 @@ impl LsmStorageInner {
}
/// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
pub fn get(&self, _key: &[u8]) -> Result<Option<Bytes>> {
unimplemented!()
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
let state = self.state.read();
if let Some(value) = state.memtable.get(key) {
if value.is_empty() {
return Ok(None);
}
return Ok(Some(value));
}
for imm_memtable in &state.imm_memtables {
if let Some(value) = imm_memtable.get(key) {
if value.is_empty() {
// Empty value means the key was deleted
return Ok(None);
}
return Ok(Some(value));
}
}
Ok(None)
}
/// Write a batch of data into the storage. Implement in week 2 day 7.
@@ -307,13 +351,29 @@ impl LsmStorageInner {
}
/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
unimplemented!()
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let memtable_size = {
let state = self.state.read();
state.memtable.put(key, value)?;
state.memtable.approximate_size()
}; // Release read lock here
// Check if we need to freeze memtable (double-check pattern)
if memtable_size >= self.options.target_sst_size {
let state_lock = self.state_lock.lock(); // 状态修改需要使用 Mutex 进行同步
let state = self.state.read(); // 重新获取读锁
if state.memtable.approximate_size() >= self.options.target_sst_size {
drop(state); // Release read lock before calling freeze
self.force_freeze_memtable(&state_lock)?;
}
}
Ok(())
}
/// Remove a key from the storage by writing an empty value.
pub fn delete(&self, _key: &[u8]) -> Result<()> {
unimplemented!()
pub fn delete(&self, key: &[u8]) -> Result<()> {
self.put(key, &[])
}
pub(crate) fn path_of_sst_static(path: impl AsRef<Path>, id: usize) -> PathBuf {
@@ -338,7 +398,39 @@ impl LsmStorageInner {
/// Force freeze the current memtable to an immutable memtable
pub fn force_freeze_memtable(&self, _state_lock_observer: &MutexGuard<'_, ()>) -> Result<()> {
unimplemented!()
// println!("Freezing memtable...");
// state 包含新的 memtable 和 一个 新的 imm_memtable(clone 之前的然后加入新的)
let new_memtable = if self.options.enable_wal {
MemTable::create_with_wal(self.next_sst_id(), &self.path)?
} else {
MemTable::create(self.next_sst_id())
};
{
let mut state = self.state.write();
let cur_state = state.as_ref();
let mut new_imm_table = cur_state.imm_memtables.clone();
new_imm_table.insert(0, cur_state.memtable.clone());
// 将外部创建的 new_memtable 放入新的 state
// 以及 新的 imm_memtable
let new_state = Arc::new(LsmStorageState {
memtable: Arc::new(new_memtable),
imm_memtables: new_imm_table,
l0_sstables: cur_state.l0_sstables.clone(),
levels: cur_state.levels.clone(),
sstables: cur_state.sstables.clone(),
});
*state = new_state;
// println!("Memtable frozen.");
// println!(
// "new state immutable memtables: {}",
// state.imm_memtables.len()
// );
}
Ok(())
// unimplemented!()
}
/// Force flush the earliest-created immutable memtable to disk
@@ -354,9 +446,40 @@ impl LsmStorageInner {
/// Create an iterator over a range of keys.
pub fn scan(
&self,
_lower: Bound<&[u8]>,
_upper: Bound<&[u8]>,
lower: Bound<&[u8]>,
upper: Bound<&[u8]>,
) -> Result<FusedIterator<LsmIterator>> {
unimplemented!()
let state = self.state.read();
// 收集所有 memtable 的迭代器
let mut mem_iters = Vec::new();
// 当前 memtable
mem_iters.push(Box::new(state.memtable.scan(lower, upper)));
// 所有不可变 memtables从新到旧
// 从新到旧的原因是因为每次新插入的都是从开头插入的
for imm_memtable in &state.imm_memtables {
mem_iters.push(Box::new(imm_memtable.scan(lower, upper)));
}
// 创建 merge iterator
let merge_iter = MergeIterator::create(mem_iters);
println!("MergeIterator created.");
let mut lsm_iter = LsmIterator::new(merge_iter)?;
println!("LsmIterator created.");
// 判断一个key是否被删除需要跳过这些
// 教训注意要把跳过删除的key的逻辑放在这里
// 而非其他的底层结构MergerIterMemTableIter...
while lsm_iter.is_valid() && lsm_iter.value().is_empty() {
lsm_iter.next()?;
}
// FusedIterator 保证了迭代器一旦失效就什么都不做(返回Ok(()))
// 而不是报错, FusedIterator 只是一层简单的包装
let fused_iter = FusedIterator::new(lsm_iter);
Ok(fused_iter)
}
}

View File

@@ -6,7 +6,7 @@
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// Unless required by applicable law or agreed to in writing, softwar,
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
@@ -15,14 +15,15 @@
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
use std::collections::BTreeMap;
use std::ops::Bound;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use anyhow::Result;
use anyhow::{Result, anyhow};
use bytes::Bytes;
use crossbeam_skiplist::SkipMap;
use crossbeam_skiplist::{SkipList, SkipMap};
use ouroboros::self_referencing;
use crate::iterators::StorageIterator;
@@ -35,7 +36,14 @@ use crate::wal::Wal;
/// An initial implementation of memtable is part of week 1, day 1. It will be incrementally implemented in other
/// chapters of week 1 and week 2.
pub struct MemTable {
map: Arc<SkipMap<Bytes, Bytes>>,
pub map: Arc<SkipMap<Bytes, Bytes>>,
wal: Option<Wal>,
id: usize,
approximate_size: Arc<AtomicUsize>,
}
pub struct BTreeMemTable {
map: Arc<BTreeMap<Bytes, Bytes>>,
wal: Option<Wal>,
id: usize,
approximate_size: Arc<AtomicUsize>,
@@ -52,13 +60,25 @@ pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound<Bytes> {
impl MemTable {
/// Create a new mem-table.
pub fn create(_id: usize) -> Self {
unimplemented!()
pub fn create(id: usize) -> Self {
Self {
map: Arc::new(SkipMap::new()),
wal: None,
id,
approximate_size: Arc::new(AtomicUsize::new(0)),
}
// unimplemented!()
}
/// Create a new mem-table with WAL
pub fn create_with_wal(_id: usize, _path: impl AsRef<Path>) -> Result<Self> {
unimplemented!()
pub fn create_with_wal(id: usize, path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {
map: Arc::new(SkipMap::new()),
wal: Some(Wal::create(path)?),
id,
approximate_size: Arc::new(AtomicUsize::new(0)),
})
// unimplemented!()
}
/// Create a memtable from WAL
@@ -86,8 +106,17 @@ impl MemTable {
}
/// Get a value by key.
pub fn get(&self, _key: &[u8]) -> Option<Bytes> {
unimplemented!()
pub fn get(&self, key: &[u8]) -> Option<Bytes> {
if let Some(entry) = self.map.get(key) {
// entry.value() is Bytes
// 就算是 [] 也要返回Some([])
// 因为这个检测是被delete还是没有这个key是上层决定的。
// 上层逻辑是 Some([]) => None
// Some(v) => Some(v)
Some(entry.value().clone())
} else {
None
}
}
/// Put a key-value pair into the mem-table.
@@ -95,11 +124,23 @@ impl MemTable {
/// In week 1, day 1, simply put the key-value pair into the skipmap.
/// In week 2, day 6, also flush the data to WAL.
/// In week 3, day 5, modify the function to use the batch API.
pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
unimplemented!()
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let key_bytes = Bytes::copy_from_slice(key);
let value_bytes = Bytes::copy_from_slice(value);
// Calculate size increase
let size_increase = key_bytes.len() + value_bytes.len();
self.map.insert(key_bytes, value_bytes);
// Update approximate size
// 如果一个key被插入两次导致近似大小计算两次
self.approximate_size
.fetch_add(size_increase, std::sync::atomic::Ordering::Relaxed);
Ok(())
// unimplemented!()
}
/// Implement this in week 3, day 5.
/// Implement this in week 3, day 5; if you want to implement this earlier, use `&[u8]` as the key type.
pub fn put_batch(&self, _data: &[(KeySlice, &[u8])]) -> Result<()> {
unimplemented!()
}
@@ -112,8 +153,17 @@ impl MemTable {
}
/// Get an iterator over a range of keys.
pub fn scan(&self, _lower: Bound<&[u8]>, _upper: Bound<&[u8]>) -> MemTableIterator {
unimplemented!()
pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> MemTableIterator {
let (lower_bound, upper_bound) = (map_bound(lower), map_bound(upper));
// 创建MemtableIterator
let mut iter = MemTableIteratorBuilder {
map: self.map.clone(),
iter_builder: |map| map.range((lower_bound, upper_bound)),
item: (Bytes::new(), Bytes::new()),
}
.build();
iter.next().unwrap();
iter
}
/// Flush the mem-table to SSTable. Implement in week 1 day 6.
@@ -159,18 +209,29 @@ impl StorageIterator for MemTableIterator {
type KeyType<'a> = KeySlice<'a>;
fn value(&self) -> &[u8] {
unimplemented!()
self.borrow_item().1.as_ref()
}
fn key(&self) -> KeySlice {
unimplemented!()
fn key(&'_ self) -> KeySlice<'_> {
KeySlice::from_slice(self.borrow_item().0.as_ref())
}
fn is_valid(&self) -> bool {
unimplemented!()
// self.borrow_map()
// .get(self.borrow_item().0.as_ref())
// .is_some()
!self.borrow_item().0.is_empty()
}
fn next(&mut self) -> Result<()> {
unimplemented!()
self.with_mut(|field| {
if let Some(entry) = field.iter.next() {
*field.item = (entry.key().clone(), entry.value().clone());
} else {
*field.item = (Bytes::new(), Bytes::new());
}
});
Ok(())
}
}

View File

@@ -25,7 +25,7 @@ use std::sync::Arc;
use anyhow::Result;
pub use builder::SsTableBuilder;
use bytes::Buf;
use bytes::{Buf, BufMut};
pub use iterator::SsTableIterator;
use crate::block::Block;
@@ -50,15 +50,46 @@ impl BlockMeta {
/// in order to help keep track of `first_key` when decoding from the same buffer in the future.
pub fn encode_block_meta(
block_meta: &[BlockMeta],
#[allow(clippy::ptr_arg)] // remove this allow after you finish
// #[allow(clippy::ptr_arg)] // remove this allow after you finish
buf: &mut Vec<u8>,
) {
unimplemented!()
for meta in block_meta {
let first_key_len = meta.first_key.len() as u16;
let last_key_len = meta.last_key.len() as u16;
buf.put_u32_le(meta.offset as u32);
buf.put_u16_le(first_key_len);
buf.put_slice(&meta.first_key.as_key_slice().into_inner());
buf.put_u16_le(last_key_len);
buf.put_slice(&meta.last_key.as_key_slice().into_inner());
}
}
/// Decode block meta from a buffer.
pub fn decode_block_meta(buf: impl Buf) -> Vec<BlockMeta> {
unimplemented!()
pub fn decode_block_meta(mut buf: impl Buf) -> Vec<BlockMeta> {
let mut block_meta = Vec::new();
while buf.has_remaining() {
let offset = buf.get_u32_le() as usize;
let key_len = buf.get_u16_le();
let mut first_key = vec![0u8; key_len as usize];
buf.copy_to_slice(&mut first_key);
let last_key_len = buf.get_u16_le() as usize;
let mut last_key = vec![0u8; last_key_len];
buf.copy_to_slice(&mut last_key);
block_meta.push(BlockMeta {
offset,
first_key: KeyBytes::from_bytes(first_key.into()),
last_key: KeyBytes::from_bytes(last_key.into()),
});
}
block_meta
}
}
@@ -152,7 +183,18 @@ impl SsTable {
/// Read a block from disk, with block cache. (Day 4)
pub fn read_block_cached(&self, block_idx: usize) -> Result<Arc<Block>> {
unimplemented!()
if let Some(cache) = &self.block_cache {
if let Some(block) = cache.get(&(self.id, block_idx)) {
return Ok(block);
} else {
let block = self.read_block(block_idx)?;
cache.insert((self.id, block_idx), block.clone());
return Ok(block);
}
} else {
return self.read_block(block_idx);
}
// unimplemented!()
}
/// Find the block that may contain `key`.

View File

@@ -15,13 +15,19 @@
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
use std::path::Path;
use std::sync::Arc;
use std::{arch::x86_64::_SIDD_LEAST_SIGNIFICANT, path::Path};
use anyhow::Result;
use bytes::{BufMut, Bytes};
use super::{BlockMeta, SsTable};
use crate::{block::BlockBuilder, key::KeySlice, lsm_storage::BlockCache};
use crate::{
block::{self, BlockBuilder},
key::{self, KeySlice},
lsm_storage::BlockCache,
table::FileObject,
};
/// Builds an SSTable from key-value pairs.
pub struct SsTableBuilder {
@@ -36,7 +42,14 @@ pub struct SsTableBuilder {
impl SsTableBuilder {
/// Create a builder based on target block size.
pub fn new(block_size: usize) -> Self {
unimplemented!()
Self {
builder: BlockBuilder::new(block_size),
first_key: Vec::new(),
last_key: Vec::new(),
data: Vec::new(),
meta: Vec::new(),
block_size,
}
}
/// Adds a key-value pair to SSTable.
@@ -44,7 +57,38 @@ impl SsTableBuilder {
/// Note: You should split a new block when the current block is full.(`std::mem::replace` may
/// be helpful here)
pub fn add(&mut self, key: KeySlice, value: &[u8]) {
unimplemented!()
let is_added = self.builder.add(key, value);
if is_added {
if self.first_key.is_empty() {
self.first_key.put_slice(key.into_inner());
} else {
self.last_key.clear();
self.last_key.put_slice(key.into_inner());
}
} else {
// Current block is full, need to finish it and start a new one.
let block_meta = BlockMeta {
first_key: key::Key::from_bytes(Bytes::copy_from_slice(&self.first_key)),
last_key: key::Key::from_bytes(Bytes::copy_from_slice(&self.last_key)),
offset: self.data.len(),
};
self.meta.push(block_meta);
// Finish the current block and get its data
let old_builder =
std::mem::replace(&mut self.builder, BlockBuilder::new(self.block_size));
let block_data = old_builder.build().encode();
self.data.extend_from_slice(&block_data);
// Add the key-value pair to the new block
// let _ = self.builder.add(key, value);
// self.first_key.clear();
// self.first_key.extend_from_slice(key.into_inner());
// self.last_key.clear();
// or just recursively call add
self.add(key, value);
}
}
/// Get the estimated size of the SSTable.
@@ -52,17 +96,47 @@ impl SsTableBuilder {
/// Since the data blocks contain much more data than meta blocks, just return the size of data
/// blocks here.
pub fn estimated_size(&self) -> usize {
unimplemented!()
self.data.len()
}
/// Builds the SSTable and writes it to the given path. Use the `FileObject` structure to manipulate the disk objects.
pub fn build(
self,
mut self,
id: usize,
block_cache: Option<Arc<BlockCache>>,
path: impl AsRef<Path>,
) -> Result<SsTable> {
unimplemented!()
let block = self.builder.build();
self.meta.push(BlockMeta {
first_key: key::Key::from_bytes(Bytes::copy_from_slice(&self.first_key)),
last_key: key::Key::from_bytes(Bytes::copy_from_slice(&self.last_key)),
offset: self.data.len(),
});
self.data.extend_from_slice(&block.encode());
let meta_offset = self.data.len();
BlockMeta::encode_block_meta(&self.meta, &mut self.data);
self.data.extend_from_slice(&(meta_offset as u32).to_le_bytes());
let file = FileObject::create(path.as_ref(), self.data)?;
let first_key = self.meta.first().unwrap().first_key.clone();
let last_key = self.meta.last().unwrap().last_key.clone();
Ok(SsTable {
id,
file,
first_key,
last_key,
block_meta: self.meta,
block_meta_offset: meta_offset,
block_cache,
bloom: None,
max_ts: 0,
})
}
#[cfg(test)]

View File

@@ -32,24 +32,71 @@ pub struct SsTableIterator {
impl SsTableIterator {
/// Create a new iterator and seek to the first key-value pair in the first data block.
pub fn create_and_seek_to_first(table: Arc<SsTable>) -> Result<Self> {
unimplemented!()
let block = table.read_block_cached(0)?;
let blk_iter = BlockIterator::create_and_seek_to_first(block);
let iter = Self {
table,
blk_iter,
blk_idx: 0,
};
// iter.seek_to_first()?;
Ok(iter)
}
/// Seek to the first key-value pair in the first data block.
pub fn seek_to_first(&mut self) -> Result<()> {
unimplemented!()
self.blk_idx = 0;
let block = self.table.read_block_cached(self.blk_idx)?;
self.blk_iter = BlockIterator::create_and_seek_to_first(block);
Ok(())
}
/// Create a new iterator and seek to the first key-value pair which >= `key`.
pub fn create_and_seek_to_key(table: Arc<SsTable>, key: KeySlice) -> Result<Self> {
unimplemented!()
let mut iter = Self {
table: table.clone(),
blk_iter: BlockIterator::create_and_seek_to_first(table.read_block_cached(0)?),
blk_idx: 0,
};
iter.seek_to_key(key)?;
Ok(iter)
}
/// Seek to the first key-value pair which >= `key`.
/// Note: You probably want to review the handout for detailed explanation when implementing
/// this function.
pub fn seek_to_key(&mut self, key: KeySlice) -> Result<()> {
unimplemented!()
let metas = &self.table.block_meta;
let mut left = 0;
let mut right = metas.len();
while left < right {
let mid = left + (right - left) / 2;
let mid_first_key = &metas[mid].first_key;
if mid_first_key.as_key_slice() <= key {
left = mid + 1;
} else {
right = mid;
}
}
self.blk_idx = if left == 0 { 0 } else { left - 1 };
let block = self.table.read_block_cached(self.blk_idx)?;
self.blk_iter = BlockIterator::create_and_seek_to_key(block, key);
// 如果在当前的block没有这个key说明可能在下一个block里面
if !self.blk_iter.is_valid() && self.blk_idx + 1 < self.table.num_of_blocks() {
self.blk_idx += 1;
let next_block = self.table.read_block_cached(self.blk_idx)?;
self.blk_iter = BlockIterator::create_and_seek_to_first(next_block);
}
Ok(())
}
}
@@ -57,23 +104,32 @@ impl StorageIterator for SsTableIterator {
type KeyType<'a> = KeySlice<'a>;
/// Return the `key` that's held by the underlying block iterator.
fn key(&self) -> KeySlice {
unimplemented!()
fn key(&'_ self) -> KeySlice<'_> {
self.blk_iter.key()
}
/// Return the `value` that's held by the underlying block iterator.
fn value(&self) -> &[u8] {
unimplemented!()
self.blk_iter.value()
}
/// Return whether the current block iterator is valid or not.
fn is_valid(&self) -> bool {
unimplemented!()
self.blk_iter.is_valid()
}
/// Move to the next `key` in the block.
/// Note: You may want to check if the current block iterator is valid after the move.
fn next(&mut self) -> Result<()> {
unimplemented!()
self.blk_iter.next();
if !self.blk_iter.is_valid() && self.blk_idx + 1 < self.table.num_of_blocks() {
self.blk_idx += 1;
let next_block = self.table.read_block_cached(self.blk_idx)?;
self.blk_iter = BlockIterator::create_and_seek_to_first(next_block);
}
Ok(())
}
}

View File

@@ -1,16 +1,7 @@
// Copyright (c) 2022-2025 Alex Chi Z
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! DO NOT MODIFY -- Mini-LSM tests modules
//! This file will be automatically rewritten by the copy-test command.
mod harness;
mod week1_day1;
mod week1_day2;
mod week1_day3;

View File

@@ -0,0 +1,462 @@
#![allow(dead_code)] // REMOVE THIS LINE once all modules are complete
// Copyright (c) 2022-2025 Alex Chi Z
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
collections::BTreeMap, ops::Bound, os::unix::fs::MetadataExt, path::Path, sync::Arc,
time::Duration,
};
use anyhow::{Result, bail};
use bytes::Bytes;
use crate::{
compact::{
CompactionOptions, LeveledCompactionOptions, SimpleLeveledCompactionOptions,
TieredCompactionOptions,
},
iterators::{StorageIterator, merge_iterator::MergeIterator},
key::{KeySlice, TS_ENABLED},
lsm_storage::{BlockCache, LsmStorageInner, LsmStorageState, MiniLsm},
table::{SsTable, SsTableBuilder, SsTableIterator},
};
#[derive(Clone)]
pub struct MockIterator {
pub data: Vec<(Bytes, Bytes)>,
pub error_when: Option<usize>,
pub index: usize,
}
impl MockIterator {
pub fn new(data: Vec<(Bytes, Bytes)>) -> Self {
Self {
data,
index: 0,
error_when: None,
}
}
pub fn new_with_error(data: Vec<(Bytes, Bytes)>, error_when: usize) -> Self {
Self {
data,
index: 0,
error_when: Some(error_when),
}
}
}
impl StorageIterator for MockIterator {
type KeyType<'a> = KeySlice<'a>;
fn next(&mut self) -> Result<()> {
if self.index < self.data.len() {
self.index += 1;
}
if let Some(error_when) = self.error_when {
if self.index == error_when {
bail!("fake error!");
}
}
Ok(())
}
fn key(&self) -> KeySlice {
if let Some(error_when) = self.error_when {
if self.index >= error_when {
panic!("invalid access after next returns an error!");
}
}
KeySlice::for_testing_from_slice_no_ts(self.data[self.index].0.as_ref())
}
fn value(&self) -> &[u8] {
if let Some(error_when) = self.error_when {
if self.index >= error_when {
panic!("invalid access after next returns an error!");
}
}
self.data[self.index].1.as_ref()
}
fn is_valid(&self) -> bool {
if let Some(error_when) = self.error_when {
if self.index >= error_when {
panic!("invalid access after next returns an error!");
}
}
self.index < self.data.len()
}
}
pub fn as_bytes(x: &[u8]) -> Bytes {
Bytes::copy_from_slice(x)
}
pub fn check_iter_result_by_key<I>(iter: &mut I, expected: Vec<(Bytes, Bytes)>)
where
I: for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
{
for (k, v) in expected {
assert!(iter.is_valid());
assert_eq!(
k,
iter.key().for_testing_key_ref(),
"expected key: {:?}, actual key: {:?}",
k,
as_bytes(iter.key().for_testing_key_ref()),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
iter.next().unwrap();
}
assert!(
!iter.is_valid(),
"iterator should not be valid at the end of the check"
);
}
pub fn check_iter_result_by_key_and_ts<I>(iter: &mut I, expected: Vec<((Bytes, u64), Bytes)>)
where
I: for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
{
for ((k, ts), v) in expected {
assert!(iter.is_valid());
assert_eq!(
(&k[..], ts),
(
iter.key().for_testing_key_ref(),
iter.key().for_testing_ts()
),
"expected key: {:?}@{}, actual key: {:?}@{}",
k,
ts,
as_bytes(iter.key().for_testing_key_ref()),
iter.key().for_testing_ts(),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
iter.next().unwrap();
}
assert!(!iter.is_valid());
}
pub fn check_lsm_iter_result_by_key<I>(iter: &mut I, expected: Vec<(Bytes, Bytes)>)
where
I: for<'a> StorageIterator<KeyType<'a> = &'a [u8]>,
{
for (k, v) in expected {
assert!(iter.is_valid());
assert_eq!(
k,
iter.key(),
"expected key: {:?}, actual key: {:?}",
k,
as_bytes(iter.key()),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
iter.next().unwrap();
}
assert!(!iter.is_valid());
}
pub fn expect_iter_error(mut iter: impl StorageIterator) {
loop {
match iter.next() {
Ok(_) if iter.is_valid() => continue,
Ok(_) => panic!("expect an error"),
Err(_) => break,
}
}
}
pub fn generate_sst(
id: usize,
path: impl AsRef<Path>,
data: Vec<(Bytes, Bytes)>,
block_cache: Option<Arc<BlockCache>>,
) -> SsTable {
let mut builder = SsTableBuilder::new(128);
for (key, value) in data {
builder.add(KeySlice::for_testing_from_slice_no_ts(&key[..]), &value[..]);
}
builder.build(id, block_cache, path.as_ref()).unwrap()
}
pub fn generate_sst_with_ts(
id: usize,
path: impl AsRef<Path>,
data: Vec<((Bytes, u64), Bytes)>,
block_cache: Option<Arc<BlockCache>>,
) -> SsTable {
let mut builder = SsTableBuilder::new(128);
for ((key, ts), value) in data {
builder.add(
KeySlice::for_testing_from_slice_with_ts(&key[..], ts),
&value[..],
);
}
builder.build(id, block_cache, path.as_ref()).unwrap()
}
pub fn sync(storage: &LsmStorageInner) {
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
storage.force_flush_next_imm_memtable().unwrap();
}
pub fn compaction_bench(storage: Arc<MiniLsm>) {
let mut key_map = BTreeMap::<usize, usize>::new();
let gen_key = |i| format!("{:010}", i); // 10B
let gen_value = |i| format!("{:0110}", i); // 110B
let mut max_key = 0;
let overlaps = if TS_ENABLED { 10000 } else { 20000 };
for iter in 0..10 {
let range_begin = iter * 5000;
for i in range_begin..(range_begin + overlaps) {
// 120B per key, 4MB data populated
let key: String = gen_key(i);
let version = key_map.get(&i).copied().unwrap_or_default() + 1;
let value = gen_value(version);
key_map.insert(i, version);
storage.put(key.as_bytes(), value.as_bytes()).unwrap();
max_key = max_key.max(i);
}
}
std::thread::sleep(Duration::from_secs(1)); // wait until all memtables flush
while {
let snapshot = storage.inner.state.read();
!snapshot.imm_memtables.is_empty()
} {
storage.inner.force_flush_next_imm_memtable().unwrap();
}
let mut prev_snapshot = storage.inner.state.read().clone();
while {
std::thread::sleep(Duration::from_secs(1));
let snapshot = storage.inner.state.read().clone();
let to_cont = prev_snapshot.levels != snapshot.levels
|| prev_snapshot.l0_sstables != snapshot.l0_sstables;
prev_snapshot = snapshot;
to_cont
} {
println!("waiting for compaction to converge");
}
let mut expected_key_value_pairs = Vec::new();
for i in 0..(max_key + 40000) {
let key = gen_key(i);
let value = storage.get(key.as_bytes()).unwrap();
if let Some(val) = key_map.get(&i) {
let expected_value = gen_value(*val);
assert_eq!(value, Some(Bytes::from(expected_value.clone())));
expected_key_value_pairs.push((Bytes::from(key), Bytes::from(expected_value)));
} else {
assert!(value.is_none());
}
}
check_lsm_iter_result_by_key(
&mut storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
expected_key_value_pairs,
);
storage.dump_structure();
println!(
"This test case does not guarantee your compaction algorithm produces a LSM state as expected. It only does minimal checks on the size of the levels. Please use the compaction simulator to check if the compaction is correctly going on."
);
}
pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
let state = storage.inner.state.read().clone();
let compaction_options = storage.inner.options.compaction_options.clone();
let mut level_size = Vec::new();
let l0_sst_num = state.l0_sstables.len();
for (_, files) in &state.levels {
let size = match &compaction_options {
CompactionOptions::Leveled(_) => files
.iter()
.map(|x| state.sstables.get(x).as_ref().unwrap().table_size())
.sum::<u64>(),
CompactionOptions::Simple(_) | CompactionOptions::Tiered(_) => files.len() as u64,
_ => unreachable!(),
};
level_size.push(size);
}
let extra_iterators = if TS_ENABLED {
1 /* txn local iterator for OCC */
} else {
0
};
let num_iters = storage
.scan(Bound::Unbounded, Bound::Unbounded)
.unwrap()
.num_active_iterators();
let num_memtables = storage.inner.state.read().imm_memtables.len() + 1;
match compaction_options {
CompactionOptions::NoCompaction => unreachable!(),
CompactionOptions::Simple(SimpleLeveledCompactionOptions {
size_ratio_percent,
level0_file_num_compaction_trigger,
max_levels,
}) => {
assert!(l0_sst_num < level0_file_num_compaction_trigger);
assert!(level_size.len() <= max_levels);
for idx in 1..level_size.len() {
let prev_size = level_size[idx - 1];
let this_size = level_size[idx];
if prev_size == 0 && this_size == 0 {
continue;
}
assert!(
this_size as f64 / prev_size as f64 >= size_ratio_percent as f64 / 100.0,
"L{}/L{}, {}/{}<{}%",
state.levels[idx - 1].0,
state.levels[idx].0,
this_size,
prev_size,
size_ratio_percent
);
}
assert!(
num_iters <= l0_sst_num + num_memtables + max_levels + extra_iterators,
"we found {num_iters} iterators in your implementation, (l0_sst_num={l0_sst_num}, num_memtables={num_memtables}, max_levels={max_levels}) did you use concat iterators?"
);
}
CompactionOptions::Leveled(LeveledCompactionOptions {
level_size_multiplier,
level0_file_num_compaction_trigger,
max_levels,
..
}) => {
assert!(l0_sst_num < level0_file_num_compaction_trigger);
assert!(level_size.len() <= max_levels);
let last_level_size = *level_size.last().unwrap();
let mut multiplier = 1.0;
for idx in (1..level_size.len()).rev() {
multiplier *= level_size_multiplier as f64;
let this_size = level_size[idx - 1];
assert!(
// do not add hard requirement on level size multiplier considering bloom filters...
this_size as f64 / last_level_size as f64 <= 1.0 / multiplier + 0.5,
"L{}/L_max, {}/{}>>1.0/{}",
state.levels[idx - 1].0,
this_size,
last_level_size,
multiplier
);
}
assert!(
num_iters <= l0_sst_num + num_memtables + max_levels + extra_iterators,
"we found {num_iters} iterators in your implementation, (l0_sst_num={l0_sst_num}, num_memtables={num_memtables}, max_levels={max_levels}) did you use concat iterators?"
);
}
CompactionOptions::Tiered(TieredCompactionOptions {
num_tiers,
max_size_amplification_percent,
size_ratio,
min_merge_width,
..
}) => {
let size_ratio_trigger = (100.0 + size_ratio as f64) / 100.0;
assert_eq!(l0_sst_num, 0);
assert!(level_size.len() <= num_tiers);
let mut sum_size = level_size[0];
for idx in 1..level_size.len() {
let this_size = level_size[idx];
if level_size.len() > min_merge_width {
assert!(
sum_size as f64 / this_size as f64 <= size_ratio_trigger,
"violation of size ratio: sum(⬆L{})/L{}, {}/{}>{}",
state.levels[idx - 1].0,
state.levels[idx].0,
sum_size,
this_size,
size_ratio_trigger
);
}
if idx + 1 == level_size.len() {
assert!(
sum_size as f64 / this_size as f64
<= max_size_amplification_percent as f64 / 100.0,
"violation of space amp: sum(⬆L{})/L{}, {}/{}>{}%",
state.levels[idx - 1].0,
state.levels[idx].0,
sum_size,
this_size,
max_size_amplification_percent
);
}
sum_size += this_size;
}
assert!(
num_iters <= num_memtables + num_tiers + extra_iterators,
"we found {num_iters} iterators in your implementation, (num_memtables={num_memtables}, num_tiers={num_tiers}) did you use concat iterators?"
);
}
}
}
pub fn dump_files_in_dir(path: impl AsRef<Path>) {
println!("--- DIR DUMP ---");
for f in path.as_ref().read_dir().unwrap() {
let f = f.unwrap();
print!("{}", f.path().display());
println!(
", size={:.3}KB",
f.metadata().unwrap().size() as f64 / 1024.0
);
}
}
pub fn construct_merge_iterator_over_storage(
state: &LsmStorageState,
) -> MergeIterator<SsTableIterator> {
let mut iters = Vec::new();
for t in &state.l0_sstables {
iters.push(Box::new(
SsTableIterator::create_and_seek_to_first(state.sstables.get(t).cloned().unwrap())
.unwrap(),
));
}
for (_, files) in &state.levels {
for f in files {
iters.push(Box::new(
SsTableIterator::create_and_seek_to_first(state.sstables.get(f).cloned().unwrap())
.unwrap(),
));
}
}
MergeIterator::create(iters)
}

View File

@@ -0,0 +1,163 @@
// Copyright (c) 2022-2025 Alex Chi Z
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use tempfile::tempdir;
use crate::{
lsm_storage::{LsmStorageInner, LsmStorageOptions},
mem_table::MemTable,
};
#[test]
fn test_task1_memtable_get() {
let memtable = MemTable::create(0);
memtable.for_testing_put_slice(b"key1", b"value1").unwrap();
memtable.for_testing_put_slice(b"key2", b"value2").unwrap();
memtable.for_testing_put_slice(b"key3", b"value3").unwrap();
assert_eq!(
&memtable.for_testing_get_slice(b"key1").unwrap()[..],
b"value1"
);
assert_eq!(
&memtable.for_testing_get_slice(b"key2").unwrap()[..],
b"value2"
);
assert_eq!(
&memtable.for_testing_get_slice(b"key3").unwrap()[..],
b"value3"
);
}
#[test]
fn test_task1_memtable_overwrite() {
let memtable = MemTable::create(0);
memtable.for_testing_put_slice(b"key1", b"value1").unwrap();
memtable.for_testing_put_slice(b"key2", b"value2").unwrap();
memtable.for_testing_put_slice(b"key3", b"value3").unwrap();
memtable.for_testing_put_slice(b"key1", b"value11").unwrap();
memtable.for_testing_put_slice(b"key2", b"value22").unwrap();
memtable.for_testing_put_slice(b"key3", b"value33").unwrap();
assert_eq!(
&memtable.for_testing_get_slice(b"key1").unwrap()[..],
b"value11"
);
assert_eq!(
&memtable.for_testing_get_slice(b"key2").unwrap()[..],
b"value22"
);
assert_eq!(
&memtable.for_testing_get_slice(b"key3").unwrap()[..],
b"value33"
);
}
#[test]
fn test_task2_storage_integration() {
let dir = tempdir().unwrap();
let storage = Arc::new(
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(),
);
assert_eq!(&storage.get(b"0").unwrap(), &None);
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233");
assert_eq!(&storage.get(b"2").unwrap().unwrap()[..], b"2333");
assert_eq!(&storage.get(b"3").unwrap().unwrap()[..], b"23333");
storage.delete(b"2").unwrap();
assert!(storage.get(b"2").unwrap().is_none());
storage.delete(b"0").unwrap(); // should NOT report any error
}
#[test]
fn test_task3_storage_integration() {
let dir = tempdir().unwrap();
let storage = Arc::new(
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(),
);
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
assert_eq!(storage.state.read().imm_memtables.len(), 1);
let previous_approximate_size = storage.state.read().imm_memtables[0].approximate_size();
assert!(previous_approximate_size >= 15);
storage.put(b"1", b"2333").unwrap();
storage.put(b"2", b"23333").unwrap();
storage.put(b"3", b"233333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
assert_eq!(storage.state.read().imm_memtables.len(), 2);
assert!(
storage.state.read().imm_memtables[1].approximate_size() == previous_approximate_size,
"wrong order of memtables?"
);
assert!(storage.state.read().imm_memtables[0].approximate_size() > previous_approximate_size);
}
#[test]
fn test_task3_freeze_on_capacity() {
let dir = tempdir().unwrap();
let mut options = LsmStorageOptions::default_for_week1_test();
options.target_sst_size = 1024;
options.num_memtable_limit = 1000;
let storage = Arc::new(LsmStorageInner::open(dir.path(), options).unwrap());
for _ in 0..1000 {
storage.put(b"1", b"2333").unwrap();
}
let num_imm_memtables = storage.state.read().imm_memtables.len();
print!("num_imm_memtables = {}\n", num_imm_memtables);
assert!(num_imm_memtables >= 1, "no memtable frozen?");
for _ in 0..1000 {
storage.delete(b"1").unwrap();
}
assert!(
storage.state.read().imm_memtables.len() > num_imm_memtables,
"no more memtable frozen?"
);
}
#[test]
fn test_task4_storage_integration() {
let dir = tempdir().unwrap();
let storage = Arc::new(
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(),
);
assert_eq!(&storage.get(b"0").unwrap(), &None);
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
storage.delete(b"1").unwrap();
storage.delete(b"2").unwrap();
storage.put(b"3", b"2333").unwrap();
storage.put(b"4", b"23333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
storage.put(b"1", b"233333").unwrap();
storage.put(b"3", b"233333").unwrap();
assert_eq!(storage.state.read().imm_memtables.len(), 2);
assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233333");
assert_eq!(&storage.get(b"2").unwrap(), &None);
assert_eq!(&storage.get(b"3").unwrap().unwrap()[..], b"233333");
assert_eq!(&storage.get(b"4").unwrap().unwrap()[..], b"23333");
}

View File

@@ -0,0 +1,331 @@
// Copyright (c) 2022-2025 Alex Chi Z
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{ops::Bound, sync::Arc};
use bytes::Bytes;
use tempfile::tempdir;
use crate::{
iterators::{StorageIterator, merge_iterator::MergeIterator},
lsm_iterator::FusedIterator,
lsm_storage::{LsmStorageInner, LsmStorageOptions},
mem_table::MemTable,
tests::harness::check_lsm_iter_result_by_key,
};
use super::harness::{MockIterator, check_iter_result_by_key, expect_iter_error};
#[test]
fn test_task1_memtable_iter() {
use std::ops::Bound;
let memtable = MemTable::create(0);
memtable.for_testing_put_slice(b"key1", b"value1").unwrap();
memtable.for_testing_put_slice(b"key2", b"value2").unwrap();
memtable.for_testing_put_slice(b"key3", b"value3").unwrap();
{
let mut iter = memtable.for_testing_scan_slice(Bound::Unbounded, Bound::Unbounded);
assert_eq!(iter.key().for_testing_key_ref(), b"key1");
assert_eq!(iter.value(), b"value1");
assert!(iter.is_valid());
iter.next().unwrap();
assert_eq!(iter.key().for_testing_key_ref(), b"key2");
assert_eq!(iter.value(), b"value2");
assert!(iter.is_valid());
iter.next().unwrap();
assert_eq!(iter.key().for_testing_key_ref(), b"key3");
assert_eq!(iter.value(), b"value3");
assert!(iter.is_valid());
iter.next().unwrap();
assert!(!iter.is_valid());
}
{
let mut iter =
memtable.for_testing_scan_slice(Bound::Included(b"key1"), Bound::Included(b"key2"));
assert_eq!(iter.key().for_testing_key_ref(), b"key1");
assert_eq!(iter.value(), b"value1");
assert!(iter.is_valid());
iter.next().unwrap();
assert_eq!(iter.key().for_testing_key_ref(), b"key2");
assert_eq!(iter.value(), b"value2");
assert!(iter.is_valid());
iter.next().unwrap();
assert!(!iter.is_valid());
}
{
let mut iter =
memtable.for_testing_scan_slice(Bound::Excluded(b"key1"), Bound::Excluded(b"key3"));
assert_eq!(iter.key().for_testing_key_ref(), b"key2");
assert_eq!(iter.value(), b"value2");
assert!(iter.is_valid());
iter.next().unwrap();
assert!(!iter.is_valid());
}
}
#[test]
fn test_task1_empty_memtable_iter() {
use std::ops::Bound;
let memtable = MemTable::create(0);
{
let iter =
memtable.for_testing_scan_slice(Bound::Excluded(b"key1"), Bound::Excluded(b"key3"));
assert!(!iter.is_valid());
}
{
let iter =
memtable.for_testing_scan_slice(Bound::Included(b"key1"), Bound::Included(b"key2"));
assert!(!iter.is_valid());
}
{
let iter = memtable.for_testing_scan_slice(Bound::Unbounded, Bound::Unbounded);
assert!(!iter.is_valid());
}
}
#[test]
fn test_task2_merge_1() {
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("e"), Bytes::new()),
]);
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let i3 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.3")),
(Bytes::from("c"), Bytes::from("3.3")),
(Bytes::from("d"), Bytes::from("4.3")),
]);
let mut iter = MergeIterator::create(vec![
Box::new(i1.clone()),
Box::new(i2.clone()),
Box::new(i3.clone()),
]);
check_iter_result_by_key(
&mut iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("d"), Bytes::from("4.2")),
(Bytes::from("e"), Bytes::new()),
],
);
let mut iter = MergeIterator::create(vec![Box::new(i3), Box::new(i1), Box::new(i2)]);
check_iter_result_by_key(
&mut iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.3")),
(Bytes::from("c"), Bytes::from("3.3")),
(Bytes::from("d"), Bytes::from("4.3")),
(Bytes::from("e"), Bytes::new()),
],
);
}
#[test]
fn test_task2_merge_2() {
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new(vec![
(Bytes::from("d"), Bytes::from("1.2")),
(Bytes::from("e"), Bytes::from("2.2")),
(Bytes::from("f"), Bytes::from("3.2")),
(Bytes::from("g"), Bytes::from("4.2")),
]);
let i3 = MockIterator::new(vec![
(Bytes::from("h"), Bytes::from("1.3")),
(Bytes::from("i"), Bytes::from("2.3")),
(Bytes::from("j"), Bytes::from("3.3")),
(Bytes::from("k"), Bytes::from("4.3")),
]);
let i4 = MockIterator::new(vec![]);
let result = vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("d"), Bytes::from("1.2")),
(Bytes::from("e"), Bytes::from("2.2")),
(Bytes::from("f"), Bytes::from("3.2")),
(Bytes::from("g"), Bytes::from("4.2")),
(Bytes::from("h"), Bytes::from("1.3")),
(Bytes::from("i"), Bytes::from("2.3")),
(Bytes::from("j"), Bytes::from("3.3")),
(Bytes::from("k"), Bytes::from("4.3")),
];
let mut iter = MergeIterator::create(vec![
Box::new(i1.clone()),
Box::new(i2.clone()),
Box::new(i3.clone()),
Box::new(i4.clone()),
]);
check_iter_result_by_key(&mut iter, result.clone());
let mut iter = MergeIterator::create(vec![
Box::new(i2.clone()),
Box::new(i4.clone()),
Box::new(i3.clone()),
Box::new(i1.clone()),
]);
check_iter_result_by_key(&mut iter, result.clone());
let mut iter =
MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]);
check_iter_result_by_key(&mut iter, result);
}
#[test]
fn test_task2_merge_empty() {
let mut iter = MergeIterator::<MockIterator>::create(vec![]);
check_iter_result_by_key(&mut iter, vec![]);
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new(vec![]);
let mut iter = MergeIterator::<MockIterator>::create(vec![Box::new(i1), Box::new(i2)]);
check_iter_result_by_key(
&mut iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
],
);
}
#[test]
fn test_task2_merge_error() {
let mut iter = MergeIterator::<MockIterator>::create(vec![]);
check_iter_result_by_key(&mut iter, vec![]);
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new_with_error(
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
],
1,
);
let iter = MergeIterator::<MockIterator>::create(vec![
Box::new(i1.clone()),
Box::new(i1),
Box::new(i2),
]);
// your implementation should correctly throw an error instead of panic
expect_iter_error(iter);
}
#[test]
fn test_task3_fused_iterator() {
let iter = MockIterator::new(vec![]);
let mut fused_iter = FusedIterator::new(iter);
assert!(!fused_iter.is_valid());
fused_iter.next().unwrap();
fused_iter.next().unwrap();
fused_iter.next().unwrap();
assert!(!fused_iter.is_valid());
let iter = MockIterator::new_with_error(
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("a"), Bytes::from("1.1")),
],
1,
);
let mut fused_iter = FusedIterator::new(iter);
assert!(fused_iter.is_valid());
assert!(fused_iter.next().is_err());
assert!(!fused_iter.is_valid());
assert!(fused_iter.next().is_err());
assert!(fused_iter.next().is_err());
}
#[test]
fn test_task4_integration() {
let dir = tempdir().unwrap();
let storage = Arc::new(
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(),
);
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
storage.delete(b"1").unwrap();
storage.delete(b"2").unwrap();
storage.put(b"3", b"2333").unwrap();
storage.put(b"4", b"23333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
storage.put(b"1", b"233333").unwrap();
storage.put(b"3", b"233333").unwrap();
{
let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap();
check_lsm_iter_result_by_key(
&mut iter,
vec![
(Bytes::from_static(b"1"), Bytes::from_static(b"233333")),
(Bytes::from_static(b"3"), Bytes::from_static(b"233333")),
(Bytes::from_static(b"4"), Bytes::from_static(b"23333")),
],
);
assert!(!iter.is_valid());
iter.next().unwrap();
iter.next().unwrap();
iter.next().unwrap();
assert!(!iter.is_valid());
}
{
let mut iter = storage
.scan(Bound::Included(b"2"), Bound::Included(b"3"))
.unwrap();
check_lsm_iter_result_by_key(
&mut iter,
vec![(Bytes::from_static(b"3"), Bytes::from_static(b"233333"))],
);
assert!(!iter.is_valid());
iter.next().unwrap();
iter.next().unwrap();
iter.next().unwrap();
assert!(!iter.is_valid());
}
}

View File

@@ -0,0 +1,161 @@
// Copyright (c) 2022-2025 Alex Chi Z
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use bytes::Bytes;
use crate::{
block::{Block, BlockBuilder, BlockIterator},
key::{KeySlice, KeyVec},
};
#[test]
fn test_block_build_single_key() {
let mut builder = BlockBuilder::new(16);
assert!(builder.add(KeySlice::for_testing_from_slice_no_ts(b"233"), b"233333"));
builder.build();
}
#[test]
fn test_block_build_full() {
let mut builder = BlockBuilder::new(16);
assert!(builder.add(KeySlice::for_testing_from_slice_no_ts(b"11"), b"11"));
assert!(!builder.add(KeySlice::for_testing_from_slice_no_ts(b"22"), b"22"));
builder.build();
}
#[test]
fn test_block_build_large_1() {
let mut builder = BlockBuilder::new(16);
assert!(builder.add(
KeySlice::for_testing_from_slice_no_ts(b"11"),
&b"1".repeat(100)
));
builder.build();
}
#[test]
fn test_block_build_large_2() {
let mut builder = BlockBuilder::new(16);
assert!(builder.add(KeySlice::for_testing_from_slice_no_ts(b"11"), b"1"));
assert!(!builder.add(
KeySlice::for_testing_from_slice_no_ts(b"11"),
&b"1".repeat(100)
));
}
fn key_of(idx: usize) -> KeyVec {
KeyVec::for_testing_from_vec_no_ts(format!("key_{:03}", idx * 5).into_bytes())
}
fn value_of(idx: usize) -> Vec<u8> {
format!("value_{:010}", idx).into_bytes()
}
fn num_of_keys() -> usize {
100
}
fn generate_block() -> Block {
let mut builder = BlockBuilder::new(10000);
for idx in 0..num_of_keys() {
let key = key_of(idx);
let value = value_of(idx);
assert!(builder.add(key.as_key_slice(), &value[..]));
}
builder.build()
}
#[test]
fn test_block_build_all() {
generate_block();
}
#[test]
fn test_block_encode() {
let block = generate_block();
block.encode();
}
#[test]
fn test_block_decode() {
let block = generate_block();
let encoded = block.encode();
let decoded_block = Block::decode(&encoded);
assert_eq!(block.offsets, decoded_block.offsets);
assert_eq!(block.data, decoded_block.data);
}
fn as_bytes(x: &[u8]) -> Bytes {
Bytes::copy_from_slice(x)
}
#[test]
fn test_block_iterator() {
let block = Arc::new(generate_block());
let mut iter = BlockIterator::create_and_seek_to_first(block);
for _ in 0..5 {
for i in 0..num_of_keys() {
let key = iter.key();
let value = iter.value();
assert_eq!(
key.for_testing_key_ref(),
key_of(i).for_testing_key_ref(),
"expected key: {:?}, actual key: {:?}",
as_bytes(key_of(i).for_testing_key_ref()),
as_bytes(key.for_testing_key_ref())
);
assert_eq!(
value,
value_of(i),
"expected value: {:?}, actual value: {:?}",
as_bytes(&value_of(i)),
as_bytes(value)
);
iter.next();
}
iter.seek_to_first();
}
}
#[test]
fn test_block_seek_key() {
let block = Arc::new(generate_block());
let mut iter = BlockIterator::create_and_seek_to_key(block, key_of(0).as_key_slice());
for offset in 1..=5 {
for i in 0..num_of_keys() {
let key = iter.key();
let value = iter.value();
assert_eq!(
key.for_testing_key_ref(),
key_of(i).for_testing_key_ref(),
"expected key: {:?}, actual key: {:?}",
as_bytes(key_of(i).for_testing_key_ref()),
as_bytes(key.for_testing_key_ref())
);
assert_eq!(
value,
value_of(i),
"expected value: {:?}, actual value: {:?}",
as_bytes(&value_of(i)),
as_bytes(value)
);
iter.seek_to_key(KeySlice::for_testing_from_slice_no_ts(
&format!("key_{:03}", i * 5 + offset).into_bytes(),
));
}
iter.seek_to_key(KeySlice::for_testing_from_slice_no_ts(b"k"));
}
}

View File

@@ -15,15 +15,16 @@
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
use anyhow::Result;
use bytes::Bytes;
use crossbeam_skiplist::SkipMap;
use parking_lot::Mutex;
use std::fs::File;
use std::io::BufWriter;
use std::path::Path;
use std::sync::Arc;
use anyhow::Result;
use bytes::Bytes;
use crossbeam_skiplist::SkipMap;
use parking_lot::Mutex;
use crate::key::KeySlice;
pub struct Wal {
file: Arc<Mutex<BufWriter<File>>>,
@@ -31,7 +32,9 @@ pub struct Wal {
impl Wal {
pub fn create(_path: impl AsRef<Path>) -> Result<Self> {
unimplemented!()
Ok(Self {
file: Arc::new(Mutex::new(BufWriter::new(File::create(_path)?))),
})
}
pub fn recover(_path: impl AsRef<Path>, _skiplist: &SkipMap<Bytes, Bytes>) -> Result<Self> {
@@ -42,8 +45,8 @@ impl Wal {
unimplemented!()
}
/// Implement this in week 3, day 5.
pub fn put_batch(&self, _data: &[(&[u8], &[u8])]) -> Result<()> {
/// Implement this in week 3, day 5; if you want to implement this earlier, use `&[u8]` as the key type.
pub fn put_batch(&self, _data: &[(KeySlice, &[u8])]) -> Result<()> {
unimplemented!()
}

View File

@@ -90,7 +90,8 @@ impl TieredCompactionController {
.take(id + 1)
.cloned()
.collect::<Vec<_>>(),
bottom_tier_included: id + 1 >= snapshot.levels.len(),
// Size ratio trigger will never include the bottom level
bottom_tier_included: false,
});
}
}

View File

@@ -121,7 +121,7 @@ impl MemTable {
Ok(())
}
/// Implement this in week 3, day 5.
/// Implement this in week 3, day 5; if you want to implement this earlier, use `&[u8]` as the key type.
pub fn put_batch(&self, _data: &[(KeySlice, &[u8])]) -> Result<()> {
unimplemented!()
}

View File

@@ -23,6 +23,8 @@ use bytes::{Buf, BufMut, Bytes};
use crossbeam_skiplist::SkipMap;
use parking_lot::Mutex;
use crate::key::KeySlice;
pub struct Wal {
file: Arc<Mutex<BufWriter<File>>>,
}
@@ -93,8 +95,8 @@ impl Wal {
Ok(())
}
/// Implement this in week 3, day 5.
pub fn put_batch(&self, _data: &[(&[u8], &[u8])]) -> Result<()> {
/// Implement this in week 3, day 5; if you want to implement this earlier, use `&[u8]` as the key type.
pub fn put_batch(&self, _data: &[(KeySlice, &[u8])]) -> Result<()> {
unimplemented!()
}