@@ -1,7 +1,10 @@
|
||||
#![allow(dead_code)] // REMOVE THIS LINE after fully implementing this functionality
|
||||
|
||||
mod leveled;
|
||||
mod simple_leveled;
|
||||
mod tiered;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -13,7 +16,9 @@ pub use simple_leveled::{
|
||||
};
|
||||
pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask};
|
||||
|
||||
use crate::iterators::concat_iterator::SstConcatIterator;
|
||||
use crate::iterators::merge_iterator::MergeIterator;
|
||||
use crate::iterators::two_merge_iterator::TwoMergeIterator;
|
||||
use crate::iterators::StorageIterator;
|
||||
use crate::lsm_storage::{LsmStorageInner, LsmStorageState};
|
||||
use crate::manifest::ManifestRecord;
|
||||
@@ -24,13 +29,16 @@ pub enum CompactionTask {
|
||||
Leveled(LeveledCompactionTask),
|
||||
Tiered(TieredCompactionTask),
|
||||
Simple(SimpleLeveledCompactionTask),
|
||||
ForceFullCompaction(Vec<usize>),
|
||||
ForceFullCompaction {
|
||||
l0_sstables: Vec<usize>,
|
||||
l1_sstables: Vec<usize>,
|
||||
},
|
||||
}
|
||||
|
||||
impl CompactionTask {
|
||||
fn compact_to_bottom_level(&self) -> bool {
|
||||
match self {
|
||||
CompactionTask::ForceFullCompaction(_) => true,
|
||||
CompactionTask::ForceFullCompaction { .. } => true,
|
||||
CompactionTask::Leveled(task) => task.is_lower_level_bottom_level,
|
||||
CompactionTask::Simple(task) => task.is_lower_level_bottom_level,
|
||||
CompactionTask::Tiered(task) => task.bottom_tier_included,
|
||||
@@ -105,50 +113,13 @@ pub enum CompactionOptions {
|
||||
}
|
||||
|
||||
impl LsmStorageInner {
|
||||
fn compact(&self, task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
|
||||
let table_ids = match task {
|
||||
CompactionTask::Leveled(task) => task
|
||||
.lower_level_sst_ids
|
||||
.iter()
|
||||
.copied()
|
||||
.chain(task.upper_level_sst_ids.iter().copied())
|
||||
.collect::<Vec<_>>(),
|
||||
CompactionTask::Simple(task) => task
|
||||
.lower_level_sst_ids
|
||||
.iter()
|
||||
.copied()
|
||||
.chain(task.upper_level_sst_ids.iter().copied())
|
||||
.collect::<Vec<_>>(),
|
||||
CompactionTask::Tiered(task) => task
|
||||
.tiers
|
||||
.iter()
|
||||
.map(|(_, files)| files)
|
||||
.flatten()
|
||||
.copied()
|
||||
.collect::<Vec<_>>(),
|
||||
CompactionTask::ForceFullCompaction(l0_ssts) => l0_ssts.clone(),
|
||||
};
|
||||
let tables: Vec<Arc<SsTable>> = {
|
||||
let state = self.state.read();
|
||||
table_ids
|
||||
.iter()
|
||||
.map(|id| state.sstables.get(id).unwrap().clone())
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
let mut iters = Vec::new();
|
||||
iters.reserve(tables.len());
|
||||
for table in tables.iter() {
|
||||
iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
|
||||
table.clone(),
|
||||
)?));
|
||||
}
|
||||
let mut iter = MergeIterator::create(iters);
|
||||
|
||||
fn compact_generate_sst_from_iter(
|
||||
&self,
|
||||
mut iter: impl StorageIterator,
|
||||
compact_to_bottom_level: bool,
|
||||
) -> Result<Vec<Arc<SsTable>>> {
|
||||
let mut builder = None;
|
||||
let mut new_sst = vec![];
|
||||
|
||||
let compact_to_bottom_level = task.compact_to_bottom_level();
|
||||
let mut new_sst = Vec::new();
|
||||
|
||||
while iter.is_valid() {
|
||||
if builder.is_none() {
|
||||
@@ -165,7 +136,7 @@ impl LsmStorageInner {
|
||||
iter.next()?;
|
||||
|
||||
if builder_inner.estimated_size() >= self.options.target_sst_size {
|
||||
let sst_id = self.next_sst_id(); // lock dropped here
|
||||
let sst_id = self.next_sst_id();
|
||||
let builder = builder.take().unwrap();
|
||||
let sst = Arc::new(builder.build(
|
||||
sst_id,
|
||||
@@ -187,6 +158,98 @@ impl LsmStorageInner {
|
||||
Ok(new_sst)
|
||||
}
|
||||
|
||||
fn compact(&self, task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
|
||||
let snapshot = {
|
||||
let state = self.state.read();
|
||||
state.clone()
|
||||
};
|
||||
match task {
|
||||
CompactionTask::ForceFullCompaction {
|
||||
l0_sstables,
|
||||
l1_sstables,
|
||||
} => {
|
||||
let mut l0_iters = Vec::with_capacity(l0_sstables.len());
|
||||
for id in l0_sstables.iter() {
|
||||
l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
|
||||
snapshot.sstables.get(id).unwrap().clone(),
|
||||
)?));
|
||||
}
|
||||
let mut l1_iters = Vec::with_capacity(l1_sstables.len());
|
||||
for id in l1_sstables.iter() {
|
||||
l1_iters.push(snapshot.sstables.get(id).unwrap().clone());
|
||||
}
|
||||
let iter = TwoMergeIterator::create(
|
||||
MergeIterator::create(l0_iters),
|
||||
SstConcatIterator::create_and_seek_to_first(l1_iters)?,
|
||||
)?;
|
||||
self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level())
|
||||
}
|
||||
CompactionTask::Simple(SimpleLeveledCompactionTask {
|
||||
upper_level,
|
||||
upper_level_sst_ids,
|
||||
lower_level: _,
|
||||
lower_level_sst_ids,
|
||||
..
|
||||
})
|
||||
| CompactionTask::Leveled(LeveledCompactionTask {
|
||||
upper_level,
|
||||
upper_level_sst_ids,
|
||||
lower_level: _,
|
||||
lower_level_sst_ids,
|
||||
..
|
||||
}) => match upper_level {
|
||||
Some(_) => {
|
||||
let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len());
|
||||
for id in upper_level_sst_ids.iter() {
|
||||
upper_ssts.push(snapshot.sstables.get(id).unwrap().clone());
|
||||
}
|
||||
let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?;
|
||||
let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len());
|
||||
for id in lower_level_sst_ids.iter() {
|
||||
lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
|
||||
}
|
||||
let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
|
||||
self.compact_generate_sst_from_iter(
|
||||
TwoMergeIterator::create(upper_iter, lower_iter)?,
|
||||
task.compact_to_bottom_level(),
|
||||
)
|
||||
}
|
||||
None => {
|
||||
let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len());
|
||||
for id in upper_level_sst_ids.iter() {
|
||||
upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
|
||||
snapshot.sstables.get(id).unwrap().clone(),
|
||||
)?));
|
||||
}
|
||||
let upper_iter = MergeIterator::create(upper_iters);
|
||||
let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len());
|
||||
for id in lower_level_sst_ids.iter() {
|
||||
lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
|
||||
}
|
||||
let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
|
||||
self.compact_generate_sst_from_iter(
|
||||
TwoMergeIterator::create(upper_iter, lower_iter)?,
|
||||
task.compact_to_bottom_level(),
|
||||
)
|
||||
}
|
||||
},
|
||||
CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => {
|
||||
let mut iters = Vec::with_capacity(tiers.len());
|
||||
for (_, tier_sst_ids) in tiers {
|
||||
let mut ssts = Vec::with_capacity(tier_sst_ids.len());
|
||||
for id in tier_sst_ids.iter() {
|
||||
ssts.push(snapshot.sstables.get(id).unwrap().clone());
|
||||
}
|
||||
iters.push(Box::new(SstConcatIterator::create_and_seek_to_first(ssts)?));
|
||||
}
|
||||
self.compact_generate_sst_from_iter(
|
||||
MergeIterator::create(iters),
|
||||
task.compact_to_bottom_level(),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn force_full_compaction(&self) -> Result<()> {
|
||||
let CompactionOptions::NoCompaction = self.options.compaction_options else {
|
||||
panic!("full compaction can only be called with compaction is not enabled")
|
||||
@@ -195,15 +258,19 @@ impl LsmStorageInner {
|
||||
let state = self.state.read();
|
||||
state.clone()
|
||||
};
|
||||
let mut original_sstables = snapshot.l0_sstables.clone();
|
||||
original_sstables.reverse(); // is this correct?
|
||||
let sstables = self.compact(&CompactionTask::ForceFullCompaction(
|
||||
original_sstables.clone(),
|
||||
))?;
|
||||
|
||||
let l0_sstables = snapshot.l0_sstables.clone();
|
||||
let l1_sstables = snapshot.levels[0].1.clone();
|
||||
let compaction_task = CompactionTask::ForceFullCompaction {
|
||||
l0_sstables: l0_sstables.clone(),
|
||||
l1_sstables: l1_sstables.clone(),
|
||||
};
|
||||
let sstables = self.compact(&compaction_task)?;
|
||||
|
||||
{
|
||||
let _state_lock = self.state_lock.lock();
|
||||
let mut state = self.state.read().as_ref().clone();
|
||||
for sst in original_sstables.iter() {
|
||||
for sst in l0_sstables.iter().chain(l1_sstables.iter()) {
|
||||
let result = state.sstables.remove(sst);
|
||||
assert!(result.is_some());
|
||||
}
|
||||
@@ -213,11 +280,20 @@ impl LsmStorageInner {
|
||||
let result = state.sstables.insert(new_sst.sst_id(), new_sst);
|
||||
assert!(result.is_none());
|
||||
}
|
||||
state.l0_sstables = ids;
|
||||
assert_eq!(l1_sstables, state.levels[0].1);
|
||||
state.levels[0].1 = ids;
|
||||
let mut l0_sstables_map = l0_sstables.iter().copied().collect::<HashSet<_>>();
|
||||
state.l0_sstables = state
|
||||
.l0_sstables
|
||||
.iter()
|
||||
.filter(|x| !l0_sstables_map.remove(x))
|
||||
.copied()
|
||||
.collect::<Vec<_>>();
|
||||
assert!(l0_sstables_map.is_empty());
|
||||
*self.state.write() = Arc::new(state);
|
||||
}
|
||||
for sst in original_sstables {
|
||||
std::fs::remove_file(self.path_of_sst(sst))?;
|
||||
for sst in l0_sstables.iter().chain(l1_sstables.iter()) {
|
||||
std::fs::remove_file(self.path_of_sst(*sst))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -235,6 +311,7 @@ impl LsmStorageInner {
|
||||
};
|
||||
println!("running compaction task: {:?}", task);
|
||||
let sstables = self.compact(&task)?;
|
||||
let files_added = sstables.len();
|
||||
let output = sstables.iter().map(|x| x.sst_id()).collect::<Vec<_>>();
|
||||
let ssts_to_remove = {
|
||||
let state_lock = self.state_lock.lock();
|
||||
@@ -244,7 +321,7 @@ impl LsmStorageInner {
|
||||
let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
|
||||
for file_to_remove in &files_to_remove {
|
||||
let result = snapshot.sstables.remove(file_to_remove);
|
||||
assert!(result.is_some());
|
||||
assert!(result.is_some(), "cannot remove {}.sst", file_to_remove);
|
||||
ssts_to_remove.push(result.unwrap());
|
||||
}
|
||||
let mut new_sst_ids = Vec::new();
|
||||
@@ -255,13 +332,24 @@ impl LsmStorageInner {
|
||||
}
|
||||
let mut state = self.state.write();
|
||||
*state = Arc::new(snapshot);
|
||||
drop(state);
|
||||
self.sync_dir()?;
|
||||
self.manifest
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?;
|
||||
ssts_to_remove
|
||||
};
|
||||
println!(
|
||||
"compaction finished: {} files removed, {} files added",
|
||||
ssts_to_remove.len(),
|
||||
files_added
|
||||
);
|
||||
for sst in ssts_to_remove {
|
||||
std::fs::remove_file(self.path_of_sst(sst.sst_id()))?;
|
||||
}
|
||||
self.sync_dir()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -289,4 +377,34 @@ impl LsmStorageInner {
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn trigger_flush(&self) -> Result<()> {
|
||||
if {
|
||||
let state = self.state.read();
|
||||
state.imm_memtables.len() >= self.options.num_memtable_limit
|
||||
} {
|
||||
self.force_flush_next_imm_memtable()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_flush_thread(
|
||||
self: &Arc<Self>,
|
||||
rx: crossbeam_channel::Receiver<()>,
|
||||
) -> Result<Option<std::thread::JoinHandle<()>>> {
|
||||
let this = self.clone();
|
||||
let handle = std::thread::spawn(move || {
|
||||
let ticker = crossbeam_channel::tick(Duration::from_millis(50));
|
||||
loop {
|
||||
crossbeam_channel::select! {
|
||||
recv(ticker) -> _ => if let Err(e) = this.trigger_flush() {
|
||||
eprintln!("flush failed: {}", e);
|
||||
},
|
||||
recv(rx) -> _ => return
|
||||
}
|
||||
}
|
||||
});
|
||||
return Ok(Some(handle));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user