#![allow(dead_code)] // REMOVE THIS LINE after fully implementing this functionality mod leveled; mod simple_leveled; mod tiered; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; use anyhow::Result; pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask}; use serde::{Deserialize, Serialize}; pub use simple_leveled::{ SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask, }; pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask}; use crate::iterators::concat_iterator::SstConcatIterator; use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::two_merge_iterator::TwoMergeIterator; use crate::iterators::StorageIterator; use crate::lsm_storage::{LsmStorageInner, LsmStorageState}; use crate::manifest::ManifestRecord; use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; #[derive(Debug, Serialize, Deserialize)] pub enum CompactionTask { Leveled(LeveledCompactionTask), Tiered(TieredCompactionTask), Simple(SimpleLeveledCompactionTask), ForceFullCompaction { l0_sstables: Vec, l1_sstables: Vec, }, } impl CompactionTask { fn compact_to_bottom_level(&self) -> bool { match self { CompactionTask::ForceFullCompaction { .. } => true, CompactionTask::Leveled(task) => task.is_lower_level_bottom_level, CompactionTask::Simple(task) => task.is_lower_level_bottom_level, CompactionTask::Tiered(task) => task.bottom_tier_included, } } } pub(crate) enum CompactionController { Leveled(LeveledCompactionController), Tiered(TieredCompactionController), Simple(SimpleLeveledCompactionController), NoCompaction, } impl CompactionController { pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option { match self { CompactionController::Leveled(ctrl) => ctrl .generate_compaction_task(&snapshot) .map(CompactionTask::Leveled), CompactionController::Simple(ctrl) => ctrl .generate_compaction_task(&snapshot) .map(CompactionTask::Simple), CompactionController::Tiered(ctrl) => ctrl .generate_compaction_task(&snapshot) .map(CompactionTask::Tiered), CompactionController::NoCompaction => unreachable!(), } } pub fn apply_compaction_result( &self, snapshot: &LsmStorageState, task: &CompactionTask, output: &[usize], ) -> (LsmStorageState, Vec) { match (self, task) { (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => { ctrl.apply_compaction_result(&snapshot, task, output) } (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => { ctrl.apply_compaction_result(&snapshot, task, output) } (CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => { ctrl.apply_compaction_result(&snapshot, task, output) } _ => unreachable!(), } } } impl CompactionController { pub fn flush_to_l0(&self) -> bool { if let Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction = self { true } else { false } } } pub enum CompactionOptions { /// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled /// Compaction) Leveled(LeveledCompactionOptions), /// Tiered compaction (= RocksDB's universal compaction) Tiered(TieredCompactionOptions), /// Simple leveled compaction Simple(SimpleLeveledCompactionOptions), /// In no compaction mode (week 1), always flush to L0 NoCompaction, } impl LsmStorageInner { fn compact_generate_sst_from_iter( &self, mut iter: impl StorageIterator, compact_to_bottom_level: bool, ) -> Result>> { let mut builder = None; let mut new_sst = Vec::new(); while iter.is_valid() { if builder.is_none() { builder = Some(SsTableBuilder::new(self.options.block_size)); } let builder_inner = builder.as_mut().unwrap(); if compact_to_bottom_level { if !iter.value().is_empty() { builder_inner.add(iter.key(), iter.value()); } } else { builder_inner.add(iter.key(), iter.value()); } iter.next()?; if builder_inner.estimated_size() >= self.options.target_sst_size { let sst_id = self.next_sst_id(); let builder = builder.take().unwrap(); let sst = Arc::new(builder.build( sst_id, Some(self.block_cache.clone()), self.path_of_sst(sst_id), )?); new_sst.push(sst); } } if let Some(builder) = builder { let sst_id = self.next_sst_id(); // lock dropped here let sst = Arc::new(builder.build( sst_id, Some(self.block_cache.clone()), self.path_of_sst(sst_id), )?); new_sst.push(sst); } Ok(new_sst) } fn compact(&self, task: &CompactionTask) -> Result>> { let snapshot = { let state = self.state.read(); state.clone() }; match task { CompactionTask::ForceFullCompaction { l0_sstables, l1_sstables, } => { let mut l0_iters = Vec::with_capacity(l0_sstables.len()); for id in l0_sstables.iter() { l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( snapshot.sstables.get(id).unwrap().clone(), )?)); } let mut l1_iters = Vec::with_capacity(l1_sstables.len()); for id in l1_sstables.iter() { l1_iters.push(snapshot.sstables.get(id).unwrap().clone()); } let iter = TwoMergeIterator::create( MergeIterator::create(l0_iters), SstConcatIterator::create_and_seek_to_first(l1_iters)?, )?; self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level()) } CompactionTask::Simple(SimpleLeveledCompactionTask { upper_level, upper_level_sst_ids, lower_level: _, lower_level_sst_ids, .. }) | CompactionTask::Leveled(LeveledCompactionTask { upper_level, upper_level_sst_ids, lower_level: _, lower_level_sst_ids, .. }) => match upper_level { Some(_) => { let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len()); for id in upper_level_sst_ids.iter() { upper_ssts.push(snapshot.sstables.get(id).unwrap().clone()); } let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?; let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len()); for id in lower_level_sst_ids.iter() { lower_ssts.push(snapshot.sstables.get(id).unwrap().clone()); } let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?; self.compact_generate_sst_from_iter( TwoMergeIterator::create(upper_iter, lower_iter)?, task.compact_to_bottom_level(), ) } None => { let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len()); for id in upper_level_sst_ids.iter() { upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( snapshot.sstables.get(id).unwrap().clone(), )?)); } let upper_iter = MergeIterator::create(upper_iters); let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len()); for id in lower_level_sst_ids.iter() { lower_ssts.push(snapshot.sstables.get(id).unwrap().clone()); } let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?; self.compact_generate_sst_from_iter( TwoMergeIterator::create(upper_iter, lower_iter)?, task.compact_to_bottom_level(), ) } }, CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => { let mut iters = Vec::with_capacity(tiers.len()); for (_, tier_sst_ids) in tiers { let mut ssts = Vec::with_capacity(tier_sst_ids.len()); for id in tier_sst_ids.iter() { ssts.push(snapshot.sstables.get(id).unwrap().clone()); } iters.push(Box::new(SstConcatIterator::create_and_seek_to_first(ssts)?)); } self.compact_generate_sst_from_iter( MergeIterator::create(iters), task.compact_to_bottom_level(), ) } } } pub fn force_full_compaction(&self) -> Result<()> { let CompactionOptions::NoCompaction = self.options.compaction_options else { panic!("full compaction can only be called with compaction is not enabled") }; let snapshot = { let state = self.state.read(); state.clone() }; let l0_sstables = snapshot.l0_sstables.clone(); let l1_sstables = snapshot.levels[0].1.clone(); let compaction_task = CompactionTask::ForceFullCompaction { l0_sstables: l0_sstables.clone(), l1_sstables: l1_sstables.clone(), }; let sstables = self.compact(&compaction_task)?; { let _state_lock = self.state_lock.lock(); let mut state = self.state.read().as_ref().clone(); for sst in l0_sstables.iter().chain(l1_sstables.iter()) { let result = state.sstables.remove(sst); assert!(result.is_some()); } let mut ids = Vec::with_capacity(sstables.len()); for new_sst in sstables { ids.push(new_sst.sst_id()); let result = state.sstables.insert(new_sst.sst_id(), new_sst); assert!(result.is_none()); } assert_eq!(l1_sstables, state.levels[0].1); state.levels[0].1 = ids; let mut l0_sstables_map = l0_sstables.iter().copied().collect::>(); state.l0_sstables = state .l0_sstables .iter() .filter(|x| !l0_sstables_map.remove(x)) .copied() .collect::>(); assert!(l0_sstables_map.is_empty()); *self.state.write() = Arc::new(state); } for sst in l0_sstables.iter().chain(l1_sstables.iter()) { std::fs::remove_file(self.path_of_sst(*sst))?; } Ok(()) } fn trigger_compaction(&self) -> Result<()> { let snapshot = { let state = self.state.read(); state.clone() }; let task = self .compaction_controller .generate_compaction_task(&snapshot); let Some(task) = task else { return Ok(()); }; println!("running compaction task: {:?}", task); let sstables = self.compact(&task)?; let files_added = sstables.len(); let output = sstables.iter().map(|x| x.sst_id()).collect::>(); let ssts_to_remove = { let state_lock = self.state_lock.lock(); let (mut snapshot, files_to_remove) = self .compaction_controller .apply_compaction_result(&self.state.read(), &task, &output); let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len()); for file_to_remove in &files_to_remove { let result = snapshot.sstables.remove(file_to_remove); assert!(result.is_some(), "cannot remove {}.sst", file_to_remove); ssts_to_remove.push(result.unwrap()); } let mut new_sst_ids = Vec::new(); for file_to_add in sstables { new_sst_ids.push(file_to_add.sst_id()); let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add); assert!(result.is_none()); } let mut state = self.state.write(); *state = Arc::new(snapshot); drop(state); self.sync_dir()?; self.manifest .as_ref() .unwrap() .add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?; ssts_to_remove }; println!( "compaction finished: {} files removed, {} files added", ssts_to_remove.len(), files_added ); for sst in ssts_to_remove { std::fs::remove_file(self.path_of_sst(sst.sst_id()))?; } self.sync_dir()?; Ok(()) } pub(crate) fn spawn_compaction_thread( self: &Arc, rx: crossbeam_channel::Receiver<()>, ) -> Result>> { if let CompactionOptions::Leveled(_) | CompactionOptions::Simple(_) | CompactionOptions::Tiered(_) = self.options.compaction_options { let this = self.clone(); let handle = std::thread::spawn(move || { let ticker = crossbeam_channel::tick(Duration::from_millis(50)); loop { crossbeam_channel::select! { recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() { eprintln!("compaction failed: {}", e); }, recv(rx) -> _ => return } } }); return Ok(Some(handle)); } Ok(None) } fn trigger_flush(&self) -> Result<()> { if { let state = self.state.read(); state.imm_memtables.len() >= self.options.num_memtable_limit } { self.force_flush_next_imm_memtable()?; } Ok(()) } pub(crate) fn spawn_flush_thread( self: &Arc, rx: crossbeam_channel::Receiver<()>, ) -> Result>> { let this = self.clone(); let handle = std::thread::spawn(move || { let ticker = crossbeam_channel::tick(Duration::from_millis(50)); loop { crossbeam_channel::select! { recv(ticker) -> _ => if let Err(e) = this.trigger_flush() { eprintln!("flush failed: {}", e); }, recv(rx) -> _ => return } } }); return Ok(Some(handle)); } }