add tiered compaction + compaction simulator
Signed-off-by: Alex Chi Z <iskyzh@gmail.com>
This commit is contained in:
@@ -38,7 +38,7 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we
|
|||||||
| 1.5 | Storage Engine - Read Path | ✅ | ✅ | ✅ |
|
| 1.5 | Storage Engine - Read Path | ✅ | ✅ | ✅ |
|
||||||
| 1.6 | Storage Engine - Write Path | ✅ | ✅ | ✅ |
|
| 1.6 | Storage Engine - Write Path | ✅ | ✅ | ✅ |
|
||||||
| 2.1 | Compaction Framework | ✅ | 🚧 | 🚧 |
|
| 2.1 | Compaction Framework | ✅ | 🚧 | 🚧 |
|
||||||
| 2.2 | Compaction Strategy - Tiered | 🚧 | | |
|
| 2.2 | Compaction Strategy - Tiered | ✅ | | |
|
||||||
| 2.3 | Compaction Strategy - Leveled | 🚧 | | |
|
| 2.3 | Compaction Strategy - Leveled | 🚧 | | |
|
||||||
| 2.4 | Manifest | | | |
|
| 2.4 | Manifest | | | |
|
||||||
| 2.5 | Write-Ahead Log | | | |
|
| 2.5 | Write-Ahead Log | | | |
|
||||||
|
|||||||
@@ -1,22 +1,38 @@
|
|||||||
use std::collections::HashSet;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use mini_lsm::compact::TieredCompactionController;
|
use mini_lsm::compact::{TieredCompactionController, TieredCompactionOptions};
|
||||||
use mini_lsm::lsm_storage::LsmStorageInner;
|
use mini_lsm::lsm_storage::LsmStorageInner;
|
||||||
use mini_lsm::mem_table::MemTable;
|
use mini_lsm::mem_table::MemTable;
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
#[command(author, version, about, long_about = None)]
|
#[command(author, version, about, long_about = None)]
|
||||||
enum Args {
|
enum Args {
|
||||||
Tiered {},
|
Tiered {
|
||||||
|
#[clap(long)]
|
||||||
|
dump_real_id: bool,
|
||||||
|
#[clap(long, default_value = "3")]
|
||||||
|
level0_file_num_compaction_trigger: usize,
|
||||||
|
#[clap(long, default_value = "200")]
|
||||||
|
max_size_amplification_percent: usize,
|
||||||
|
#[clap(long, default_value = "1")]
|
||||||
|
size_ratio: usize,
|
||||||
|
#[clap(long, default_value = "2")]
|
||||||
|
min_merge_width: usize,
|
||||||
|
#[clap(long, default_value = "50")]
|
||||||
|
iterations: usize,
|
||||||
|
},
|
||||||
Leveled {},
|
Leveled {},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct MockStorage {
|
pub struct MockStorage {
|
||||||
snapshot: LsmStorageInner,
|
snapshot: LsmStorageInner,
|
||||||
next_sst_id: usize,
|
next_sst_id: usize,
|
||||||
file_list: HashSet<usize>,
|
/// Maps SST ID to the original flushed SST ID
|
||||||
|
file_list: HashMap<usize, usize>,
|
||||||
|
total_flushes: usize,
|
||||||
|
total_writes: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MockStorage {
|
impl MockStorage {
|
||||||
@@ -32,6 +48,8 @@ impl MockStorage {
|
|||||||
snapshot,
|
snapshot,
|
||||||
next_sst_id: 0,
|
next_sst_id: 0,
|
||||||
file_list: Default::default(),
|
file_list: Default::default(),
|
||||||
|
total_flushes: 0,
|
||||||
|
total_writes: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -41,22 +59,52 @@ impl MockStorage {
|
|||||||
id
|
id
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn flush_sst(&mut self) {
|
pub fn flush_sst_to_l0(&mut self) {
|
||||||
let id = self.generate_sst_id();
|
let id = self.generate_sst_id();
|
||||||
self.snapshot.l0_sstables.push(id);
|
self.snapshot.l0_sstables.push(id);
|
||||||
self.file_list.insert(id);
|
self.file_list.insert(id, id);
|
||||||
|
self.total_flushes += 1;
|
||||||
|
self.total_writes += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn flush_sst_to_new_tier(&mut self) {
|
||||||
|
let id = self.generate_sst_id();
|
||||||
|
self.snapshot.levels.insert(0, (id, vec![id]));
|
||||||
|
self.file_list.insert(id, id);
|
||||||
|
self.total_flushes += 1;
|
||||||
|
self.total_writes += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove(&mut self, files_to_remove: &[usize]) {
|
pub fn remove(&mut self, files_to_remove: &[usize]) {
|
||||||
for file_id in files_to_remove {
|
for file_id in files_to_remove {
|
||||||
self.file_list.remove(file_id);
|
let ret = self.file_list.remove(file_id);
|
||||||
|
assert!(ret.is_some(), "failed to remove file {}", file_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dump(&self) {
|
pub fn dump_original_id(&self) {
|
||||||
print!("L0: {:?}", self.snapshot.l0_sstables);
|
if !self.snapshot.l0_sstables.is_empty() {
|
||||||
|
println!("L0: {:?}", self.snapshot.l0_sstables);
|
||||||
|
}
|
||||||
for (level, files) in &self.snapshot.levels {
|
for (level, files) in &self.snapshot.levels {
|
||||||
print!("L{level}: {:?}", files);
|
println!(
|
||||||
|
"L{level} ({}): {:?}",
|
||||||
|
files.len(),
|
||||||
|
files.iter().map(|x| self.file_list[x]).collect::<Vec<_>>()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn dump_real_id(&self) {
|
||||||
|
if !self.snapshot.l0_sstables.is_empty() {
|
||||||
|
println!("L0: {:?}", self.snapshot.l0_sstables);
|
||||||
|
}
|
||||||
|
for (level, files) in &self.snapshot.levels {
|
||||||
|
println!(
|
||||||
|
"L{level} ({}): {:?}",
|
||||||
|
files.len(),
|
||||||
|
files.iter().map(|x| self.file_list[x]).collect::<Vec<_>>()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -64,19 +112,74 @@ impl MockStorage {
|
|||||||
fn main() {
|
fn main() {
|
||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
match args {
|
match args {
|
||||||
Args::Tiered {} => {
|
Args::Tiered {
|
||||||
let controller = TieredCompactionController {};
|
dump_real_id,
|
||||||
|
level0_file_num_compaction_trigger,
|
||||||
|
max_size_amplification_percent,
|
||||||
|
size_ratio,
|
||||||
|
min_merge_width,
|
||||||
|
iterations,
|
||||||
|
} => {
|
||||||
|
let controller = TieredCompactionController::new(TieredCompactionOptions {
|
||||||
|
level0_file_num_compaction_trigger,
|
||||||
|
max_size_amplification_percent,
|
||||||
|
size_ratio,
|
||||||
|
min_merge_width,
|
||||||
|
});
|
||||||
let mut storage = MockStorage::new();
|
let mut storage = MockStorage::new();
|
||||||
for i in 0..500 {
|
let mut max_space = 0;
|
||||||
println!("Iteration {i}");
|
for i in 0..iterations {
|
||||||
storage.flush_sst();
|
println!("=== Iteration {i} ===");
|
||||||
|
storage.flush_sst_to_new_tier();
|
||||||
|
println!("--- After Flush ---");
|
||||||
|
if dump_real_id {
|
||||||
|
storage.dump_real_id();
|
||||||
|
} else {
|
||||||
|
storage.dump_original_id();
|
||||||
|
}
|
||||||
let task = controller.generate_compaction_task(&storage.snapshot);
|
let task = controller.generate_compaction_task(&storage.snapshot);
|
||||||
let sst_id = storage.generate_sst_id();
|
println!("--- Compaction Task ---");
|
||||||
let (snapshot, del) =
|
if let Some(task) = task {
|
||||||
controller.apply_compaction_result(&storage.snapshot, &task, &[sst_id]);
|
let mut sst_ids = Vec::new();
|
||||||
storage.snapshot = snapshot;
|
for (tier_id, files) in &task.tiers {
|
||||||
storage.remove(&del);
|
for file in files {
|
||||||
storage.dump();
|
let new_sst_id = storage.generate_sst_id();
|
||||||
|
sst_ids.push(new_sst_id);
|
||||||
|
storage.file_list.insert(new_sst_id, *file);
|
||||||
|
storage.total_writes += 1;
|
||||||
|
}
|
||||||
|
print!("L{} {:?} ", tier_id, files);
|
||||||
|
}
|
||||||
|
println!("-> {:?}", sst_ids);
|
||||||
|
max_space = max_space.max(storage.file_list.len());
|
||||||
|
let (snapshot, del) =
|
||||||
|
controller.apply_compaction_result(&storage.snapshot, &task, &sst_ids);
|
||||||
|
storage.snapshot = snapshot;
|
||||||
|
storage.remove(&del);
|
||||||
|
println!("--- After Compaction ---");
|
||||||
|
if dump_real_id {
|
||||||
|
storage.dump_real_id();
|
||||||
|
} else {
|
||||||
|
storage.dump_original_id();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
println!("no compaction triggered");
|
||||||
|
}
|
||||||
|
max_space = max_space.max(storage.file_list.len());
|
||||||
|
println!("--- Statistics ---");
|
||||||
|
println!(
|
||||||
|
"Write Amplification: {}/{}={:.3}x",
|
||||||
|
storage.total_writes,
|
||||||
|
storage.total_flushes,
|
||||||
|
storage.total_writes as f64 / storage.total_flushes as f64
|
||||||
|
);
|
||||||
|
println!(
|
||||||
|
"Space Amplification: {}/{}={:.3}x",
|
||||||
|
max_space,
|
||||||
|
storage.total_flushes,
|
||||||
|
max_space as f64 / storage.total_flushes as f64
|
||||||
|
);
|
||||||
|
println!();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Args::Leveled {} => {}
|
Args::Leveled {} => {}
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
pub use leveled::{LeveledCompactionController, LeveledCompactionTask};
|
pub use leveled::{LeveledCompactionController, LeveledCompactionTask};
|
||||||
pub use tiered::{TieredCompactionController, TieredCompactionTask};
|
pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask};
|
||||||
|
|
||||||
use crate::iterators::merge_iterator::MergeIterator;
|
use crate::iterators::merge_iterator::MergeIterator;
|
||||||
use crate::iterators::StorageIterator;
|
use crate::iterators::StorageIterator;
|
||||||
|
|||||||
@@ -1,15 +1,88 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use crate::lsm_storage::LsmStorageInner;
|
use crate::lsm_storage::LsmStorageInner;
|
||||||
use crate::table::SsTable;
|
|
||||||
|
|
||||||
pub struct TieredCompactionTask {
|
pub struct TieredCompactionTask {
|
||||||
tiers: Vec<usize>,
|
pub tiers: Vec<(usize, Vec<usize>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TieredCompactionController {}
|
pub struct TieredCompactionOptions {
|
||||||
|
pub level0_file_num_compaction_trigger: usize,
|
||||||
|
pub max_size_amplification_percent: usize,
|
||||||
|
pub size_ratio: usize,
|
||||||
|
pub min_merge_width: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TieredCompactionController {
|
||||||
|
options: TieredCompactionOptions,
|
||||||
|
}
|
||||||
|
|
||||||
impl TieredCompactionController {
|
impl TieredCompactionController {
|
||||||
pub fn generate_compaction_task(&self, snapshot: &LsmStorageInner) -> TieredCompactionTask {
|
pub fn new(options: TieredCompactionOptions) -> Self {
|
||||||
return TieredCompactionTask { tiers: Vec::new() };
|
Self { options }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn generate_compaction_task(
|
||||||
|
&self,
|
||||||
|
snapshot: &LsmStorageInner,
|
||||||
|
) -> Option<TieredCompactionTask> {
|
||||||
|
assert!(
|
||||||
|
snapshot.l0_sstables.is_empty(),
|
||||||
|
"should not add l0 ssts in tiered compaction"
|
||||||
|
);
|
||||||
|
if snapshot.levels.len() < self.options.level0_file_num_compaction_trigger {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
// compaction triggered by space amplification ratio
|
||||||
|
let mut size = 0;
|
||||||
|
for id in 0..(snapshot.levels.len() - 1) {
|
||||||
|
size += snapshot.levels[id].1.len();
|
||||||
|
}
|
||||||
|
let space_amp_ratio =
|
||||||
|
(size as f64) / (snapshot.levels.last().unwrap().1.len() as f64) * 100.0;
|
||||||
|
if space_amp_ratio >= self.options.max_size_amplification_percent as f64 {
|
||||||
|
println!(
|
||||||
|
"compaction triggered by space amplification ratio: {}",
|
||||||
|
space_amp_ratio
|
||||||
|
);
|
||||||
|
return Some(TieredCompactionTask {
|
||||||
|
tiers: snapshot.levels.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let size_ratio_trigger = (100.0 + self.options.size_ratio as f64) / 100.0;
|
||||||
|
// compaction triggered by size ratio
|
||||||
|
let mut size = 0;
|
||||||
|
for id in 0..(snapshot.levels.len() - 1) {
|
||||||
|
size += snapshot.levels[id].1.len();
|
||||||
|
let next_level_size = snapshot.levels[id + 1].1.len();
|
||||||
|
let current_size_ratio = size as f64 / next_level_size as f64;
|
||||||
|
if current_size_ratio >= size_ratio_trigger && id + 2 >= self.options.min_merge_width {
|
||||||
|
println!(
|
||||||
|
"compaction triggered by size ratio: {}",
|
||||||
|
current_size_ratio * 100.0
|
||||||
|
);
|
||||||
|
return Some(TieredCompactionTask {
|
||||||
|
tiers: snapshot
|
||||||
|
.levels
|
||||||
|
.iter()
|
||||||
|
.take(id + 2)
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// trying to reduce sorted runs without respecting size ratio
|
||||||
|
let num_tiers_to_take =
|
||||||
|
snapshot.levels.len() - self.options.level0_file_num_compaction_trigger + 1;
|
||||||
|
println!("compaction triggered by reducing sorted runs");
|
||||||
|
return Some(TieredCompactionTask {
|
||||||
|
tiers: snapshot
|
||||||
|
.levels
|
||||||
|
.iter()
|
||||||
|
.take(num_tiers_to_take)
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn apply_compaction_result(
|
pub fn apply_compaction_result(
|
||||||
@@ -18,6 +91,35 @@ impl TieredCompactionController {
|
|||||||
task: &TieredCompactionTask,
|
task: &TieredCompactionTask,
|
||||||
output: &[usize],
|
output: &[usize],
|
||||||
) -> (LsmStorageInner, Vec<usize>) {
|
) -> (LsmStorageInner, Vec<usize>) {
|
||||||
(snapshot.clone(), Vec::new())
|
assert!(
|
||||||
|
snapshot.l0_sstables.is_empty(),
|
||||||
|
"should not add l0 ssts in tiered compaction"
|
||||||
|
);
|
||||||
|
let mut snapshot = snapshot.clone();
|
||||||
|
let mut tier_to_remove = task
|
||||||
|
.tiers
|
||||||
|
.iter()
|
||||||
|
.map(|(x, y)| (*x, y))
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
let mut levels = Vec::new();
|
||||||
|
let mut new_tier_added = false;
|
||||||
|
let mut files_to_remove = Vec::new();
|
||||||
|
for (tier_id, files) in &snapshot.levels {
|
||||||
|
if let Some(ffiles) = tier_to_remove.remove(tier_id) {
|
||||||
|
// the tier should be removed
|
||||||
|
assert_eq!(ffiles, files, "file changed after issuing compaction task");
|
||||||
|
files_to_remove.extend(ffiles.iter().copied());
|
||||||
|
} else {
|
||||||
|
// retain the tier
|
||||||
|
levels.push((*tier_id, files.clone()));
|
||||||
|
}
|
||||||
|
if tier_to_remove.is_empty() && !new_tier_added {
|
||||||
|
// add the compacted tier to the LSM tree
|
||||||
|
new_tier_added = true;
|
||||||
|
levels.push((output[0], output.to_vec()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
snapshot.levels = levels;
|
||||||
|
(snapshot, files_to_remove)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user