Files
mini_lsm/mini-lsm-starter/src/compact.rs

167 lines
5.4 KiB
Rust
Raw Normal View History

#![allow(dead_code)] // REMOVE THIS LINE after fully implementing this functionality
mod leveled;
mod simple_leveled;
mod tiered;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask};
use serde::{Deserialize, Serialize};
pub use simple_leveled::{
SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask,
};
pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask};
use crate::lsm_storage::{LsmStorageInner, LsmStorageState};
use crate::table::SsTable;
#[derive(Debug, Serialize, Deserialize)]
pub enum CompactionTask {
Leveled(LeveledCompactionTask),
Tiered(TieredCompactionTask),
Simple(SimpleLeveledCompactionTask),
ForceFullCompaction(Vec<usize>),
}
impl CompactionTask {
fn compact_to_bottom_level(&self) -> bool {
match self {
CompactionTask::ForceFullCompaction(_) => true,
CompactionTask::Leveled(task) => task.is_lower_level_bottom_level,
CompactionTask::Simple(task) => task.is_lower_level_bottom_level,
CompactionTask::Tiered(task) => task.bottom_tier_included,
}
}
}
pub(crate) enum CompactionController {
Leveled(LeveledCompactionController),
Tiered(TieredCompactionController),
Simple(SimpleLeveledCompactionController),
NoCompaction,
}
impl CompactionController {
pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option<CompactionTask> {
match self {
CompactionController::Leveled(ctrl) => ctrl
.generate_compaction_task(&snapshot)
.map(CompactionTask::Leveled),
CompactionController::Simple(ctrl) => ctrl
.generate_compaction_task(&snapshot)
.map(CompactionTask::Simple),
CompactionController::Tiered(ctrl) => ctrl
.generate_compaction_task(&snapshot)
.map(CompactionTask::Tiered),
CompactionController::NoCompaction => unreachable!(),
}
}
pub fn apply_compaction_result(
&self,
snapshot: &LsmStorageState,
task: &CompactionTask,
output: &[usize],
) -> (LsmStorageState, Vec<usize>) {
match (self, task) {
(CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => {
ctrl.apply_compaction_result(&snapshot, task, output)
}
(CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => {
ctrl.apply_compaction_result(&snapshot, task, output)
}
(CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => {
ctrl.apply_compaction_result(&snapshot, task, output)
}
_ => unreachable!(),
}
}
}
impl CompactionController {
pub fn flush_to_l0(&self) -> bool {
if let Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction = self {
true
} else {
false
}
}
}
pub enum CompactionOptions {
/// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled
/// Compaction)
Leveled(LeveledCompactionOptions),
/// Tiered compaction (= RocksDB's universal compaction)
Tiered(TieredCompactionOptions),
/// Simple leveled compaction
Simple(SimpleLeveledCompactionOptions),
/// In no compaction mode (week 1), always flush to L0
NoCompaction,
}
impl LsmStorageInner {
fn compact(&self, _task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
unimplemented!()
}
pub fn force_full_compaction(&self) -> Result<()> {
unimplemented!()
}
fn trigger_compaction(&self) -> Result<()> {
unimplemented!()
}
pub(crate) fn spawn_compaction_thread(
self: &Arc<Self>,
rx: crossbeam_channel::Receiver<()>,
) -> Result<Option<std::thread::JoinHandle<()>>> {
if let CompactionOptions::Leveled(_)
| CompactionOptions::Simple(_)
| CompactionOptions::Tiered(_) = self.options.compaction_options
{
let this = self.clone();
let handle = std::thread::spawn(move || {
let ticker = crossbeam_channel::tick(Duration::from_millis(50));
loop {
crossbeam_channel::select! {
recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() {
eprintln!("compaction failed: {}", e);
},
recv(rx) -> _ => return
}
}
});
return Ok(Some(handle));
}
Ok(None)
}
fn trigger_flush(&self) -> Result<()> {
Ok(())
}
pub(crate) fn spawn_flush_thread(
self: &Arc<Self>,
rx: crossbeam_channel::Receiver<()>,
) -> Result<Option<std::thread::JoinHandle<()>>> {
let this = self.clone();
let handle = std::thread::spawn(move || {
let ticker = crossbeam_channel::tick(Duration::from_millis(50));
loop {
crossbeam_channel::select! {
recv(ticker) -> _ => if let Err(e) = this.trigger_flush() {
eprintln!("flush failed: {}", e);
},
recv(rx) -> _ => return
}
}
});
return Ok(Some(handle));
}
}