fix: avoid leveled compaction crash when recovering from manifest (#63)
* Fix: Avoid leveled copaction crash when recovering from manifest * Also sort SSTs in manifest recovery * Add `in_recovery` flag to `apply_compaction_result` - Don't sort the SSTs inside `apply_compaction_result` if in recovery
This commit is contained in:
@@ -73,10 +73,11 @@ impl CompactionController {
|
||||
snapshot: &LsmStorageState,
|
||||
task: &CompactionTask,
|
||||
output: &[usize],
|
||||
in_recovery: bool,
|
||||
) -> (LsmStorageState, Vec<usize>) {
|
||||
match (self, task) {
|
||||
(CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => {
|
||||
ctrl.apply_compaction_result(snapshot, task, output)
|
||||
ctrl.apply_compaction_result(snapshot, task, output, in_recovery)
|
||||
}
|
||||
(CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => {
|
||||
ctrl.apply_compaction_result(snapshot, task, output)
|
||||
@@ -335,7 +336,8 @@ impl LsmStorageInner {
|
||||
}
|
||||
let (mut snapshot, files_to_remove) = self
|
||||
.compaction_controller
|
||||
.apply_compaction_result(&snapshot, &task, &output);
|
||||
.apply_compaction_result(&snapshot, &task, &output, false);
|
||||
|
||||
let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
|
||||
for file_to_remove in &files_to_remove {
|
||||
let result = snapshot.sstables.remove(file_to_remove);
|
||||
|
@@ -159,6 +159,7 @@ impl LeveledCompactionController {
|
||||
snapshot: &LsmStorageState,
|
||||
task: &LeveledCompactionTask,
|
||||
output: &[usize],
|
||||
in_recovery: bool,
|
||||
) -> (LsmStorageState, Vec<usize>) {
|
||||
let mut snapshot = snapshot.clone();
|
||||
let mut files_to_remove = Vec::new();
|
||||
@@ -215,14 +216,17 @@ impl LeveledCompactionController {
|
||||
.collect::<Vec<_>>();
|
||||
assert!(lower_level_sst_ids_set.is_empty());
|
||||
new_lower_level_ssts.extend(output);
|
||||
new_lower_level_ssts.sort_by(|x, y| {
|
||||
snapshot
|
||||
.sstables
|
||||
.get(x)
|
||||
.unwrap()
|
||||
.first_key()
|
||||
.cmp(snapshot.sstables.get(y).unwrap().first_key())
|
||||
});
|
||||
// Don't sort the SST IDs during recovery because actual SSTs are not loaded at that point
|
||||
if !in_recovery {
|
||||
new_lower_level_ssts.sort_by(|x, y| {
|
||||
snapshot
|
||||
.sstables
|
||||
.get(x)
|
||||
.unwrap()
|
||||
.first_key()
|
||||
.cmp(snapshot.sstables.get(y).unwrap().first_key())
|
||||
});
|
||||
}
|
||||
snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts;
|
||||
(snapshot, files_to_remove)
|
||||
}
|
||||
|
@@ -364,8 +364,8 @@ impl LsmStorageInner {
|
||||
memtables.insert(x);
|
||||
}
|
||||
ManifestRecord::Compaction(task, output) => {
|
||||
let (new_state, _) =
|
||||
compaction_controller.apply_compaction_result(&state, &task, &output);
|
||||
let (new_state, _) = compaction_controller
|
||||
.apply_compaction_result(&state, &task, &output, true);
|
||||
// TODO: apply remove again
|
||||
state = new_state;
|
||||
next_sst_id =
|
||||
@@ -395,6 +395,18 @@ impl LsmStorageInner {
|
||||
|
||||
next_sst_id += 1;
|
||||
|
||||
// Sort SSTs on each level
|
||||
for (_id, ssts) in &mut state.levels {
|
||||
ssts.sort_by(|x, y| {
|
||||
state
|
||||
.sstables
|
||||
.get(x)
|
||||
.unwrap()
|
||||
.first_key()
|
||||
.cmp(state.sstables.get(y).unwrap().first_key())
|
||||
})
|
||||
}
|
||||
|
||||
// recover memtables
|
||||
if options.enable_wal {
|
||||
let mut wal_cnt = 0;
|
||||
|
@@ -1,3 +1,6 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::BufMut;
|
||||
use tempfile::tempdir;
|
||||
|
||||
use crate::{
|
||||
@@ -38,6 +41,64 @@ fn test_integration_simple() {
|
||||
}));
|
||||
}
|
||||
|
||||
/// Provision the storage such that base_level contains 2 SST files (target size is 2MB and each SST is 1MB).
|
||||
/// This configuration has the effect that compaction will generate a new lower-level containing more than 1 SST files,
|
||||
/// and leveled compaction should handle this situation correctly: These files might not be sorted by first-key and
|
||||
/// should NOT be sorted inside the `apply_compaction_result` function, because we don't have any actual SST loaded at the
|
||||
/// point where this function is called during manifest recovery.
|
||||
#[test]
|
||||
fn test_multiple_compacted_ssts_leveled() {
|
||||
let compaction_options = CompactionOptions::Leveled(LeveledCompactionOptions {
|
||||
level_size_multiplier: 4,
|
||||
level0_file_num_compaction_trigger: 2,
|
||||
max_levels: 2,
|
||||
base_level_size_mb: 2,
|
||||
});
|
||||
|
||||
let lsm_storage_options = LsmStorageOptions::default_for_week2_test(compaction_options.clone());
|
||||
|
||||
let dir = tempdir().unwrap();
|
||||
let storage = MiniLsm::open(&dir, lsm_storage_options).unwrap();
|
||||
|
||||
// Insert approximately 10MB of data to ensure that at least one compaction is triggered by priority.
|
||||
// Insert 500 key-value pairs where each pair is 2KB
|
||||
for i in 0..500 {
|
||||
let (key, val) = key_value_pair_with_target_size(i, 20 * 1024);
|
||||
storage.put(&key, &val).unwrap();
|
||||
}
|
||||
|
||||
let mut prev_snapshot = storage.inner.state.read().clone();
|
||||
while {
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
let snapshot = storage.inner.state.read().clone();
|
||||
let to_cont = prev_snapshot.levels != snapshot.levels
|
||||
|| prev_snapshot.l0_sstables != snapshot.l0_sstables;
|
||||
prev_snapshot = snapshot;
|
||||
to_cont
|
||||
} {
|
||||
println!("waiting for compaction to converge");
|
||||
}
|
||||
|
||||
storage.close().unwrap();
|
||||
assert!(storage.inner.state.read().memtable.is_empty());
|
||||
assert!(storage.inner.state.read().imm_memtables.is_empty());
|
||||
|
||||
storage.dump_structure();
|
||||
drop(storage);
|
||||
dump_files_in_dir(&dir);
|
||||
|
||||
let storage = MiniLsm::open(
|
||||
&dir,
|
||||
LsmStorageOptions::default_for_week2_test(compaction_options.clone()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
for i in 0..500 {
|
||||
let (key, val) = key_value_pair_with_target_size(i, 20 * 1024);
|
||||
assert_eq!(&storage.get(&key).unwrap().unwrap()[..], &val);
|
||||
}
|
||||
}
|
||||
|
||||
fn test_integration(compaction_options: CompactionOptions) {
|
||||
let dir = tempdir().unwrap();
|
||||
let storage = MiniLsm::open(
|
||||
@@ -79,3 +140,14 @@ fn test_integration(compaction_options: CompactionOptions) {
|
||||
assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"v20".as_slice());
|
||||
assert_eq!(storage.get(b"2").unwrap(), None);
|
||||
}
|
||||
|
||||
/// Create a key value pair where key and value are of target size in bytes
|
||||
fn key_value_pair_with_target_size(seed: i32, target_size_byte: usize) -> (Vec<u8>, Vec<u8>) {
|
||||
let mut key = vec![0; target_size_byte - 4];
|
||||
key.put_i32(seed);
|
||||
|
||||
let mut val = vec![0; target_size_byte - 4];
|
||||
val.put_i32(seed);
|
||||
|
||||
(key, val)
|
||||
}
|
||||
|
Reference in New Issue
Block a user