fix compaction split bug and add 3.1 test
Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
@@ -2,6 +2,18 @@
|
||||
|
||||
During the refactor, you might need to change the signature of some functions from `&self` to `self: &Arc<Self>` as necessary.
|
||||
|
||||
## MemTable
|
||||
## Task 1: MemTable, Write-Ahead Log, and Read Path
|
||||
|
||||
## WAL
|
||||
Memtable store timestamp, change to scan, encode ts in wal
|
||||
|
||||
## Task 2: Write Path
|
||||
|
||||
assign mvcc object, take write lock, increase ts by 1
|
||||
|
||||
## Task 3: MVCC Compaction
|
||||
|
||||
keep all versions
|
||||
|
||||
## Task 4: LSM Iterator
|
||||
|
||||
return the latest version
|
||||
|
@@ -153,19 +153,22 @@ impl LsmStorageInner {
|
||||
}
|
||||
|
||||
let builder_inner = builder.as_mut().unwrap();
|
||||
builder_inner.add(iter.key(), iter.value());
|
||||
|
||||
if builder_inner.estimated_size() >= self.options.target_sst_size && !same_as_last_key {
|
||||
let sst_id = self.next_sst_id();
|
||||
let builder = builder.take().unwrap();
|
||||
let sst = Arc::new(builder.build(
|
||||
let sst_id: usize = self.next_sst_id();
|
||||
let old_builder = builder.take().unwrap();
|
||||
let sst = Arc::new(old_builder.build(
|
||||
sst_id,
|
||||
Some(self.block_cache.clone()),
|
||||
self.path_of_sst(sst_id),
|
||||
)?);
|
||||
new_sst.push(sst);
|
||||
builder = Some(SsTableBuilder::new(self.options.block_size));
|
||||
}
|
||||
|
||||
let builder_inner = builder.as_mut().unwrap();
|
||||
builder_inner.add(iter.key(), iter.value());
|
||||
|
||||
if !same_as_last_key {
|
||||
last_key.clear();
|
||||
last_key.extend(iter.key().key_ref());
|
||||
|
@@ -35,6 +35,10 @@ impl<T: AsRef<[u8]>> Key<T> {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.as_ref().is_empty()
|
||||
}
|
||||
|
||||
pub fn for_testing_ts(self) -> u64 {
|
||||
self.1
|
||||
}
|
||||
}
|
||||
|
||||
impl Key<Vec<u8>> {
|
||||
@@ -149,6 +153,10 @@ impl<'a> Key<&'a [u8]> {
|
||||
pub fn for_testing_from_slice_no_ts(slice: &'a [u8]) -> Self {
|
||||
Self(slice, TS_DEFAULT)
|
||||
}
|
||||
|
||||
pub fn for_testing_from_slice_with_ts(slice: &'a [u8], ts: u64) -> Self {
|
||||
Self(slice, ts)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsRef<[u8]> + Debug> Debug for Key<T> {
|
||||
|
@@ -12,3 +12,5 @@ mod week2_day3;
|
||||
mod week2_day4;
|
||||
// mod week2_day5;
|
||||
// mod week2_day6;
|
||||
mod week3_day1;
|
||||
mod week3_day2;
|
||||
|
54
mini-lsm-mvcc/src/tests/week3_day1.rs
Normal file
54
mini-lsm-mvcc/src/tests/week3_day1.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tempfile::tempdir;
|
||||
|
||||
use crate::key::KeySlice;
|
||||
use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator};
|
||||
|
||||
use super::harness::{check_iter_result_by_key_and_ts, generate_sst_with_ts};
|
||||
|
||||
#[test]
|
||||
fn test_sst_build_multi_version_simple() {
|
||||
let mut builder = SsTableBuilder::new(16);
|
||||
builder.add(
|
||||
KeySlice::for_testing_from_slice_with_ts(b"233", 233),
|
||||
b"233333",
|
||||
);
|
||||
builder.add(
|
||||
KeySlice::for_testing_from_slice_with_ts(b"233", 0),
|
||||
b"2333333",
|
||||
);
|
||||
let dir = tempdir().unwrap();
|
||||
builder.build_for_test(dir.path().join("1.sst")).unwrap();
|
||||
}
|
||||
|
||||
fn generate_test_data() -> Vec<((Bytes, u64), Bytes)> {
|
||||
(0..100)
|
||||
.map(|id| {
|
||||
(
|
||||
(Bytes::from(format!("key{:05}", id / 5)), 5 - (id % 5)),
|
||||
Bytes::from(format!("value{:05}", id)),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sst_build_multi_version_hard() {
|
||||
let dir = tempdir().unwrap();
|
||||
let data = generate_test_data();
|
||||
generate_sst_with_ts(1, dir.path().join("1.sst"), data.clone(), None);
|
||||
let sst = Arc::new(
|
||||
SsTable::open(
|
||||
1,
|
||||
None,
|
||||
FileObject::open(&dir.path().join("1.sst")).unwrap(),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
check_iter_result_by_key_and_ts(
|
||||
&mut SsTableIterator::create_and_seek_to_first(sst).unwrap(),
|
||||
data,
|
||||
);
|
||||
}
|
61
mini-lsm-mvcc/src/tests/week3_day2.rs
Normal file
61
mini-lsm-mvcc/src/tests/week3_day2.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use tempfile::tempdir;
|
||||
|
||||
use crate::{
|
||||
compact::CompactionOptions,
|
||||
lsm_storage::{LsmStorageOptions, MiniLsm},
|
||||
tests::harness::dump_files_in_dir,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_task_1_2_integration() {
|
||||
let dir = tempdir().unwrap();
|
||||
let mut options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction);
|
||||
options.enable_wal = true;
|
||||
let storage = MiniLsm::open(&dir, options.clone()).unwrap();
|
||||
let _txn = storage.new_txn().unwrap();
|
||||
for i in 0..=20000 {
|
||||
storage
|
||||
.put(b"0", format!("{:02000}", i).as_bytes())
|
||||
.unwrap();
|
||||
}
|
||||
std::thread::sleep(Duration::from_secs(1)); // wait until all memtables flush
|
||||
while {
|
||||
let snapshot = storage.inner.state.read();
|
||||
!snapshot.imm_memtables.is_empty()
|
||||
} {
|
||||
storage.inner.force_flush_next_imm_memtable().unwrap();
|
||||
}
|
||||
assert!(storage.inner.state.read().l0_sstables.len() > 1);
|
||||
storage.force_full_compaction().unwrap();
|
||||
storage.dump_structure();
|
||||
dump_files_in_dir(&dir);
|
||||
assert!(storage.inner.state.read().l0_sstables.is_empty());
|
||||
assert_eq!(storage.inner.state.read().levels.len(), 1);
|
||||
// same key in the same SST
|
||||
assert_eq!(storage.inner.state.read().levels[0].1.len(), 1);
|
||||
for i in 0..=100 {
|
||||
storage
|
||||
.put(b"1", format!("{:02000}", i).as_bytes())
|
||||
.unwrap();
|
||||
}
|
||||
storage
|
||||
.inner
|
||||
.force_freeze_memtable(&storage.inner.state_lock.lock())
|
||||
.unwrap();
|
||||
std::thread::sleep(Duration::from_secs(1)); // wait until all memtables flush
|
||||
while {
|
||||
let snapshot = storage.inner.state.read();
|
||||
!snapshot.imm_memtables.is_empty()
|
||||
} {
|
||||
storage.inner.force_flush_next_imm_memtable().unwrap();
|
||||
}
|
||||
storage.force_full_compaction().unwrap();
|
||||
storage.dump_structure();
|
||||
dump_files_in_dir(&dir);
|
||||
assert!(storage.inner.state.read().l0_sstables.is_empty());
|
||||
assert_eq!(storage.inner.state.read().levels.len(), 1);
|
||||
// same key in the same SST, now we should split two
|
||||
assert_eq!(storage.inner.state.read().levels[0].1.len(), 2);
|
||||
}
|
@@ -22,6 +22,10 @@ impl<T: AsRef<[u8]>> Key<T> {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.as_ref().is_empty()
|
||||
}
|
||||
|
||||
pub fn for_testing_ts(self) -> u64 {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
impl Key<Vec<u8>> {
|
||||
@@ -118,6 +122,10 @@ impl<'a> Key<&'a [u8]> {
|
||||
pub fn for_testing_from_slice_no_ts(slice: &'a [u8]) -> Self {
|
||||
Self(slice)
|
||||
}
|
||||
|
||||
pub fn for_testing_from_slice_with_ts(slice: &'a [u8], _ts: u64) -> Self {
|
||||
Self(slice)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsRef<[u8]> + Debug> Debug for Key<T> {
|
||||
|
@@ -22,6 +22,10 @@ impl<T: AsRef<[u8]>> Key<T> {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.as_ref().is_empty()
|
||||
}
|
||||
|
||||
pub fn for_testing_ts(self) -> u64 {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
impl Key<Vec<u8>> {
|
||||
@@ -118,6 +122,10 @@ impl<'a> Key<&'a [u8]> {
|
||||
pub fn for_testing_from_slice_no_ts(slice: &'a [u8]) -> Self {
|
||||
Self(slice)
|
||||
}
|
||||
|
||||
pub fn for_testing_from_slice_with_ts(slice: &'a [u8], _ts: u64) -> Self {
|
||||
Self(slice)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsRef<[u8]> + Debug> Debug for Key<T> {
|
||||
|
@@ -1,4 +1,7 @@
|
||||
use std::{collections::BTreeMap, ops::Bound, path::Path, sync::Arc, time::Duration};
|
||||
use std::{
|
||||
collections::BTreeMap, ops::Bound, os::unix::fs::MetadataExt, path::Path, sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use bytes::Bytes;
|
||||
@@ -111,6 +114,36 @@ where
|
||||
assert!(!iter.is_valid());
|
||||
}
|
||||
|
||||
pub fn check_iter_result_by_key_and_ts<I>(iter: &mut I, expected: Vec<((Bytes, u64), Bytes)>)
|
||||
where
|
||||
I: for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
|
||||
{
|
||||
for ((k, ts), v) in expected {
|
||||
assert!(iter.is_valid());
|
||||
assert_eq!(
|
||||
(&k[..], ts),
|
||||
(
|
||||
iter.key().for_testing_key_ref(),
|
||||
iter.key().for_testing_ts()
|
||||
),
|
||||
"expected key: {:?}@{}, actual key: {:?}@{}",
|
||||
k,
|
||||
ts,
|
||||
as_bytes(iter.key().for_testing_key_ref()),
|
||||
iter.key().for_testing_ts(),
|
||||
);
|
||||
assert_eq!(
|
||||
v,
|
||||
iter.value(),
|
||||
"expected value: {:?}, actual value: {:?}",
|
||||
v,
|
||||
as_bytes(iter.value()),
|
||||
);
|
||||
iter.next().unwrap();
|
||||
}
|
||||
assert!(!iter.is_valid());
|
||||
}
|
||||
|
||||
pub fn check_lsm_iter_result_by_key<I>(iter: &mut I, expected: Vec<(Bytes, Bytes)>)
|
||||
where
|
||||
I: for<'a> StorageIterator<KeyType<'a> = &'a [u8]>,
|
||||
@@ -159,6 +192,22 @@ pub fn generate_sst(
|
||||
builder.build(id, block_cache, path.as_ref()).unwrap()
|
||||
}
|
||||
|
||||
pub fn generate_sst_with_ts(
|
||||
id: usize,
|
||||
path: impl AsRef<Path>,
|
||||
data: Vec<((Bytes, u64), Bytes)>,
|
||||
block_cache: Option<Arc<BlockCache>>,
|
||||
) -> SsTable {
|
||||
let mut builder = SsTableBuilder::new(128);
|
||||
for ((key, ts), value) in data {
|
||||
builder.add(
|
||||
KeySlice::for_testing_from_slice_with_ts(&key[..], ts),
|
||||
&value[..],
|
||||
);
|
||||
}
|
||||
builder.build(id, block_cache, path.as_ref()).unwrap()
|
||||
}
|
||||
|
||||
pub fn sync(storage: &LsmStorageInner) {
|
||||
storage
|
||||
.force_freeze_memtable(&storage.state_lock.lock())
|
||||
@@ -184,6 +233,7 @@ pub fn compaction_bench(storage: Arc<MiniLsm>) {
|
||||
max_key = max_key.max(i);
|
||||
}
|
||||
}
|
||||
|
||||
std::thread::sleep(Duration::from_secs(1)); // wait until all memtables flush
|
||||
while {
|
||||
let snapshot = storage.inner.state.read();
|
||||
@@ -352,6 +402,11 @@ pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
|
||||
|
||||
pub fn dump_files_in_dir(path: impl AsRef<Path>) {
|
||||
for f in path.as_ref().read_dir().unwrap() {
|
||||
println!("{}", f.unwrap().path().display())
|
||||
let f = f.unwrap();
|
||||
println!(
|
||||
"{}, size={:.3}KB",
|
||||
f.path().display(),
|
||||
f.metadata().unwrap().size() as f64 / 1024.0
|
||||
)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user