add compaction tests and fix bugs in compaction

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2024-01-25 15:25:23 +08:00
parent 971d0b1c81
commit 8dbaf54e38
23 changed files with 379 additions and 42 deletions

2
.config/nextest.toml Normal file
View File

@@ -0,0 +1,2 @@
[profile.default]
slow-timeout = { period = "10s", terminate-after = 3 }

View File

@@ -8,6 +8,13 @@ In this chapter, you will:
* Implement the logic to update the LSM states and manage SST files on the filesystem.
* Update LSM read path to incorporate the LSM levels.
To copy the test cases into the starter code and run them,
```
cargo x copy-test --week 2 --day 1
cargo x scheck
```
## Task 1: Compaction Implementation
In this task, you will implement the core logic of doing a compaction -- merge sort a set of SST files into a sorted run. You will need to modify:

View File

@@ -7,6 +7,13 @@ In this chapter, you will:
* Implement a simple leveled compaction strategy and simulate it on the compaction simulator.
* Start compaction as a background task and implement a compaction trigger in the system.
To copy the test cases into the starter code and run them,
```
cargo x copy-test --week 2 --day 2
cargo x scheck
```
## Task 1: Simple Leveled Compaction
In this chapter, we are going to implement our first compaction strategy -- simple leveled compaction. In this task, you will need to modify:

View File

@@ -9,6 +9,13 @@ In this chapter, you will:
The tiered compaction we talk about in this chapter is the same as RocksDB's universal compaction. We will use these two terminologies interchangeably.
To copy the test cases into the starter code and run them,
```
cargo x copy-test --week 2 --day 3
cargo x scheck
```
## Task 1: Universal Compaction
In this chapter, you will implement RocksDB's universal compaction, which is of the tiered compaction family compaction strategies. Similar to the simple leveled compaction strategy, we only use number of files as the indicator in this compaction strategy. And when we trigger the compaction jobs, we always include a full sorted run (tier) in the compaction job.

View File

@@ -7,6 +7,13 @@ In this chapter, you will:
* Implement a leveled compaction strategy and simulate it on the compaction simulator.
* Incorporate leveled compaction strategy into the system.
To copy the test cases into the starter code and run them,
```
cargo x copy-test --week 2 --day 4
cargo x scheck
```
## Task 1: Leveled Compaction
In chapter 2 day 2, you have implemented the simple leveled compaction strategies. However, the implementation has a few problems:

View File

