support force full compaction

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2024-01-18 19:49:36 +08:00
parent 3aad027e23
commit 41d860e987
2 changed files with 34 additions and 2 deletions

View File

@@ -1,5 +1,3 @@
use std::time::Duration;
use anyhow::Result; use anyhow::Result;
use mini_lsm::compact::{CompactionOptions, SimpleLeveledCompactionOptions}; use mini_lsm::compact::{CompactionOptions, SimpleLeveledCompactionOptions};
use mini_lsm::lsm_storage::{LsmStorageOptions, MiniLsm}; use mini_lsm::lsm_storage::{LsmStorageOptions, MiniLsm};

View File

@@ -184,6 +184,40 @@ impl LsmStorageInner {
Ok(new_sst) Ok(new_sst)
} }
pub fn force_full_compaction(&self) -> Result<()> {
let CompactionOptions::NoCompaction = self.options.compaction_options else {
panic!("full compaction can only be called with compaction is not enabled")
};
let snapshot = {
let state = self.state.read();
state.clone()
};
let original_sstables = snapshot.l0_sstables.clone();
let sstables = self.compact(&CompactionTask::ForceFullCompaction(
original_sstables.clone(),
))?;
{
let _state_lock = self.state_lock.lock();
let mut state = self.state.read().as_ref().clone();
for sst in original_sstables.iter() {
let result = state.sstables.remove(sst);
assert!(result.is_some());
}
let mut ids = Vec::with_capacity(sstables.len());
for new_sst in sstables {
ids.push(new_sst.sst_id());
let result = state.sstables.insert(new_sst.sst_id(), new_sst);
assert!(result.is_none());
}
state.l0_sstables = ids;
*self.state.write() = Arc::new(state);
}
for sst in original_sstables {
std::fs::remove_file(self.path_of_sst(sst))?;
}
Ok(())
}
fn trigger_compaction(&self) -> Result<()> { fn trigger_compaction(&self) -> Result<()> {
let snapshot = { let snapshot = {
let state = self.state.read(); let state = self.state.read();