First Commit
Some checks failed
CI (main) / build (push) Has been cancelled
CI (main) / deploy (push) Has been cancelled

This commit is contained in:
ECROF88
2025-10-20 20:12:40 +08:00
parent f484bc5e6c
commit 8fb4176e71
17 changed files with 1822 additions and 97 deletions

View File

@@ -32,11 +32,40 @@ impl Block {
/// Encode the internal data to the data layout illustrated in the course
/// Note: You may want to recheck if any of the expected field is missing from your output
pub fn encode(&self) -> Bytes {
unimplemented!()
let mut buf = Vec::new();
buf.extend_from_slice(&self.data);
for offset in &self.offsets {
buf.extend_from_slice(&offset.to_le_bytes());
}
let num_of_elements = self.offsets.len() as u16;
buf.extend_from_slice(&num_of_elements.to_le_bytes());
Bytes::from(buf)
}
/// Decode from the data layout, transform the input `data` to a single `Block`
pub fn decode(data: &[u8]) -> Self {
unimplemented!()
let data_len = data.len();
let element_nums = u16::from_le_bytes([data[data_len - 2], data[data_len - 1]]) as usize;
let offset_start = data_len - element_nums * 2 - 2;
let data_vec = data[..offset_start].to_vec();
let mut offset_vec = Vec::new();
for i in 0..element_nums {
let start_position = offset_start + i * 2;
let offset = u16::from_le_bytes([data[start_position], data[start_position + 1]]);
offset_vec.push(offset);
}
Self {
data: data_vec,
offsets: offset_vec,
}
}
}

View File

@@ -15,6 +15,8 @@
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
use bytes::BufMut;
use crate::key::{KeySlice, KeyVec};
use super::Block;
@@ -34,23 +36,59 @@ pub struct BlockBuilder {
impl BlockBuilder {
/// Creates a new block builder.
pub fn new(block_size: usize) -> Self {
unimplemented!()
Self {
block_size,
data: Vec::new(),
offsets: Vec::new(),
first_key: KeyVec::new(),
}
}
/// Adds a key-value pair to the block. Returns false when the block is full.
/// You may find the `bytes::BufMut` trait useful for manipulating binary data.
#[must_use]
pub fn add(&mut self, key: KeySlice, value: &[u8]) -> bool {
unimplemented!()
// 计算添加这个 entry 后的总大小
let entry_size = 2 + key.len() + 2 + value.len(); // key_len + key + value_len + value
let new_data_size = self.data.len() + entry_size;
let new_offsets_size = (self.offsets.len() + 1) * 2; // 新增一个 offset
let total_size = new_data_size + new_offsets_size + 2; // +2 for num_of_elements
// 如果不是第一个 entry,且总大小会超过 block_size,返回 false
if !self.is_empty() && total_size > self.block_size {
return false;
}
let key_len = key.len() as u16;
let value_len = value.len() as u16;
self.offsets.push(self.data.len() as u16);
// 一开始用的extend_from_slice,使用BufMut里面的方法更简洁
self.data.put_u16_le(key.len() as u16); // 写入 u16 小端序
self.data.put_slice(key.into_inner()); // 写入 slice
self.data.put_u16_le(value.len() as u16); // 写入 u16 小端序
self.data.put_slice(value); // 写入 slice
if self.first_key.is_empty() {
self.first_key = key.to_key_vec();
}
true
}
/// Check if there is no key-value pair in the block.
pub fn is_empty(&self) -> bool {
unimplemented!()
if self.data.is_empty() && self.offsets.is_empty() {
return true;
}
false
}
/// Finalize the block.
pub fn build(self) -> Block {
unimplemented!()
Block {
data: self.data,
offsets: self.offsets,
}
}
}

View File

@@ -48,44 +48,112 @@ impl BlockIterator {
/// Creates a block iterator and seek to the first entry.
pub fn create_and_seek_to_first(block: Arc<Block>) -> Self {
unimplemented!()
let mut iter = Self::new(block);
iter.seek_to_first();
iter
}
/// Creates a block iterator and seek to the first key that >= `key`.
pub fn create_and_seek_to_key(block: Arc<Block>, key: KeySlice) -> Self {
unimplemented!()
let mut iter = Self::new(block);
iter.seek_to_key(key);
iter
}
/// Returns the key of the current entry.
pub fn key(&self) -> KeySlice {
unimplemented!()
pub fn key(&self) -> KeySlice<'_> {
self.key.as_key_slice()
}
/// Returns the value of the current entry.
pub fn value(&self) -> &[u8] {
unimplemented!()
self.get_value_at_index(self.idx)
}
/// Returns true if the iterator is valid.
/// Note: You may want to make use of `key`
pub fn is_valid(&self) -> bool {
unimplemented!()
!self.key.is_empty()
}
/// Seeks to the first key in the block.
pub fn seek_to_first(&mut self) {
unimplemented!()
let data = &self.block.data;
if !data.is_empty() {
let first_key_len = u16::from_le_bytes([data[0], data[1]]) as usize;
let first_key = data[2..2 + first_key_len].to_vec();
self.key = KeyVec::from_vec(first_key);
self.first_key = self.key.clone();
self.idx = 0;
}
}
/// Move to the next key in the block.
pub fn next(&mut self) {
unimplemented!()
if self.idx + 1 < self.block.offsets.len() {
self.idx += 1;
self.key = self.get_key_at_index(self.idx).to_key_vec();
} else {
self.idx = self.block.offsets.len();
self.key.clear();
}
}
/// Seek to the first key that >= `key`.
/// Note: You should assume the key-value pairs in the block are sorted when being added by
/// callers.
pub fn seek_to_key(&mut self, key: KeySlice) {
unimplemented!()
let offsets = &self.block.offsets;
let mut left = 0;
let mut right = offsets.len();
while left < right {
let mid = left + (right - left) / 2;
let mid_key = self.get_key_at_index(mid);
if mid_key < key {
left = mid + 1;
} else {
right = mid;
}
}
// now left is the index
if left < offsets.len() {
self.idx = left;
self.key = self.get_key_at_index(left).to_key_vec();
} else {
// not found
self.idx = offsets.len();
self.key.clear();
}
}
fn get_key_at_index(&self, index: usize) -> KeySlice<'_> {
let data_pos = self.block.offsets[index] as usize;
let key_len =
u16::from_le_bytes([self.block.data[data_pos], self.block.data[data_pos + 1]]) as usize;
let key = &self.block.data[data_pos + 2..data_pos + 2 + key_len];
KeySlice::from_slice(key)
}
fn get_value_at_index(&self, index: usize) -> &[u8] {
let data = &self.block.data;
let data_pos = self.block.offsets[index] as usize;
let key_len = u16::from_le_bytes([data[data_pos], data[data_pos + 1]]) as usize;
let value_len = u16::from_le_bytes([
data[data_pos + 2 + key_len],
data[data_pos + 2 + key_len + 1],
]) as usize;
&data[data_pos + 2 + key_len + 2..data_pos + 2 + key_len + 2 + value_len]
}
}

View File

