checkin initial MVCC codebase
Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
229
mini-lsm-mvcc/src/compact/leveled.rs
Normal file
229
mini-lsm-mvcc/src/compact/leveled.rs
Normal file
@@ -0,0 +1,229 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::lsm_storage::LsmStorageState;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct LeveledCompactionTask {
|
||||
// if upper_level is `None`, then it is L0 compaction
|
||||
pub upper_level: Option<usize>,
|
||||
pub upper_level_sst_ids: Vec<usize>,
|
||||
pub lower_level: usize,
|
||||
pub lower_level_sst_ids: Vec<usize>,
|
||||
pub is_lower_level_bottom_level: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LeveledCompactionOptions {
|
||||
pub level_size_multiplier: usize,
|
||||
pub level0_file_num_compaction_trigger: usize,
|
||||
pub max_levels: usize,
|
||||
pub base_level_size_mb: usize,
|
||||
}
|
||||
|
||||
pub struct LeveledCompactionController {
|
||||
options: LeveledCompactionOptions,
|
||||
}
|
||||
|
||||
impl LeveledCompactionController {
|
||||
pub fn new(options: LeveledCompactionOptions) -> Self {
|
||||
Self { options }
|
||||
}
|
||||
|
||||
fn find_overlapping_ssts(
|
||||
&self,
|
||||
snapshot: &LsmStorageState,
|
||||
sst_ids: &[usize],
|
||||
in_level: usize,
|
||||
) -> Vec<usize> {
|
||||
let begin_key = sst_ids
|
||||
.iter()
|
||||
.map(|id| snapshot.sstables[id].first_key())
|
||||
.min()
|
||||
.cloned()
|
||||
.unwrap();
|
||||
let end_key = sst_ids
|
||||
.iter()
|
||||
.map(|id| snapshot.sstables[id].last_key())
|
||||
.max()
|
||||
.cloned()
|
||||
.unwrap();
|
||||
let mut overlap_ssts = Vec::new();
|
||||
for sst_id in &snapshot.levels[in_level - 1].1 {
|
||||
let sst = &snapshot.sstables[sst_id];
|
||||
let first_key = sst.first_key();
|
||||
let last_key = sst.last_key();
|
||||
if !(last_key < &begin_key || first_key > &end_key) {
|
||||
overlap_ssts.push(*sst_id);
|
||||
}
|
||||
}
|
||||
overlap_ssts
|
||||
}
|
||||
|
||||
pub fn generate_compaction_task(
|
||||
&self,
|
||||
snapshot: &LsmStorageState,
|
||||
) -> Option<LeveledCompactionTask> {
|
||||
// step 1: compute target level size
|
||||
let mut target_level_size = (0..self.options.max_levels).map(|_| 0).collect::<Vec<_>>(); // exclude level 0
|
||||
let mut real_level_size = Vec::with_capacity(self.options.max_levels);
|
||||
let mut base_level = self.options.max_levels;
|
||||
for i in 0..self.options.max_levels {
|
||||
real_level_size.push(
|
||||
snapshot.levels[i]
|
||||
.1
|
||||
.iter()
|
||||
.map(|x| snapshot.sstables.get(x).unwrap().table_size())
|
||||
.sum::<u64>() as usize,
|
||||
);
|
||||
}
|
||||
let base_level_size_bytes = self.options.base_level_size_mb * 1024 * 1024;
|
||||
|
||||
// select base level and compute target level size
|
||||
target_level_size[self.options.max_levels - 1] =
|
||||
real_level_size[self.options.max_levels - 1].max(base_level_size_bytes);
|
||||
for i in (0..(self.options.max_levels - 1)).rev() {
|
||||
let next_level_size = target_level_size[i + 1];
|
||||
let this_level_size = next_level_size / self.options.level_size_multiplier;
|
||||
if next_level_size > base_level_size_bytes {
|
||||
target_level_size[i] = this_level_size;
|
||||
}
|
||||
if target_level_size[i] > 0 {
|
||||
base_level = i + 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Flush L0 SST is the top priority
|
||||
if snapshot.l0_sstables.len() >= self.options.level0_file_num_compaction_trigger {
|
||||
println!("flush L0 SST to base level {}", base_level);
|
||||
return Some(LeveledCompactionTask {
|
||||
upper_level: None,
|
||||
upper_level_sst_ids: snapshot.l0_sstables.clone(),
|
||||
lower_level: base_level,
|
||||
lower_level_sst_ids: self.find_overlapping_ssts(
|
||||
snapshot,
|
||||
&snapshot.l0_sstables,
|
||||
base_level,
|
||||
),
|
||||
is_lower_level_bottom_level: base_level == self.options.max_levels,
|
||||
});
|
||||
}
|
||||
|
||||
let mut priorities = Vec::with_capacity(self.options.max_levels);
|
||||
for level in 0..self.options.max_levels {
|
||||
let prio = real_level_size[level] as f64 / target_level_size[level] as f64;
|
||||
if prio > 1.0 {
|
||||
priorities.push((prio, level + 1));
|
||||
}
|
||||
}
|
||||
priorities.sort_by(|a, b| a.partial_cmp(b).unwrap().reverse());
|
||||
let priority = priorities.first();
|
||||
if let Some((_, level)) = priority {
|
||||
println!(
|
||||
"target level sizes: {:?}, real level sizes: {:?}, base_level: {}",
|
||||
target_level_size
|
||||
.iter()
|
||||
.map(|x| format!("{}MB", x / 1024 / 1024))
|
||||
.collect::<Vec<_>>(),
|
||||
real_level_size
|
||||
.iter()
|
||||
.map(|x| format!("{}MB", x / 1024 / 1024))
|
||||
.collect::<Vec<_>>(),
|
||||
base_level,
|
||||
);
|
||||
|
||||
let level = *level;
|
||||
let selected_sst = snapshot.levels[level - 1].1.iter().min().copied().unwrap(); // select the oldest sst to compact
|
||||
println!(
|
||||
"compaction triggered by priority: {level} out of {:?}, select {selected_sst} for compaction",
|
||||
priorities
|
||||
);
|
||||
return Some(LeveledCompactionTask {
|
||||
upper_level: Some(level),
|
||||
upper_level_sst_ids: vec![selected_sst],
|
||||
lower_level: level + 1,
|
||||
lower_level_sst_ids: self.find_overlapping_ssts(
|
||||
snapshot,
|
||||
&[selected_sst],
|
||||
level + 1,
|
||||
),
|
||||
is_lower_level_bottom_level: level + 1 == self.options.max_levels,
|
||||
});
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn apply_compaction_result(
|
||||
&self,
|
||||
snapshot: &LsmStorageState,
|
||||
task: &LeveledCompactionTask,
|
||||
output: &[usize],
|
||||
) -> (LsmStorageState, Vec<usize>) {
|
||||
let mut snapshot = snapshot.clone();
|
||||
let mut files_to_remove = Vec::new();
|
||||
let mut upper_level_sst_ids_set = task
|
||||
.upper_level_sst_ids
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<HashSet<_>>();
|
||||
let mut lower_level_sst_ids_set = task
|
||||
.lower_level_sst_ids
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<HashSet<_>>();
|
||||
if let Some(upper_level) = task.upper_level {
|
||||
let new_upper_level_ssts = snapshot.levels[upper_level - 1]
|
||||
.1
|
||||
.iter()
|
||||
.filter_map(|x| {
|
||||
if upper_level_sst_ids_set.remove(x) {
|
||||
return None;
|
||||
}
|
||||
Some(*x)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert!(upper_level_sst_ids_set.is_empty());
|
||||
snapshot.levels[upper_level - 1].1 = new_upper_level_ssts;
|
||||
} else {
|
||||
let new_l0_ssts = snapshot
|
||||
.l0_sstables
|
||||
.iter()
|
||||
.filter_map(|x| {
|
||||
if upper_level_sst_ids_set.remove(x) {
|
||||
return None;
|
||||
}
|
||||
Some(*x)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert!(upper_level_sst_ids_set.is_empty());
|
||||
snapshot.l0_sstables = new_l0_ssts;
|
||||
}
|
||||
|
||||
files_to_remove.extend(&task.upper_level_sst_ids);
|
||||
files_to_remove.extend(&task.lower_level_sst_ids);
|
||||
|
||||
let mut new_lower_level_ssts = snapshot.levels[task.lower_level - 1]
|
||||
.1
|
||||
.iter()
|
||||
.filter_map(|x| {
|
||||
if lower_level_sst_ids_set.remove(x) {
|
||||
return None;
|
||||
}
|
||||
Some(*x)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert!(lower_level_sst_ids_set.is_empty());
|
||||
new_lower_level_ssts.extend(output);
|
||||
new_lower_level_ssts.sort_by(|x, y| {
|
||||
snapshot
|
||||
.sstables
|
||||
.get(x)
|
||||
.unwrap()
|
||||
.first_key()
|
||||
.cmp(snapshot.sstables.get(y).unwrap().first_key())
|
||||
});
|
||||
snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts;
|
||||
(snapshot, files_to_remove)
|
||||
}
|
||||
}
|
||||
114
mini-lsm-mvcc/src/compact/simple_leveled.rs
Normal file
114
mini-lsm-mvcc/src/compact/simple_leveled.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::lsm_storage::LsmStorageState;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SimpleLeveledCompactionOptions {
|
||||
pub size_ratio_percent: usize,
|
||||
pub level0_file_num_compaction_trigger: usize,
|
||||
pub max_levels: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct SimpleLeveledCompactionTask {
|
||||
// if upper_level is `None`, then it is L0 compaction
|
||||
pub upper_level: Option<usize>,
|
||||
pub upper_level_sst_ids: Vec<usize>,
|
||||
pub lower_level: usize,
|
||||
pub lower_level_sst_ids: Vec<usize>,
|
||||
pub is_lower_level_bottom_level: bool,
|
||||
}
|
||||
|
||||
pub struct SimpleLeveledCompactionController {
|
||||
options: SimpleLeveledCompactionOptions,
|
||||
}
|
||||
|
||||
impl SimpleLeveledCompactionController {
|
||||
pub fn new(options: SimpleLeveledCompactionOptions) -> Self {
|
||||
Self { options }
|
||||
}
|
||||
|
||||
/// Generates a compaction task.
|
||||
///
|
||||
/// Returns `None` if no compaction needs to be scheduled. The order of SSTs in the compaction task id vector matters.
|
||||
pub fn generate_compaction_task(
|
||||
&self,
|
||||
snapshot: &LsmStorageState,
|
||||
) -> Option<SimpleLeveledCompactionTask> {
|
||||
let mut level_sizes = Vec::new();
|
||||
level_sizes.push(snapshot.l0_sstables.len());
|
||||
for (_, files) in &snapshot.levels {
|
||||
level_sizes.push(files.len());
|
||||
}
|
||||
|
||||
for i in 0..self.options.max_levels {
|
||||
if i == 0
|
||||
&& snapshot.l0_sstables.len() < self.options.level0_file_num_compaction_trigger
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let lower_level = i + 1;
|
||||
let size_ratio = level_sizes[lower_level] as f64 / level_sizes[i] as f64;
|
||||
if size_ratio < self.options.size_ratio_percent as f64 / 100.0 {
|
||||
println!(
|
||||
"compaction triggered at level {} and {} with size ratio {}",
|
||||
i, lower_level, size_ratio
|
||||
);
|
||||
return Some(SimpleLeveledCompactionTask {
|
||||
upper_level: if i == 0 { None } else { Some(i) },
|
||||
upper_level_sst_ids: if i == 0 {
|
||||
snapshot.l0_sstables.clone()
|
||||
} else {
|
||||
snapshot.levels[i - 1].1.clone()
|
||||
},
|
||||
lower_level,
|
||||
lower_level_sst_ids: snapshot.levels[lower_level - 1].1.clone(),
|
||||
is_lower_level_bottom_level: lower_level == self.options.max_levels,
|
||||
});
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Apply the compaction result.
|
||||
///
|
||||
/// The compactor will call this function with the compaction task and the list of SST ids generated. This function applies the
|
||||
/// result and generates a new LSM state. The functions should only change `l0_sstables` and `levels` without changing memtables
|
||||
/// and `sstables` hash map. Though there should only be one thread running compaction jobs, you should think about the case
|
||||
/// where an L0 SST gets flushed while the compactor generates new SSTs, and with that in mind, you should do some sanity checks
|
||||
/// in your implementation.
|
||||
pub fn apply_compaction_result(
|
||||
&self,
|
||||
snapshot: &LsmStorageState,
|
||||
task: &SimpleLeveledCompactionTask,
|
||||
output: &[usize],
|
||||
) -> (LsmStorageState, Vec<usize>) {
|
||||
let mut snapshot = snapshot.clone();
|
||||
let mut files_to_remove = Vec::new();
|
||||
if let Some(upper_level) = task.upper_level {
|
||||
assert_eq!(
|
||||
task.upper_level_sst_ids,
|
||||
snapshot.levels[upper_level - 1].1,
|
||||
"sst mismatched"
|
||||
);
|
||||
files_to_remove.extend(&snapshot.levels[upper_level - 1].1);
|
||||
snapshot.levels[upper_level - 1].1.clear();
|
||||
} else {
|
||||
assert_eq!(
|
||||
task.upper_level_sst_ids, snapshot.l0_sstables,
|
||||
"sst mismatched"
|
||||
);
|
||||
files_to_remove.extend(&snapshot.l0_sstables);
|
||||
snapshot.l0_sstables.clear();
|
||||
}
|
||||
assert_eq!(
|
||||
task.lower_level_sst_ids,
|
||||
snapshot.levels[task.lower_level - 1].1,
|
||||
"sst mismatched"
|
||||
);
|
||||
files_to_remove.extend(&snapshot.levels[task.lower_level - 1].1);
|
||||
snapshot.levels[task.lower_level - 1].1 = output.to_vec();
|
||||
(snapshot, files_to_remove)
|
||||
}
|
||||
}
|
||||
135
mini-lsm-mvcc/src/compact/tiered.rs
Normal file
135
mini-lsm-mvcc/src/compact/tiered.rs
Normal file
@@ -0,0 +1,135 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::lsm_storage::LsmStorageState;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct TieredCompactionTask {
|
||||
pub tiers: Vec<(usize, Vec<usize>)>,
|
||||
pub bottom_tier_included: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TieredCompactionOptions {
|
||||
pub num_tiers: usize,
|
||||
pub max_size_amplification_percent: usize,
|
||||
pub size_ratio: usize,
|
||||
pub min_merge_width: usize,
|
||||
}
|
||||
|
||||
pub struct TieredCompactionController {
|
||||
options: TieredCompactionOptions,
|
||||
}
|
||||
|
||||
impl TieredCompactionController {
|
||||
pub fn new(options: TieredCompactionOptions) -> Self {
|
||||
Self { options }
|
||||
}
|
||||
|
||||
pub fn generate_compaction_task(
|
||||
&self,
|
||||
snapshot: &LsmStorageState,
|
||||
) -> Option<TieredCompactionTask> {
|
||||
assert!(
|
||||
snapshot.l0_sstables.is_empty(),
|
||||
"should not add l0 ssts in tiered compaction"
|
||||
);
|
||||
if snapshot.levels.len() < self.options.num_tiers {
|
||||
return None;
|
||||
}
|
||||
// compaction triggered by space amplification ratio
|
||||
let mut size = 0;
|
||||
for id in 0..(snapshot.levels.len() - 1) {
|
||||
size += snapshot.levels[id].1.len();
|
||||
}
|
||||
let space_amp_ratio =
|
||||
(size as f64) / (snapshot.levels.last().unwrap().1.len() as f64) * 100.0;
|
||||
if space_amp_ratio >= self.options.max_size_amplification_percent as f64 {
|
||||
println!(
|
||||
"compaction triggered by space amplification ratio: {}",
|
||||
space_amp_ratio
|
||||
);
|
||||
return Some(TieredCompactionTask {
|
||||
tiers: snapshot.levels.clone(),
|
||||
bottom_tier_included: true,
|
||||
});
|
||||
}
|
||||
let size_ratio_trigger = (100.0 + self.options.size_ratio as f64) / 100.0;
|
||||
// compaction triggered by size ratio
|
||||
let mut size = 0;
|
||||
for id in 0..(snapshot.levels.len() - 1) {
|
||||
size += snapshot.levels[id].1.len();
|
||||
let next_level_size = snapshot.levels[id + 1].1.len();
|
||||
let current_size_ratio = size as f64 / next_level_size as f64;
|
||||
if current_size_ratio >= size_ratio_trigger && id + 2 >= self.options.min_merge_width {
|
||||
println!(
|
||||
"compaction triggered by size ratio: {}",
|
||||
current_size_ratio * 100.0
|
||||
);
|
||||
return Some(TieredCompactionTask {
|
||||
tiers: snapshot
|
||||
.levels
|
||||
.iter()
|
||||
.take(id + 2)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>(),
|
||||
bottom_tier_included: id + 2 >= snapshot.levels.len(),
|
||||
});
|
||||
}
|
||||
}
|
||||
// trying to reduce sorted runs without respecting size ratio
|
||||
let num_tiers_to_take = snapshot.levels.len() - self.options.num_tiers + 2;
|
||||
println!("compaction triggered by reducing sorted runs");
|
||||
return Some(TieredCompactionTask {
|
||||
tiers: snapshot
|
||||
.levels
|
||||
.iter()
|
||||
.take(num_tiers_to_take)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>(),
|
||||
bottom_tier_included: snapshot.levels.len() >= num_tiers_to_take,
|
||||
});
|
||||
}
|
||||
|
||||
pub fn apply_compaction_result(
|
||||
&self,
|
||||
snapshot: &LsmStorageState,
|
||||
task: &TieredCompactionTask,
|
||||
output: &[usize],
|
||||
) -> (LsmStorageState, Vec<usize>) {
|
||||
assert!(
|
||||
snapshot.l0_sstables.is_empty(),
|
||||
"should not add l0 ssts in tiered compaction"
|
||||
);
|
||||
let mut snapshot = snapshot.clone();
|
||||
let mut tier_to_remove = task
|
||||
.tiers
|
||||
.iter()
|
||||
.map(|(x, y)| (*x, y))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let mut levels = Vec::new();
|
||||
let mut new_tier_added = false;
|
||||
let mut files_to_remove = Vec::new();
|
||||
for (tier_id, files) in &snapshot.levels {
|
||||
if let Some(ffiles) = tier_to_remove.remove(tier_id) {
|
||||
// the tier should be removed
|
||||
assert_eq!(ffiles, files, "file changed after issuing compaction task");
|
||||
files_to_remove.extend(ffiles.iter().copied());
|
||||
} else {
|
||||
// retain the tier
|
||||
levels.push((*tier_id, files.clone()));
|
||||
}
|
||||
if tier_to_remove.is_empty() && !new_tier_added {
|
||||
// add the compacted tier to the LSM tree
|
||||
new_tier_added = true;
|
||||
levels.push((output[0], output.to_vec()));
|
||||
}
|
||||
}
|
||||
if !tier_to_remove.is_empty() {
|
||||
unreachable!("some tiers not found??");
|
||||
}
|
||||
snapshot.levels = levels;
|
||||
(snapshot, files_to_remove)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user