@@ -98,6 +98,7 @@ impl CompactionController {
}
}
#[derive(Debug, Clone)]
pub enum CompactionOptions {
/// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled
/// Compaction)
@@ -309,25 +310,25 @@ impl LsmStorageInner {
};
println!("running compaction task: {:?}", task);
let sstables = self.compact(&task)?;
let files_added = sstables.len();
let output = sstables.iter().map(|x| x.sst_id()).collect::<Vec<_>>();
let ssts_to_remove = {
let state_lock = self.state_lock.lock();
let (mut snapshot, files_to_remove) = self
.compaction_controller
.apply_compaction_result(&self.state.read(), &task, &output);
let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
for file_to_remove in &files_to_remove {
let result = snapshot.sstables.remove(file_to_remove);
assert!(result.is_some(), "cannot remove {}.sst", file_to_remove);
ssts_to_remove.push(result.unwrap());
}
let mut snapshot = self.state.read().as_ref().clone();
let mut new_sst_ids = Vec::new();
for file_to_add in sstables {
new_sst_ids.push(file_to_add.sst_id());
let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
assert!(result.is_none());
}
let (mut snapshot, files_to_remove) = self
.compaction_controller
.apply_compaction_result(&snapshot, &task, &output);
let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
for file_to_remove in &files_to_remove {
let result = snapshot.sstables.remove(file_to_remove);
assert!(result.is_some(), "cannot remove {}.sst", file_to_remove);
ssts_to_remove.push(result.unwrap());
}
let mut state = self.state.write();
*state = Arc::new(snapshot);
drop(state);
@@ -339,9 +340,10 @@ impl LsmStorageInner {
ssts_to_remove
};
println!(
"compaction finished: {} files removed, {} files added",
"compaction finished: {} files removed, {} files added, output={:?}",
ssts_to_remove.len(),
files_added
output.len(),
output
);
for sst in ssts_to_remove {
std::fs::remove_file(self.path_of_sst(sst.sst_id()))?;

View File

@@ -124,11 +124,11 @@ impl LeveledCompactionController {
"target level sizes: {:?}, real level sizes: {:?}, base_level: {}",
target_level_size
.iter()
.map(|x| format!("{}MB", x / 1024 / 1024))
.map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
.collect::<Vec<_>>(),
real_level_size
.iter()
.map(|x| format!("{}MB", x / 1024 / 1024))
.map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
.collect::<Vec<_>>(),
base_level,
);

View File

@@ -1,3 +1,5 @@
use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use crate::lsm_storage::LsmStorageState;
@@ -95,12 +97,20 @@ impl SimpleLeveledCompactionController {
files_to_remove.extend(&snapshot.levels[upper_level - 1].1);
snapshot.levels[upper_level - 1].1.clear();
} else {
assert_eq!(
task.upper_level_sst_ids, snapshot.l0_sstables,
"sst mismatched"
);
files_to_remove.extend(&snapshot.l0_sstables);
snapshot.l0_sstables.clear();
files_to_remove.extend(&task.upper_level_sst_ids);
let mut l0_ssts_compacted = task
.upper_level_sst_ids
.iter()
.copied()
.collect::<HashSet<_>>();
let new_l0_sstables = snapshot
.l0_sstables
.iter()
.copied()
.filter(|x| !l0_ssts_compacted.remove(x))
.collect::<Vec<_>>();
assert!(l0_ssts_compacted.is_empty());
snapshot.l0_sstables = new_l0_sstables;
}
assert_eq!(
task.lower_level_sst_ids,

View File

@@ -94,6 +94,16 @@ impl LsmStorageOptions {
num_memtable_limit: 2,
}
}
pub fn default_for_week2_test(compaction_options: CompactionOptions) -> Self {
Self {
block_size: 4096,
target_sst_size: 1 << 20, // 1MB
compaction_options,
enable_wal: false,
num_memtable_limit: 2,
}
}
}
fn range_overlap(

View File

@@ -7,3 +7,6 @@ mod week1_day5;
mod week1_day6;
mod week1_day7;
mod week2_day1;
mod week2_day2;
mod week2_day3;
mod week2_day4;

View File

@@ -0,0 +1 @@
../../../mini-lsm/src/tests/week2_day2.rs

View File

@@ -0,0 +1 @@
../../../mini-lsm/src/tests/week2_day3.rs

View File

@@ -0,0 +1 @@
../../../mini-lsm/src/tests/week2_day4.rs

View File

@@ -90,6 +90,16 @@ impl LsmStorageOptions {
num_memtable_limit: 2,
}
}
pub fn default_for_week2_test(compaction_options: CompactionOptions) -> Self {
Self {
block_size: 4096,
target_sst_size: 1 << 20, // 1MB
compaction_options,
enable_wal: false,
num_memtable_limit: 2,
}
}
}
/// The storage interface of the LSM tree.

View File

@@ -98,6 +98,7 @@ impl CompactionController {
}
}
#[derive(Debug, Clone)]
pub enum CompactionOptions {
/// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled
/// Compaction)
@@ -309,25 +310,25 @@ impl LsmStorageInner {
};
println!("running compaction task: {:?}", task);
let sstables = self.compact(&task)?;
let files_added = sstables.len();
let output = sstables.iter().map(|x| x.sst_id()).collect::<Vec<_>>();
let ssts_to_remove = {
let state_lock = self.state_lock.lock();
let (mut snapshot, files_to_remove) = self
.compaction_controller
.apply_compaction_result(&self.state.read(), &task, &output);
let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
for file_to_remove in &files_to_remove {
let result = snapshot.sstables.remove(file_to_remove);
assert!(result.is_some(), "cannot remove {}.sst", file_to_remove);
ssts_to_remove.push(result.unwrap());
}
let mut snapshot = self.state.read().as_ref().clone();
let mut new_sst_ids = Vec::new();
for file_to_add in sstables {
new_sst_ids.push(file_to_add.sst_id());
let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
assert!(result.is_none());
}
let (mut snapshot, files_to_remove) = self
.compaction_controller
.apply_compaction_result(&snapshot, &task, &output);
let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
for file_to_remove in &files_to_remove {
let result = snapshot.sstables.remove(file_to_remove);
assert!(result.is_some(), "cannot remove {}.sst", file_to_remove);
ssts_to_remove.push(result.unwrap());
}
let mut state = self.state.write();
*state = Arc::new(snapshot);
drop(state);
@@ -339,9 +340,10 @@ impl LsmStorageInner {
ssts_to_remove
};
println!(
"compaction finished: {} files removed, {} files added",
"compaction finished: {} files removed, {} files added, output={:?}",
ssts_to_remove.len(),
files_added
output.len(),
output
);
for sst in ssts_to_remove {
std::fs::remove_file(self.path_of_sst(sst.sst_id()))?;

View File

@@ -124,11 +124,11 @@ impl LeveledCompactionController {
"target level sizes: {:?}, real level sizes: {:?}, base_level: {}",
target_level_size
.iter()
.map(|x| format!("{}MB", x / 1024 / 1024))
.map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
.collect::<Vec<_>>(),
real_level_size
.iter()
.map(|x| format!("{}MB", x / 1024 / 1024))
.map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
.collect::<Vec<_>>(),
base_level,
);

View File

@@ -1,3 +1,5 @@
use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use crate::lsm_storage::LsmStorageState;
@@ -95,12 +97,20 @@ impl SimpleLeveledCompactionController {
files_to_remove.extend(&snapshot.levels[upper_level - 1].1);
snapshot.levels[upper_level - 1].1.clear();
} else {
assert_eq!(
task.upper_level_sst_ids, snapshot.l0_sstables,
"sst mismatched"
);
files_to_remove.extend(&snapshot.l0_sstables);
snapshot.l0_sstables.clear();
files_to_remove.extend(&task.upper_level_sst_ids);
let mut l0_ssts_compacted = task
.upper_level_sst_ids
.iter()
.copied()
.collect::<HashSet<_>>();
let new_l0_sstables = snapshot
.l0_sstables
.iter()
.copied()
.filter(|x| !l0_ssts_compacted.remove(x))
.collect::<Vec<_>>();
assert!(l0_ssts_compacted.is_empty());
snapshot.l0_sstables = new_l0_sstables;
}
assert_eq!(
task.lower_level_sst_ids,

View File

@@ -94,6 +94,16 @@ impl LsmStorageOptions {
num_memtable_limit: 2,
}
}
pub fn default_for_week2_test(compaction_options: CompactionOptions) -> Self {
Self {
block_size: 4096,
target_sst_size: 1 << 20, // 1MB
compaction_options,
enable_wal: false,
num_memtable_limit: 2,
}
}
}
fn range_overlap(

View File

@@ -7,3 +7,6 @@ mod week1_day5;
mod week1_day6;
mod week1_day7;
mod week2_day1;
mod week2_day2;
mod week2_day3;
mod week2_day4;

View File

@@ -1,12 +1,16 @@
use std::{path::Path, sync::Arc};
use std::{collections::BTreeMap, path::Path, sync::Arc, time::Duration};
use anyhow::{bail, Result};
use bytes::Bytes;
use crate::{
compact::{
CompactionOptions, LeveledCompactionOptions, SimpleLeveledCompactionOptions,
TieredCompactionOptions,
},
iterators::StorageIterator,
key::KeySlice,
lsm_storage::{BlockCache, LsmStorageInner},
lsm_storage::{BlockCache, LsmStorageInner, MiniLsm},
table::{SsTable, SsTableBuilder},
};
@@ -161,3 +165,160 @@ pub fn sync(storage: &LsmStorageInner) {
.unwrap();
storage.force_flush_next_imm_memtable().unwrap();
}
pub fn compaction_bench(storage: Arc<MiniLsm>) {
let mut key_map = BTreeMap::<usize, usize>::new();
let gen_key = |i| format!("{:010}", i); // 10B
let gen_value = |i| format!("{:0110}", i); // 110B
let mut max_key = 0;
for iter in 0..10 {
let range_begin = iter * 5000;
for i in range_begin..(range_begin + 40000) {
// 120B per key, 4MB data populated
let key = gen_key(i);
let version = key_map.get(&i).copied().unwrap_or_default() + 1;
let value = gen_value(version);
key_map.insert(i, version);
storage.put(key.as_bytes(), value.as_bytes()).unwrap();
max_key = max_key.max(i);
}
}
for i in 0..(max_key + 40000) {
let key = gen_key(i);
let value = storage.get(key.as_bytes()).unwrap();
if let Some(val) = key_map.get(&i) {
let expected_value = gen_value(*val);
assert_eq!(value, Some(Bytes::from(expected_value)));
} else {
assert!(value.is_none());
}
}
while {
let snapshot = storage.inner.state.read();
!snapshot.imm_memtables.is_empty()
} {
storage.inner.force_flush_next_imm_memtable().unwrap();
}
let mut prev_snapshot = storage.inner.state.read().clone();
while {
std::thread::sleep(Duration::from_secs(1));
let snapshot = storage.inner.state.read().clone();
let to_cont = prev_snapshot.levels != snapshot.levels
|| prev_snapshot.l0_sstables != snapshot.l0_sstables;
prev_snapshot = snapshot;
to_cont
} {
println!("waiting for compaction to converge");
}
storage.dump_structure();
println!("This test case does not guarantee your compaction algorithm produces a LSM state as expected. It only does minimal checks on the size of the levels. Please use the compaction simulator to check if the compaction is correctly going on.");
}
pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
let state = storage.inner.state.read().clone();
let compaction_options = storage.inner.options.compaction_options.clone();
let mut level_size = Vec::new();
let l0_sst_num = state.l0_sstables.len();
for (_, files) in &state.levels {
let size = match &compaction_options {
CompactionOptions::Leveled(_) => files
.iter()
.map(|x| state.sstables.get(x).as_ref().unwrap().table_size())
.sum::<u64>(),
CompactionOptions::Simple(_) | CompactionOptions::Tiered(_) => files.len() as u64,
_ => unreachable!(),
};
level_size.push(size);
}
match compaction_options {
CompactionOptions::NoCompaction => unreachable!(),
CompactionOptions::Simple(SimpleLeveledCompactionOptions {
size_ratio_percent,
level0_file_num_compaction_trigger,
max_levels,
}) => {
assert!(l0_sst_num < level0_file_num_compaction_trigger);
assert!(level_size.len() <= max_levels);
for idx in 1..level_size.len() {
let prev_size = level_size[idx - 1];
let this_size = level_size[idx];
if prev_size == 0 && this_size == 0 {
continue;
}
assert!(
this_size as f64 / prev_size as f64 >= size_ratio_percent as f64 / 100.0,
"L{}/L{}, {}/{}<{}%",
state.levels[idx - 1].0,
state.levels[idx].0,
this_size,
prev_size,
size_ratio_percent
);
}
}
CompactionOptions::Leveled(LeveledCompactionOptions {
level_size_multiplier,
level0_file_num_compaction_trigger,
max_levels,
..
}) => {
assert!(l0_sst_num < level0_file_num_compaction_trigger);
assert!(level_size.len() <= max_levels);
for idx in 1..level_size.len() {
let prev_size = level_size[idx - 1];
let this_size = level_size[idx];
assert!(
// do not add hard requirement on level size multiplier considering bloom filters...
this_size as f64 / prev_size as f64 >= (level_size_multiplier as f64 - 0.5),
"L{}/L{}, {}/{}<<{}",
state.levels[idx].0,
state.levels[idx - 1].0,
this_size,
prev_size,
level_size_multiplier
);
}
}
CompactionOptions::Tiered(TieredCompactionOptions {
num_tiers,
max_size_amplification_percent,
size_ratio,
..
}) => {
let size_ratio_trigger = (100.0 + size_ratio as f64) / 100.0;
assert_eq!(l0_sst_num, 0);
assert!(level_size.len() <= num_tiers);
let mut sum_size = level_size[0];
for idx in 1..level_size.len() {
let this_size = level_size[idx];
assert!(
sum_size as f64 / this_size as f64 <= size_ratio_trigger,
"sum(⬆L{})/L{}, {}/{}>{}",
state.levels[idx - 1].0,
state.levels[idx].0,
sum_size,
this_size,
size_ratio_trigger
);
if idx + 1 == level_size.len() {
assert!(
sum_size as f64 / this_size as f64
<= max_size_amplification_percent as f64 / 100.0,
"sum(⬆L{})/L{}, {}/{}>{}%",
state.levels[idx - 1].0,
state.levels[idx].0,
sum_size,
this_size,
max_size_amplification_percent
);
}
sum_size += this_size;
}
}
}
}

View File

@@ -0,0 +1,27 @@
use tempfile::tempdir;
use crate::{
compact::{CompactionOptions, SimpleLeveledCompactionOptions},
lsm_storage::{LsmStorageOptions, MiniLsm},
};
use super::harness::{check_compaction_ratio, compaction_bench};
#[test]
fn test_integration() {
let dir = tempdir().unwrap();
let storage = MiniLsm::open(
&dir,
LsmStorageOptions::default_for_week2_test(CompactionOptions::Simple(
SimpleLeveledCompactionOptions {
level0_file_num_compaction_trigger: 2,
max_levels: 3,
size_ratio_percent: 200,
},
)),
)
.unwrap();
compaction_bench(storage.clone());
check_compaction_ratio(storage.clone());
}

View File

@@ -0,0 +1,28 @@
use tempfile::tempdir;
use crate::{
compact::{CompactionOptions, TieredCompactionOptions},
lsm_storage::{LsmStorageOptions, MiniLsm},
};
use super::harness::{check_compaction_ratio, compaction_bench};
#[test]
fn test_integration() {
let dir = tempdir().unwrap();
let storage = MiniLsm::open(
&dir,
LsmStorageOptions::default_for_week2_test(CompactionOptions::Tiered(
TieredCompactionOptions {
num_tiers: 3,
max_size_amplification_percent: 200,
size_ratio: 1,
min_merge_width: 2,
},
)),
)
.unwrap();
compaction_bench(storage.clone());
check_compaction_ratio(storage.clone());
}

View File

@@ -0,0 +1,28 @@
use tempfile::tempdir;
use crate::{
compact::{CompactionOptions, LeveledCompactionOptions},
lsm_storage::{LsmStorageOptions, MiniLsm},
};
use super::harness::{check_compaction_ratio, compaction_bench};
#[test]
fn test_integration() {
let dir = tempdir().unwrap();
let storage = MiniLsm::open(
&dir,
LsmStorageOptions::default_for_week2_test(CompactionOptions::Leveled(
LeveledCompactionOptions {
level0_file_num_compaction_trigger: 2,
level_size_multiplier: 2,
base_level_size_mb: 1,
max_levels: 4,
},
)),
)
.unwrap();
compaction_bench(storage.clone());
check_compaction_ratio(storage.clone());
}