@@ -17,6 +17,7 @@
use std::cmp::{self};
use std::collections::BinaryHeap;
use std::collections::binary_heap::PeekMut;
use anyhow::Result;
@@ -46,7 +47,7 @@ impl<I: StorageIterator> Ord for HeapWrapper<I> {
.key()
.cmp(&other.1.key())
.then(self.0.cmp(&other.0))
.reverse()
.reverse() // 反转,实现让小的在堆顶部
}
}
@@ -54,12 +55,39 @@ impl<I: StorageIterator> Ord for HeapWrapper<I> {
/// iterators, prefer the one with smaller index.
pub struct MergeIterator<I: StorageIterator> {
iters: BinaryHeap<HeapWrapper<I>>,
// 待处理的堆,里面也是迭代器,每个都是(usize, Box<I>),iter.1就是真正的那个usize对应的memtable的迭代器
// 由于最新的iter.0最小,因此拿到的就是最新的数据
current: Option<HeapWrapper<I>>,
// 当前的迭代器,这个是真正要使用的
}
impl<I: StorageIterator> MergeIterator<I> {
pub fn create(iters: Vec<Box<I>>) -> Self {
unimplemented!()
/*
把所有的iter只要有效就放在堆里
然后最后从堆中弹出一个有效的iter作为current
*/
let mut heap = BinaryHeap::new();
for (idx, iter) in iters.into_iter().enumerate() {
if iter.is_valid() {
heap.push(HeapWrapper(idx, iter));
}
}
let mut current = None;
while let Some(candidate) = heap.pop() {
// 如果还有效,就作为 current
if candidate.1.is_valid() {
current = Some(candidate);
break;
}
// 否则继续尝试下一个
}
Self {
iters: heap,
current,
}
}
}
@@ -68,19 +96,87 @@ impl<I: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>> StorageIt
{
type KeyType<'a> = KeySlice<'a>;
fn key(&self) -> KeySlice {
unimplemented!()
fn key(&self) -> KeySlice<'_> {
self.current.as_ref().unwrap().1.key()
}
fn value(&self) -> &[u8] {
unimplemented!()
self.current.as_ref().unwrap().1.value()
}
fn is_valid(&self) -> bool {
unimplemented!()
self.current
.as_ref()
.map(|x| x.1.is_valid())
.unwrap_or(false)
// unimplemented!()
}
fn next(&mut self) -> Result<()> {
unimplemented!()
// let current = self.current.as_mut().unwrap(); // 当前的迭代器
let Some(mut current) = self.current.take() else {
return Ok(());
};
let current_key = current.1.key().to_key_vec(); // 需要clone一下否则会遇到所有权问题
// 先把当前的迭代器往下走一步
// current.1.next 本质是 *current.1, 是一个可变借用
if let Err(e) = current.1.next() {
self.current = None;
return Err(e);
} else {
// 如果当前的迭代器还是有效的,就把它放回堆里
if current.1.is_valid() {
self.iters.push(current);
}
}
while let Some(mut heap_top) = self.iters.peek_mut() {
if !heap_top.1.is_valid() {
// 如果当前迭代器失效了,就把它弹出
PeekMut::pop(heap_top);
continue;
}
if current_key.as_key_slice() == heap_top.1.key() {
if let Err(e) = heap_top.1.next() {
PeekMut::pop(heap_top);
return Err(e);
} else {
if !heap_top.1.is_valid() {
PeekMut::pop(heap_top);
}
}
} else {
break;
}
}
// 弹出新的最小 key 对应的那个迭代器,作为 current
while let Some(candidate) = self.iters.pop() {
if candidate.1.is_valid() {
self.current = Some(candidate);
return Ok(());
}
}
self.current = None;
Ok(())
}
/*
深入理解合并原理;
1. 首先多个memtable的iter都被放在一个堆中
其中先按照key的大小排序再按照memtable的index排序
这样就保证了堆顶的元素是当前所有iter中key最小
2. current字段指的是当前的iter但是堆中又很多iter
需要先保存一下current的key
再需将current调用一次next尝试往后走一步然后再把它放回堆
实现堆顶的元素是key最小的。
3. 然后就是从堆顶拿出数据看一看它的key
如果和current的key相同就说明这个key是旧值
需要将堆顶的iter也调用next尝试往后走一步
然后再把它放回堆
直到堆顶的key和current的key不相同为止
4. 最后将堆顶的iter弹出作为新的current
*/
}

View File

@@ -68,7 +68,7 @@ impl Key<Vec<u8>> {
self.0.extend(key_slice.0);
}
pub fn as_key_slice(&self) -> KeySlice {
pub fn as_key_slice(&'_ self) -> KeySlice<'_> {
Key(self.0.as_slice())
}

View File

@@ -39,19 +39,25 @@ impl StorageIterator for LsmIterator {
type KeyType<'a> = &'a [u8];
fn is_valid(&self) -> bool {
unimplemented!()
self.inner.is_valid()
}
fn key(&self) -> &[u8] {
unimplemented!()
self.inner.key().into_inner()
}
fn value(&self) -> &[u8] {
unimplemented!()
self.inner.value()
}
fn next(&mut self) -> Result<()> {
unimplemented!()
// TODO: I dont know if I need use ? to return the Error or not
self.inner.next()?;
// 跳过已经删除的key
while self.inner.is_valid() && self.inner.value().is_empty() {
self.inner.next()?;
}
Ok(())
}
}
@@ -79,18 +85,40 @@ impl<I: StorageIterator> StorageIterator for FusedIterator<I> {
Self: 'a;
fn is_valid(&self) -> bool {
unimplemented!()
// self.iter.is_valid() && !self.has_errored
if self.has_errored {
false
} else {
self.iter.is_valid()
}
}
fn key(&self) -> Self::KeyType<'_> {
unimplemented!()
self.iter.key()
}
fn value(&self) -> &[u8] {
unimplemented!()
self.iter.value()
}
fn next(&mut self) -> Result<()> {
unimplemented!()
// 按照测试用例出现错误就要返回err
if self.has_errored {
return Err(anyhow::anyhow!("iterator has errored previously"));
}
// 但是对于迭代器已经失效,不应该报错
if !self.iter.is_valid() {
return Ok(());
}
//
match self.iter.next() {
Ok(()) => Ok(()),
Err(e) => {
self.has_errored = true;
Err(e)
}
}
}
}

View File

@@ -21,7 +21,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use anyhow::Result;
use anyhow::{Ok, Result};
use bytes::Bytes;
use parking_lot::{Mutex, MutexGuard, RwLock};
@@ -30,6 +30,8 @@ use crate::compact::{
CompactionController, CompactionOptions, LeveledCompactionController, LeveledCompactionOptions,
SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController,
};
use crate::iterators::StorageIterator;
use crate::iterators::merge_iterator::MergeIterator;
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::Manifest;
use crate::mem_table::MemTable;
@@ -243,6 +245,29 @@ impl MiniLsm {
}
impl LsmStorageInner {
pub fn show_memtable_datas(&self) {
let state = self.state.read();
println!("Current Memtable Data:");
for entry in state.memtable.map.iter() {
println!(
"Key: {}, Value: {}",
String::from_utf8_lossy(entry.key()),
String::from_utf8_lossy(entry.value())
);
}
for (i, imm_memtable) in state.imm_memtables.iter().enumerate() {
println!("Immutable Memtable {} Data:", i);
for entry in imm_memtable.map.iter() {
println!(
"Key: {}, Value: {}",
String::from_utf8_lossy(entry.key()),
String::from_utf8_lossy(entry.value())
);
}
}
}
pub(crate) fn next_sst_id(&self) -> usize {
self.next_sst_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
@@ -297,8 +322,27 @@ impl LsmStorageInner {
}
/// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
pub fn get(&self, _key: &[u8]) -> Result<Option<Bytes>> {
unimplemented!()
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
let state = self.state.read();
if let Some(value) = state.memtable.get(key) {
if value.is_empty() {
return Ok(None);
}
return Ok(Some(value));
}
for imm_memtable in &state.imm_memtables {
if let Some(value) = imm_memtable.get(key) {
if value.is_empty() {
// Empty value means the key was deleted
return Ok(None);
}
return Ok(Some(value));
}
}
Ok(None)
}
/// Write a batch of data into the storage. Implement in week 2 day 7.
@@ -307,13 +351,29 @@ impl LsmStorageInner {
}
/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
unimplemented!()
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let memtable_size = {
let state = self.state.read();
state.memtable.put(key, value)?;
state.memtable.approximate_size()
}; // Release read lock here
// Check if we need to freeze memtable (double-check pattern)
if memtable_size >= self.options.target_sst_size {
let state_lock = self.state_lock.lock(); // 状态修改需要使用 Mutex 进行同步
let state = self.state.read(); // 重新获取读锁
if state.memtable.approximate_size() >= self.options.target_sst_size {
drop(state); // Release read lock before calling freeze
self.force_freeze_memtable(&state_lock)?;
}
}
Ok(())
}
/// Remove a key from the storage by writing an empty value.
pub fn delete(&self, _key: &[u8]) -> Result<()> {
unimplemented!()
pub fn delete(&self, key: &[u8]) -> Result<()> {
self.put(key, &[])
}
pub(crate) fn path_of_sst_static(path: impl AsRef<Path>, id: usize) -> PathBuf {
@@ -338,7 +398,39 @@ impl LsmStorageInner {
/// Force freeze the current memtable to an immutable memtable
pub fn force_freeze_memtable(&self, _state_lock_observer: &MutexGuard<'_, ()>) -> Result<()> {
unimplemented!()
// println!("Freezing memtable...");
// state 包含新的 memtable 和 一个 新的 imm_memtable(clone 之前的然后加入新的)
let new_memtable = if self.options.enable_wal {
MemTable::create_with_wal(self.next_sst_id(), &self.path)?
} else {
MemTable::create(self.next_sst_id())
};
{
let mut state = self.state.write();
let cur_state = state.as_ref();
let mut new_imm_table = cur_state.imm_memtables.clone();
new_imm_table.insert(0, cur_state.memtable.clone());
// 将外部创建的 new_memtable 放入新的 state
// 以及 新的 imm_memtable
let new_state = Arc::new(LsmStorageState {
memtable: Arc::new(new_memtable),
imm_memtables: new_imm_table,
l0_sstables: cur_state.l0_sstables.clone(),
levels: cur_state.levels.clone(),
sstables: cur_state.sstables.clone(),
});
*state = new_state;
// println!("Memtable frozen.");
// println!(
// "new state immutable memtables: {}",
// state.imm_memtables.len()
// );
}
Ok(())
// unimplemented!()
}
/// Force flush the earliest-created immutable memtable to disk
@@ -354,9 +446,40 @@ impl LsmStorageInner {
/// Create an iterator over a range of keys.
pub fn scan(
&self,
_lower: Bound<&[u8]>,
_upper: Bound<&[u8]>,
lower: Bound<&[u8]>,
upper: Bound<&[u8]>,
) -> Result<FusedIterator<LsmIterator>> {
unimplemented!()
let state = self.state.read();
// 收集所有 memtable 的迭代器
let mut mem_iters = Vec::new();
// 当前 memtable
mem_iters.push(Box::new(state.memtable.scan(lower, upper)));
// 所有不可变 memtables从新到旧
// 从新到旧的原因是因为每次新插入的都是从开头插入的
for imm_memtable in &state.imm_memtables {
mem_iters.push(Box::new(imm_memtable.scan(lower, upper)));
}
// 创建 merge iterator
let merge_iter = MergeIterator::create(mem_iters);
println!("MergeIterator created.");
let mut lsm_iter = LsmIterator::new(merge_iter)?;
println!("LsmIterator created.");
// 判断一个key是否被删除需要跳过这些
// 教训注意要把跳过删除的key的逻辑放在这里
// 而非其他的底层结构MergerIterMemTableIter...
while lsm_iter.is_valid() && lsm_iter.value().is_empty() {
lsm_iter.next()?;
}
// FusedIterator 保证了迭代器一旦失效就什么都不做(返回Ok(()))
// 而不是报错, FusedIterator 只是一层简单的包装
let fused_iter = FusedIterator::new(lsm_iter);
Ok(fused_iter)
}
}

View File

@@ -6,7 +6,7 @@
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// Unless required by applicable law or agreed to in writing, softwar,
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
@@ -15,14 +15,15 @@
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
use std::collections::BTreeMap;
use std::ops::Bound;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use anyhow::Result;
use anyhow::{Result, anyhow};
use bytes::Bytes;
use crossbeam_skiplist::SkipMap;
use crossbeam_skiplist::{SkipList, SkipMap};
use ouroboros::self_referencing;
use crate::iterators::StorageIterator;
@@ -35,7 +36,14 @@ use crate::wal::Wal;
/// An initial implementation of memtable is part of week 1, day 1. It will be incrementally implemented in other
/// chapters of week 1 and week 2.
pub struct MemTable {
map: Arc<SkipMap<Bytes, Bytes>>,
pub map: Arc<SkipMap<Bytes, Bytes>>,
wal: Option<Wal>,
id: usize,
approximate_size: Arc<AtomicUsize>,
}
pub struct BTreeMemTable {
map: Arc<BTreeMap<Bytes, Bytes>>,
wal: Option<Wal>,
id: usize,
approximate_size: Arc<AtomicUsize>,
@@ -52,13 +60,25 @@ pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound<Bytes> {
impl MemTable {
/// Create a new mem-table.
pub fn create(_id: usize) -> Self {
unimplemented!()
pub fn create(id: usize) -> Self {
Self {
map: Arc::new(SkipMap::new()),
wal: None,
id,
approximate_size: Arc::new(AtomicUsize::new(0)),
}
// unimplemented!()
}
/// Create a new mem-table with WAL
pub fn create_with_wal(_id: usize, _path: impl AsRef<Path>) -> Result<Self> {
unimplemented!()
pub fn create_with_wal(id: usize, path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {
map: Arc::new(SkipMap::new()),
wal: Some(Wal::create(path)?),
id,
approximate_size: Arc::new(AtomicUsize::new(0)),
})
// unimplemented!()
}
/// Create a memtable from WAL
@@ -86,8 +106,17 @@ impl MemTable {
}
/// Get a value by key.
pub fn get(&self, _key: &[u8]) -> Option<Bytes> {
unimplemented!()
pub fn get(&self, key: &[u8]) -> Option<Bytes> {
if let Some(entry) = self.map.get(key) {
// entry.value() is Bytes
// 就算是 [] 也要返回Some([])
// 因为这个检测是被delete还是没有这个key是上层决定的。
// 上层逻辑是 Some([]) => None
// Some(v) => Some(v)
Some(entry.value().clone())
} else {
None
}
}
/// Put a key-value pair into the mem-table.
@@ -95,8 +124,20 @@ impl MemTable {
/// In week 1, day 1, simply put the key-value pair into the skipmap.
/// In week 2, day 6, also flush the data to WAL.
/// In week 3, day 5, modify the function to use the batch API.
pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
unimplemented!()
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let key_bytes = Bytes::copy_from_slice(key);
let value_bytes = Bytes::copy_from_slice(value);
// Calculate size increase
let size_increase = key_bytes.len() + value_bytes.len();
self.map.insert(key_bytes, value_bytes);
// Update approximate size
// 如果一个key被插入两次导致近似大小计算两次
self.approximate_size
.fetch_add(size_increase, std::sync::atomic::Ordering::Relaxed);
Ok(())
// unimplemented!()
}
/// Implement this in week 3, day 5; if you want to implement this earlier, use `&[u8]` as the key type.
@@ -112,8 +153,17 @@ impl MemTable {
}
/// Get an iterator over a range of keys.
pub fn scan(&self, _lower: Bound<&[u8]>, _upper: Bound<&[u8]>) -> MemTableIterator {
unimplemented!()
pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> MemTableIterator {
let (lower_bound, upper_bound) = (map_bound(lower), map_bound(upper));
// 创建MemtableIterator
let mut iter = MemTableIteratorBuilder {
map: self.map.clone(),
iter_builder: |map| map.range((lower_bound, upper_bound)),
item: (Bytes::new(), Bytes::new()),
}
.build();
iter.next().unwrap();
iter
}
/// Flush the mem-table to SSTable. Implement in week 1 day 6.
@@ -159,18 +209,29 @@ impl StorageIterator for MemTableIterator {
type KeyType<'a> = KeySlice<'a>;
fn value(&self) -> &[u8] {
unimplemented!()
self.borrow_item().1.as_ref()
}
fn key(&self) -> KeySlice {
unimplemented!()
fn key(&'_ self) -> KeySlice<'_> {
KeySlice::from_slice(self.borrow_item().0.as_ref())
}
fn is_valid(&self) -> bool {
unimplemented!()
// self.borrow_map()
// .get(self.borrow_item().0.as_ref())
// .is_some()
!self.borrow_item().0.is_empty()
}
fn next(&mut self) -> Result<()> {
unimplemented!()
self.with_mut(|field| {
if let Some(entry) = field.iter.next() {
*field.item = (entry.key().clone(), entry.value().clone());
} else {
*field.item = (Bytes::new(), Bytes::new());
}
});
Ok(())
}
}

View File

@@ -25,7 +25,7 @@ use std::sync::Arc;
use anyhow::Result;
pub use builder::SsTableBuilder;
use bytes::Buf;
use bytes::{Buf, BufMut};
pub use iterator::SsTableIterator;
use crate::block::Block;
@@ -50,15 +50,46 @@ impl BlockMeta {
/// in order to help keep track of `first_key` when decoding from the same buffer in the future.
pub fn encode_block_meta(
block_meta: &[BlockMeta],
#[allow(clippy::ptr_arg)] // remove this allow after you finish
// #[allow(clippy::ptr_arg)] // remove this allow after you finish
buf: &mut Vec<u8>,
) {
unimplemented!()
for meta in block_meta {
let first_key_len = meta.first_key.len() as u16;
let last_key_len = meta.last_key.len() as u16;
buf.put_u32_le(meta.offset as u32);
buf.put_u16_le(first_key_len);
buf.put_slice(&meta.first_key.as_key_slice().into_inner());
buf.put_u16_le(last_key_len);
buf.put_slice(&meta.last_key.as_key_slice().into_inner());
}
}
/// Decode block meta from a buffer.
pub fn decode_block_meta(buf: impl Buf) -> Vec<BlockMeta> {
unimplemented!()
pub fn decode_block_meta(mut buf: impl Buf) -> Vec<BlockMeta> {
let mut block_meta = Vec::new();
while buf.has_remaining() {
let offset = buf.get_u32_le() as usize;
let key_len = buf.get_u16_le();
let mut first_key = vec![0u8; key_len as usize];
buf.copy_to_slice(&mut first_key);
let last_key_len = buf.get_u16_le() as usize;
let mut last_key = vec![0u8; last_key_len];
buf.copy_to_slice(&mut last_key);
block_meta.push(BlockMeta {
offset,
first_key: KeyBytes::from_bytes(first_key.into()),
last_key: KeyBytes::from_bytes(last_key.into()),
});
}
block_meta
}
}
@@ -152,7 +183,18 @@ impl SsTable {
/// Read a block from disk, with block cache. (Day 4)
pub fn read_block_cached(&self, block_idx: usize) -> Result<Arc<Block>> {
unimplemented!()
if let Some(cache) = &self.block_cache {
if let Some(block) = cache.get(&(self.id, block_idx)) {
return Ok(block);
} else {
let block = self.read_block(block_idx)?;
cache.insert((self.id, block_idx), block.clone());
return Ok(block);
}
} else {
return self.read_block(block_idx);
}
// unimplemented!()
}
/// Find the block that may contain `key`.

View File

@@ -15,13 +15,19 @@
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
use std::path::Path;
use std::sync::Arc;
use std::{arch::x86_64::_SIDD_LEAST_SIGNIFICANT, path::Path};
use anyhow::Result;
use bytes::{BufMut, Bytes};
use super::{BlockMeta, SsTable};
use crate::{block::BlockBuilder, key::KeySlice, lsm_storage::BlockCache};
use crate::{
block::{self, BlockBuilder},
key::{self, KeySlice},
lsm_storage::BlockCache,
table::FileObject,
};
/// Builds an SSTable from key-value pairs.
pub struct SsTableBuilder {
@@ -36,7 +42,14 @@ pub struct SsTableBuilder {
impl SsTableBuilder {
/// Create a builder based on target block size.
pub fn new(block_size: usize) -> Self {
unimplemented!()
Self {
builder: BlockBuilder::new(block_size),
first_key: Vec::new(),
last_key: Vec::new(),
data: Vec::new(),
meta: Vec::new(),
block_size,
}
}
/// Adds a key-value pair to SSTable.
@@ -44,7 +57,38 @@ impl SsTableBuilder {
/// Note: You should split a new block when the current block is full.(`std::mem::replace` may
/// be helpful here)
pub fn add(&mut self, key: KeySlice, value: &[u8]) {
unimplemented!()
let is_added = self.builder.add(key, value);
if is_added {
if self.first_key.is_empty() {
self.first_key.put_slice(key.into_inner());
} else {
self.last_key.clear();
self.last_key.put_slice(key.into_inner());
}
} else {
// Current block is full, need to finish it and start a new one.
let block_meta = BlockMeta {
first_key: key::Key::from_bytes(Bytes::copy_from_slice(&self.first_key)),
last_key: key::Key::from_bytes(Bytes::copy_from_slice(&self.last_key)),
offset: self.data.len(),
};
self.meta.push(block_meta);
// Finish the current block and get its data
let old_builder =
std::mem::replace(&mut self.builder, BlockBuilder::new(self.block_size));
let block_data = old_builder.build().encode();
self.data.extend_from_slice(&block_data);
// Add the key-value pair to the new block
// let _ = self.builder.add(key, value);
// self.first_key.clear();
// self.first_key.extend_from_slice(key.into_inner());
// self.last_key.clear();
// or just recursively call add
self.add(key, value);
}
}
/// Get the estimated size of the SSTable.
@@ -52,7 +96,7 @@ impl SsTableBuilder {
/// Since the data blocks contain much more data than meta blocks, just return the size of data
/// blocks here.
pub fn estimated_size(&self) -> usize {
unimplemented!()
self.data.len()
}
/// Builds the SSTable and writes it to the given path. Use the `FileObject` structure to manipulate the disk objects.
@@ -62,7 +106,37 @@ impl SsTableBuilder {
block_cache: Option<Arc<BlockCache>>,
path: impl AsRef<Path>,
) -> Result<SsTable> {
unimplemented!()
let block = self.builder.build();
self.meta.push(BlockMeta {
first_key: key::Key::from_bytes(Bytes::copy_from_slice(&self.first_key)),
last_key: key::Key::from_bytes(Bytes::copy_from_slice(&self.last_key)),
offset: self.data.len(),
});
self.data.extend_from_slice(&block.encode());
let meta_offset = self.data.len();
BlockMeta::encode_block_meta(&self.meta, &mut self.data);
self.data.extend_from_slice(&(meta_offset as u32).to_le_bytes());
let file = FileObject::create(path.as_ref(), self.data)?;
let first_key = self.meta.first().unwrap().first_key.clone();
let last_key = self.meta.last().unwrap().last_key.clone();
Ok(SsTable {
id,
file,
first_key,
last_key,
block_meta: self.meta,
block_meta_offset: meta_offset,
block_cache,
bloom: None,
max_ts: 0,
})
}
#[cfg(test)]

View File

@@ -32,24 +32,71 @@ pub struct SsTableIterator {
impl SsTableIterator {
/// Create a new iterator and seek to the first key-value pair in the first data block.
pub fn create_and_seek_to_first(table: Arc<SsTable>) -> Result<Self> {
unimplemented!()
let block = table.read_block_cached(0)?;
let blk_iter = BlockIterator::create_and_seek_to_first(block);
let iter = Self {
table,
blk_iter,
blk_idx: 0,
};
// iter.seek_to_first()?;
Ok(iter)
}
/// Seek to the first key-value pair in the first data block.
pub fn seek_to_first(&mut self) -> Result<()> {
unimplemented!()
self.blk_idx = 0;
let block = self.table.read_block_cached(self.blk_idx)?;
self.blk_iter = BlockIterator::create_and_seek_to_first(block);
Ok(())
}
/// Create a new iterator and seek to the first key-value pair which >= `key`.
pub fn create_and_seek_to_key(table: Arc<SsTable>, key: KeySlice) -> Result<Self> {
unimplemented!()
let mut iter = Self {
table: table.clone(),
blk_iter: BlockIterator::create_and_seek_to_first(table.read_block_cached(0)?),
blk_idx: 0,
};
iter.seek_to_key(key)?;
Ok(iter)
}
/// Seek to the first key-value pair which >= `key`.
/// Note: You probably want to review the handout for detailed explanation when implementing
/// this function.
pub fn seek_to_key(&mut self, key: KeySlice) -> Result<()> {
unimplemented!()
let metas = &self.table.block_meta;
let mut left = 0;
let mut right = metas.len();
while left < right {
let mid = left + (right - left) / 2;
let mid_first_key = &metas[mid].first_key;
if mid_first_key.as_key_slice() <= key {
left = mid + 1;
} else {
right = mid;
}
}
self.blk_idx = if left == 0 { 0 } else { left - 1 };
let block = self.table.read_block_cached(self.blk_idx)?;
self.blk_iter = BlockIterator::create_and_seek_to_key(block, key);
// 如果在当前的block没有这个key说明可能在下一个block里面
if !self.blk_iter.is_valid() && self.blk_idx + 1 < self.table.num_of_blocks() {
self.blk_idx += 1;
let next_block = self.table.read_block_cached(self.blk_idx)?;
self.blk_iter = BlockIterator::create_and_seek_to_first(next_block);
}
Ok(())
}
}
@@ -57,23 +104,32 @@ impl StorageIterator for SsTableIterator {
type KeyType<'a> = KeySlice<'a>;
/// Return the `key` that's held by the underlying block iterator.
fn key(&self) -> KeySlice {
unimplemented!()
fn key(&'_ self) -> KeySlice<'_> {
self.blk_iter.key()
}
/// Return the `value` that's held by the underlying block iterator.
fn value(&self) -> &[u8] {
unimplemented!()
self.blk_iter.value()
}
/// Return whether the current block iterator is valid or not.
fn is_valid(&self) -> bool {
unimplemented!()
self.blk_iter.is_valid()
}
/// Move to the next `key` in the block.
/// Note: You may want to check if the current block iterator is valid after the move.
fn next(&mut self) -> Result<()> {
unimplemented!()
self.blk_iter.next();
if !self.blk_iter.is_valid() && self.blk_idx + 1 < self.table.num_of_blocks() {
self.blk_idx += 1;
let next_block = self.table.read_block_cached(self.blk_idx)?;
self.blk_iter = BlockIterator::create_and_seek_to_first(next_block);
}
Ok(())
}
}

View File

@@ -1,16 +1,7 @@
// Copyright (c) 2022-2025 Alex Chi Z
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! DO NOT MODIFY -- Mini-LSM tests modules
//! This file will be automatically rewritten by the copy-test command.
mod harness;
mod week1_day1;
mod week1_day2;
mod week1_day3;

View File

@@ -0,0 +1,462 @@
#![allow(dead_code)] // REMOVE THIS LINE once all modules are complete
// Copyright (c) 2022-2025 Alex Chi Z
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
collections::BTreeMap, ops::Bound, os::unix::fs::MetadataExt, path::Path, sync::Arc,
time::Duration,
};
use anyhow::{Result, bail};
use bytes::Bytes;
use crate::{
compact::{
CompactionOptions, LeveledCompactionOptions, SimpleLeveledCompactionOptions,
TieredCompactionOptions,
},
iterators::{StorageIterator, merge_iterator::MergeIterator},
key::{KeySlice, TS_ENABLED},
lsm_storage::{BlockCache, LsmStorageInner, LsmStorageState, MiniLsm},
table::{SsTable, SsTableBuilder, SsTableIterator},
};
#[derive(Clone)]
pub struct MockIterator {
pub data: Vec<(Bytes, Bytes)>,
pub error_when: Option<usize>,
pub index: usize,
}
impl MockIterator {
pub fn new(data: Vec<(Bytes, Bytes)>) -> Self {
Self {
data,
index: 0,
error_when: None,
}
}
pub fn new_with_error(data: Vec<(Bytes, Bytes)>, error_when: usize) -> Self {
Self {
data,
index: 0,
error_when: Some(error_when),
}
}
}
impl StorageIterator for MockIterator {
type KeyType<'a> = KeySlice<'a>;
fn next(&mut self) -> Result<()> {
if self.index < self.data.len() {
self.index += 1;
}
if let Some(error_when) = self.error_when {
if self.index == error_when {
bail!("fake error!");
}
}
Ok(())
}
fn key(&self) -> KeySlice {
if let Some(error_when) = self.error_when {
if self.index >= error_when {
panic!("invalid access after next returns an error!");
}
}
KeySlice::for_testing_from_slice_no_ts(self.data[self.index].0.as_ref())
}
fn value(&self) -> &[u8] {
if let Some(error_when) = self.error_when {
if self.index >= error_when {
panic!("invalid access after next returns an error!");
}
}
self.data[self.index].1.as_ref()
}
fn is_valid(&self) -> bool {
if let Some(error_when) = self.error_when {
if self.index >= error_when {
panic!("invalid access after next returns an error!");
}
}
self.index < self.data.len()
}
}
pub fn as_bytes(x: &[u8]) -> Bytes {
Bytes::copy_from_slice(x)
}
pub fn check_iter_result_by_key<I>(iter: &mut I, expected: Vec<(Bytes, Bytes)>)
where
I: for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
{
for (k, v) in expected {
assert!(iter.is_valid());
assert_eq!(
k,
iter.key().for_testing_key_ref(),
"expected key: {:?}, actual key: {:?}",
k,
as_bytes(iter.key().for_testing_key_ref()),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
iter.next().unwrap();
}
assert!(
!iter.is_valid(),
"iterator should not be valid at the end of the check"
);
}
pub fn check_iter_result_by_key_and_ts<I>(iter: &mut I, expected: Vec<((Bytes, u64), Bytes)>)
where
I: for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
{
for ((k, ts), v) in expected {
assert!(iter.is_valid());
assert_eq!(
(&k[..], ts),
(
iter.key().for_testing_key_ref(),
iter.key().for_testing_ts()
),
"expected key: {:?}@{}, actual key: {:?}@{}",
k,
ts,
as_bytes(iter.key().for_testing_key_ref()),
iter.key().for_testing_ts(),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
iter.next().unwrap();
}
assert!(!iter.is_valid());
}
pub fn check_lsm_iter_result_by_key<I>(iter: &mut I, expected: Vec<(Bytes, Bytes)>)
where
I: for<'a> StorageIterator<KeyType<'a> = &'a [u8]>,
{
for (k, v) in expected {
assert!(iter.is_valid());
assert_eq!(
k,
iter.key(),
"expected key: {:?}, actual key: {:?}",
k,
as_bytes(iter.key()),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
iter.next().unwrap();
}
assert!(!iter.is_valid());
}
pub fn expect_iter_error(mut iter: impl StorageIterator) {
loop {
match iter.next() {
Ok(_) if iter.is_valid() => continue,
Ok(_) => panic!("expect an error"),
Err(_) => break,
}
}
}
pub fn generate_sst(
id: usize,
path: impl AsRef<Path>,
data: Vec<(Bytes, Bytes)>,
block_cache: Option<Arc<BlockCache>>,
) -> SsTable {
let mut builder = SsTableBuilder::new(128);
for (key, value) in data {
builder.add(KeySlice::for_testing_from_slice_no_ts(&key[..]), &value[..]);
}
builder.build(id, block_cache, path.as_ref()).unwrap()
}
pub fn generate_sst_with_ts(
id: usize,
path: impl AsRef<Path>,
data: Vec<((Bytes, u64), Bytes)>,
block_cache: Option<Arc<BlockCache>>,
) -> SsTable {
let mut builder = SsTableBuilder::new(128);
for ((key, ts), value) in data {
builder.add(
KeySlice::for_testing_from_slice_with_ts(&key[..], ts),
&value[..],
);
}
builder.build(id, block_cache, path.as_ref()).unwrap()
}
pub fn sync(storage: &LsmStorageInner) {
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
storage.force_flush_next_imm_memtable().unwrap();
}
pub fn compaction_bench(storage: Arc<MiniLsm>) {
let mut key_map = BTreeMap::<usize, usize>::new();
let gen_key = |i| format!("{:010}", i); // 10B
let gen_value = |i| format!("{:0110}", i); // 110B
let mut max_key = 0;
let overlaps = if TS_ENABLED { 10000 } else { 20000 };
for iter in 0..10 {
let range_begin = iter * 5000;
for i in range_begin..(range_begin + overlaps) {
// 120B per key, 4MB data populated
let key: String = gen_key(i);
let version = key_map.get(&i).copied().unwrap_or_default() + 1;
let value = gen_value(version);
key_map.insert(i, version);
storage.put(key.as_bytes(), value.as_bytes()).unwrap();
max_key = max_key.max(i);
}
}
std::thread::sleep(Duration::from_secs(1)); // wait until all memtables flush
while {
let snapshot = storage.inner.state.read();
!snapshot.imm_memtables.is_empty()
} {
storage.inner.force_flush_next_imm_memtable().unwrap();
}
let mut prev_snapshot = storage.inner.state.read().clone();
while {
std::thread::sleep(Duration::from_secs(1));
let snapshot = storage.inner.state.read().clone();
let to_cont = prev_snapshot.levels != snapshot.levels
|| prev_snapshot.l0_sstables != snapshot.l0_sstables;
prev_snapshot = snapshot;
to_cont
} {
println!("waiting for compaction to converge");
}
let mut expected_key_value_pairs = Vec::new();
for i in 0..(max_key + 40000) {
let key = gen_key(i);
let value = storage.get(key.as_bytes()).unwrap();
if let Some(val) = key_map.get(&i) {
let expected_value = gen_value(*val);
assert_eq!(value, Some(Bytes::from(expected_value.clone())));
expected_key_value_pairs.push((Bytes::from(key), Bytes::from(expected_value)));
} else {
assert!(value.is_none());
}
}
check_lsm_iter_result_by_key(
&mut storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
expected_key_value_pairs,
);
storage.dump_structure();
println!(
"This test case does not guarantee your compaction algorithm produces a LSM state as expected. It only does minimal checks on the size of the levels. Please use the compaction simulator to check if the compaction is correctly going on."
);
}
pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
let state = storage.inner.state.read().clone();
let compaction_options = storage.inner.options.compaction_options.clone();
let mut level_size = Vec::new();
let l0_sst_num = state.l0_sstables.len();
for (_, files) in &state.levels {
let size = match &compaction_options {
CompactionOptions::Leveled(_) => files
.iter()
.map(|x| state.sstables.get(x).as_ref().unwrap().table_size())
.sum::<u64>(),
CompactionOptions::Simple(_) | CompactionOptions::Tiered(_) => files.len() as u64,
_ => unreachable!(),
};
level_size.push(size);
}
let extra_iterators = if TS_ENABLED {
1 /* txn local iterator for OCC */
} else {
0
};
let num_iters = storage
.scan(Bound::Unbounded, Bound::Unbounded)
.unwrap()
.num_active_iterators();
let num_memtables = storage.inner.state.read().imm_memtables.len() + 1;
match compaction_options {
CompactionOptions::NoCompaction => unreachable!(),
CompactionOptions::Simple(SimpleLeveledCompactionOptions {
size_ratio_percent,
level0_file_num_compaction_trigger,
max_levels,
}) => {
assert!(l0_sst_num < level0_file_num_compaction_trigger);
assert!(level_size.len() <= max_levels);
for idx in 1..level_size.len() {
let prev_size = level_size[idx - 1];
let this_size = level_size[idx];
if prev_size == 0 && this_size == 0 {
continue;
}
assert!(
this_size as f64 / prev_size as f64 >= size_ratio_percent as f64 / 100.0,
"L{}/L{}, {}/{}<{}%",
state.levels[idx - 1].0,
state.levels[idx].0,
this_size,
prev_size,
size_ratio_percent
);
}
assert!(
num_iters <= l0_sst_num + num_memtables + max_levels + extra_iterators,
"we found {num_iters} iterators in your implementation, (l0_sst_num={l0_sst_num}, num_memtables={num_memtables}, max_levels={max_levels}) did you use concat iterators?"
);
}
CompactionOptions::Leveled(LeveledCompactionOptions {
level_size_multiplier,
level0_file_num_compaction_trigger,
max_levels,
..
}) => {
assert!(l0_sst_num < level0_file_num_compaction_trigger);
assert!(level_size.len() <= max_levels);
let last_level_size = *level_size.last().unwrap();
let mut multiplier = 1.0;
for idx in (1..level_size.len()).rev() {
multiplier *= level_size_multiplier as f64;
let this_size = level_size[idx - 1];
assert!(
// do not add hard requirement on level size multiplier considering bloom filters...
this_size as f64 / last_level_size as f64 <= 1.0 / multiplier + 0.5,
"L{}/L_max, {}/{}>>1.0/{}",
state.levels[idx - 1].0,
this_size,
last_level_size,
multiplier
);
}
assert!(
num_iters <= l0_sst_num + num_memtables + max_levels + extra_iterators,
"we found {num_iters} iterators in your implementation, (l0_sst_num={l0_sst_num}, num_memtables={num_memtables}, max_levels={max_levels}) did you use concat iterators?"
);
}
CompactionOptions::Tiered(TieredCompactionOptions {
num_tiers,
max_size_amplification_percent,
size_ratio,
min_merge_width,
..
}) => {
let size_ratio_trigger = (100.0 + size_ratio as f64) / 100.0;
assert_eq!(l0_sst_num, 0);
assert!(level_size.len() <= num_tiers);
let mut sum_size = level_size[0];
for idx in 1..level_size.len() {
let this_size = level_size[idx];
if level_size.len() > min_merge_width {
assert!(
sum_size as f64 / this_size as f64 <= size_ratio_trigger,
"violation of size ratio: sum(⬆L{})/L{}, {}/{}>{}",
state.levels[idx - 1].0,
state.levels[idx].0,
sum_size,
this_size,
size_ratio_trigger
);
}
if idx + 1 == level_size.len() {
assert!(
sum_size as f64 / this_size as f64
<= max_size_amplification_percent as f64 / 100.0,
"violation of space amp: sum(⬆L{})/L{}, {}/{}>{}%",
state.levels[idx - 1].0,
state.levels[idx].0,
sum_size,
this_size,
max_size_amplification_percent
);
}
sum_size += this_size;
}
assert!(
num_iters <= num_memtables + num_tiers + extra_iterators,
"we found {num_iters} iterators in your implementation, (num_memtables={num_memtables}, num_tiers={num_tiers}) did you use concat iterators?"
);
}
}
}
pub fn dump_files_in_dir(path: impl AsRef<Path>) {
println!("--- DIR DUMP ---");
for f in path.as_ref().read_dir().unwrap() {
let f = f.unwrap();
print!("{}", f.path().display());
println!(
", size={:.3}KB",
f.metadata().unwrap().size() as f64 / 1024.0
);
}
}
pub fn construct_merge_iterator_over_storage(
state: &LsmStorageState,
) -> MergeIterator<SsTableIterator> {
let mut iters = Vec::new();
for t in &state.l0_sstables {
iters.push(Box::new(
SsTableIterator::create_and_seek_to_first(state.sstables.get(t).cloned().unwrap())
.unwrap(),
));
}
for (_, files) in &state.levels {
for f in files {
iters.push(Box::new(
SsTableIterator::create_and_seek_to_first(state.sstables.get(f).cloned().unwrap())
.unwrap(),
));
}
}
MergeIterator::create(iters)
}

View File

@@ -0,0 +1,163 @@
// Copyright (c) 2022-2025 Alex Chi Z
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use tempfile::tempdir;
use crate::{
lsm_storage::{LsmStorageInner, LsmStorageOptions},
mem_table::MemTable,
};
#[test]
fn test_task1_memtable_get() {
let memtable = MemTable::create(0);
memtable.for_testing_put_slice(b"key1", b"value1").unwrap();
memtable.for_testing_put_slice(b"key2", b"value2").unwrap();
memtable.for_testing_put_slice(b"key3", b"value3").unwrap();
assert_eq!(
&memtable.for_testing_get_slice(b"key1").unwrap()[..],
b"value1"
);
assert_eq!(
&memtable.for_testing_get_slice(b"key2").unwrap()[..],
b"value2"
);
assert_eq!(
&memtable.for_testing_get_slice(b"key3").unwrap()[..],
b"value3"
);
}
#[test]
fn test_task1_memtable_overwrite() {
let memtable = MemTable::create(0);
memtable.for_testing_put_slice(b"key1", b"value1").unwrap();
memtable.for_testing_put_slice(b"key2", b"value2").unwrap();
memtable.for_testing_put_slice(b"key3", b"value3").unwrap();
memtable.for_testing_put_slice(b"key1", b"value11").unwrap();
memtable.for_testing_put_slice(b"key2", b"value22").unwrap();
memtable.for_testing_put_slice(b"key3", b"value33").unwrap();
assert_eq!(
&memtable.for_testing_get_slice(b"key1").unwrap()[..],
b"value11"
);
assert_eq!(
&memtable.for_testing_get_slice(b"key2").unwrap()[..],
b"value22"
);
assert_eq!(
&memtable.for_testing_get_slice(b"key3").unwrap()[..],
b"value33"
);
}
#[test]
fn test_task2_storage_integration() {
let dir = tempdir().unwrap();
let storage = Arc::new(
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(),
);
assert_eq!(&storage.get(b"0").unwrap(), &None);
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233");
assert_eq!(&storage.get(b"2").unwrap().unwrap()[..], b"2333");
assert_eq!(&storage.get(b"3").unwrap().unwrap()[..], b"23333");
storage.delete(b"2").unwrap();
assert!(storage.get(b"2").unwrap().is_none());
storage.delete(b"0").unwrap(); // should NOT report any error
}
#[test]
fn test_task3_storage_integration() {
let dir = tempdir().unwrap();
let storage = Arc::new(
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(),
);
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
assert_eq!(storage.state.read().imm_memtables.len(), 1);
let previous_approximate_size = storage.state.read().imm_memtables[0].approximate_size();
assert!(previous_approximate_size >= 15);
storage.put(b"1", b"2333").unwrap();
storage.put(b"2", b"23333").unwrap();
storage.put(b"3", b"233333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
assert_eq!(storage.state.read().imm_memtables.len(), 2);
assert!(
storage.state.read().imm_memtables[1].approximate_size() == previous_approximate_size,
"wrong order of memtables?"
);
assert!(storage.state.read().imm_memtables[0].approximate_size() > previous_approximate_size);
}
#[test]
fn test_task3_freeze_on_capacity() {
let dir = tempdir().unwrap();
let mut options = LsmStorageOptions::default_for_week1_test();
options.target_sst_size = 1024;
options.num_memtable_limit = 1000;
let storage = Arc::new(LsmStorageInner::open(dir.path(), options).unwrap());
for _ in 0..1000 {
storage.put(b"1", b"2333").unwrap();
}
let num_imm_memtables = storage.state.read().imm_memtables.len();
print!("num_imm_memtables = {}\n", num_imm_memtables);
assert!(num_imm_memtables >= 1, "no memtable frozen?");
for _ in 0..1000 {
storage.delete(b"1").unwrap();
}
assert!(
storage.state.read().imm_memtables.len() > num_imm_memtables,
"no more memtable frozen?"
);
}
#[test]
fn test_task4_storage_integration() {
let dir = tempdir().unwrap();
let storage = Arc::new(
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(),
);
assert_eq!(&storage.get(b"0").unwrap(), &None);
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
storage.delete(b"1").unwrap();
storage.delete(b"2").unwrap();
storage.put(b"3", b"2333").unwrap();
storage.put(b"4", b"23333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
storage.put(b"1", b"233333").unwrap();
storage.put(b"3", b"233333").unwrap();
assert_eq!(storage.state.read().imm_memtables.len(), 2);
assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233333");
assert_eq!(&storage.get(b"2").unwrap(), &None);
assert_eq!(&storage.get(b"3").unwrap().unwrap()[..], b"233333");
assert_eq!(&storage.get(b"4").unwrap().unwrap()[..], b"23333");
}

View File

@@ -0,0 +1,331 @@
// Copyright (c) 2022-2025 Alex Chi Z
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{ops::Bound, sync::Arc};
use bytes::Bytes;
use tempfile::tempdir;
use crate::{
iterators::{StorageIterator, merge_iterator::MergeIterator},
lsm_iterator::FusedIterator,
lsm_storage::{LsmStorageInner, LsmStorageOptions},
mem_table::MemTable,
tests::harness::check_lsm_iter_result_by_key,
};
use super::harness::{MockIterator, check_iter_result_by_key, expect_iter_error};
#[test]
fn test_task1_memtable_iter() {
use std::ops::Bound;
let memtable = MemTable::create(0);
memtable.for_testing_put_slice(b"key1", b"value1").unwrap();
memtable.for_testing_put_slice(b"key2", b"value2").unwrap();
memtable.for_testing_put_slice(b"key3", b"value3").unwrap();
{
let mut iter = memtable.for_testing_scan_slice(Bound::Unbounded, Bound::Unbounded);
assert_eq!(iter.key().for_testing_key_ref(), b"key1");
assert_eq!(iter.value(), b"value1");
assert!(iter.is_valid());
iter.next().unwrap();
assert_eq!(iter.key().for_testing_key_ref(), b"key2");
assert_eq!(iter.value(), b"value2");
assert!(iter.is_valid());
iter.next().unwrap();
assert_eq!(iter.key().for_testing_key_ref(), b"key3");
assert_eq!(iter.value(), b"value3");
assert!(iter.is_valid());
iter.next().unwrap();
assert!(!iter.is_valid());
}
{
let mut iter =
memtable.for_testing_scan_slice(Bound::Included(b"key1"), Bound::Included(b"key2"));
assert_eq!(iter.key().for_testing_key_ref(), b"key1");
assert_eq!(iter.value(), b"value1");
assert!(iter.is_valid());
iter.next().unwrap();
assert_eq!(iter.key().for_testing_key_ref(), b"key2");
assert_eq!(iter.value(), b"value2");
assert!(iter.is_valid());
iter.next().unwrap();
assert!(!iter.is_valid());
}
{
let mut iter =
memtable.for_testing_scan_slice(Bound::Excluded(b"key1"), Bound::Excluded(b"key3"));
assert_eq!(iter.key().for_testing_key_ref(), b"key2");
assert_eq!(iter.value(), b"value2");
assert!(iter.is_valid());
iter.next().unwrap();
assert!(!iter.is_valid());
}
}
#[test]
fn test_task1_empty_memtable_iter() {
use std::ops::Bound;
let memtable = MemTable::create(0);
{
let iter =
memtable.for_testing_scan_slice(Bound::Excluded(b"key1"), Bound::Excluded(b"key3"));
assert!(!iter.is_valid());
}
{
let iter =
memtable.for_testing_scan_slice(Bound::Included(b"key1"), Bound::Included(b"key2"));
assert!(!iter.is_valid());
}
{
let iter = memtable.for_testing_scan_slice(Bound::Unbounded, Bound::Unbounded);
assert!(!iter.is_valid());
}
}
#[test]
fn test_task2_merge_1() {
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("e"), Bytes::new()),
]);
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let i3 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.3")),
(Bytes::from("c"), Bytes::from("3.3")),
(Bytes::from("d"), Bytes::from("4.3")),
]);
let mut iter = MergeIterator::create(vec![
Box::new(i1.clone()),
Box::new(i2.clone()),
Box::new(i3.clone()),
]);
check_iter_result_by_key(
&mut iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("d"), Bytes::from("4.2")),
(Bytes::from("e"), Bytes::new()),
],
);
let mut iter = MergeIterator::create(vec![Box::new(i3), Box::new(i1), Box::new(i2)]);
check_iter_result_by_key(
&mut iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.3")),
(Bytes::from("c"), Bytes::from("3.3")),
(Bytes::from("d"), Bytes::from("4.3")),
(Bytes::from("e"), Bytes::new()),
],
);
}
#[test]
fn test_task2_merge_2() {
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new(vec![
(Bytes::from("d"), Bytes::from("1.2")),
(Bytes::from("e"), Bytes::from("2.2")),
(Bytes::from("f"), Bytes::from("3.2")),
(Bytes::from("g"), Bytes::from("4.2")),
]);
let i3 = MockIterator::new(vec![
(Bytes::from("h"), Bytes::from("1.3")),
(Bytes::from("i"), Bytes::from("2.3")),
(Bytes::from("j"), Bytes::from("3.3")),
(Bytes::from("k"), Bytes::from("4.3")),
]);
let i4 = MockIterator::new(vec![]);
let result = vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("d"), Bytes::from("1.2")),
(Bytes::from("e"), Bytes::from("2.2")),
(Bytes::from("f"), Bytes::from("3.2")),
(Bytes::from("g"), Bytes::from("4.2")),
(Bytes::from("h"), Bytes::from("1.3")),
(Bytes::from("i"), Bytes::from("2.3")),
(Bytes::from("j"), Bytes::from("3.3")),
(Bytes::from("k"), Bytes::from("4.3")),
];
let mut iter = MergeIterator::create(vec![
Box::new(i1.clone()),
Box::new(i2.clone()),
Box::new(i3.clone()),
Box::new(i4.clone()),
]);
check_iter_result_by_key(&mut iter, result.clone());
let mut iter = MergeIterator::create(vec![
Box::new(i2.clone()),
Box::new(i4.clone()),
Box::new(i3.clone()),
Box::new(i1.clone()),
]);
check_iter_result_by_key(&mut iter, result.clone());
let mut iter =
MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]);
check_iter_result_by_key(&mut iter, result);
}
#[test]
fn test_task2_merge_empty() {
let mut iter = MergeIterator::<MockIterator>::create(vec![]);
check_iter_result_by_key(&mut iter, vec![]);
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new(vec![]);
let mut iter = MergeIterator::<MockIterator>::create(vec![Box::new(i1), Box::new(i2)]);
check_iter_result_by_key(
&mut iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
],
);
}
#[test]
fn test_task2_merge_error() {
let mut iter = MergeIterator::<MockIterator>::create(vec![]);
check_iter_result_by_key(&mut iter, vec![]);
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new_with_error(
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
],
1,
);
let iter = MergeIterator::<MockIterator>::create(vec![
Box::new(i1.clone()),
Box::new(i1),
Box::new(i2),
]);
// your implementation should correctly throw an error instead of panic
expect_iter_error(iter);
}
#[test]
fn test_task3_fused_iterator() {
let iter = MockIterator::new(vec![]);
let mut fused_iter = FusedIterator::new(iter);
assert!(!fused_iter.is_valid());
fused_iter.next().unwrap();
fused_iter.next().unwrap();
fused_iter.next().unwrap();
assert!(!fused_iter.is_valid());
let iter = MockIterator::new_with_error(
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("a"), Bytes::from("1.1")),
],
1,
);
let mut fused_iter = FusedIterator::new(iter);
assert!(fused_iter.is_valid());
assert!(fused_iter.next().is_err());
assert!(!fused_iter.is_valid());
assert!(fused_iter.next().is_err());
assert!(fused_iter.next().is_err());
}
#[test]
fn test_task4_integration() {
let dir = tempdir().unwrap();
let storage = Arc::new(
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(),
);
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
storage.delete(b"1").unwrap();
storage.delete(b"2").unwrap();
storage.put(b"3", b"2333").unwrap();
storage.put(b"4", b"23333").unwrap();
storage
.force_freeze_memtable(&storage.state_lock.lock())
.unwrap();
storage.put(b"1", b"233333").unwrap();
storage.put(b"3", b"233333").unwrap();
{
let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap();
check_lsm_iter_result_by_key(
&mut iter,
vec![
(Bytes::from_static(b"1"), Bytes::from_static(b"233333")),
(Bytes::from_static(b"3"), Bytes::from_static(b"233333")),
(Bytes::from_static(b"4"), Bytes::from_static(b"23333")),
],
);
assert!(!iter.is_valid());
iter.next().unwrap();
iter.next().unwrap();
iter.next().unwrap();
assert!(!iter.is_valid());
}
{
let mut iter = storage
.scan(Bound::Included(b"2"), Bound::Included(b"3"))
.unwrap();
check_lsm_iter_result_by_key(
&mut iter,
vec![(Bytes::from_static(b"3"), Bytes::from_static(b"233333"))],
);
assert!(!iter.is_valid());
iter.next().unwrap();
iter.next().unwrap();
iter.next().unwrap();
assert!(!iter.is_valid());
}
}

View File

@@ -0,0 +1,161 @@
// Copyright (c) 2022-2025 Alex Chi Z
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use bytes::Bytes;
use crate::{
block::{Block, BlockBuilder, BlockIterator},
key::{KeySlice, KeyVec},
};
#[test]
fn test_block_build_single_key() {
let mut builder = BlockBuilder::new(16);
assert!(builder.add(KeySlice::for_testing_from_slice_no_ts(b"233"), b"233333"));
builder.build();
}
#[test]
fn test_block_build_full() {
let mut builder = BlockBuilder::new(16);
assert!(builder.add(KeySlice::for_testing_from_slice_no_ts(b"11"), b"11"));
assert!(!builder.add(KeySlice::for_testing_from_slice_no_ts(b"22"), b"22"));
builder.build();
}
#[test]
fn test_block_build_large_1() {
let mut builder = BlockBuilder::new(16);
assert!(builder.add(
KeySlice::for_testing_from_slice_no_ts(b"11"),
&b"1".repeat(100)
));
builder.build();
}
#[test]
fn test_block_build_large_2() {
let mut builder = BlockBuilder::new(16);
assert!(builder.add(KeySlice::for_testing_from_slice_no_ts(b"11"), b"1"));
assert!(!builder.add(
KeySlice::for_testing_from_slice_no_ts(b"11"),
&b"1".repeat(100)
));
}
fn key_of(idx: usize) -> KeyVec {
KeyVec::for_testing_from_vec_no_ts(format!("key_{:03}", idx * 5).into_bytes())
}
fn value_of(idx: usize) -> Vec<u8> {
format!("value_{:010}", idx).into_bytes()
}
fn num_of_keys() -> usize {
100
}
fn generate_block() -> Block {
let mut builder = BlockBuilder::new(10000);
for idx in 0..num_of_keys() {
let key = key_of(idx);
let value = value_of(idx);
assert!(builder.add(key.as_key_slice(), &value[..]));
}
builder.build()
}
#[test]
fn test_block_build_all() {
generate_block();
}
#[test]
fn test_block_encode() {
let block = generate_block();
block.encode();
}
#[test]
fn test_block_decode() {
let block = generate_block();
let encoded = block.encode();
let decoded_block = Block::decode(&encoded);
assert_eq!(block.offsets, decoded_block.offsets);
assert_eq!(block.data, decoded_block.data);
}
fn as_bytes(x: &[u8]) -> Bytes {
Bytes::copy_from_slice(x)
}
#[test]
fn test_block_iterator() {
let block = Arc::new(generate_block());
let mut iter = BlockIterator::create_and_seek_to_first(block);
for _ in 0..5 {
for i in 0..num_of_keys() {
let key = iter.key();
let value = iter.value();
assert_eq!(
key.for_testing_key_ref(),
key_of(i).for_testing_key_ref(),
"expected key: {:?}, actual key: {:?}",
as_bytes(key_of(i).for_testing_key_ref()),
as_bytes(key.for_testing_key_ref())
);
assert_eq!(
value,
value_of(i),
"expected value: {:?}, actual value: {:?}",
as_bytes(&value_of(i)),
as_bytes(value)
);
iter.next();
}
iter.seek_to_first();
}
}
#[test]
fn test_block_seek_key() {
let block = Arc::new(generate_block());
let mut iter = BlockIterator::create_and_seek_to_key(block, key_of(0).as_key_slice());
for offset in 1..=5 {
for i in 0..num_of_keys() {
let key = iter.key();
let value = iter.value();
assert_eq!(
key.for_testing_key_ref(),
key_of(i).for_testing_key_ref(),
"expected key: {:?}, actual key: {:?}",
as_bytes(key_of(i).for_testing_key_ref()),
as_bytes(key.for_testing_key_ref())
);
assert_eq!(
value,
value_of(i),
"expected value: {:?}, actual value: {:?}",
as_bytes(&value_of(i)),
as_bytes(value)
);
iter.seek_to_key(KeySlice::for_testing_from_slice_no_ts(
&format!("key_{:03}", i * 5 + offset).into_bytes(),
));
}
iter.seek_to_key(KeySlice::for_testing_from_slice_no_ts(b"k"));
}
}

View File

@@ -32,7 +32,9 @@ pub struct Wal {
impl Wal {
pub fn create(_path: impl AsRef<Path>) -> Result<Self> {
unimplemented!()
Ok(Self {
file: Arc::new(Mutex::new(BufWriter::new(File::create(_path)?))),
})
}
pub fn recover(_path: impl AsRef<Path>, _skiplist: &SkipMap<Bytes, Bytes>) -> Result<Self> {