@@ -18,6 +18,7 @@ parking_lot = "0.12"
|
||||
ouroboros = "0.15"
|
||||
moka = "0.9"
|
||||
clap = { version = "4.4.17", features = ["derive"] }
|
||||
rand = "0.8.5"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use clap::Parser;
|
||||
use mini_lsm::compact::{
|
||||
SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController,
|
||||
TieredCompactionOptions,
|
||||
LeveledCompactionController, LeveledCompactionOptions, SimpleLeveledCompactionController,
|
||||
SimpleLeveledCompactionOptions, TieredCompactionController, TieredCompactionOptions,
|
||||
};
|
||||
use mini_lsm::lsm_storage::LsmStorageInner;
|
||||
use mini_lsm::mem_table::MemTable;
|
||||
use mini_lsm::table::SsTable;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
@@ -38,7 +40,22 @@ enum Args {
|
||||
#[clap(long, default_value = "50")]
|
||||
iterations: usize,
|
||||
},
|
||||
Leveled {},
|
||||
Leveled {
|
||||
#[clap(long)]
|
||||
dump_real_id: bool,
|
||||
#[clap(long, default_value = "2")]
|
||||
level0_file_num_compaction_trigger: usize,
|
||||
#[clap(long, default_value = "2")]
|
||||
level_size_multiplier: usize,
|
||||
#[clap(long, default_value = "4")]
|
||||
max_levels: usize,
|
||||
#[clap(long, default_value = "128")]
|
||||
base_level_size_mb: usize,
|
||||
#[clap(long, default_value = "50")]
|
||||
iterations: usize,
|
||||
#[clap(long, default_value = "32")]
|
||||
sst_size_mb: usize,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct MockStorage {
|
||||
@@ -74,12 +91,13 @@ impl MockStorage {
|
||||
id
|
||||
}
|
||||
|
||||
pub fn flush_sst_to_l0(&mut self) {
|
||||
pub fn flush_sst_to_l0(&mut self) -> usize {
|
||||
let id = self.generate_sst_id();
|
||||
self.snapshot.l0_sstables.push(id);
|
||||
self.file_list.insert(id, id);
|
||||
self.total_flushes += 1;
|
||||
self.total_writes += 1;
|
||||
id
|
||||
}
|
||||
|
||||
pub fn flush_sst_to_new_tier(&mut self) {
|
||||
@@ -97,7 +115,30 @@ impl MockStorage {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dump_original_id(&self, always_show_l0: bool) {
|
||||
fn check_keys(&self) {
|
||||
for (level, files) in &self.snapshot.levels {
|
||||
if files.len() >= 2 {
|
||||
for id in 0..(files.len() - 1) {
|
||||
let this_file = self.snapshot.sstables[&files[id]].clone();
|
||||
let next_file = self.snapshot.sstables[&files[id + 1]].clone();
|
||||
if this_file.last_key() >= next_file.first_key() {
|
||||
panic!(
|
||||
"invalid file arrangement in L{}: id={}, range={:x}..={:x}; id={}, range={:x}..={:x}",
|
||||
level,
|
||||
this_file.sst_id(),
|
||||
this_file.first_key().clone().get_u64(),
|
||||
this_file.last_key().clone().get_u64(),
|
||||
next_file.sst_id(),
|
||||
next_file.first_key().clone().get_u64(),
|
||||
next_file.last_key().clone().get_u64()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dump_original_id(&self, always_show_l0: bool, with_key: bool) {
|
||||
if !self.snapshot.l0_sstables.is_empty() || always_show_l0 {
|
||||
println!(
|
||||
"L0 ({}): {:?}",
|
||||
@@ -112,9 +153,12 @@ impl MockStorage {
|
||||
files.iter().map(|x| self.file_list[x]).collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
if with_key {
|
||||
self.check_keys();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dump_real_id(&self, always_show_l0: bool) {
|
||||
pub fn dump_real_id(&self, always_show_l0: bool, with_key: bool) {
|
||||
if !self.snapshot.l0_sstables.is_empty() || always_show_l0 {
|
||||
println!(
|
||||
"L0 ({}): {:?}",
|
||||
@@ -125,9 +169,47 @@ impl MockStorage {
|
||||
for (level, files) in &self.snapshot.levels {
|
||||
println!("L{level} ({}): {:?}", files.len(), files);
|
||||
}
|
||||
if with_key {
|
||||
self.check_keys();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_random_key_range() -> (Bytes, Bytes) {
|
||||
use rand::Rng;
|
||||
let mut rng = rand::thread_rng();
|
||||
let begin: usize = rng.gen_range(0..(1 << 31));
|
||||
let end: usize = begin + rng.gen_range((1 << 10)..(1 << 31));
|
||||
let mut begin_bytes = BytesMut::new();
|
||||
let mut end_bytes = BytesMut::new();
|
||||
begin_bytes.put_u64(begin as u64);
|
||||
end_bytes.put_u64(end as u64);
|
||||
(begin_bytes.into(), end_bytes.into())
|
||||
}
|
||||
|
||||
fn generate_random_split(
|
||||
mut begin_bytes: Bytes,
|
||||
mut end_bytes: Bytes,
|
||||
split: usize,
|
||||
) -> Vec<(Bytes, Bytes)> {
|
||||
let begin = begin_bytes.get_u64();
|
||||
let end = end_bytes.get_u64();
|
||||
let len = end - begin + 1;
|
||||
let mut result = Vec::new();
|
||||
let split = split as u64;
|
||||
assert!(len >= split, "well, this is unfortunate... run again!");
|
||||
for i in 0..split {
|
||||
let nb = begin + len * i / split;
|
||||
let ne = begin + len * (i + 1) / split - 1;
|
||||
let mut begin_bytes = BytesMut::new();
|
||||
let mut end_bytes = BytesMut::new();
|
||||
begin_bytes.put_u64(nb as u64);
|
||||
end_bytes.put_u64(ne as u64);
|
||||
result.push((begin_bytes.into(), end_bytes.into()));
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let args = Args::parse();
|
||||
match args {
|
||||
@@ -154,9 +236,9 @@ fn main() {
|
||||
storage.flush_sst_to_l0();
|
||||
println!("--- After Flush ---");
|
||||
if dump_real_id {
|
||||
storage.dump_real_id(true);
|
||||
storage.dump_real_id(true, false);
|
||||
} else {
|
||||
storage.dump_original_id(true);
|
||||
storage.dump_original_id(true, false);
|
||||
}
|
||||
let mut num_compactions = 0;
|
||||
while let Some(task) = controller.generate_compaction_task(&storage.snapshot) {
|
||||
@@ -189,9 +271,9 @@ fn main() {
|
||||
storage.remove(&del);
|
||||
println!("--- After Compaction ---");
|
||||
if dump_real_id {
|
||||
storage.dump_real_id(true);
|
||||
storage.dump_real_id(true, false);
|
||||
} else {
|
||||
storage.dump_original_id(true);
|
||||
storage.dump_original_id(true, false);
|
||||
}
|
||||
num_compactions += 1;
|
||||
if num_compactions >= max_levels * 2 {
|
||||
@@ -212,7 +294,7 @@ fn main() {
|
||||
storage.total_writes as f64 / storage.total_flushes as f64
|
||||
);
|
||||
println!(
|
||||
"Space Amplification: {}/{}={:.3}x",
|
||||
"Maximum Space Usage: {}/{}={:.3}x",
|
||||
max_space,
|
||||
storage.total_flushes,
|
||||
max_space as f64 / storage.total_flushes as f64
|
||||
@@ -251,13 +333,13 @@ fn main() {
|
||||
storage.flush_sst_to_new_tier();
|
||||
println!("--- After Flush ---");
|
||||
if dump_real_id {
|
||||
storage.dump_real_id(false);
|
||||
storage.dump_real_id(false, false);
|
||||
} else {
|
||||
storage.dump_original_id(false);
|
||||
storage.dump_original_id(false, false);
|
||||
}
|
||||
let task = controller.generate_compaction_task(&storage.snapshot);
|
||||
println!("--- Compaction Task ---");
|
||||
if let Some(task) = task {
|
||||
let mut num_compactions = 0;
|
||||
while let Some(task) = controller.generate_compaction_task(&storage.snapshot) {
|
||||
let mut sst_ids = Vec::new();
|
||||
for (tier_id, files) in &task.tiers {
|
||||
for file in files {
|
||||
@@ -276,12 +358,19 @@ fn main() {
|
||||
storage.remove(&del);
|
||||
println!("--- After Compaction ---");
|
||||
if dump_real_id {
|
||||
storage.dump_real_id(false);
|
||||
storage.dump_real_id(false, false);
|
||||
} else {
|
||||
storage.dump_original_id(false);
|
||||
storage.dump_original_id(false, false);
|
||||
}
|
||||
} else {
|
||||
num_compactions += 1;
|
||||
if num_compactions >= level0_file_num_compaction_trigger * 3 {
|
||||
panic!("compaction does not converge?");
|
||||
}
|
||||
}
|
||||
if num_compactions == 0 {
|
||||
println!("no compaction triggered");
|
||||
} else {
|
||||
println!("{num_compactions} compaction triggered in this iteration");
|
||||
}
|
||||
max_space = max_space.max(storage.file_list.len());
|
||||
println!("--- Statistics ---");
|
||||
@@ -292,7 +381,173 @@ fn main() {
|
||||
storage.total_writes as f64 / storage.total_flushes as f64
|
||||
);
|
||||
println!(
|
||||
"Space Amplification: {}/{}={:.3}x",
|
||||
"Maximum Space Usage: {}/{}={:.3}x",
|
||||
max_space,
|
||||
storage.total_flushes,
|
||||
max_space as f64 / storage.total_flushes as f64
|
||||
);
|
||||
println!(
|
||||
"Read Amplification: {}x",
|
||||
storage.snapshot.l0_sstables.len()
|
||||
+ storage
|
||||
.snapshot
|
||||
.levels
|
||||
.iter()
|
||||
.filter(|(_, f)| !f.is_empty())
|
||||
.count()
|
||||
);
|
||||
println!();
|
||||
}
|
||||
}
|
||||
Args::Leveled {
|
||||
dump_real_id,
|
||||
level0_file_num_compaction_trigger,
|
||||
level_size_multiplier,
|
||||
max_levels,
|
||||
base_level_size_mb,
|
||||
iterations,
|
||||
sst_size_mb,
|
||||
} => {
|
||||
let controller = LeveledCompactionController::new(LeveledCompactionOptions {
|
||||
level0_file_num_compaction_trigger,
|
||||
level_size_multiplier,
|
||||
max_levels,
|
||||
base_level_size_mb,
|
||||
});
|
||||
|
||||
let mut storage = MockStorage::new();
|
||||
for i in 0..max_levels {
|
||||
storage.snapshot.levels.push((i + 1, Vec::new()));
|
||||
}
|
||||
let mut max_space = 0;
|
||||
for i in 0..iterations {
|
||||
println!("=== Iteration {i} ===");
|
||||
let id = storage.flush_sst_to_l0();
|
||||
let (first_key, last_key) = generate_random_key_range();
|
||||
storage.snapshot.sstables.insert(
|
||||
id,
|
||||
Arc::new(SsTable::create_meta_only(
|
||||
id,
|
||||
sst_size_mb as u64 * 1024 * 1024,
|
||||
first_key,
|
||||
last_key,
|
||||
)),
|
||||
);
|
||||
println!("--- After Flush ---");
|
||||
if dump_real_id {
|
||||
storage.dump_real_id(false, true);
|
||||
} else {
|
||||
storage.dump_original_id(false, true);
|
||||
}
|
||||
let mut num_compactions = 0;
|
||||
while let Some(task) = controller.generate_compaction_task(&storage.snapshot) {
|
||||
let mut sst_ids = Vec::new();
|
||||
let split_num = task.upper_level_sst_ids.len() + task.lower_level_sst_ids.len();
|
||||
let mut first_keys = Vec::new();
|
||||
let mut last_keys = Vec::new();
|
||||
for file in task
|
||||
.upper_level_sst_ids
|
||||
.iter()
|
||||
.chain(task.lower_level_sst_ids.iter())
|
||||
{
|
||||
first_keys.push(storage.snapshot.sstables[file].first_key().clone());
|
||||
last_keys.push(storage.snapshot.sstables[file].last_key().clone());
|
||||
}
|
||||
let begin = first_keys.into_iter().min().unwrap();
|
||||
let end = last_keys.into_iter().max().unwrap();
|
||||
let splits = generate_random_split(begin, end, split_num);
|
||||
for (id, file) in task
|
||||
.upper_level_sst_ids
|
||||
.iter()
|
||||
.chain(task.lower_level_sst_ids.iter())
|
||||
.enumerate()
|
||||
{
|
||||
let new_sst_id = storage.generate_sst_id();
|
||||
sst_ids.push(new_sst_id);
|
||||
storage.file_list.insert(new_sst_id, *file);
|
||||
storage.total_writes += 1;
|
||||
storage.snapshot.sstables.insert(
|
||||
new_sst_id,
|
||||
Arc::new(SsTable::create_meta_only(
|
||||
new_sst_id,
|
||||
sst_size_mb as u64 * 1024 * 1024,
|
||||
splits[id].0.clone(),
|
||||
splits[id].1.clone(),
|
||||
)),
|
||||
);
|
||||
}
|
||||
print!(
|
||||
"Upper L{} [{}] ",
|
||||
task.upper_level.unwrap_or_default(),
|
||||
task.upper_level_sst_ids
|
||||
.iter()
|
||||
.map(|id| format!(
|
||||
"{}.sst {:x}..={:x}",
|
||||
id,
|
||||
storage.snapshot.sstables[id].first_key().clone().get_u64(),
|
||||
storage.snapshot.sstables[id].last_key().clone().get_u64()
|
||||
))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
);
|
||||
print!(
|
||||
"Lower L{} [{}] ",
|
||||
task.lower_level,
|
||||
task.lower_level_sst_ids
|
||||
.iter()
|
||||
.map(|id| format!(
|
||||
"{}.sst {:x}..={:x}",
|
||||
id,
|
||||
storage.snapshot.sstables[id].first_key().clone().get_u64(),
|
||||
storage.snapshot.sstables[id].last_key().clone().get_u64()
|
||||
))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
);
|
||||
println!(
|
||||
"-> [{}]",
|
||||
sst_ids
|
||||
.iter()
|
||||
.map(|id| format!(
|
||||
"{}.sst {:x}..={:x}",
|
||||
id,
|
||||
storage.snapshot.sstables[id].first_key().clone().get_u64(),
|
||||
storage.snapshot.sstables[id].last_key().clone().get_u64()
|
||||
))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
);
|
||||
max_space = max_space.max(storage.file_list.len());
|
||||
let (snapshot, del) =
|
||||
controller.apply_compaction_result(&storage.snapshot, &task, &sst_ids);
|
||||
storage.snapshot = snapshot;
|
||||
storage.remove(&del);
|
||||
println!("--- After Compaction ---");
|
||||
if dump_real_id {
|
||||
storage.dump_real_id(true, true);
|
||||
} else {
|
||||
storage.dump_original_id(true, true);
|
||||
}
|
||||
num_compactions += 1;
|
||||
if num_compactions >= storage.file_list.len() * max_levels {
|
||||
panic!("compaction does not converge?");
|
||||
}
|
||||
}
|
||||
if num_compactions == 0 {
|
||||
println!("no compaction triggered");
|
||||
} else {
|
||||
println!("{num_compactions} compaction triggered in this iteration");
|
||||
}
|
||||
max_space = max_space.max(storage.file_list.len());
|
||||
println!("--- Statistics ---");
|
||||
println!(
|
||||
"Write Amplification: {}/{}={:.3}x",
|
||||
storage.total_writes,
|
||||
storage.total_flushes,
|
||||
storage.total_writes as f64 / storage.total_flushes as f64
|
||||
);
|
||||
println!(
|
||||
"Maximum Space Usage: {}/{}={:.3}x",
|
||||
max_space,
|
||||
storage.total_flushes,
|
||||
max_space as f64 / storage.total_flushes as f64
|
||||
@@ -310,6 +565,5 @@ fn main() {
|
||||
println!();
|
||||
}
|
||||
}
|
||||
Args::Leveled {} => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ mod tiered;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
pub use leveled::{LeveledCompactionController, LeveledCompactionTask};
|
||||
pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask};
|
||||
pub use simple_leveled::{
|
||||
SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask,
|
||||
};
|
||||
@@ -37,9 +37,7 @@ impl LsmStorage {
|
||||
let mut iters = Vec::new();
|
||||
iters.reserve(tables.len());
|
||||
for table in tables.iter() {
|
||||
iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
|
||||
table.clone(),
|
||||
)?));
|
||||
iters.push(Box::new(SsTableIterator::create_and_seek_to_first(table.clone())?));
|
||||
}
|
||||
let mut iter = MergeIterator::create(iters);
|
||||
|
||||
|
||||
@@ -1,18 +1,150 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use crate::lsm_storage::LsmStorageInner;
|
||||
|
||||
pub struct LeveledCompactionTask {
|
||||
// if upper_level is `None`, then it is L0 compaction
|
||||
upper_level: Option<usize>,
|
||||
upper_level_sst_ids: Vec<usize>,
|
||||
lower_level: usize,
|
||||
lower_level_sst_ids: Vec<usize>,
|
||||
pub upper_level: Option<usize>,
|
||||
pub upper_level_sst_ids: Vec<usize>,
|
||||
pub lower_level: usize,
|
||||
pub lower_level_sst_ids: Vec<usize>,
|
||||
}
|
||||
|
||||
pub struct LeveledCompactionController {}
|
||||
pub struct LeveledCompactionOptions {
|
||||
pub level_size_multiplier: usize,
|
||||
pub level0_file_num_compaction_trigger: usize,
|
||||
pub max_levels: usize,
|
||||
pub base_level_size_mb: usize,
|
||||
}
|
||||
|
||||
pub struct LeveledCompactionController {
|
||||
options: LeveledCompactionOptions,
|
||||
}
|
||||
|
||||
impl LeveledCompactionController {
|
||||
pub fn generate_compaction_task(&self, snapshot: &LsmStorageInner) -> LeveledCompactionTask {
|
||||
unimplemented!()
|
||||
pub fn new(options: LeveledCompactionOptions) -> Self {
|
||||
Self { options }
|
||||
}
|
||||
|
||||
fn find_overlapping_ssts(
|
||||
&self,
|
||||
snapshot: &LsmStorageInner,
|
||||
sst_ids: &[usize],
|
||||
in_level: usize,
|
||||
) -> Vec<usize> {
|
||||
let begin_key = sst_ids
|
||||
.iter()
|
||||
.map(|id| snapshot.sstables[id].first_key())
|
||||
.min()
|
||||
.cloned()
|
||||
.unwrap();
|
||||
let end_key = sst_ids
|
||||
.iter()
|
||||
.map(|id| snapshot.sstables[id].last_key())
|
||||
.max()
|
||||
.cloned()
|
||||
.unwrap();
|
||||
let mut overlap_ssts = Vec::new();
|
||||
for sst_id in &snapshot.levels[in_level - 1].1 {
|
||||
let sst = &snapshot.sstables[sst_id];
|
||||
let first_key = sst.first_key();
|
||||
let last_key = sst.last_key();
|
||||
if !(last_key < &begin_key || first_key > &end_key) {
|
||||
overlap_ssts.push(*sst_id);
|
||||
}
|
||||
}
|
||||
overlap_ssts
|
||||
}
|
||||
|
||||
pub fn generate_compaction_task(
|
||||
&self,
|
||||
snapshot: &LsmStorageInner,
|
||||
) -> Option<LeveledCompactionTask> {
|
||||
// step 1: compute target level size
|
||||
let mut target_level_size = (0..self.options.max_levels).map(|_| 0).collect::<Vec<_>>(); // exclude level 0
|
||||
let mut real_level_size = Vec::with_capacity(self.options.max_levels);
|
||||
let mut base_level = self.options.max_levels;
|
||||
for i in 0..self.options.max_levels {
|
||||
real_level_size.push(
|
||||
snapshot.levels[i]
|
||||
.1
|
||||
.iter()
|
||||
.map(|x| snapshot.sstables.get(x).unwrap().table_size())
|
||||
.sum::<u64>() as usize,
|
||||
);
|
||||
}
|
||||
let base_level_size_bytes = self.options.base_level_size_mb as usize * 1024 * 1024;
|
||||
|
||||
// select base level and compute target level size
|
||||
target_level_size[self.options.max_levels - 1] =
|
||||
real_level_size[self.options.max_levels - 1].max(base_level_size_bytes);
|
||||
for i in (0..(self.options.max_levels - 1)).rev() {
|
||||
let next_level_size = target_level_size[i + 1];
|
||||
let this_level_size = next_level_size / self.options.level_size_multiplier;
|
||||
if next_level_size > base_level_size_bytes {
|
||||
target_level_size[i] = this_level_size;
|
||||
}
|
||||
if target_level_size[i] > 0 {
|
||||
base_level = i + 1;
|
||||
}
|
||||
}
|
||||
|
||||
println!(
|
||||
"target level sizes: {:?}, real level sizes: {:?}, base_level: {}",
|
||||
target_level_size
|
||||
.iter()
|
||||
.map(|x| format!("{}MB", x / 1024 / 1024))
|
||||
.collect::<Vec<_>>(),
|
||||
real_level_size
|
||||
.iter()
|
||||
.map(|x| format!("{}MB", x / 1024 / 1024))
|
||||
.collect::<Vec<_>>(),
|
||||
base_level,
|
||||
);
|
||||
|
||||
// Flush L0 SST is the top priority
|
||||
if snapshot.l0_sstables.len() >= self.options.level0_file_num_compaction_trigger {
|
||||
println!("flush L0 SST to base level {}", base_level);
|
||||
return Some(LeveledCompactionTask {
|
||||
upper_level: None,
|
||||
upper_level_sst_ids: snapshot.l0_sstables.clone(),
|
||||
lower_level: base_level,
|
||||
lower_level_sst_ids: self.find_overlapping_ssts(
|
||||
snapshot,
|
||||
&snapshot.l0_sstables,
|
||||
base_level,
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
let mut priorities = Vec::with_capacity(self.options.max_levels);
|
||||
for level in 0..self.options.max_levels {
|
||||
let prio = real_level_size[level] as f64 / target_level_size[level] as f64;
|
||||
if prio > 1.0 {
|
||||
priorities.push((prio, level + 1));
|
||||
}
|
||||
}
|
||||
priorities.sort_by(|a, b| a.partial_cmp(b).unwrap().reverse());
|
||||
let priority = priorities.first();
|
||||
if let Some((_, level)) = priority {
|
||||
let level = *level;
|
||||
let selected_sst = snapshot.levels[level - 1].1.iter().min().copied().unwrap(); // select the oldest sst to compact
|
||||
println!(
|
||||
"compaction triggered by priority: {level} out of {:?}, select {selected_sst} for compaction",
|
||||
priorities
|
||||
);
|
||||
return Some(LeveledCompactionTask {
|
||||
upper_level: Some(level),
|
||||
upper_level_sst_ids: vec![selected_sst],
|
||||
lower_level: level + 1,
|
||||
lower_level_sst_ids: self.find_overlapping_ssts(
|
||||
snapshot,
|
||||
&[selected_sst],
|
||||
level + 1,
|
||||
),
|
||||
});
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn apply_compaction_result(
|
||||
@@ -21,6 +153,71 @@ impl LeveledCompactionController {
|
||||
task: &LeveledCompactionTask,
|
||||
output: &[usize],
|
||||
) -> (LsmStorageInner, Vec<usize>) {
|
||||
unimplemented!()
|
||||
let mut snapshot = snapshot.clone();
|
||||
let mut files_to_remove = Vec::new();
|
||||
let mut upper_level_sst_ids_set = task
|
||||
.upper_level_sst_ids
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<HashSet<_>>();
|
||||
let mut lower_level_sst_ids_set = task
|
||||
.lower_level_sst_ids
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<HashSet<_>>();
|
||||
if let Some(upper_level) = task.upper_level {
|
||||
let new_upper_level_ssts =
|
||||
snapshot.levels[upper_level - 1]
|
||||
.1
|
||||
.iter()
|
||||
.filter_map(|x| {
|
||||
if upper_level_sst_ids_set.remove(x) {
|
||||
return None;
|
||||
}
|
||||
Some(*x)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert!(upper_level_sst_ids_set.is_empty());
|
||||
snapshot.levels[upper_level - 1].1 = new_upper_level_ssts;
|
||||
} else {
|
||||
let new_l0_ssts = snapshot
|
||||
.l0_sstables
|
||||
.iter()
|
||||
.filter_map(|x| {
|
||||
if upper_level_sst_ids_set.remove(x) {
|
||||
return None;
|
||||
}
|
||||
Some(*x)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert!(upper_level_sst_ids_set.is_empty());
|
||||
snapshot.l0_sstables = new_l0_ssts;
|
||||
}
|
||||
|
||||
files_to_remove.extend(&task.upper_level_sst_ids);
|
||||
files_to_remove.extend(&task.lower_level_sst_ids);
|
||||
|
||||
let mut new_lower_level_ssts = snapshot.levels[task.lower_level - 1]
|
||||
.1
|
||||
.iter()
|
||||
.filter_map(|x| {
|
||||
if lower_level_sst_ids_set.remove(x) {
|
||||
return None;
|
||||
}
|
||||
Some(*x)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert!(lower_level_sst_ids_set.is_empty());
|
||||
new_lower_level_ssts.extend(output);
|
||||
new_lower_level_ssts.sort_by(|x, y| {
|
||||
snapshot
|
||||
.sstables
|
||||
.get(x)
|
||||
.unwrap()
|
||||
.first_key()
|
||||
.cmp(snapshot.sstables.get(y).unwrap().first_key())
|
||||
});
|
||||
snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts;
|
||||
(snapshot, files_to_remove)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::lsm_storage::LsmStorageInner;
|
||||
|
||||
pub struct SimpleLeveledCompactionOptions {
|
||||
|
||||
@@ -73,7 +73,7 @@ impl TieredCompactionController {
|
||||
}
|
||||
// trying to reduce sorted runs without respecting size ratio
|
||||
let num_tiers_to_take =
|
||||
snapshot.levels.len() - self.options.level0_file_num_compaction_trigger + 1;
|
||||
snapshot.levels.len() - self.options.level0_file_num_compaction_trigger + 2;
|
||||
println!("compaction triggered by reducing sorted runs");
|
||||
return Some(TieredCompactionTask {
|
||||
tiers: snapshot
|
||||
|
||||
@@ -136,9 +136,6 @@ impl LsmStorage {
|
||||
}
|
||||
|
||||
/// Persist data to disk.
|
||||
///
|
||||
/// In day 3: flush the current memtable to disk as L0 SST.
|
||||
/// In day 6: call `fsync` on WAL.
|
||||
pub fn sync(&self) -> Result<()> {
|
||||
let _flush_lock = self.flush_lock.lock();
|
||||
|
||||
@@ -226,9 +223,6 @@ impl LsmStorage {
|
||||
|
||||
let iter = TwoMergeIterator::create(memtable_iter, table_iter)?;
|
||||
|
||||
Ok(FusedIterator::new(LsmIterator::new(
|
||||
iter,
|
||||
map_bound(upper),
|
||||
)?))
|
||||
Ok(FusedIterator::new(LsmIterator::new(iter, map_bound(upper))?))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ pub use builder::SsTableBuilder;
|
||||
use bytes::{Buf, BufMut, Bytes};
|
||||
pub use iterator::SsTableIterator;
|
||||
|
||||
use crate::block::{self, Block};
|
||||
use crate::block::Block;
|
||||
use crate::lsm_storage::BlockCache;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
@@ -75,27 +75,6 @@ impl BlockMeta {
|
||||
/// A file object.
|
||||
///
|
||||
/// Before day 4, it should look like:
|
||||
///
|
||||
/// ```ignore
|
||||
/// pub struct FileObject(Bytes);
|
||||
///
|
||||
/// impl FileObject {
|
||||
/// pub fn read(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
|
||||
/// Ok(self.0[offset as usize..(offset + len) as usize].to_vec())
|
||||
/// }
|
||||
/// pub fn size(&self) -> u64 {
|
||||
/// self.0.len() as u64
|
||||
/// }
|
||||
///
|
||||
/// pub fn create(_path: &Path, data: Vec<u8>) -> Result<Self> {
|
||||
/// Ok(FileObject(data.into()))
|
||||
/// }
|
||||
///
|
||||
/// pub fn open(_path: &Path) -> Result<Self> {
|
||||
/// unimplemented!()
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub struct FileObject(Option<File>, u64);
|
||||
|
||||
impl FileObject {
|
||||
@@ -162,13 +141,8 @@ impl SsTable {
|
||||
}
|
||||
|
||||
/// Create a mock SST with only first key + last key metadata
|
||||
pub fn create_meta_only(
|
||||
id: usize,
|
||||
file_size: u64,
|
||||
first_key: Bytes,
|
||||
last_key: Bytes,
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
pub fn create_meta_only(id: usize, file_size: u64, first_key: Bytes, last_key: Bytes) -> Self {
|
||||
Self {
|
||||
file: FileObject(None, file_size),
|
||||
block_metas: vec![],
|
||||
block_meta_offset: 0,
|
||||
@@ -176,7 +150,7 @@ impl SsTable {
|
||||
block_cache: None,
|
||||
first_key,
|
||||
last_key,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Read a block from the disk.
|
||||
@@ -215,6 +189,22 @@ impl SsTable {
|
||||
pub fn num_of_blocks(&self) -> usize {
|
||||
self.block_metas.len()
|
||||
}
|
||||
|
||||
pub fn first_key(&self) -> &Bytes {
|
||||
&self.first_key
|
||||
}
|
||||
|
||||
pub fn last_key(&self) -> &Bytes {
|
||||
&self.last_key
|
||||
}
|
||||
|
||||
pub fn table_size(&self) -> u64 {
|
||||
self.file.1
|
||||
}
|
||||
|
||||
pub fn sst_id(&self) -> usize {
|
||||
self.id
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user