consider merge width for tiered trigger + mvcc compaction
Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
@@ -2,4 +2,11 @@
|
||||
|
||||
In this part, you will implement MVCC over the LSM engine that you have built in the previous two weeks. We will add timestamp encoding in the keys to maintain multiple versions of a key, and change some part of the engine to ensure old data are either retained or garbage-collected based on whether there are users reading an old version.
|
||||
|
||||
1. Use the new key module
|
||||
2. Refactor until no compile error
|
||||
3. Use correct key ranges, add timestamp for engine
|
||||
4. Memtable refactor
|
||||
5. LsmIterator use read_ts
|
||||
6. Compaction no delete
|
||||
|
||||
{{#include copyright.md}}
|
||||
|
@@ -115,26 +115,22 @@ impl LsmStorageInner {
|
||||
fn compact_generate_sst_from_iter(
|
||||
&self,
|
||||
mut iter: impl for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
|
||||
compact_to_bottom_level: bool,
|
||||
_compact_to_bottom_level: bool,
|
||||
) -> Result<Vec<Arc<SsTable>>> {
|
||||
let mut builder = None;
|
||||
let mut new_sst = Vec::new();
|
||||
|
||||
let mut last_key = Vec::<u8>::new();
|
||||
while iter.is_valid() {
|
||||
if builder.is_none() {
|
||||
builder = Some(SsTableBuilder::new(self.options.block_size));
|
||||
}
|
||||
let builder_inner = builder.as_mut().unwrap();
|
||||
if compact_to_bottom_level {
|
||||
if !iter.value().is_empty() {
|
||||
builder_inner.add(iter.key(), iter.value());
|
||||
}
|
||||
} else {
|
||||
builder_inner.add(iter.key(), iter.value());
|
||||
}
|
||||
iter.next()?;
|
||||
builder_inner.add(iter.key(), iter.value());
|
||||
|
||||
if builder_inner.estimated_size() >= self.options.target_sst_size {
|
||||
let same_as_last_key = iter.key().key_ref() == &last_key;
|
||||
|
||||
if builder_inner.estimated_size() >= self.options.target_sst_size && !same_as_last_key {
|
||||
let sst_id = self.next_sst_id();
|
||||
let builder = builder.take().unwrap();
|
||||
let sst = Arc::new(builder.build(
|
||||
@@ -144,6 +140,12 @@ impl LsmStorageInner {
|
||||
)?);
|
||||
new_sst.push(sst);
|
||||
}
|
||||
|
||||
if !same_as_last_key {
|
||||
last_key.clear();
|
||||
last_key.extend(iter.key().key_ref());
|
||||
}
|
||||
iter.next()?;
|
||||
}
|
||||
if let Some(builder) = builder {
|
||||
let sst_id = self.next_sst_id(); // lock dropped here
|
||||
@@ -253,6 +255,7 @@ impl LsmStorageInner {
|
||||
let CompactionOptions::NoCompaction = self.options.compaction_options else {
|
||||
panic!("full compaction can only be called with compaction is not enabled")
|
||||
};
|
||||
|
||||
let snapshot = {
|
||||
let state = self.state.read();
|
||||
state.clone()
|
||||
@@ -264,6 +267,9 @@ impl LsmStorageInner {
|
||||
l0_sstables: l0_sstables.clone(),
|
||||
l1_sstables: l1_sstables.clone(),
|
||||
};
|
||||
|
||||
println!("force full compaction: {:?}", compaction_task);
|
||||
|
||||
let sstables = self.compact(&compaction_task)?;
|
||||
|
||||
{
|
||||
@@ -294,10 +300,14 @@ impl LsmStorageInner {
|
||||
for sst in l0_sstables.iter().chain(l1_sstables.iter()) {
|
||||
std::fs::remove_file(self.path_of_sst(*sst))?;
|
||||
}
|
||||
|
||||
println!("force full compaction done");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn trigger_compaction(&self) -> Result<()> {
|
||||
self.dump_structure();
|
||||
let snapshot = {
|
||||
let state = self.state.read();
|
||||
state.clone()
|
||||
|
@@ -1,8 +1,8 @@
|
||||
use crate::lsm_storage::MiniLsm;
|
||||
use crate::lsm_storage::{LsmStorageInner, MiniLsm};
|
||||
|
||||
impl MiniLsm {
|
||||
impl LsmStorageInner {
|
||||
pub fn dump_structure(&self) {
|
||||
let snapshot = self.inner.state.read();
|
||||
let snapshot = self.state.read();
|
||||
if !snapshot.l0_sstables.is_empty() {
|
||||
println!(
|
||||
"L0 ({}): {:?}",
|
||||
@@ -15,3 +15,9 @@ impl MiniLsm {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MiniLsm {
|
||||
pub fn dump_structure(&self) {
|
||||
self.inner.dump_structure()
|
||||
}
|
||||
}
|
||||
|
@@ -6,7 +6,7 @@ mod week1_day4;
|
||||
mod week1_day5;
|
||||
mod week1_day6;
|
||||
mod week1_day7;
|
||||
// mod week2_day1;
|
||||
// mod week2_day2;
|
||||
// mod week2_day3;
|
||||
// mod week2_day4;
|
||||
mod week2_day1;
|
||||
mod week2_day2;
|
||||
mod week2_day3;
|
||||
mod week2_day4;
|
||||
|
@@ -1,8 +1,8 @@
|
||||
use crate::lsm_storage::MiniLsm;
|
||||
use crate::lsm_storage::{LsmStorageInner, MiniLsm};
|
||||
|
||||
impl MiniLsm {
|
||||
impl LsmStorageInner {
|
||||
pub fn dump_structure(&self) {
|
||||
let snapshot = self.inner.state.read();
|
||||
let snapshot = self.state.read();
|
||||
if !snapshot.l0_sstables.is_empty() {
|
||||
println!(
|
||||
"L0 ({}): {:?}",
|
||||
@@ -15,3 +15,9 @@ impl MiniLsm {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MiniLsm {
|
||||
pub fn dump_structure(&self) {
|
||||
self.inner.dump_structure()
|
||||
}
|
||||
}
|
||||
|
@@ -288,7 +288,7 @@ pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
|
||||
num_tiers,
|
||||
max_size_amplification_percent,
|
||||
size_ratio,
|
||||
..
|
||||
min_merge_width,
|
||||
}) => {
|
||||
let size_ratio_trigger = (100.0 + size_ratio as f64) / 100.0;
|
||||
assert_eq!(l0_sst_num, 0);
|
||||
@@ -296,20 +296,22 @@ pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
|
||||
let mut sum_size = level_size[0];
|
||||
for idx in 1..level_size.len() {
|
||||
let this_size = level_size[idx];
|
||||
assert!(
|
||||
sum_size as f64 / this_size as f64 <= size_ratio_trigger,
|
||||
"sum(⬆️L{})/L{}, {}/{}>{}",
|
||||
state.levels[idx - 1].0,
|
||||
state.levels[idx].0,
|
||||
sum_size,
|
||||
this_size,
|
||||
size_ratio_trigger
|
||||
);
|
||||
if level_size.len() > min_merge_width {
|
||||
assert!(
|
||||
sum_size as f64 / this_size as f64 <= size_ratio_trigger,
|
||||
"violation of size ratio: sum(⬆️L{})/L{}, {}/{}>{}",
|
||||
state.levels[idx - 1].0,
|
||||
state.levels[idx].0,
|
||||
sum_size,
|
||||
this_size,
|
||||
size_ratio_trigger
|
||||
);
|
||||
}
|
||||
if idx + 1 == level_size.len() {
|
||||
assert!(
|
||||
sum_size as f64 / this_size as f64
|
||||
<= max_size_amplification_percent as f64 / 100.0,
|
||||
"sum(⬆️L{})/L{}, {}/{}>{}%",
|
||||
"violation of space amp: sum(⬆️L{})/L{}, {}/{}>{}%",
|
||||
state.levels[idx - 1].0,
|
||||
state.levels[idx].0,
|
||||
sum_size,
|
||||
|
@@ -9,7 +9,7 @@ use crate::{
|
||||
iterators::{
|
||||
concat_iterator::SstConcatIterator, merge_iterator::MergeIterator, StorageIterator,
|
||||
},
|
||||
key::KeySlice,
|
||||
key::{KeySlice, TS_ENABLED},
|
||||
lsm_storage::{LsmStorageInner, LsmStorageOptions, LsmStorageState},
|
||||
table::{SsTable, SsTableBuilder, SsTableIterator},
|
||||
};
|
||||
@@ -37,6 +37,7 @@ fn construct_merge_iterator_over_storage(
|
||||
|
||||
#[test]
|
||||
fn test_task1_full_compaction() {
|
||||
// We do not use LSM iterator in this test because it's implemented as part of task 3
|
||||
let dir = tempdir().unwrap();
|
||||
let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap();
|
||||
storage.put(b"0", b"v1").unwrap();
|
||||
@@ -50,45 +51,107 @@ fn test_task1_full_compaction() {
|
||||
sync(&storage);
|
||||
assert_eq!(storage.state.read().l0_sstables.len(), 3);
|
||||
let mut iter = construct_merge_iterator_over_storage(&storage.state.read());
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"1"), Bytes::from_static(b"v2")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"")),
|
||||
],
|
||||
);
|
||||
if TS_ENABLED {
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v2")),
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v1")),
|
||||
(Bytes::from_static(b"1"), Bytes::from_static(b"v2")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"v2")),
|
||||
],
|
||||
);
|
||||
} else {
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"1"), Bytes::from_static(b"v2")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"")),
|
||||
],
|
||||
);
|
||||
}
|
||||
storage.force_full_compaction().unwrap();
|
||||
assert!(storage.state.read().l0_sstables.is_empty());
|
||||
let mut iter = construct_merge_iterator_over_storage(&storage.state.read());
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![(Bytes::from_static(b"1"), Bytes::from_static(b"v2"))],
|
||||
);
|
||||
if TS_ENABLED {
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v2")),
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v1")),
|
||||
(Bytes::from_static(b"1"), Bytes::from_static(b"v2")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"v2")),
|
||||
],
|
||||
);
|
||||
} else {
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![(Bytes::from_static(b"1"), Bytes::from_static(b"v2"))],
|
||||
);
|
||||
}
|
||||
storage.put(b"0", b"v3").unwrap();
|
||||
storage.put(b"2", b"v3").unwrap();
|
||||
sync(&storage);
|
||||
storage.delete(b"1").unwrap();
|
||||
sync(&storage);
|
||||
let mut iter = construct_merge_iterator_over_storage(&storage.state.read());
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v3")),
|
||||
(Bytes::from_static(b"1"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"v3")),
|
||||
],
|
||||
);
|
||||
if TS_ENABLED {
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v3")),
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v2")),
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v1")),
|
||||
(Bytes::from_static(b"1"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"1"), Bytes::from_static(b"v2")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"v3")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"v2")),
|
||||
],
|
||||
);
|
||||
} else {
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v3")),
|
||||
(Bytes::from_static(b"1"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"v3")),
|
||||
],
|
||||
);
|
||||
}
|
||||
storage.force_full_compaction().unwrap();
|
||||
assert!(storage.state.read().l0_sstables.is_empty());
|
||||
let mut iter = construct_merge_iterator_over_storage(&storage.state.read());
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v3")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"v3")),
|
||||
],
|
||||
);
|
||||
if TS_ENABLED {
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v3")),
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v2")),
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v1")),
|
||||
(Bytes::from_static(b"1"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"1"), Bytes::from_static(b"v2")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"v3")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"v2")),
|
||||
],
|
||||
);
|
||||
} else {
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from_static(b"0"), Bytes::from_static(b"v3")),
|
||||
(Bytes::from_static(b"2"), Bytes::from_static(b"v3")),
|
||||
],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_concat_sst(
|
||||
|
Reference in New Issue
Block a user