diff --git a/Cargo.lock b/Cargo.lock index 7d3645d..2333e2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -403,6 +403,8 @@ dependencies = [ "ouroboros", "parking_lot", "rand", + "serde", + "serde_json", "tempfile", ] diff --git a/mini-lsm-week-1/src/lsm_storage.rs b/mini-lsm-week-1/src/lsm_storage.rs index f57e189..79beae4 100644 --- a/mini-lsm-week-1/src/lsm_storage.rs +++ b/mini-lsm-week-1/src/lsm_storage.rs @@ -99,7 +99,7 @@ impl LsmStorage { )?)); } let iter = MergeIterator::create(iters); - if iter.is_valid() { + if iter.is_valid() && iter.key() == key { return Ok(Some(Bytes::copy_from_slice(iter.value()))); } Ok(None) diff --git a/mini-lsm/Cargo.toml b/mini-lsm/Cargo.toml index 2194ab7..60a140d 100644 --- a/mini-lsm/Cargo.toml +++ b/mini-lsm/Cargo.toml @@ -20,6 +20,8 @@ moka = "0.9" clap = { version = "4.4.17", features = ["derive"] } rand = "0.8.5" crossbeam-channel = "0.5.11" +serde_json = { version = "1.0" } +serde = { version = "1.0", features = ["derive"] } [dev-dependencies] tempfile = "3" diff --git a/mini-lsm/src/bin/compaction_simulator.rs b/mini-lsm/src/bin/compaction_simulator.rs index f9e36f5..74a8843 100644 --- a/mini-lsm/src/bin/compaction_simulator.rs +++ b/mini-lsm/src/bin/compaction_simulator.rs @@ -70,7 +70,7 @@ pub struct MockStorage { impl MockStorage { pub fn new() -> Self { let snapshot = LsmStorageState { - memtable: Arc::new(MemTable::create()), + memtable: Arc::new(MemTable::create(0)), imm_memtables: Vec::new(), l0_sstables: Vec::new(), levels: Vec::new(), @@ -78,7 +78,7 @@ impl MockStorage { }; Self { snapshot, - next_sst_id: 0, + next_sst_id: 1, file_list: Default::default(), total_flushes: 0, total_writes: 0, diff --git a/mini-lsm/src/bin/minilsm_cli.rs b/mini-lsm/src/bin/mini_lsm_cli.rs similarity index 97% rename from mini-lsm/src/bin/minilsm_cli.rs rename to mini-lsm/src/bin/mini_lsm_cli.rs index 6bdb037..e26d96b 100644 --- a/mini-lsm/src/bin/minilsm_cli.rs +++ b/mini-lsm/src/bin/mini_lsm_cli.rs @@ -13,6 +13,7 @@ fn main() -> Result<()> { level0_file_num_compaction_trigger: 2, max_levels: 4, }), + enable_wal: false, }, )?; let mut epoch = 0; @@ -51,7 +52,7 @@ fn main() -> Result<()> { println!("{} not exist", key); } } else if line == "flush" { - lsm.force_flush_imm_memtables()?; + lsm.force_flush()?; } else if line == "quit" { lsm.close()?; break; diff --git a/mini-lsm/src/compact.rs b/mini-lsm/src/compact.rs index eb38a10..df5e65c 100644 --- a/mini-lsm/src/compact.rs +++ b/mini-lsm/src/compact.rs @@ -7,6 +7,7 @@ use std::time::Duration; use anyhow::Result; pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask}; +use serde::{Deserialize, Serialize}; pub use simple_leveled::{ SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask, }; @@ -15,8 +16,10 @@ pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredComp use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::StorageIterator; use crate::lsm_storage::{LsmStorageInner, LsmStorageState}; +use crate::manifest::ManifestRecord; use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; +#[derive(Serialize, Deserialize)] pub(crate) enum CompactionTask { Leveled(LeveledCompactionTask), Tiered(TieredCompactionTask), @@ -233,7 +236,7 @@ impl LsmStorageInner { let sstables = self.compact(&task)?; let output = sstables.iter().map(|x| x.sst_id()).collect::>(); let ssts_to_remove = { - let _state_lock = self.state_lock.lock(); + let state_lock = self.state_lock.lock(); let (mut snapshot, files_to_remove) = self .compaction_controller .apply_compaction_result(&self.state.read(), &task, &output); @@ -249,6 +252,8 @@ impl LsmStorageInner { } let mut state = self.state.write(); *state = Arc::new(snapshot); + self.manifest + .add_record(&state_lock, ManifestRecord::Compaction(task))?; ssts_to_remove }; for sst in ssts_to_remove { diff --git a/mini-lsm/src/compact/leveled.rs b/mini-lsm/src/compact/leveled.rs index df0f031..7f5cfcd 100644 --- a/mini-lsm/src/compact/leveled.rs +++ b/mini-lsm/src/compact/leveled.rs @@ -1,7 +1,10 @@ use std::collections::HashSet; +use serde::{Deserialize, Serialize}; + use crate::lsm_storage::LsmStorageState; +#[derive(Serialize, Deserialize)] pub struct LeveledCompactionTask { // if upper_level is `None`, then it is L0 compaction pub upper_level: Option, diff --git a/mini-lsm/src/compact/simple_leveled.rs b/mini-lsm/src/compact/simple_leveled.rs index f414339..9bc62b2 100644 --- a/mini-lsm/src/compact/simple_leveled.rs +++ b/mini-lsm/src/compact/simple_leveled.rs @@ -1,3 +1,5 @@ +use serde::{Deserialize, Serialize}; + use crate::lsm_storage::LsmStorageState; #[derive(Debug, Clone)] @@ -7,6 +9,7 @@ pub struct SimpleLeveledCompactionOptions { pub max_levels: usize, } +#[derive(Serialize, Deserialize)] pub struct SimpleLeveledCompactionTask { // if upper_level is `None`, then it is L0 compaction pub upper_level: Option, diff --git a/mini-lsm/src/compact/tiered.rs b/mini-lsm/src/compact/tiered.rs index d7ed52a..2885c51 100644 --- a/mini-lsm/src/compact/tiered.rs +++ b/mini-lsm/src/compact/tiered.rs @@ -1,7 +1,10 @@ use std::collections::HashMap; +use serde::{Deserialize, Serialize}; + use crate::lsm_storage::LsmStorageState; +#[derive(Serialize, Deserialize)] pub struct TieredCompactionTask { pub tiers: Vec<(usize, Vec)>, pub bottom_tier_included: bool, diff --git a/mini-lsm/src/lib.rs b/mini-lsm/src/lib.rs index b51e14b..ac8e8ce 100644 --- a/mini-lsm/src/lib.rs +++ b/mini-lsm/src/lib.rs @@ -3,8 +3,10 @@ pub mod compact; pub mod iterators; pub mod lsm_iterator; pub mod lsm_storage; +pub mod manifest; pub mod mem_table; pub mod table; +pub mod wal; #[cfg(test)] mod tests; diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index 337a8af..d17ad02 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::fs::File; use std::ops::Bound; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicUsize; @@ -17,6 +18,7 @@ use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::two_merge_iterator::TwoMergeIterator; use crate::iterators::StorageIterator; use crate::lsm_iterator::{FusedIterator, LsmIterator}; +use crate::manifest::{Manifest, ManifestRecord}; use crate::mem_table::{map_bound, MemTable}; use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; @@ -39,26 +41,20 @@ pub struct LsmStorageState { impl LsmStorageState { fn create(options: &LsmStorageOptions) -> Self { - match &options.compaction_options { + let levels = match &options.compaction_options { CompactionOptions::Leveled(LeveledCompactionOptions { max_levels, .. }) - | CompactionOptions::Simple(SimpleLeveledCompactionOptions { max_levels, .. }) => { - Self { - memtable: Arc::new(MemTable::create()), - imm_memtables: Vec::new(), - l0_sstables: Vec::new(), - levels: (1..=*max_levels) - .map(|level| (level, Vec::new())) - .collect::>(), - sstables: Default::default(), - } - } - CompactionOptions::Tiered(_) | CompactionOptions::NoCompaction => Self { - memtable: Arc::new(MemTable::create()), - imm_memtables: Vec::new(), - l0_sstables: Vec::new(), - levels: Vec::new(), - sstables: Default::default(), - }, + | CompactionOptions::Simple(SimpleLeveledCompactionOptions { max_levels, .. }) => (1 + ..=*max_levels) + .map(|level| (level, Vec::new())) + .collect::>(), + CompactionOptions::Tiered(_) | CompactionOptions::NoCompaction => Vec::new(), + }; + Self { + memtable: Arc::new(MemTable::create(0)), + imm_memtables: Vec::new(), + l0_sstables: Vec::new(), + levels, + sstables: Default::default(), } } } @@ -67,6 +63,7 @@ pub struct LsmStorageOptions { pub block_size: usize, pub target_sst_size: usize, pub compaction_options: CompactionOptions, + pub enable_wal: bool, } impl LsmStorageOptions { @@ -75,6 +72,7 @@ impl LsmStorageOptions { block_size: 4096, target_sst_size: 2 << 20, compaction_options: CompactionOptions::NoCompaction, + enable_wal: false, } } } @@ -88,6 +86,7 @@ pub(crate) struct LsmStorageInner { next_sst_id: AtomicUsize, pub(crate) options: Arc, pub(crate) compaction_controller: CompactionController, + pub(crate) manifest: Manifest, } pub struct MiniLsm { @@ -145,8 +144,9 @@ impl MiniLsm { self.inner.scan(lower, upper) } - pub fn force_flush_imm_memtables(&self) -> Result<()> { - self.inner.force_flush_imm_memtables() + pub fn force_flush(&self) -> Result<()> { + self.inner.force_freeze_memtable()?; + self.inner.force_flush_next_imm_memtable() } } @@ -161,8 +161,15 @@ impl LsmStorageInner { if !path.exists() { std::fs::create_dir_all(path)?; } - Ok(Self { - state: Arc::new(RwLock::new(Arc::new(LsmStorageState::create(&options)))), + let mut state = LsmStorageState::create(&options); + if options.enable_wal { + state.memtable = Arc::new(MemTable::create_with_wal( + state.memtable.id(), + Self::path_of_wal_static(path, state.memtable.id()), + )?); + } + let storage = Self { + state: Arc::new(RwLock::new(Arc::new(state))), state_lock: Mutex::new(()), path: path.to_path_buf(), block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache, @@ -179,8 +186,12 @@ impl LsmStorageInner { ), CompactionOptions::NoCompaction => CompactionController::NoCompaction, }, + manifest: Manifest::create(path.join("MANIFEST"))?, options: options.into(), - }) + }; + storage.sync_dir()?; + + Ok(storage) } /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. @@ -198,6 +209,7 @@ impl LsmStorageInner { } return Ok(Some(value)); } + // Search on immutable memtables. for memtable in snapshot.imm_memtables.iter().rev() { if let Some(value) = memtable.get(key) { @@ -216,7 +228,7 @@ impl LsmStorageInner { )?)); } let iter = MergeIterator::create(iters); - if iter.is_valid() { + if iter.is_valid() && iter.key() == key { return Ok(Some(Bytes::copy_from_slice(iter.value()))); } Ok(None) @@ -228,7 +240,7 @@ impl LsmStorageInner { assert!(!key.is_empty(), "key cannot be empty"); let guard = self.state.read(); - guard.memtable.put(key, value); + guard.memtable.put(key, value)?; Ok(()) } @@ -238,7 +250,7 @@ impl LsmStorageInner { assert!(!key.is_empty(), "key cannot be empty"); let guard = self.state.read(); - guard.memtable.put(key, b""); + guard.memtable.put(key, b"")?; Ok(()) } @@ -247,38 +259,69 @@ impl LsmStorageInner { self.path.join(format!("{:05}.sst", id)) } - /// Force freeze the current memetable to an immutable memtable - pub fn force_freeze_memtable(&self) -> Result<()> { + pub(crate) fn path_of_wal_static(path: impl AsRef, id: usize) -> PathBuf { + path.as_ref().join(format!("{:05}.wal", id)) + } + + pub(crate) fn path_of_wal(&self, id: usize) -> PathBuf { + Self::path_of_wal_static(&self.path, id) + } + + fn sync_dir(&self) -> Result<()> { + File::open(&self.path)?.sync_all()?; Ok(()) } - /// Force flush the all immutable memtables to disk - pub fn force_flush_imm_memtables(&self) -> Result<()> { - let _flush_lock = self.state_lock.lock(); + /// Force freeze the current memetable to an immutable memtable + pub fn force_freeze_memtable(&self) -> Result<()> { + let state_lock = self.state_lock.lock(); - let flush_memtable; - let sst_id; + let memtable_id = self.next_sst_id(); + let memtable = Arc::new(if self.options.enable_wal { + let mt = MemTable::create_with_wal(memtable_id, self.path_of_wal(memtable_id))?; + self.sync_dir()?; + mt + } else { + MemTable::create(memtable_id) + }); - // Move mutable memtable to immutable memtables. + let old_memtable; { let mut guard = self.state.write(); // Swap the current memtable with a new one. let mut snapshot = guard.as_ref().clone(); - let memtable = std::mem::replace(&mut snapshot.memtable, Arc::new(MemTable::create())); - flush_memtable = memtable.clone(); - sst_id = self.next_sst_id(); + old_memtable = std::mem::replace(&mut snapshot.memtable, memtable); // Add the memtable to the immutable memtables. - snapshot.imm_memtables.push(memtable); + snapshot.imm_memtables.push(old_memtable.clone()); // Update the snapshot. *guard = Arc::new(snapshot); } + old_memtable.sync_wal()?; - // At this point, the old memtable should be disabled for write, and all write threads - // should be operating on the new memtable. We can safely flush the old memtable to - // disk. + self.manifest + .add_record(&state_lock, ManifestRecord::NewWal(memtable_id))?; - let mut builder = SsTableBuilder::new(4096); + Ok(()) + } + + /// Force flush the earliest-created immutable memtable to disk + pub fn force_flush_next_imm_memtable(&self) -> Result<()> { + let state_lock = self.state_lock.lock(); + + let flush_memtable; + + { + let mut guard = self.state.read(); + flush_memtable = guard + .imm_memtables + .first() + .expect("no imm memtables!") + .clone(); + } + + let mut builder = SsTableBuilder::new(self.options.block_size); flush_memtable.flush(&mut builder)?; + let sst_id = flush_memtable.id(); let sst = Arc::new(builder.build( sst_id, Some(self.block_cache.clone()), @@ -290,7 +333,8 @@ impl LsmStorageInner { let mut guard = self.state.write(); let mut snapshot = guard.as_ref().clone(); // Remove the memtable from the immutable memtables. - snapshot.imm_memtables.pop(); + let mem = snapshot.imm_memtables.remove(0); + assert_eq!(mem.id(), sst_id); // Add L0 table if self.compaction_controller.flush_to_l0() { // In leveled compaction or no compaction, simply flush to L0 @@ -305,6 +349,15 @@ impl LsmStorageInner { *guard = Arc::new(snapshot); } + if self.options.enable_wal { + std::fs::remove_file(self.path_of_wal(sst_id))?; + } + + self.manifest + .add_record(&state_lock, ManifestRecord::Flush(sst_id))?; + + self.sync_dir()?; + Ok(()) } diff --git a/mini-lsm/src/manifest.rs b/mini-lsm/src/manifest.rs new file mode 100644 index 0000000..4f9d44a --- /dev/null +++ b/mini-lsm/src/manifest.rs @@ -0,0 +1,36 @@ +use std::fs::File; +use std::path::Path; +use std::sync::Arc; + +use anyhow::Result; +use parking_lot::{Mutex, MutexGuard}; +use serde::{Deserialize, Serialize}; + +use crate::compact::CompactionTask; + +pub struct Manifest {} + +#[derive(Serialize, Deserialize)] +pub enum ManifestRecord { + Flush(usize), + NewWal(usize), + Compaction(CompactionTask), +} + +impl Manifest { + pub fn create(path: impl AsRef) -> Result { + Ok(Self {}) + } + + pub fn recover(path: impl AsRef) -> Result<(Self, Vec)> { + Ok((Self {}, Vec::new())) + } + + pub fn add_record( + &self, + _state_lock_observer: &MutexGuard<()>, + record: ManifestRecord, + ) -> Result<()> { + Ok(()) + } +} diff --git a/mini-lsm/src/mem_table.rs b/mini-lsm/src/mem_table.rs index 637b762..2b2b0e4 100644 --- a/mini-lsm/src/mem_table.rs +++ b/mini-lsm/src/mem_table.rs @@ -1,4 +1,6 @@ use std::ops::Bound; +use std::path::Path; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use anyhow::Result; @@ -9,10 +11,13 @@ use ouroboros::self_referencing; use crate::iterators::StorageIterator; use crate::table::SsTableBuilder; +use crate::wal::Wal; /// A basic mem-table based on crossbeam-skiplist pub struct MemTable { map: Arc>, + wal: Option, + id: usize, } pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound { @@ -25,21 +30,47 @@ pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound { impl MemTable { /// Create a new mem-table. - pub fn create() -> Self { + pub fn create(id: usize) -> Self { Self { + id, map: Arc::new(SkipMap::new()), + wal: None, } } + /// Create a new mem-table with WAL + pub fn create_with_wal(id: usize, path: impl AsRef) -> Result { + Ok(Self { + id, + map: Arc::new(SkipMap::new()), + wal: Some(Wal::create(path.as_ref())?), + }) + } + + pub fn recover_from_wal(id: usize, path: impl AsRef) -> Result { + unimplemented!() + } + /// Get a value by key. pub fn get(&self, key: &[u8]) -> Option { self.map.get(key).map(|e| e.value().clone()) } /// Put a key-value pair into the mem-table. - pub fn put(&self, key: &[u8], value: &[u8]) { + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { self.map .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); + if let Some(ref wal) = self.wal { + wal.put(key, value)?; + } + Ok(()) + } + + pub fn sync_wal(&self) -> Result<()> { + if let Some(ref wal) = self.wal { + wal.sync()?; + } + Ok(()) } /// Get an iterator over a range of keys. @@ -63,6 +94,10 @@ impl MemTable { } Ok(()) } + + pub fn id(&self) -> usize { + self.id + } } type SkipMapRangeIter<'a> = diff --git a/mini-lsm/src/mem_table/tests.rs b/mini-lsm/src/mem_table/tests.rs index 8371ecb..76a80da 100644 --- a/mini-lsm/src/mem_table/tests.rs +++ b/mini-lsm/src/mem_table/tests.rs @@ -6,10 +6,10 @@ use crate::table::{SsTableBuilder, SsTableIterator}; #[test] fn test_memtable_get() { - let memtable = MemTable::create(); - memtable.put(b"key1", b"value1"); - memtable.put(b"key2", b"value2"); - memtable.put(b"key3", b"value3"); + let memtable = MemTable::create(0); + memtable.put(b"key1", b"value1").unwrap(); + memtable.put(b"key2", b"value2").unwrap(); + memtable.put(b"key3", b"value3").unwrap(); assert_eq!(&memtable.get(b"key1").unwrap()[..], b"value1"); assert_eq!(&memtable.get(b"key2").unwrap()[..], b"value2"); assert_eq!(&memtable.get(b"key3").unwrap()[..], b"value3"); @@ -17,13 +17,13 @@ fn test_memtable_get() { #[test] fn test_memtable_overwrite() { - let memtable = MemTable::create(); - memtable.put(b"key1", b"value1"); - memtable.put(b"key2", b"value2"); - memtable.put(b"key3", b"value3"); - memtable.put(b"key1", b"value11"); - memtable.put(b"key2", b"value22"); - memtable.put(b"key3", b"value33"); + let memtable = MemTable::create(0); + memtable.put(b"key1", b"value1").unwrap(); + memtable.put(b"key2", b"value2").unwrap(); + memtable.put(b"key3", b"value3").unwrap(); + memtable.put(b"key1", b"value11").unwrap(); + memtable.put(b"key2", b"value22").unwrap(); + memtable.put(b"key3", b"value33").unwrap(); assert_eq!(&memtable.get(b"key1").unwrap()[..], b"value11"); assert_eq!(&memtable.get(b"key2").unwrap()[..], b"value22"); assert_eq!(&memtable.get(b"key3").unwrap()[..], b"value33"); @@ -31,10 +31,10 @@ fn test_memtable_overwrite() { #[test] fn test_memtable_flush() { - let memtable = MemTable::create(); - memtable.put(b"key1", b"value1"); - memtable.put(b"key2", b"value2"); - memtable.put(b"key3", b"value3"); + let memtable = MemTable::create(0); + memtable.put(b"key1", b"value1").unwrap(); + memtable.put(b"key2", b"value2").unwrap(); + memtable.put(b"key3", b"value3").unwrap(); let mut builder = SsTableBuilder::new(128); memtable.flush(&mut builder).unwrap(); let dir = tempdir().unwrap(); @@ -55,10 +55,10 @@ fn test_memtable_flush() { #[test] fn test_memtable_iter() { use std::ops::Bound; - let memtable = MemTable::create(); - memtable.put(b"key1", b"value1"); - memtable.put(b"key2", b"value2"); - memtable.put(b"key3", b"value3"); + let memtable = MemTable::create(0); + memtable.put(b"key1", b"value1").unwrap(); + memtable.put(b"key2", b"value2").unwrap(); + memtable.put(b"key3", b"value3").unwrap(); { let mut iter = memtable.scan(Bound::Unbounded, Bound::Unbounded); diff --git a/mini-lsm/src/table.rs b/mini-lsm/src/table.rs index eb8301b..a8083a3 100644 --- a/mini-lsm/src/table.rs +++ b/mini-lsm/src/table.rs @@ -95,6 +95,7 @@ impl FileObject { /// Create a new file object (day 2) and write the file to the disk (day 4). pub fn create(path: &Path, data: Vec) -> Result { std::fs::write(path, &data)?; + File::open(path)?.sync_all()?; Ok(FileObject( Some(File::options().read(true).write(false).open(path)?), data.len() as u64, diff --git a/mini-lsm/src/tests/day4_tests.rs b/mini-lsm/src/tests/day4_tests.rs index a870ae3..4c26eaa 100644 --- a/mini-lsm/src/tests/day4_tests.rs +++ b/mini-lsm/src/tests/day4_tests.rs @@ -35,7 +35,7 @@ fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) fn sync(storage: &LsmStorageInner) { storage.force_freeze_memtable().unwrap(); - storage.force_flush_imm_memtables().unwrap(); + storage.force_flush_next_imm_memtable().unwrap(); } #[test] diff --git a/mini-lsm/src/wal.rs b/mini-lsm/src/wal.rs new file mode 100644 index 0000000..9d026c2 --- /dev/null +++ b/mini-lsm/src/wal.rs @@ -0,0 +1,19 @@ +use std::path::Path; + +use anyhow::Result; + +pub struct Wal {} + +impl Wal { + pub fn create(path: impl AsRef) -> Result { + unimplemented!() + } + + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + Ok(()) + } + + pub fn sync(&self) -> Result<()> { + Ok(()) + } +}