#![allow(dead_code)] // REMOVE THIS LINE after fully implementing this functionality mod leveled; mod simple_leveled; mod tiered; use std::sync::Arc; use std::time::Duration; use anyhow::Result; pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask}; use serde::{Deserialize, Serialize}; pub use simple_leveled::{ SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask, }; pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask}; use crate::lsm_storage::{LsmStorageInner, LsmStorageState}; use crate::table::SsTable; #[derive(Debug, Serialize, Deserialize)] pub enum CompactionTask { Leveled(LeveledCompactionTask), Tiered(TieredCompactionTask), Simple(SimpleLeveledCompactionTask), ForceFullCompaction(Vec), } impl CompactionTask { fn compact_to_bottom_level(&self) -> bool { match self { CompactionTask::ForceFullCompaction(_) => true, CompactionTask::Leveled(task) => task.is_lower_level_bottom_level, CompactionTask::Simple(task) => task.is_lower_level_bottom_level, CompactionTask::Tiered(task) => task.bottom_tier_included, } } } pub(crate) enum CompactionController { Leveled(LeveledCompactionController), Tiered(TieredCompactionController), Simple(SimpleLeveledCompactionController), NoCompaction, } impl CompactionController { pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option { match self { CompactionController::Leveled(ctrl) => ctrl .generate_compaction_task(&snapshot) .map(CompactionTask::Leveled), CompactionController::Simple(ctrl) => ctrl .generate_compaction_task(&snapshot) .map(CompactionTask::Simple), CompactionController::Tiered(ctrl) => ctrl .generate_compaction_task(&snapshot) .map(CompactionTask::Tiered), CompactionController::NoCompaction => unreachable!(), } } pub fn apply_compaction_result( &self, snapshot: &LsmStorageState, task: &CompactionTask, output: &[usize], ) -> (LsmStorageState, Vec) { match (self, task) { (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => { ctrl.apply_compaction_result(&snapshot, task, output) } (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => { ctrl.apply_compaction_result(&snapshot, task, output) } (CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => { ctrl.apply_compaction_result(&snapshot, task, output) } _ => unreachable!(), } } } impl CompactionController { pub fn flush_to_l0(&self) -> bool { if let Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction = self { true } else { false } } } pub enum CompactionOptions { /// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled /// Compaction) Leveled(LeveledCompactionOptions), /// Tiered compaction (= RocksDB's universal compaction) Tiered(TieredCompactionOptions), /// Simple leveled compaction Simple(SimpleLeveledCompactionOptions), /// In no compaction mode (week 1), always flush to L0 NoCompaction, } impl LsmStorageInner { fn compact(&self, _task: &CompactionTask) -> Result>> { unimplemented!() } pub fn force_full_compaction(&self) -> Result<()> { unimplemented!() } fn trigger_compaction(&self) -> Result<()> { unimplemented!() } pub(crate) fn spawn_compaction_thread( self: &Arc, rx: crossbeam_channel::Receiver<()>, ) -> Result>> { if let CompactionOptions::Leveled(_) | CompactionOptions::Simple(_) | CompactionOptions::Tiered(_) = self.options.compaction_options { let this = self.clone(); let handle = std::thread::spawn(move || { let ticker = crossbeam_channel::tick(Duration::from_millis(50)); loop { crossbeam_channel::select! { recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() { eprintln!("compaction failed: {}", e); }, recv(rx) -> _ => return } } }); return Ok(Some(handle)); } Ok(None) } }