use std::collections::HashSet; use serde::{Deserialize, Serialize}; use crate::lsm_storage::LsmStorageState; #[derive(Debug, Serialize, Deserialize)] pub struct LeveledCompactionTask { // if upper_level is `None`, then it is L0 compaction pub upper_level: Option, pub upper_level_sst_ids: Vec, pub lower_level: usize, pub lower_level_sst_ids: Vec, pub is_lower_level_bottom_level: bool, } #[derive(Debug, Clone)] pub struct LeveledCompactionOptions { pub level_size_multiplier: usize, pub level0_file_num_compaction_trigger: usize, pub max_levels: usize, pub base_level_size_mb: usize, } pub struct LeveledCompactionController { options: LeveledCompactionOptions, } impl LeveledCompactionController { pub fn new(options: LeveledCompactionOptions) -> Self { Self { options } } fn find_overlapping_ssts( &self, snapshot: &LsmStorageState, sst_ids: &[usize], in_level: usize, ) -> Vec { let begin_key = sst_ids .iter() .map(|id| snapshot.sstables[id].first_key()) .min() .cloned() .unwrap(); let end_key = sst_ids .iter() .map(|id| snapshot.sstables[id].last_key()) .max() .cloned() .unwrap(); let mut overlap_ssts = Vec::new(); for sst_id in &snapshot.levels[in_level - 1].1 { let sst = &snapshot.sstables[sst_id]; let first_key = sst.first_key(); let last_key = sst.last_key(); if !(last_key < &begin_key || first_key > &end_key) { overlap_ssts.push(*sst_id); } } overlap_ssts } pub fn generate_compaction_task( &self, snapshot: &LsmStorageState, ) -> Option { // step 1: compute target level size let mut target_level_size = (0..self.options.max_levels).map(|_| 0).collect::>(); // exclude level 0 let mut real_level_size = Vec::with_capacity(self.options.max_levels); let mut base_level = self.options.max_levels; for i in 0..self.options.max_levels { real_level_size.push( snapshot.levels[i] .1 .iter() .map(|x| snapshot.sstables.get(x).unwrap().table_size()) .sum::() as usize, ); } let base_level_size_bytes = self.options.base_level_size_mb * 1024 * 1024; // select base level and compute target level size target_level_size[self.options.max_levels - 1] = real_level_size[self.options.max_levels - 1].max(base_level_size_bytes); for i in (0..(self.options.max_levels - 1)).rev() { let next_level_size = target_level_size[i + 1]; let this_level_size = next_level_size / self.options.level_size_multiplier; if next_level_size > base_level_size_bytes { target_level_size[i] = this_level_size; } if target_level_size[i] > 0 { base_level = i + 1; } } // Flush L0 SST is the top priority if snapshot.l0_sstables.len() >= self.options.level0_file_num_compaction_trigger { println!("flush L0 SST to base level {}", base_level); return Some(LeveledCompactionTask { upper_level: None, upper_level_sst_ids: snapshot.l0_sstables.clone(), lower_level: base_level, lower_level_sst_ids: self.find_overlapping_ssts( snapshot, &snapshot.l0_sstables, base_level, ), is_lower_level_bottom_level: base_level == self.options.max_levels, }); } let mut priorities = Vec::with_capacity(self.options.max_levels); for level in 0..self.options.max_levels { let prio = real_level_size[level] as f64 / target_level_size[level] as f64; if prio > 1.0 { priorities.push((prio, level + 1)); } } priorities.sort_by(|a, b| a.partial_cmp(b).unwrap().reverse()); let priority = priorities.first(); if let Some((_, level)) = priority { println!( "target level sizes: {:?}, real level sizes: {:?}, base_level: {}", target_level_size .iter() .map(|x| format!("{}MB", x / 1024 / 1024)) .collect::>(), real_level_size .iter() .map(|x| format!("{}MB", x / 1024 / 1024)) .collect::>(), base_level, ); let level = *level; let selected_sst = snapshot.levels[level - 1].1.iter().min().copied().unwrap(); // select the oldest sst to compact println!( "compaction triggered by priority: {level} out of {:?}, select {selected_sst} for compaction", priorities ); return Some(LeveledCompactionTask { upper_level: Some(level), upper_level_sst_ids: vec![selected_sst], lower_level: level + 1, lower_level_sst_ids: self.find_overlapping_ssts( snapshot, &[selected_sst], level + 1, ), is_lower_level_bottom_level: level + 1 == self.options.max_levels, }); } None } pub fn apply_compaction_result( &self, snapshot: &LsmStorageState, task: &LeveledCompactionTask, output: &[usize], ) -> (LsmStorageState, Vec) { let mut snapshot = snapshot.clone(); let mut files_to_remove = Vec::new(); let mut upper_level_sst_ids_set = task .upper_level_sst_ids .iter() .copied() .collect::>(); let mut lower_level_sst_ids_set = task .lower_level_sst_ids .iter() .copied() .collect::>(); if let Some(upper_level) = task.upper_level { let new_upper_level_ssts = snapshot.levels[upper_level - 1] .1 .iter() .filter_map(|x| { if upper_level_sst_ids_set.remove(x) { return None; } Some(*x) }) .collect::>(); assert!(upper_level_sst_ids_set.is_empty()); snapshot.levels[upper_level - 1].1 = new_upper_level_ssts; } else { let new_l0_ssts = snapshot .l0_sstables .iter() .filter_map(|x| { if upper_level_sst_ids_set.remove(x) { return None; } Some(*x) }) .collect::>(); assert!(upper_level_sst_ids_set.is_empty()); snapshot.l0_sstables = new_l0_ssts; } files_to_remove.extend(&task.upper_level_sst_ids); files_to_remove.extend(&task.lower_level_sst_ids); let mut new_lower_level_ssts = snapshot.levels[task.lower_level - 1] .1 .iter() .filter_map(|x| { if lower_level_sst_ids_set.remove(x) { return None; } Some(*x) }) .collect::>(); assert!(lower_level_sst_ids_set.is_empty()); new_lower_level_ssts.extend(output); new_lower_level_ssts.sort_by(|x, y| { snapshot .sstables .get(x) .unwrap() .first_key() .cmp(snapshot.sstables.get(y).unwrap().first_key()) }); snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts; (snapshot, files_to_remove) } }