mod leveled; mod simple_leveled; mod tiered; use std::sync::Arc; use std::time::Duration; use anyhow::Result; pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask}; use serde::{Deserialize, Serialize}; pub use simple_leveled::{ SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask, }; pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask}; use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::StorageIterator; use crate::lsm_storage::{LsmStorageInner, LsmStorageState}; use crate::manifest::ManifestRecord; use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; #[derive(Debug, Serialize, Deserialize)] pub(crate) enum CompactionTask { Leveled(LeveledCompactionTask), Tiered(TieredCompactionTask), Simple(SimpleLeveledCompactionTask), ForceFullCompaction(Vec), } impl CompactionTask { fn compact_to_bottom_level(&self) -> bool { match self { CompactionTask::ForceFullCompaction(_) => true, CompactionTask::Leveled(task) => task.is_lower_level_bottom_level, CompactionTask::Simple(task) => task.is_lower_level_bottom_level, CompactionTask::Tiered(task) => task.bottom_tier_included, } } } pub(crate) enum CompactionController { Leveled(LeveledCompactionController), Tiered(TieredCompactionController), Simple(SimpleLeveledCompactionController), NoCompaction, } impl CompactionController { pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option { match self { CompactionController::Leveled(ctrl) => ctrl .generate_compaction_task(&snapshot) .map(CompactionTask::Leveled), CompactionController::Simple(ctrl) => ctrl .generate_compaction_task(&snapshot) .map(CompactionTask::Simple), CompactionController::Tiered(ctrl) => ctrl .generate_compaction_task(&snapshot) .map(CompactionTask::Tiered), CompactionController::NoCompaction => unreachable!(), } } pub fn apply_compaction_result( &self, snapshot: &LsmStorageState, task: &CompactionTask, output: &[usize], ) -> (LsmStorageState, Vec) { match (self, task) { (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => { ctrl.apply_compaction_result(&snapshot, task, output) } (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => { ctrl.apply_compaction_result(&snapshot, task, output) } (CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => { ctrl.apply_compaction_result(&snapshot, task, output) } _ => unreachable!(), } } } impl CompactionController { pub fn flush_to_l0(&self) -> bool { if let Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction = self { true } else { false } } } pub enum CompactionOptions { /// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled /// Compaction) Leveled(LeveledCompactionOptions), /// Tiered compaction (= RocksDB's universal compaction) Tiered(TieredCompactionOptions), /// Simple leveled compaction Simple(SimpleLeveledCompactionOptions), /// In no compaction mode (week 1), always flush to L0 NoCompaction, } impl LsmStorageInner { fn compact(&self, task: &CompactionTask) -> Result>> { let table_ids = match task { CompactionTask::Leveled(task) => task .lower_level_sst_ids .iter() .copied() .chain(task.upper_level_sst_ids.iter().copied()) .collect::>(), CompactionTask::Simple(task) => task .lower_level_sst_ids .iter() .copied() .chain(task.upper_level_sst_ids.iter().copied()) .collect::>(), CompactionTask::Tiered(task) => task .tiers .iter() .map(|(_, files)| files) .flatten() .copied() .collect::>(), CompactionTask::ForceFullCompaction(l0_ssts) => l0_ssts.clone(), }; let tables: Vec> = { let state = self.state.read(); table_ids .iter() .map(|id| state.sstables.get(id).unwrap().clone()) .collect::>() }; let mut iters = Vec::new(); iters.reserve(tables.len()); for table in tables.iter() { iters.push(Box::new(SsTableIterator::create_and_seek_to_first( table.clone(), )?)); } let mut iter = MergeIterator::create(iters); let mut builder = None; let mut new_sst = vec![]; let compact_to_bottom_level = task.compact_to_bottom_level(); while iter.is_valid() { if builder.is_none() { builder = Some(SsTableBuilder::new(self.options.block_size)); } let builder_inner = builder.as_mut().unwrap(); if compact_to_bottom_level { if !iter.value().is_empty() { builder_inner.add(iter.key(), iter.value()); } } else { builder_inner.add(iter.key(), iter.value()); } iter.next()?; if builder_inner.estimated_size() >= self.options.target_sst_size { let sst_id = self.next_sst_id(); // lock dropped here let builder = builder.take().unwrap(); let sst = Arc::new(builder.build( sst_id, Some(self.block_cache.clone()), self.path_of_sst(sst_id), )?); new_sst.push(sst); } } if let Some(builder) = builder { let sst_id = self.next_sst_id(); // lock dropped here let sst = Arc::new(builder.build( sst_id, Some(self.block_cache.clone()), self.path_of_sst(sst_id), )?); new_sst.push(sst); } Ok(new_sst) } pub fn force_full_compaction(&self) -> Result<()> { let CompactionOptions::NoCompaction = self.options.compaction_options else { panic!("full compaction can only be called with compaction is not enabled") }; let snapshot = { let state = self.state.read(); state.clone() }; let mut original_sstables = snapshot.l0_sstables.clone(); original_sstables.reverse(); let sstables = self.compact(&CompactionTask::ForceFullCompaction( original_sstables.clone(), ))?; { let _state_lock = self.state_lock.lock(); let mut state = self.state.read().as_ref().clone(); for sst in original_sstables.iter() { let result = state.sstables.remove(sst); assert!(result.is_some()); } let mut ids = Vec::with_capacity(sstables.len()); for new_sst in sstables { ids.push(new_sst.sst_id()); let result = state.sstables.insert(new_sst.sst_id(), new_sst); assert!(result.is_none()); } state.l0_sstables = ids; *self.state.write() = Arc::new(state); } for sst in original_sstables { std::fs::remove_file(self.path_of_sst(sst))?; } Ok(()) } fn trigger_compaction(&self) -> Result<()> { let snapshot = { let state = self.state.read(); state.clone() }; let task = self .compaction_controller .generate_compaction_task(&snapshot); let Some(task) = task else { return Ok(()); }; println!("running compaction task: {:?}", task); let sstables = self.compact(&task)?; let output = sstables.iter().map(|x| x.sst_id()).collect::>(); let ssts_to_remove = { let state_lock = self.state_lock.lock(); let (mut snapshot, files_to_remove) = self .compaction_controller .apply_compaction_result(&self.state.read(), &task, &output); let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len()); for file_to_remove in &files_to_remove { let result = snapshot.sstables.remove(file_to_remove); assert!(result.is_some()); ssts_to_remove.push(result.unwrap()); } let mut new_sst_ids = Vec::new(); for file_to_add in sstables { new_sst_ids.push(file_to_add.sst_id()); let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add); assert!(result.is_none()); } let mut state = self.state.write(); *state = Arc::new(snapshot); self.manifest .add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?; ssts_to_remove }; for sst in ssts_to_remove { std::fs::remove_file(self.path_of_sst(sst.sst_id()))?; } Ok(()) } pub(crate) fn spawn_compaction_thread( self: &Arc, rx: crossbeam_channel::Receiver<()>, ) -> Result>> { if let CompactionOptions::Leveled(_) | CompactionOptions::Simple(_) | CompactionOptions::Tiered(_) = self.options.compaction_options { let this = self.clone(); let handle = std::thread::spawn(move || { let ticker = crossbeam_channel::tick(Duration::from_millis(50)); loop { crossbeam_channel::select! { recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() { eprintln!("compaction failed: {}", e); }, recv(rx) -> _ => return } } }); return Ok(Some(handle)); } Ok(None) } }