fix: universal compaction condition (#97)
Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
@@ -16,6 +16,7 @@ pub struct TieredCompactionOptions {
|
||||
pub max_size_amplification_percent: usize,
|
||||
pub size_ratio: usize,
|
||||
pub min_merge_width: usize,
|
||||
pub max_merge_width: Option<usize>,
|
||||
}
|
||||
|
||||
pub struct TieredCompactionController {
|
||||
@@ -61,25 +62,29 @@ impl TieredCompactionController {
|
||||
for id in 0..(snapshot.levels.len() - 1) {
|
||||
size += snapshot.levels[id].1.len();
|
||||
let next_level_size = snapshot.levels[id + 1].1.len();
|
||||
let current_size_ratio = size as f64 / next_level_size as f64;
|
||||
if current_size_ratio >= size_ratio_trigger && id + 2 >= self.options.min_merge_width {
|
||||
let current_size_ratio = next_level_size as f64 / size as f64;
|
||||
if current_size_ratio > size_ratio_trigger && id + 1 >= self.options.min_merge_width {
|
||||
println!(
|
||||
"compaction triggered by size ratio: {}",
|
||||
current_size_ratio * 100.0
|
||||
"compaction triggered by size ratio: {} > {}",
|
||||
current_size_ratio * 100.0,
|
||||
size_ratio_trigger * 100.0
|
||||
);
|
||||
return Some(TieredCompactionTask {
|
||||
tiers: snapshot
|
||||
.levels
|
||||
.iter()
|
||||
.take(id + 2)
|
||||
.take(id + 1)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>(),
|
||||
bottom_tier_included: id + 2 >= snapshot.levels.len(),
|
||||
bottom_tier_included: id + 1 >= snapshot.levels.len(),
|
||||
});
|
||||
}
|
||||
}
|
||||
// trying to reduce sorted runs without respecting size ratio
|
||||
let num_tiers_to_take = snapshot.levels.len() - self.options.num_tiers + 2;
|
||||
let num_tiers_to_take = snapshot
|
||||
.levels
|
||||
.len()
|
||||
.min(self.options.max_merge_width.unwrap_or(usize::MAX));
|
||||
println!("compaction triggered by reducing sorted runs");
|
||||
return Some(TieredCompactionTask {
|
||||
tiers: snapshot
|
||||
|
||||
@@ -87,7 +87,7 @@ impl Bloom {
|
||||
filter.resize(nbytes, 0);
|
||||
for h in keys {
|
||||
let mut h = *h;
|
||||
let delta = (h >> 17) | (h << 15);
|
||||
let delta = h.rotate_left(15);
|
||||
for _ in 0..k {
|
||||
let bit_pos = (h as usize) % nbits;
|
||||
filter.set_bit(bit_pos, true);
|
||||
@@ -107,7 +107,7 @@ impl Bloom {
|
||||
true
|
||||
} else {
|
||||
let nbits = self.filter.bit_len();
|
||||
let delta = (h >> 17) | (h << 15);
|
||||
let delta = h.rotate_left(15);
|
||||
for _ in 0..self.k {
|
||||
let bit_pos = h % (nbits as u32);
|
||||
if !self.filter.get_bit(bit_pos as usize) {
|
||||
|
||||
@@ -19,8 +19,17 @@ use mini_lsm_wrapper::table::SsTable;
|
||||
#[command(author, version, about, long_about = None)]
|
||||
enum Args {
|
||||
Simple {
|
||||
/// Dump the generated ID instead of where the original data comes from.
|
||||
/// For example, if SST 1, 2, 3 is compacted to another level, it should have
|
||||
/// a new SST ID 4, 5, 6 as SSTs are immutable and write-once. With this flag
|
||||
/// enabled, you will see the new level has SST 1, 2, 3 because the data of
|
||||
/// 4, 5, 6 are originated from 1, 2, 3.
|
||||
#[clap(long)]
|
||||
dump_real_id: bool,
|
||||
/// Only dump size information instead of the layer files. if this is enabled,
|
||||
/// it will print one row per compaction iteration.
|
||||
#[clap(long)]
|
||||
size_only: bool,
|
||||
#[clap(long, default_value = "2")]
|
||||
level0_file_num_compaction_trigger: usize,
|
||||
#[clap(long, default_value = "3")]
|
||||
@@ -31,8 +40,17 @@ enum Args {
|
||||
iterations: usize,
|
||||
},
|
||||
Tiered {
|
||||
/// Dump the generated ID instead of where the original data comes from.
|
||||
/// For example, if SST 1, 2, 3 is compacted to another level, it should have
|
||||
/// a new SST ID 4, 5, 6 as SSTs are immutable and write-once. With this flag
|
||||
/// enabled, you will see the new level has SST 1, 2, 3 because the data of
|
||||
/// 4, 5, 6 are originated from 1, 2, 3.
|
||||
#[clap(long)]
|
||||
dump_real_id: bool,
|
||||
/// Only dump size information instead of the layer files. if this is enabled,
|
||||
/// it will print one row per compaction iteration.
|
||||
#[clap(long)]
|
||||
size_only: bool,
|
||||
#[clap(long, default_value = "3")]
|
||||
num_tiers: usize,
|
||||
#[clap(long, default_value = "200")]
|
||||
@@ -41,12 +59,23 @@ enum Args {
|
||||
size_ratio: usize,
|
||||
#[clap(long, default_value = "2")]
|
||||
min_merge_width: usize,
|
||||
#[clap(long)]
|
||||
max_merge_width: Option<usize>,
|
||||
#[clap(long, default_value = "50")]
|
||||
iterations: usize,
|
||||
},
|
||||
Leveled {
|
||||
/// Dump the generated ID instead of where the original data comes from.
|
||||
/// For example, if SST 1, 2, 3 is compacted to another level, it should have
|
||||
/// a new SST ID 4, 5, 6 as SSTs are immutable and write-once. With this flag
|
||||
/// enabled, you will see the new level has SST 1, 2, 3 because the data of
|
||||
/// 4, 5, 6 are originated from 1, 2, 3.
|
||||
#[clap(long)]
|
||||
dump_real_id: bool,
|
||||
/// Only dump size information instead of the layer files. if this is enabled,
|
||||
/// it will print one row per compaction iteration.
|
||||
#[clap(long)]
|
||||
size_only: bool,
|
||||
#[clap(long, default_value = "2")]
|
||||
level0_file_num_compaction_trigger: usize,
|
||||
#[clap(long, default_value = "2")]
|
||||
@@ -148,6 +177,14 @@ impl MockStorage {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dump_size_only(&self) {
|
||||
print!("Levels: {}", self.snapshot.l0_sstables.len());
|
||||
for (_, files) in &self.snapshot.levels {
|
||||
print!(" {}", files.len());
|
||||
}
|
||||
println!();
|
||||
}
|
||||
|
||||
pub fn dump_original_id(&self, always_show_l0: bool, with_key: bool) {
|
||||
if !self.snapshot.l0_sstables.is_empty() || always_show_l0 {
|
||||
println!(
|
||||
@@ -231,6 +268,7 @@ fn main() {
|
||||
match args {
|
||||
Args::Simple {
|
||||
dump_real_id,
|
||||
size_only,
|
||||
size_ratio_percent,
|
||||
iterations,
|
||||
level0_file_num_compaction_trigger,
|
||||
@@ -252,14 +290,18 @@ fn main() {
|
||||
println!("=== Iteration {i} ===");
|
||||
storage.flush_sst_to_l0();
|
||||
println!("--- After Flush ---");
|
||||
if dump_real_id {
|
||||
if size_only {
|
||||
storage.dump_size_only();
|
||||
} else if dump_real_id {
|
||||
storage.dump_real_id(true, false);
|
||||
} else {
|
||||
storage.dump_original_id(true, false);
|
||||
}
|
||||
let mut num_compactions = 0;
|
||||
while let Some(task) = {
|
||||
println!("--- Compaction Task ---");
|
||||
if !size_only {
|
||||
println!("--- Compaction Task ---");
|
||||
}
|
||||
controller.generate_compaction_task(&storage.snapshot)
|
||||
} {
|
||||
let mut sst_ids = Vec::new();
|
||||
@@ -289,7 +331,9 @@ fn main() {
|
||||
storage.snapshot = snapshot;
|
||||
storage.remove(&del);
|
||||
println!("--- After Compaction ---");
|
||||
if dump_real_id {
|
||||
if size_only {
|
||||
storage.dump_size_only();
|
||||
} else if dump_real_id {
|
||||
storage.dump_real_id(true, false);
|
||||
} else {
|
||||
storage.dump_original_id(true, false);
|
||||
@@ -333,10 +377,12 @@ fn main() {
|
||||
}
|
||||
Args::Tiered {
|
||||
dump_real_id,
|
||||
size_only,
|
||||
num_tiers: level0_file_num_compaction_trigger,
|
||||
max_size_amplification_percent,
|
||||
size_ratio,
|
||||
min_merge_width,
|
||||
max_merge_width,
|
||||
iterations,
|
||||
} => {
|
||||
let controller = TieredCompactionController::new(TieredCompactionOptions {
|
||||
@@ -344,6 +390,7 @@ fn main() {
|
||||
max_size_amplification_percent,
|
||||
size_ratio,
|
||||
min_merge_width,
|
||||
max_merge_width,
|
||||
});
|
||||
let mut storage = MockStorage::new();
|
||||
let mut max_space = 0;
|
||||
@@ -351,15 +398,21 @@ fn main() {
|
||||
println!("=== Iteration {i} ===");
|
||||
storage.flush_sst_to_new_tier();
|
||||
println!("--- After Flush ---");
|
||||
if dump_real_id {
|
||||
if size_only {
|
||||
storage.dump_size_only();
|
||||
} else if dump_real_id {
|
||||
storage.dump_real_id(false, false);
|
||||
} else {
|
||||
storage.dump_original_id(false, false);
|
||||
}
|
||||
println!("--- Compaction Task ---");
|
||||
if !size_only {
|
||||
println!("--- Compaction Task ---");
|
||||
}
|
||||
let mut num_compactions = 0;
|
||||
while let Some(task) = {
|
||||
println!("--- Compaction Task ---");
|
||||
if !size_only {
|
||||
println!("--- Compaction Task ---");
|
||||
}
|
||||
controller.generate_compaction_task(&storage.snapshot)
|
||||
} {
|
||||
let mut sst_ids = Vec::new();
|
||||
@@ -379,7 +432,9 @@ fn main() {
|
||||
storage.snapshot = snapshot;
|
||||
storage.remove(&del);
|
||||
println!("--- After Compaction ---");
|
||||
if dump_real_id {
|
||||
if size_only {
|
||||
storage.dump_size_only();
|
||||
} else if dump_real_id {
|
||||
storage.dump_real_id(false, false);
|
||||
} else {
|
||||
storage.dump_original_id(false, false);
|
||||
@@ -423,6 +478,7 @@ fn main() {
|
||||
}
|
||||
Args::Leveled {
|
||||
dump_real_id,
|
||||
size_only,
|
||||
level0_file_num_compaction_trigger,
|
||||
level_size_multiplier,
|
||||
max_levels,
|
||||
@@ -456,14 +512,18 @@ fn main() {
|
||||
)),
|
||||
);
|
||||
println!("--- After Flush ---");
|
||||
if dump_real_id {
|
||||
if size_only {
|
||||
storage.dump_size_only();
|
||||
} else if dump_real_id {
|
||||
storage.dump_real_id(false, true);
|
||||
} else {
|
||||
storage.dump_original_id(false, true);
|
||||
}
|
||||
let mut num_compactions = 0;
|
||||
while let Some(task) = {
|
||||
println!("--- Compaction Task ---");
|
||||
if !size_only {
|
||||
println!("--- Compaction Task ---");
|
||||
}
|
||||
controller.generate_compaction_task(&storage.snapshot)
|
||||
} {
|
||||
let mut sst_ids = Vec::new();
|
||||
@@ -570,7 +630,9 @@ fn main() {
|
||||
storage.snapshot = snapshot;
|
||||
storage.remove(&del);
|
||||
println!("--- After Compaction ---");
|
||||
if dump_real_id {
|
||||
if size_only {
|
||||
storage.dump_size_only();
|
||||
} else if dump_real_id {
|
||||
storage.dump_real_id(true, true);
|
||||
} else {
|
||||
storage.dump_original_id(true, true);
|
||||
|
||||
@@ -334,6 +334,7 @@ fn main() -> Result<()> {
|
||||
max_size_amplification_percent: 200,
|
||||
size_ratio: 1,
|
||||
min_merge_width: 2,
|
||||
max_merge_width: None,
|
||||
}),
|
||||
CompactionStrategy::Leveled => {
|
||||
CompactionOptions::Leveled(LeveledCompactionOptions {
|
||||
|
||||
@@ -14,6 +14,7 @@ pub struct TieredCompactionOptions {
|
||||
pub max_size_amplification_percent: usize,
|
||||
pub size_ratio: usize,
|
||||
pub min_merge_width: usize,
|
||||
pub max_merge_width: Option<usize>,
|
||||
}
|
||||
|
||||
pub struct TieredCompactionController {
|
||||
|
||||
@@ -94,7 +94,7 @@ impl Bloom {
|
||||
true
|
||||
} else {
|
||||
let nbits = self.filter.bit_len();
|
||||
let delta = (h >> 17) | (h << 15);
|
||||
let delta = h.rotate_left(15);
|
||||
|
||||
// TODO: probe the bloom filter
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ pub struct TieredCompactionOptions {
|
||||
pub max_size_amplification_percent: usize,
|
||||
pub size_ratio: usize,
|
||||
pub min_merge_width: usize,
|
||||
pub max_merge_width: Option<usize>,
|
||||
}
|
||||
|
||||
pub struct TieredCompactionController {
|
||||
@@ -61,25 +62,29 @@ impl TieredCompactionController {
|
||||
for id in 0..(snapshot.levels.len() - 1) {
|
||||
size += snapshot.levels[id].1.len();
|
||||
let next_level_size = snapshot.levels[id + 1].1.len();
|
||||
let current_size_ratio = size as f64 / next_level_size as f64;
|
||||
if current_size_ratio >= size_ratio_trigger && id + 2 >= self.options.min_merge_width {
|
||||
let current_size_ratio = next_level_size as f64 / size as f64;
|
||||
if current_size_ratio > size_ratio_trigger && id + 1 >= self.options.min_merge_width {
|
||||
println!(
|
||||
"compaction triggered by size ratio: {}",
|
||||
current_size_ratio * 100.0
|
||||
"compaction triggered by size ratio: {} > {}",
|
||||
current_size_ratio * 100.0,
|
||||
size_ratio_trigger * 100.0
|
||||
);
|
||||
return Some(TieredCompactionTask {
|
||||
tiers: snapshot
|
||||
.levels
|
||||
.iter()
|
||||
.take(id + 2)
|
||||
.take(id + 1)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>(),
|
||||
bottom_tier_included: id + 2 >= snapshot.levels.len(),
|
||||
bottom_tier_included: id + 1 >= snapshot.levels.len(),
|
||||
});
|
||||
}
|
||||
}
|
||||
// trying to reduce sorted runs without respecting size ratio
|
||||
let num_tiers_to_take = snapshot.levels.len() - self.options.num_tiers + 2;
|
||||
let num_tiers_to_take = snapshot
|
||||
.levels
|
||||
.len()
|
||||
.min(self.options.max_merge_width.unwrap_or(usize::MAX));
|
||||
println!("compaction triggered by reducing sorted runs");
|
||||
return Some(TieredCompactionTask {
|
||||
tiers: snapshot
|
||||
|
||||
@@ -87,7 +87,7 @@ impl Bloom {
|
||||
filter.resize(nbytes, 0);
|
||||
for h in keys {
|
||||
let mut h = *h;
|
||||
let delta = (h >> 17) | (h << 15);
|
||||
let delta = h.rotate_left(15);
|
||||
for _ in 0..k {
|
||||
let bit_pos = (h as usize) % nbits;
|
||||
filter.set_bit(bit_pos, true);
|
||||
@@ -107,7 +107,7 @@ impl Bloom {
|
||||
true
|
||||
} else {
|
||||
let nbits = self.filter.bit_len();
|
||||
let delta = (h >> 17) | (h << 15);
|
||||
let delta = h.rotate_left(15);
|
||||
for _ in 0..self.k {
|
||||
let bit_pos = h % (nbits as u32);
|
||||
if !self.filter.get_bit(bit_pos as usize) {
|
||||
|
||||
@@ -368,6 +368,7 @@ pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
|
||||
max_size_amplification_percent,
|
||||
size_ratio,
|
||||
min_merge_width,
|
||||
..
|
||||
}) => {
|
||||
let size_ratio_trigger = (100.0 + size_ratio as f64) / 100.0;
|
||||
assert_eq!(l0_sst_num, 0);
|
||||
|
||||
@@ -18,6 +18,7 @@ fn test_integration() {
|
||||
max_size_amplification_percent: 200,
|
||||
size_ratio: 1,
|
||||
min_merge_width: 2,
|
||||
max_merge_width: None,
|
||||
},
|
||||
)),
|
||||
)
|
||||
|
||||
@@ -29,6 +29,7 @@ fn test_integration_tiered() {
|
||||
max_size_amplification_percent: 200,
|
||||
size_ratio: 1,
|
||||
min_merge_width: 3,
|
||||
max_merge_width: None,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ fn test_integration_tiered() {
|
||||
max_size_amplification_percent: 200,
|
||||
size_ratio: 1,
|
||||
min_merge_width: 3,
|
||||
max_merge_width: None,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user