add simple leveled compaction

Signed-off-by: Alex Chi Z <iskyzh@gmail.com>
This commit is contained in:
Alex Chi Z
2024-01-17 15:42:33 +08:00
parent f93a8a1bd8
commit 70b1da4553
5 changed files with 238 additions and 18 deletions

View File

@@ -37,7 +37,7 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we
| 1.4 | Merge Iterators | ✅ | ✅ | ✅ |
| 1.5 | Storage Engine - Read Path | ✅ | ✅ | ✅ |
| 1.6 | Storage Engine - Write Path | ✅ | ✅ | ✅ |
| 2.1 | Compaction Framework | ✅ | 🚧 | 🚧 |
| 2.1 | Simple Compaction Strategy | ✅ | 🚧 | 🚧 |
| 2.2 | Compaction Strategy - Tiered | ✅ | | |
| 2.3 | Compaction Strategy - Leveled | 🚧 | | |
| 2.4 | Manifest | | | |

View File

@@ -2,13 +2,28 @@ use std::collections::HashMap;
use std::sync::Arc;
use clap::Parser;
use mini_lsm::compact::{TieredCompactionController, TieredCompactionOptions};
use mini_lsm::compact::{
SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController,
TieredCompactionOptions,
};
use mini_lsm::lsm_storage::LsmStorageInner;
use mini_lsm::mem_table::MemTable;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
enum Args {
Simple {
#[clap(long)]
dump_real_id: bool,
#[clap(long, default_value = "2")]
level0_file_num_compaction_trigger: usize,
#[clap(long, default_value = "3")]
max_levels: usize,
#[clap(long, default_value = "200")]
size_ratio_percent: usize,
#[clap(long, default_value = "50")]
iterations: usize,
},
Tiered {
#[clap(long)]
dump_real_id: bool,
@@ -82,9 +97,13 @@ impl MockStorage {
}
}
pub fn dump_original_id(&self) {
if !self.snapshot.l0_sstables.is_empty() {
println!("L0: {:?}", self.snapshot.l0_sstables);
pub fn dump_original_id(&self, always_show_l0: bool) {
if !self.snapshot.l0_sstables.is_empty() || always_show_l0 {
println!(
"L0 ({}): {:?}",
self.snapshot.l0_sstables.len(),
self.snapshot.l0_sstables,
);
}
for (level, files) in &self.snapshot.levels {
println!(
@@ -95,16 +114,16 @@ impl MockStorage {
}
}
pub fn dump_real_id(&self) {
if !self.snapshot.l0_sstables.is_empty() {
println!("L0: {:?}", self.snapshot.l0_sstables);
pub fn dump_real_id(&self, always_show_l0: bool) {
if !self.snapshot.l0_sstables.is_empty() || always_show_l0 {
println!(
"L0 ({}): {:?}",
self.snapshot.l0_sstables.len(),
self.snapshot.l0_sstables,
);
}
for (level, files) in &self.snapshot.levels {
println!(
"L{level} ({}): {:?}",
files.len(),
files.iter().map(|x| self.file_list[x]).collect::<Vec<_>>()
);
println!("L{level} ({}): {:?}", files.len(), files);
}
}
}
@@ -112,6 +131,102 @@ impl MockStorage {
fn main() {
let args = Args::parse();
match args {
Args::Simple {
dump_real_id,
size_ratio_percent,
iterations,
level0_file_num_compaction_trigger,
max_levels,
} => {
let mut controller =
SimpleLeveledCompactionController::new(SimpleLeveledCompactionOptions {
size_ratio_percent,
level0_file_num_compaction_trigger,
max_levels,
});
let mut storage = MockStorage::new();
for i in 0..max_levels {
storage.snapshot.levels.push((i + 1, Vec::new()));
}
let mut max_space = 0;
for i in 0..iterations {
println!("=== Iteration {i} ===");
storage.flush_sst_to_l0();
println!("--- After Flush ---");
if dump_real_id {
storage.dump_real_id(true);
} else {
storage.dump_original_id(true);
}
let mut num_compactions = 0;
while let Some(task) = controller.generate_compaction_task(&storage.snapshot) {
num_compactions += 1;
println!("--- Compaction Task ---");
let mut sst_ids = Vec::new();
for file in task
.upper_level_sst_ids
.iter()
.chain(task.lower_level_sst_ids.iter())
{
let new_sst_id = storage.generate_sst_id();
sst_ids.push(new_sst_id);
storage.file_list.insert(new_sst_id, *file);
storage.total_writes += 1;
}
print!(
"Upper L{} {:?} ",
task.upper_level.unwrap_or_default(),
task.upper_level_sst_ids
);
print!(
"Lower L{} {:?} ",
task.lower_level, task.lower_level_sst_ids
);
println!("-> {:?}", sst_ids);
max_space = max_space.max(storage.file_list.len());
let (snapshot, del) =
controller.apply_compaction_result(&storage.snapshot, &task, &sst_ids);
storage.snapshot = snapshot;
storage.remove(&del);
println!("--- After Compaction ---");
if dump_real_id {
storage.dump_real_id(true);
} else {
storage.dump_original_id(true);
}
}
if num_compactions == 0 {
println!("no compaction triggered");
} else {
println!("{num_compactions} compaction triggered in this iteration");
}
max_space = max_space.max(storage.file_list.len());
println!("--- Statistics ---");
println!(
"Write Amplification: {}/{}={:.3}x",
storage.total_writes,
storage.total_flushes,
storage.total_writes as f64 / storage.total_flushes as f64
);
println!(
"Space Amplification: {}/{}={:.3}x",
max_space,
storage.total_flushes,
max_space as f64 / storage.total_flushes as f64
);
println!(
"Read Amplification: {}x",
storage.snapshot.l0_sstables.len()
+ storage
.snapshot
.levels
.iter()
.filter(|(_, f)| !f.is_empty())
.count()
);
println!();
}
}
Args::Tiered {
dump_real_id,
level0_file_num_compaction_trigger,
@@ -133,9 +248,9 @@ fn main() {
storage.flush_sst_to_new_tier();
println!("--- After Flush ---");
if dump_real_id {
storage.dump_real_id();
storage.dump_real_id(false);
} else {
storage.dump_original_id();
storage.dump_original_id(false);
}
let task = controller.generate_compaction_task(&storage.snapshot);
println!("--- Compaction Task ---");
@@ -158,9 +273,9 @@ fn main() {
storage.remove(&del);
println!("--- After Compaction ---");
if dump_real_id {
storage.dump_real_id();
storage.dump_real_id(false);
} else {
storage.dump_original_id();
storage.dump_original_id(false);
}
} else {
println!("no compaction triggered");

View File

@@ -1,10 +1,14 @@
mod leveled;
mod simple_leveled;
mod tiered;
use std::sync::Arc;
use anyhow::Result;
pub use leveled::{LeveledCompactionController, LeveledCompactionTask};
pub use simple_leveled::{
SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask,
};
pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask};
use crate::iterators::merge_iterator::MergeIterator;

View File

@@ -1,7 +1,8 @@
use crate::lsm_storage::LsmStorageInner;
pub struct LeveledCompactionTask {
upper_level: usize,
// if upper_level is `None`, then it is L0 compaction
upper_level: Option<usize>,
upper_level_sst_ids: Vec<usize>,
lower_level: usize,
lower_level_sst_ids: Vec<usize>,

View File

@@ -0,0 +1,100 @@
use std::collections::HashMap;
use crate::lsm_storage::LsmStorageInner;
pub struct SimpleLeveledCompactionOptions {
pub size_ratio_percent: usize,
pub level0_file_num_compaction_trigger: usize,
pub max_levels: usize,
}
pub struct SimpleLeveledCompactionTask {
// if upper_level is `None`, then it is L0 compaction
pub upper_level: Option<usize>,
pub upper_level_sst_ids: Vec<usize>,
pub lower_level: usize,
pub lower_level_sst_ids: Vec<usize>,
}
pub struct SimpleLeveledCompactionController {
options: SimpleLeveledCompactionOptions,
}
impl SimpleLeveledCompactionController {
pub fn new(options: SimpleLeveledCompactionOptions) -> Self {
Self { options }
}
pub fn generate_compaction_task(
&mut self,
snapshot: &LsmStorageInner,
) -> Option<SimpleLeveledCompactionTask> {
let mut level_sizes = Vec::new();
level_sizes.push(snapshot.l0_sstables.len());
for (_, files) in &snapshot.levels {
level_sizes.push(files.len());
}
for i in 0..self.options.max_levels {
if i == 0
&& snapshot.l0_sstables.len() < self.options.level0_file_num_compaction_trigger
{
continue;
}
let lower_level = i + 1;
let size_ratio = level_sizes[lower_level] as f64 / level_sizes[i] as f64;
if size_ratio < self.options.size_ratio_percent as f64 / 100.0 {
println!(
"compaction triggered at level {} and {} with size ratio {}",
i, lower_level, size_ratio
);
return Some(SimpleLeveledCompactionTask {
upper_level: if i == 0 { None } else { Some(i) },
upper_level_sst_ids: if i == 0 {
snapshot.l0_sstables.clone()
} else {
snapshot.levels[i - 1].1.clone()
},
lower_level,
lower_level_sst_ids: snapshot.levels[lower_level - 1].1.clone(),
});
}
}
None
}
pub fn apply_compaction_result(
&self,
snapshot: &LsmStorageInner,
task: &SimpleLeveledCompactionTask,
output: &[usize],
) -> (LsmStorageInner, Vec<usize>) {
let mut snapshot = snapshot.clone();
let mut files_to_remove = Vec::new();
if let Some(upper_level) = task.upper_level {
assert_eq!(
task.upper_level_sst_ids,
snapshot.levels[upper_level - 1].1,
"sst mismatched"
);
files_to_remove.extend(&snapshot.levels[upper_level - 1].1);
snapshot.levels[upper_level - 1].1.clear();
} else {
assert_eq!(
task.upper_level_sst_ids, snapshot.l0_sstables,
"sst mismatched"
);
files_to_remove.extend(&snapshot.l0_sstables);
snapshot.l0_sstables.clear();
}
assert_eq!(
task.lower_level_sst_ids,
snapshot.levels[task.lower_level - 1].1,
"sst mismatched"
);
files_to_remove.extend(&snapshot.levels[task.lower_level - 1].1);
snapshot.levels[task.lower_level - 1].1 = output.to_vec();
(snapshot, files_to_remove)
}
}