| 
									
										
										
										
											2024-01-20 11:14:19 +08:00
										 |  |  | #![allow(dead_code)] // REMOVE THIS LINE after fully implementing this functionality
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | mod leveled;
 | 
					
						
							|  |  |  | mod simple_leveled;
 | 
					
						
							|  |  |  | mod tiered;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | use std::sync::Arc;
 | 
					
						
							|  |  |  | use std::time::Duration;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | use anyhow::Result;
 | 
					
						
							|  |  |  | pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask};
 | 
					
						
							|  |  |  | use serde::{Deserialize, Serialize};
 | 
					
						
							|  |  |  | pub use simple_leveled::{
 | 
					
						
							|  |  |  |     SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask,
 | 
					
						
							|  |  |  | };
 | 
					
						
							|  |  |  | pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask};
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | use crate::lsm_storage::{LsmStorageInner, LsmStorageState};
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:14:19 +08:00
										 |  |  | use crate::table::SsTable;
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | #[derive(Debug, Serialize, Deserialize)]
 | 
					
						
							|  |  |  | pub enum CompactionTask {
 | 
					
						
							|  |  |  |     Leveled(LeveledCompactionTask),
 | 
					
						
							|  |  |  |     Tiered(TieredCompactionTask),
 | 
					
						
							|  |  |  |     Simple(SimpleLeveledCompactionTask),
 | 
					
						
							| 
									
										
										
										
											2024-01-22 22:05:47 +08:00
										 |  |  |     ForceFullCompaction {
 | 
					
						
							|  |  |  |         l0_sstables: Vec<usize>,
 | 
					
						
							|  |  |  |         l1_sstables: Vec<usize>,
 | 
					
						
							|  |  |  |     },
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl CompactionTask {
 | 
					
						
							|  |  |  |     fn compact_to_bottom_level(&self) -> bool {
 | 
					
						
							|  |  |  |         match self {
 | 
					
						
							| 
									
										
										
										
											2024-01-22 22:05:47 +08:00
										 |  |  |             CompactionTask::ForceFullCompaction { .. } => true,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |             CompactionTask::Leveled(task) => task.is_lower_level_bottom_level,
 | 
					
						
							|  |  |  |             CompactionTask::Simple(task) => task.is_lower_level_bottom_level,
 | 
					
						
							|  |  |  |             CompactionTask::Tiered(task) => task.bottom_tier_included,
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | pub(crate) enum CompactionController {
 | 
					
						
							|  |  |  |     Leveled(LeveledCompactionController),
 | 
					
						
							|  |  |  |     Tiered(TieredCompactionController),
 | 
					
						
							|  |  |  |     Simple(SimpleLeveledCompactionController),
 | 
					
						
							|  |  |  |     NoCompaction,
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl CompactionController {
 | 
					
						
							|  |  |  |     pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option<CompactionTask> {
 | 
					
						
							|  |  |  |         match self {
 | 
					
						
							|  |  |  |             CompactionController::Leveled(ctrl) => ctrl
 | 
					
						
							| 
									
										
										
										
											2024-01-24 14:39:00 +08:00
										 |  |  |                 .generate_compaction_task(snapshot)
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |                 .map(CompactionTask::Leveled),
 | 
					
						
							|  |  |  |             CompactionController::Simple(ctrl) => ctrl
 | 
					
						
							| 
									
										
										
										
											2024-01-24 14:39:00 +08:00
										 |  |  |                 .generate_compaction_task(snapshot)
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |                 .map(CompactionTask::Simple),
 | 
					
						
							|  |  |  |             CompactionController::Tiered(ctrl) => ctrl
 | 
					
						
							| 
									
										
										
										
											2024-01-24 14:39:00 +08:00
										 |  |  |                 .generate_compaction_task(snapshot)
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |                 .map(CompactionTask::Tiered),
 | 
					
						
							|  |  |  |             CompactionController::NoCompaction => unreachable!(),
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn apply_compaction_result(
 | 
					
						
							|  |  |  |         &self,
 | 
					
						
							|  |  |  |         snapshot: &LsmStorageState,
 | 
					
						
							|  |  |  |         task: &CompactionTask,
 | 
					
						
							|  |  |  |         output: &[usize],
 | 
					
						
							|  |  |  |     ) -> (LsmStorageState, Vec<usize>) {
 | 
					
						
							|  |  |  |         match (self, task) {
 | 
					
						
							|  |  |  |             (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => {
 | 
					
						
							| 
									
										
										
										
											2024-01-24 14:39:00 +08:00
										 |  |  |                 ctrl.apply_compaction_result(snapshot, task, output)
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |             }
 | 
					
						
							|  |  |  |             (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => {
 | 
					
						
							| 
									
										
										
										
											2024-01-24 14:39:00 +08:00
										 |  |  |                 ctrl.apply_compaction_result(snapshot, task, output)
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |             }
 | 
					
						
							|  |  |  |             (CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => {
 | 
					
						
							| 
									
										
										
										
											2024-01-24 14:39:00 +08:00
										 |  |  |                 ctrl.apply_compaction_result(snapshot, task, output)
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |             }
 | 
					
						
							|  |  |  |             _ => unreachable!(),
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl CompactionController {
 | 
					
						
							|  |  |  |     pub fn flush_to_l0(&self) -> bool {
 | 
					
						
							| 
									
										
										
										
											2024-01-24 14:39:00 +08:00
										 |  |  |         matches!(
 | 
					
						
							|  |  |  |             self,
 | 
					
						
							|  |  |  |             Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction
 | 
					
						
							|  |  |  |         )
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |     }
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-25 21:53:47 +08:00
										 |  |  | #[derive(Debug, Clone)]
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | pub enum CompactionOptions {
 | 
					
						
							|  |  |  |     /// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled
 | 
					
						
							|  |  |  |     /// Compaction)
 | 
					
						
							|  |  |  |     Leveled(LeveledCompactionOptions),
 | 
					
						
							|  |  |  |     /// Tiered compaction (= RocksDB's universal compaction)
 | 
					
						
							|  |  |  |     Tiered(TieredCompactionOptions),
 | 
					
						
							|  |  |  |     /// Simple leveled compaction
 | 
					
						
							|  |  |  |     Simple(SimpleLeveledCompactionOptions),
 | 
					
						
							|  |  |  |     /// In no compaction mode (week 1), always flush to L0
 | 
					
						
							|  |  |  |     NoCompaction,
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl LsmStorageInner {
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:14:19 +08:00
										 |  |  |     fn compact(&self, _task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |         unimplemented!()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn force_full_compaction(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         unimplemented!()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     fn trigger_compaction(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         unimplemented!()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn spawn_compaction_thread(
 | 
					
						
							|  |  |  |         self: &Arc<Self>,
 | 
					
						
							|  |  |  |         rx: crossbeam_channel::Receiver<()>,
 | 
					
						
							|  |  |  |     ) -> Result<Option<std::thread::JoinHandle<()>>> {
 | 
					
						
							|  |  |  |         if let CompactionOptions::Leveled(_)
 | 
					
						
							|  |  |  |         | CompactionOptions::Simple(_)
 | 
					
						
							|  |  |  |         | CompactionOptions::Tiered(_) = self.options.compaction_options
 | 
					
						
							|  |  |  |         {
 | 
					
						
							|  |  |  |             let this = self.clone();
 | 
					
						
							|  |  |  |             let handle = std::thread::spawn(move || {
 | 
					
						
							|  |  |  |                 let ticker = crossbeam_channel::tick(Duration::from_millis(50));
 | 
					
						
							|  |  |  |                 loop {
 | 
					
						
							|  |  |  |                     crossbeam_channel::select! {
 | 
					
						
							|  |  |  |                         recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() {
 | 
					
						
							|  |  |  |                             eprintln!("compaction failed: {}", e);
 | 
					
						
							|  |  |  |                         },
 | 
					
						
							|  |  |  |                         recv(rx) -> _ => return
 | 
					
						
							|  |  |  |                     }
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |             });
 | 
					
						
							|  |  |  |             return Ok(Some(handle));
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         Ok(None)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							| 
									
										
										
										
											2024-01-21 17:40:47 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     fn trigger_flush(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         Ok(())
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn spawn_flush_thread(
 | 
					
						
							|  |  |  |         self: &Arc<Self>,
 | 
					
						
							|  |  |  |         rx: crossbeam_channel::Receiver<()>,
 | 
					
						
							|  |  |  |     ) -> Result<Option<std::thread::JoinHandle<()>>> {
 | 
					
						
							|  |  |  |         let this = self.clone();
 | 
					
						
							|  |  |  |         let handle = std::thread::spawn(move || {
 | 
					
						
							|  |  |  |             let ticker = crossbeam_channel::tick(Duration::from_millis(50));
 | 
					
						
							|  |  |  |             loop {
 | 
					
						
							|  |  |  |                 crossbeam_channel::select! {
 | 
					
						
							|  |  |  |                     recv(ticker) -> _ => if let Err(e) = this.trigger_flush() {
 | 
					
						
							|  |  |  |                         eprintln!("flush failed: {}", e);
 | 
					
						
							|  |  |  |                     },
 | 
					
						
							|  |  |  |                     recv(rx) -> _ => return
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |         });
 | 
					
						
							| 
									
										
										
										
											2024-01-24 14:39:00 +08:00
										 |  |  |         Ok(Some(handle))
 | 
					
						
							| 
									
										
										
										
											2024-01-21 17:40:47 +08:00
										 |  |  |     }
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | }
 |