From 37f2c5aff0bba5731c9f97aea923255413ee5a85 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Sun, 28 Jan 2024 16:01:40 +0800 Subject: [PATCH] fix compaction split bug and add 3.1 test Signed-off-by: Alex Chi --- .../src/week3-02-snapshot-read-part-1.md | 16 ++++- mini-lsm-mvcc/src/compact.rs | 11 ++-- mini-lsm-mvcc/src/key.rs | 8 +++ mini-lsm-mvcc/src/tests.rs | 2 + mini-lsm-mvcc/src/tests/week3_day1.rs | 54 ++++++++++++++++ mini-lsm-mvcc/src/tests/week3_day2.rs | 61 +++++++++++++++++++ mini-lsm-starter/src/key.rs | 8 +++ mini-lsm/src/key.rs | 8 +++ mini-lsm/src/tests/harness.rs | 59 +++++++++++++++++- 9 files changed, 219 insertions(+), 8 deletions(-) create mode 100644 mini-lsm-mvcc/src/tests/week3_day1.rs create mode 100644 mini-lsm-mvcc/src/tests/week3_day2.rs diff --git a/mini-lsm-book/src/week3-02-snapshot-read-part-1.md b/mini-lsm-book/src/week3-02-snapshot-read-part-1.md index f8ce35a..bca7c7e 100644 --- a/mini-lsm-book/src/week3-02-snapshot-read-part-1.md +++ b/mini-lsm-book/src/week3-02-snapshot-read-part-1.md @@ -2,6 +2,18 @@ During the refactor, you might need to change the signature of some functions from `&self` to `self: &Arc` as necessary. -## MemTable +## Task 1: MemTable, Write-Ahead Log, and Read Path -## WAL +Memtable store timestamp, change to scan, encode ts in wal + +## Task 2: Write Path + +assign mvcc object, take write lock, increase ts by 1 + +## Task 3: MVCC Compaction + +keep all versions + +## Task 4: LSM Iterator + +return the latest version diff --git a/mini-lsm-mvcc/src/compact.rs b/mini-lsm-mvcc/src/compact.rs index 6e729d3..ab09c1d 100644 --- a/mini-lsm-mvcc/src/compact.rs +++ b/mini-lsm-mvcc/src/compact.rs @@ -153,19 +153,22 @@ impl LsmStorageInner { } let builder_inner = builder.as_mut().unwrap(); - builder_inner.add(iter.key(), iter.value()); if builder_inner.estimated_size() >= self.options.target_sst_size && !same_as_last_key { - let sst_id = self.next_sst_id(); - let builder = builder.take().unwrap(); - let sst = Arc::new(builder.build( + let sst_id: usize = self.next_sst_id(); + let old_builder = builder.take().unwrap(); + let sst = Arc::new(old_builder.build( sst_id, Some(self.block_cache.clone()), self.path_of_sst(sst_id), )?); new_sst.push(sst); + builder = Some(SsTableBuilder::new(self.options.block_size)); } + let builder_inner = builder.as_mut().unwrap(); + builder_inner.add(iter.key(), iter.value()); + if !same_as_last_key { last_key.clear(); last_key.extend(iter.key().key_ref()); diff --git a/mini-lsm-mvcc/src/key.rs b/mini-lsm-mvcc/src/key.rs index 9886842..b383282 100644 --- a/mini-lsm-mvcc/src/key.rs +++ b/mini-lsm-mvcc/src/key.rs @@ -35,6 +35,10 @@ impl> Key { pub fn is_empty(&self) -> bool { self.0.as_ref().is_empty() } + + pub fn for_testing_ts(self) -> u64 { + self.1 + } } impl Key> { @@ -149,6 +153,10 @@ impl<'a> Key<&'a [u8]> { pub fn for_testing_from_slice_no_ts(slice: &'a [u8]) -> Self { Self(slice, TS_DEFAULT) } + + pub fn for_testing_from_slice_with_ts(slice: &'a [u8], ts: u64) -> Self { + Self(slice, ts) + } } impl + Debug> Debug for Key { diff --git a/mini-lsm-mvcc/src/tests.rs b/mini-lsm-mvcc/src/tests.rs index ae2d28a..010a21f 100644 --- a/mini-lsm-mvcc/src/tests.rs +++ b/mini-lsm-mvcc/src/tests.rs @@ -12,3 +12,5 @@ mod week2_day3; mod week2_day4; // mod week2_day5; // mod week2_day6; +mod week3_day1; +mod week3_day2; diff --git a/mini-lsm-mvcc/src/tests/week3_day1.rs b/mini-lsm-mvcc/src/tests/week3_day1.rs new file mode 100644 index 0000000..df55979 --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week3_day1.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; + +use bytes::Bytes; +use tempfile::tempdir; + +use crate::key::KeySlice; +use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator}; + +use super::harness::{check_iter_result_by_key_and_ts, generate_sst_with_ts}; + +#[test] +fn test_sst_build_multi_version_simple() { + let mut builder = SsTableBuilder::new(16); + builder.add( + KeySlice::for_testing_from_slice_with_ts(b"233", 233), + b"233333", + ); + builder.add( + KeySlice::for_testing_from_slice_with_ts(b"233", 0), + b"2333333", + ); + let dir = tempdir().unwrap(); + builder.build_for_test(dir.path().join("1.sst")).unwrap(); +} + +fn generate_test_data() -> Vec<((Bytes, u64), Bytes)> { + (0..100) + .map(|id| { + ( + (Bytes::from(format!("key{:05}", id / 5)), 5 - (id % 5)), + Bytes::from(format!("value{:05}", id)), + ) + }) + .collect() +} + +#[test] +fn test_sst_build_multi_version_hard() { + let dir = tempdir().unwrap(); + let data = generate_test_data(); + generate_sst_with_ts(1, dir.path().join("1.sst"), data.clone(), None); + let sst = Arc::new( + SsTable::open( + 1, + None, + FileObject::open(&dir.path().join("1.sst")).unwrap(), + ) + .unwrap(), + ); + check_iter_result_by_key_and_ts( + &mut SsTableIterator::create_and_seek_to_first(sst).unwrap(), + data, + ); +} diff --git a/mini-lsm-mvcc/src/tests/week3_day2.rs b/mini-lsm-mvcc/src/tests/week3_day2.rs new file mode 100644 index 0000000..8acca57 --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week3_day2.rs @@ -0,0 +1,61 @@ +use std::time::Duration; + +use tempfile::tempdir; + +use crate::{ + compact::CompactionOptions, + lsm_storage::{LsmStorageOptions, MiniLsm}, + tests::harness::dump_files_in_dir, +}; + +#[test] +fn test_task_1_2_integration() { + let dir = tempdir().unwrap(); + let mut options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction); + options.enable_wal = true; + let storage = MiniLsm::open(&dir, options.clone()).unwrap(); + let _txn = storage.new_txn().unwrap(); + for i in 0..=20000 { + storage + .put(b"0", format!("{:02000}", i).as_bytes()) + .unwrap(); + } + std::thread::sleep(Duration::from_secs(1)); // wait until all memtables flush + while { + let snapshot = storage.inner.state.read(); + !snapshot.imm_memtables.is_empty() + } { + storage.inner.force_flush_next_imm_memtable().unwrap(); + } + assert!(storage.inner.state.read().l0_sstables.len() > 1); + storage.force_full_compaction().unwrap(); + storage.dump_structure(); + dump_files_in_dir(&dir); + assert!(storage.inner.state.read().l0_sstables.is_empty()); + assert_eq!(storage.inner.state.read().levels.len(), 1); + // same key in the same SST + assert_eq!(storage.inner.state.read().levels[0].1.len(), 1); + for i in 0..=100 { + storage + .put(b"1", format!("{:02000}", i).as_bytes()) + .unwrap(); + } + storage + .inner + .force_freeze_memtable(&storage.inner.state_lock.lock()) + .unwrap(); + std::thread::sleep(Duration::from_secs(1)); // wait until all memtables flush + while { + let snapshot = storage.inner.state.read(); + !snapshot.imm_memtables.is_empty() + } { + storage.inner.force_flush_next_imm_memtable().unwrap(); + } + storage.force_full_compaction().unwrap(); + storage.dump_structure(); + dump_files_in_dir(&dir); + assert!(storage.inner.state.read().l0_sstables.is_empty()); + assert_eq!(storage.inner.state.read().levels.len(), 1); + // same key in the same SST, now we should split two + assert_eq!(storage.inner.state.read().levels[0].1.len(), 2); +} diff --git a/mini-lsm-starter/src/key.rs b/mini-lsm-starter/src/key.rs index 00898b1..f459ee7 100644 --- a/mini-lsm-starter/src/key.rs +++ b/mini-lsm-starter/src/key.rs @@ -22,6 +22,10 @@ impl> Key { pub fn is_empty(&self) -> bool { self.0.as_ref().is_empty() } + + pub fn for_testing_ts(self) -> u64 { + 0 + } } impl Key> { @@ -118,6 +122,10 @@ impl<'a> Key<&'a [u8]> { pub fn for_testing_from_slice_no_ts(slice: &'a [u8]) -> Self { Self(slice) } + + pub fn for_testing_from_slice_with_ts(slice: &'a [u8], _ts: u64) -> Self { + Self(slice) + } } impl + Debug> Debug for Key { diff --git a/mini-lsm/src/key.rs b/mini-lsm/src/key.rs index 00898b1..f459ee7 100644 --- a/mini-lsm/src/key.rs +++ b/mini-lsm/src/key.rs @@ -22,6 +22,10 @@ impl> Key { pub fn is_empty(&self) -> bool { self.0.as_ref().is_empty() } + + pub fn for_testing_ts(self) -> u64 { + 0 + } } impl Key> { @@ -118,6 +122,10 @@ impl<'a> Key<&'a [u8]> { pub fn for_testing_from_slice_no_ts(slice: &'a [u8]) -> Self { Self(slice) } + + pub fn for_testing_from_slice_with_ts(slice: &'a [u8], _ts: u64) -> Self { + Self(slice) + } } impl + Debug> Debug for Key { diff --git a/mini-lsm/src/tests/harness.rs b/mini-lsm/src/tests/harness.rs index 2339a80..30afa80 100644 --- a/mini-lsm/src/tests/harness.rs +++ b/mini-lsm/src/tests/harness.rs @@ -1,4 +1,7 @@ -use std::{collections::BTreeMap, ops::Bound, path::Path, sync::Arc, time::Duration}; +use std::{ + collections::BTreeMap, ops::Bound, os::unix::fs::MetadataExt, path::Path, sync::Arc, + time::Duration, +}; use anyhow::{bail, Result}; use bytes::Bytes; @@ -111,6 +114,36 @@ where assert!(!iter.is_valid()); } +pub fn check_iter_result_by_key_and_ts(iter: &mut I, expected: Vec<((Bytes, u64), Bytes)>) +where + I: for<'a> StorageIterator = KeySlice<'a>>, +{ + for ((k, ts), v) in expected { + assert!(iter.is_valid()); + assert_eq!( + (&k[..], ts), + ( + iter.key().for_testing_key_ref(), + iter.key().for_testing_ts() + ), + "expected key: {:?}@{}, actual key: {:?}@{}", + k, + ts, + as_bytes(iter.key().for_testing_key_ref()), + iter.key().for_testing_ts(), + ); + assert_eq!( + v, + iter.value(), + "expected value: {:?}, actual value: {:?}", + v, + as_bytes(iter.value()), + ); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + pub fn check_lsm_iter_result_by_key(iter: &mut I, expected: Vec<(Bytes, Bytes)>) where I: for<'a> StorageIterator = &'a [u8]>, @@ -159,6 +192,22 @@ pub fn generate_sst( builder.build(id, block_cache, path.as_ref()).unwrap() } +pub fn generate_sst_with_ts( + id: usize, + path: impl AsRef, + data: Vec<((Bytes, u64), Bytes)>, + block_cache: Option>, +) -> SsTable { + let mut builder = SsTableBuilder::new(128); + for ((key, ts), value) in data { + builder.add( + KeySlice::for_testing_from_slice_with_ts(&key[..], ts), + &value[..], + ); + } + builder.build(id, block_cache, path.as_ref()).unwrap() +} + pub fn sync(storage: &LsmStorageInner) { storage .force_freeze_memtable(&storage.state_lock.lock()) @@ -184,6 +233,7 @@ pub fn compaction_bench(storage: Arc) { max_key = max_key.max(i); } } + std::thread::sleep(Duration::from_secs(1)); // wait until all memtables flush while { let snapshot = storage.inner.state.read(); @@ -352,6 +402,11 @@ pub fn check_compaction_ratio(storage: Arc) { pub fn dump_files_in_dir(path: impl AsRef) { for f in path.as_ref().read_dir().unwrap() { - println!("{}", f.unwrap().path().display()) + let f = f.unwrap(); + println!( + "{}, size={:.3}KB", + f.path().display(), + f.metadata().unwrap().size() as f64 / 1024.0 + ) } }