From 77e15efad4b7d8a3946020a35c3a239a929d7321 Mon Sep 17 00:00:00 2001 From: Eikasia30 Date: Tue, 2 Jul 2024 20:25:43 -0400 Subject: [PATCH] fix: avoid leveled compaction crash when recovering from manifest (#63) * Fix: Avoid leveled copaction crash when recovering from manifest * Also sort SSTs in manifest recovery * Add `in_recovery` flag to `apply_compaction_result` - Don't sort the SSTs inside `apply_compaction_result` if in recovery --- mini-lsm-mvcc/src/compact.rs | 6 +- mini-lsm-mvcc/src/compact/leveled.rs | 20 +++--- mini-lsm-mvcc/src/lsm_storage.rs | 16 ++++- .../src/bin/compaction-simulator.rs | 8 ++- mini-lsm-starter/src/compact.rs | 3 +- mini-lsm-starter/src/compact/leveled.rs | 1 + mini-lsm/src/compact.rs | 6 +- mini-lsm/src/compact/leveled.rs | 20 +++--- mini-lsm/src/lsm_storage.rs | 16 ++++- mini-lsm/src/tests/week2_day5.rs | 72 +++++++++++++++++++ 10 files changed, 141 insertions(+), 27 deletions(-) diff --git a/mini-lsm-mvcc/src/compact.rs b/mini-lsm-mvcc/src/compact.rs index bc0593a..3b5bd1e 100644 --- a/mini-lsm-mvcc/src/compact.rs +++ b/mini-lsm-mvcc/src/compact.rs @@ -73,10 +73,11 @@ impl CompactionController { snapshot: &LsmStorageState, task: &CompactionTask, output: &[usize], + in_recovery: bool, ) -> (LsmStorageState, Vec) { match (self, task) { (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => { - ctrl.apply_compaction_result(snapshot, task, output) + ctrl.apply_compaction_result(snapshot, task, output, in_recovery) } (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => { ctrl.apply_compaction_result(snapshot, task, output) @@ -381,7 +382,8 @@ impl LsmStorageInner { } let (mut snapshot, files_to_remove) = self .compaction_controller - .apply_compaction_result(&snapshot, &task, &output); + .apply_compaction_result(&snapshot, &task, &output, false); + let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len()); for file_to_remove in &files_to_remove { let result = snapshot.sstables.remove(file_to_remove); diff --git a/mini-lsm-mvcc/src/compact/leveled.rs b/mini-lsm-mvcc/src/compact/leveled.rs index 043bc92..b4c28e2 100644 --- a/mini-lsm-mvcc/src/compact/leveled.rs +++ b/mini-lsm-mvcc/src/compact/leveled.rs @@ -160,6 +160,7 @@ impl LeveledCompactionController { snapshot: &LsmStorageState, task: &LeveledCompactionTask, output: &[usize], + in_recovery: bool, ) -> (LsmStorageState, Vec) { let mut snapshot = snapshot.clone(); let mut files_to_remove = Vec::new(); @@ -216,14 +217,17 @@ impl LeveledCompactionController { .collect::>(); assert!(lower_level_sst_ids_set.is_empty()); new_lower_level_ssts.extend(output); - new_lower_level_ssts.sort_by(|x, y| { - snapshot - .sstables - .get(x) - .unwrap() - .first_key() - .cmp(snapshot.sstables.get(y).unwrap().first_key()) - }); + // Don't sort the SST IDs during recovery because actual SSTs are not loaded at that point + if !in_recovery { + new_lower_level_ssts.sort_by(|x, y| { + snapshot + .sstables + .get(x) + .unwrap() + .first_key() + .cmp(snapshot.sstables.get(y).unwrap().first_key()) + }); + } snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts; (snapshot, files_to_remove) } diff --git a/mini-lsm-mvcc/src/lsm_storage.rs b/mini-lsm-mvcc/src/lsm_storage.rs index 10f918b..ab32e7b 100644 --- a/mini-lsm-mvcc/src/lsm_storage.rs +++ b/mini-lsm-mvcc/src/lsm_storage.rs @@ -368,8 +368,8 @@ impl LsmStorageInner { memtables.insert(x); } ManifestRecord::Compaction(task, output) => { - let (new_state, _) = - compaction_controller.apply_compaction_result(&state, &task, &output); + let (new_state, _) = compaction_controller + .apply_compaction_result(&state, &task, &output, true); // TODO: apply remove again state = new_state; next_sst_id = @@ -400,6 +400,18 @@ impl LsmStorageInner { next_sst_id += 1; + // Sort SSTs on each level + for (_id, ssts) in &mut state.levels { + ssts.sort_by(|x, y| { + state + .sstables + .get(x) + .unwrap() + .first_key() + .cmp(state.sstables.get(y).unwrap().first_key()) + }) + } + // recover memtables if options.enable_wal { let mut wal_cnt = 0; diff --git a/mini-lsm-starter/src/bin/compaction-simulator.rs b/mini-lsm-starter/src/bin/compaction-simulator.rs index 18eba53..3d8047b 100644 --- a/mini-lsm-starter/src/bin/compaction-simulator.rs +++ b/mini-lsm-starter/src/bin/compaction-simulator.rs @@ -561,8 +561,12 @@ fn main() { .join(", ") ); max_space = max_space.max(storage.file_list.len()); - let (snapshot, del) = - controller.apply_compaction_result(&storage.snapshot, &task, &sst_ids); + let (snapshot, del) = controller.apply_compaction_result( + &storage.snapshot, + &task, + &sst_ids, + false, + ); storage.snapshot = snapshot; storage.remove(&del); println!("--- After Compaction ---"); diff --git a/mini-lsm-starter/src/compact.rs b/mini-lsm-starter/src/compact.rs index d52cc13..f7646c7 100644 --- a/mini-lsm-starter/src/compact.rs +++ b/mini-lsm-starter/src/compact.rs @@ -68,10 +68,11 @@ impl CompactionController { snapshot: &LsmStorageState, task: &CompactionTask, output: &[usize], + in_recovery: bool, ) -> (LsmStorageState, Vec) { match (self, task) { (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => { - ctrl.apply_compaction_result(snapshot, task, output) + ctrl.apply_compaction_result(snapshot, task, output, in_recovery) } (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => { ctrl.apply_compaction_result(snapshot, task, output) diff --git a/mini-lsm-starter/src/compact/leveled.rs b/mini-lsm-starter/src/compact/leveled.rs index 50db7b0..6cd8bc7 100644 --- a/mini-lsm-starter/src/compact/leveled.rs +++ b/mini-lsm-starter/src/compact/leveled.rs @@ -50,6 +50,7 @@ impl LeveledCompactionController { _snapshot: &LsmStorageState, _task: &LeveledCompactionTask, _output: &[usize], + _in_recovery: bool, ) -> (LsmStorageState, Vec) { unimplemented!() } diff --git a/mini-lsm/src/compact.rs b/mini-lsm/src/compact.rs index 695ab3b..22fb932 100644 --- a/mini-lsm/src/compact.rs +++ b/mini-lsm/src/compact.rs @@ -73,10 +73,11 @@ impl CompactionController { snapshot: &LsmStorageState, task: &CompactionTask, output: &[usize], + in_recovery: bool, ) -> (LsmStorageState, Vec) { match (self, task) { (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => { - ctrl.apply_compaction_result(snapshot, task, output) + ctrl.apply_compaction_result(snapshot, task, output, in_recovery) } (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => { ctrl.apply_compaction_result(snapshot, task, output) @@ -335,7 +336,8 @@ impl LsmStorageInner { } let (mut snapshot, files_to_remove) = self .compaction_controller - .apply_compaction_result(&snapshot, &task, &output); + .apply_compaction_result(&snapshot, &task, &output, false); + let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len()); for file_to_remove in &files_to_remove { let result = snapshot.sstables.remove(file_to_remove); diff --git a/mini-lsm/src/compact/leveled.rs b/mini-lsm/src/compact/leveled.rs index 2136618..5cc1609 100644 --- a/mini-lsm/src/compact/leveled.rs +++ b/mini-lsm/src/compact/leveled.rs @@ -159,6 +159,7 @@ impl LeveledCompactionController { snapshot: &LsmStorageState, task: &LeveledCompactionTask, output: &[usize], + in_recovery: bool, ) -> (LsmStorageState, Vec) { let mut snapshot = snapshot.clone(); let mut files_to_remove = Vec::new(); @@ -215,14 +216,17 @@ impl LeveledCompactionController { .collect::>(); assert!(lower_level_sst_ids_set.is_empty()); new_lower_level_ssts.extend(output); - new_lower_level_ssts.sort_by(|x, y| { - snapshot - .sstables - .get(x) - .unwrap() - .first_key() - .cmp(snapshot.sstables.get(y).unwrap().first_key()) - }); + // Don't sort the SST IDs during recovery because actual SSTs are not loaded at that point + if !in_recovery { + new_lower_level_ssts.sort_by(|x, y| { + snapshot + .sstables + .get(x) + .unwrap() + .first_key() + .cmp(snapshot.sstables.get(y).unwrap().first_key()) + }); + } snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts; (snapshot, files_to_remove) } diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index 19d3c64..6b69e69 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -364,8 +364,8 @@ impl LsmStorageInner { memtables.insert(x); } ManifestRecord::Compaction(task, output) => { - let (new_state, _) = - compaction_controller.apply_compaction_result(&state, &task, &output); + let (new_state, _) = compaction_controller + .apply_compaction_result(&state, &task, &output, true); // TODO: apply remove again state = new_state; next_sst_id = @@ -395,6 +395,18 @@ impl LsmStorageInner { next_sst_id += 1; + // Sort SSTs on each level + for (_id, ssts) in &mut state.levels { + ssts.sort_by(|x, y| { + state + .sstables + .get(x) + .unwrap() + .first_key() + .cmp(state.sstables.get(y).unwrap().first_key()) + }) + } + // recover memtables if options.enable_wal { let mut wal_cnt = 0; diff --git a/mini-lsm/src/tests/week2_day5.rs b/mini-lsm/src/tests/week2_day5.rs index 9ceedb3..5199c76 100644 --- a/mini-lsm/src/tests/week2_day5.rs +++ b/mini-lsm/src/tests/week2_day5.rs @@ -1,3 +1,6 @@ +use std::time::Duration; + +use bytes::BufMut; use tempfile::tempdir; use crate::{ @@ -38,6 +41,64 @@ fn test_integration_simple() { })); } +/// Provision the storage such that base_level contains 2 SST files (target size is 2MB and each SST is 1MB). +/// This configuration has the effect that compaction will generate a new lower-level containing more than 1 SST files, +/// and leveled compaction should handle this situation correctly: These files might not be sorted by first-key and +/// should NOT be sorted inside the `apply_compaction_result` function, because we don't have any actual SST loaded at the +/// point where this function is called during manifest recovery. +#[test] +fn test_multiple_compacted_ssts_leveled() { + let compaction_options = CompactionOptions::Leveled(LeveledCompactionOptions { + level_size_multiplier: 4, + level0_file_num_compaction_trigger: 2, + max_levels: 2, + base_level_size_mb: 2, + }); + + let lsm_storage_options = LsmStorageOptions::default_for_week2_test(compaction_options.clone()); + + let dir = tempdir().unwrap(); + let storage = MiniLsm::open(&dir, lsm_storage_options).unwrap(); + + // Insert approximately 10MB of data to ensure that at least one compaction is triggered by priority. + // Insert 500 key-value pairs where each pair is 2KB + for i in 0..500 { + let (key, val) = key_value_pair_with_target_size(i, 20 * 1024); + storage.put(&key, &val).unwrap(); + } + + let mut prev_snapshot = storage.inner.state.read().clone(); + while { + std::thread::sleep(Duration::from_secs(1)); + let snapshot = storage.inner.state.read().clone(); + let to_cont = prev_snapshot.levels != snapshot.levels + || prev_snapshot.l0_sstables != snapshot.l0_sstables; + prev_snapshot = snapshot; + to_cont + } { + println!("waiting for compaction to converge"); + } + + storage.close().unwrap(); + assert!(storage.inner.state.read().memtable.is_empty()); + assert!(storage.inner.state.read().imm_memtables.is_empty()); + + storage.dump_structure(); + drop(storage); + dump_files_in_dir(&dir); + + let storage = MiniLsm::open( + &dir, + LsmStorageOptions::default_for_week2_test(compaction_options.clone()), + ) + .unwrap(); + + for i in 0..500 { + let (key, val) = key_value_pair_with_target_size(i, 20 * 1024); + assert_eq!(&storage.get(&key).unwrap().unwrap()[..], &val); + } +} + fn test_integration(compaction_options: CompactionOptions) { let dir = tempdir().unwrap(); let storage = MiniLsm::open( @@ -79,3 +140,14 @@ fn test_integration(compaction_options: CompactionOptions) { assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"v20".as_slice()); assert_eq!(storage.get(b"2").unwrap(), None); } + +/// Create a key value pair where key and value are of target size in bytes +fn key_value_pair_with_target_size(seed: i32, target_size_byte: usize) -> (Vec, Vec) { + let mut key = vec![0; target_size_byte - 4]; + key.put_i32(seed); + + let mut val = vec![0; target_size_byte - 4]; + val.put_i32(seed); + + (key, val) +}