finish compaction

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2024-01-18 19:40:05 +08:00
parent ce33f62be6
commit 53cb1fe4a4
12 changed files with 249 additions and 43 deletions

View File

@@ -3,6 +3,7 @@ mod simple_leveled;
mod tiered;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask};
@@ -13,18 +14,25 @@ pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredComp
use crate::iterators::merge_iterator::MergeIterator;
use crate::iterators::StorageIterator;
use crate::lsm_storage::LsmStorageInner;
use crate::lsm_storage::{LsmStorageInner, LsmStorageState};
use crate::table::{SsTable, SsTableBuilder, SsTableIterator};
pub(crate) enum CompactionTask {
Leveled(LeveledCompactionTask),
Tiered(TieredCompactionTask),
Simple(SimpleLeveledCompactionTask),
ForceFullCompaction(Vec<usize>),
}
struct CompactOptions {
block_size: usize,
target_sst_size: usize,
impl CompactionTask {
fn compact_to_bottom_level(&self) -> bool {
match self {
CompactionTask::ForceFullCompaction(_) => true,
CompactionTask::Leveled(task) => task.is_lower_level_bottom_level,
CompactionTask::Simple(task) => task.is_lower_level_bottom_level,
CompactionTask::Tiered(task) => task.bottom_tier_included,
}
}
}
pub(crate) enum CompactionController {
@@ -34,6 +42,43 @@ pub(crate) enum CompactionController {
NoCompaction,
}
impl CompactionController {
fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option<CompactionTask> {
match self {
CompactionController::Leveled(ctrl) => ctrl
.generate_compaction_task(&snapshot)
.map(CompactionTask::Leveled),
CompactionController::Simple(ctrl) => ctrl
.generate_compaction_task(&snapshot)
.map(CompactionTask::Simple),
CompactionController::Tiered(ctrl) => ctrl
.generate_compaction_task(&snapshot)
.map(CompactionTask::Tiered),
CompactionController::NoCompaction => unreachable!(),
}
}
fn apply_compaction_result(
&self,
snapshot: &LsmStorageState,
task: &CompactionTask,
output: &[usize],
) -> (LsmStorageState, Vec<usize>) {
match (self, task) {
(CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => {
ctrl.apply_compaction_result(&snapshot, task, output)
}
(CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => {
ctrl.apply_compaction_result(&snapshot, task, output)
}
(CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => {
ctrl.apply_compaction_result(&snapshot, task, output)
}
_ => unreachable!(),
}
}
}
impl CompactionController {
pub fn flush_to_l0(&self) -> bool {
if let Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction = self {
@@ -57,12 +102,37 @@ pub enum CompactionOptions {
}
impl LsmStorageInner {
#[allow(dead_code)]
fn compact(
&self,
tables: Vec<Arc<SsTable>>,
options: CompactOptions,
) -> Result<Vec<Arc<SsTable>>> {
fn compact(&self, task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
let table_ids = match task {
CompactionTask::Leveled(task) => task
.lower_level_sst_ids
.iter()
.copied()
.chain(task.upper_level_sst_ids.iter().copied())
.collect::<Vec<_>>(),
CompactionTask::Simple(task) => task
.lower_level_sst_ids
.iter()
.copied()
.chain(task.upper_level_sst_ids.iter().copied())
.collect::<Vec<_>>(),
CompactionTask::Tiered(task) => task
.tiers
.iter()
.map(|(_, files)| files)
.flatten()
.copied()
.collect::<Vec<_>>(),
CompactionTask::ForceFullCompaction(l0_ssts) => l0_ssts.clone(),
};
let tables: Vec<Arc<SsTable>> = {
let state = self.state.read();
table_ids
.iter()
.map(|id| state.sstables.get(id).unwrap().clone())
.collect::<Vec<_>>()
};
let mut iters = Vec::new();
iters.reserve(tables.len());
for table in tables.iter() {
@@ -75,11 +145,11 @@ impl LsmStorageInner {
let mut builder = None;
let mut new_sst = vec![];
let compact_to_bottom_level = false;
let compact_to_bottom_level = task.compact_to_bottom_level();
while iter.is_valid() {
if builder.is_none() {
builder = Some(SsTableBuilder::new(options.block_size));
builder = Some(SsTableBuilder::new(self.options.block_size));
}
let builder_inner = builder.as_mut().unwrap();
if compact_to_bottom_level {
@@ -91,7 +161,7 @@ impl LsmStorageInner {
}
iter.next()?;
if builder_inner.estimated_size() >= options.target_sst_size {
if builder_inner.estimated_size() >= self.options.target_sst_size {
let sst_id = self.next_sst_id(); // lock dropped here
let builder = builder.take().unwrap();
let sst = Arc::new(builder.build(
@@ -114,10 +184,67 @@ impl LsmStorageInner {
Ok(new_sst)
}
fn trigger_compaction(&self) -> Result<()> {
let snapshot = {
let state = self.state.read();
state.clone()
};
let task = self
.compaction_controller
.generate_compaction_task(&snapshot);
let Some(task) = task else {
return Ok(());
};
println!("running compaction task");
let sstables = self.compact(&task)?;
let output = sstables.iter().map(|x| x.sst_id()).collect::<Vec<_>>();
let ssts_to_remove = {
let _state_lock = self.state_lock.lock();
let (mut snapshot, files_to_remove) = self
.compaction_controller
.apply_compaction_result(&self.state.read(), &task, &output);
let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
for file_to_remove in &files_to_remove {
let result = snapshot.sstables.remove(file_to_remove);
assert!(result.is_some());
ssts_to_remove.push(result.unwrap());
}
for file_to_add in sstables {
let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
assert!(result.is_none());
}
let mut state = self.state.write();
*state = Arc::new(snapshot);
ssts_to_remove
};
for sst in ssts_to_remove {
std::fs::remove_file(self.path_of_sst(sst.sst_id()))?;
}
Ok(())
}
pub(crate) fn spawn_compaction_thread(
self: &Arc<Self>,
rx: std::sync::mpsc::Receiver<()>,
rx: crossbeam_channel::Receiver<()>,
) -> Result<Option<std::thread::JoinHandle<()>>> {
if let CompactionOptions::Leveled(_)
| CompactionOptions::Simple(_)
| CompactionOptions::Tiered(_) = self.options.compaction_options
{
let this = self.clone();
let handle = std::thread::spawn(move || {
let ticker = crossbeam_channel::tick(Duration::from_millis(50));
loop {
crossbeam_channel::select! {
recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() {
eprintln!("compaction failed: {}", e);
},
recv(rx) -> _ => return
}
}
});
return Ok(Some(handle));
}
Ok(None)
}
}