diff --git a/Cargo.lock b/Cargo.lock index 2333e2e..e252670 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -415,11 +415,16 @@ dependencies = [ "anyhow", "arc-swap", "bytes", + "clap", + "crossbeam-channel", "crossbeam-epoch", "crossbeam-skiplist", "moka", "ouroboros", "parking_lot", + "rand", + "serde", + "serde_json", "tempfile", ] diff --git a/README.md b/README.md index 76a96bf..9dc10ff 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,8 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we | 4.4 | Alternative Block Encodings | | | | | 4.5 | Rate Limiter and I/O Optimizations | | | | | 4.6 | Build Your Own Block Cache | | | | -| 4.7 | Async Engine | | | | -| 4.8 | Key-Value Separation | | | | -| 4.9 | Column Families | | | | -| 4.10 | SQL over Mini-LSM | | | | +| 4.7 | Build Your Own SkipList | | | | +| 4.8 | Async Engine | | | | +| 4.9 | Key-Value Separation | | | | +| 4.10 | Column Families | | | | +| 4.11 | SQL over Mini-LSM | | | | diff --git a/mini-lsm-starter/Cargo.toml b/mini-lsm-starter/Cargo.toml index 987c440..3d4b8fc 100644 --- a/mini-lsm-starter/Cargo.toml +++ b/mini-lsm-starter/Cargo.toml @@ -13,6 +13,11 @@ crossbeam-skiplist = "0.1" parking_lot = "0.12" ouroboros = "0.15" moka = "0.9" +clap = { version = "4.4.17", features = ["derive"] } +rand = "0.8.5" +crossbeam-channel = "0.5.11" +serde_json = { version = "1.0" } +serde = { version = "1.0", features = ["derive"] } [dev-dependencies] tempfile = "3" diff --git a/mini-lsm-starter/src/compact.rs b/mini-lsm-starter/src/compact.rs new file mode 100644 index 0000000..1e70994 --- /dev/null +++ b/mini-lsm-starter/src/compact.rs @@ -0,0 +1,144 @@ +mod leveled; +mod simple_leveled; +mod tiered; + +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask}; +use serde::{Deserialize, Serialize}; +pub use simple_leveled::{ + SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask, +}; +pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask}; + +use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::StorageIterator; +use crate::lsm_storage::{LsmStorageInner, LsmStorageState}; +use crate::manifest::ManifestRecord; +use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; + +#[derive(Debug, Serialize, Deserialize)] +pub enum CompactionTask { + Leveled(LeveledCompactionTask), + Tiered(TieredCompactionTask), + Simple(SimpleLeveledCompactionTask), + ForceFullCompaction(Vec), +} + +impl CompactionTask { + fn compact_to_bottom_level(&self) -> bool { + match self { + CompactionTask::ForceFullCompaction(_) => true, + CompactionTask::Leveled(task) => task.is_lower_level_bottom_level, + CompactionTask::Simple(task) => task.is_lower_level_bottom_level, + CompactionTask::Tiered(task) => task.bottom_tier_included, + } + } +} + +pub(crate) enum CompactionController { + Leveled(LeveledCompactionController), + Tiered(TieredCompactionController), + Simple(SimpleLeveledCompactionController), + NoCompaction, +} + +impl CompactionController { + pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option { + match self { + CompactionController::Leveled(ctrl) => ctrl + .generate_compaction_task(&snapshot) + .map(CompactionTask::Leveled), + CompactionController::Simple(ctrl) => ctrl + .generate_compaction_task(&snapshot) + .map(CompactionTask::Simple), + CompactionController::Tiered(ctrl) => ctrl + .generate_compaction_task(&snapshot) + .map(CompactionTask::Tiered), + CompactionController::NoCompaction => unreachable!(), + } + } + + pub fn apply_compaction_result( + &self, + snapshot: &LsmStorageState, + task: &CompactionTask, + output: &[usize], + ) -> (LsmStorageState, Vec) { + match (self, task) { + (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => { + ctrl.apply_compaction_result(&snapshot, task, output) + } + (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => { + ctrl.apply_compaction_result(&snapshot, task, output) + } + (CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => { + ctrl.apply_compaction_result(&snapshot, task, output) + } + _ => unreachable!(), + } + } +} + +impl CompactionController { + pub fn flush_to_l0(&self) -> bool { + if let Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction = self { + true + } else { + false + } + } +} + +pub enum CompactionOptions { + /// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled + /// Compaction) + Leveled(LeveledCompactionOptions), + /// Tiered compaction (= RocksDB's universal compaction) + Tiered(TieredCompactionOptions), + /// Simple leveled compaction + Simple(SimpleLeveledCompactionOptions), + /// In no compaction mode (week 1), always flush to L0 + NoCompaction, +} + +impl LsmStorageInner { + fn compact(&self, task: &CompactionTask) -> Result>> { + unimplemented!() + } + + pub fn force_full_compaction(&self) -> Result<()> { + unimplemented!() + } + + fn trigger_compaction(&self) -> Result<()> { + unimplemented!() + } + + pub(crate) fn spawn_compaction_thread( + self: &Arc, + rx: crossbeam_channel::Receiver<()>, + ) -> Result>> { + if let CompactionOptions::Leveled(_) + | CompactionOptions::Simple(_) + | CompactionOptions::Tiered(_) = self.options.compaction_options + { + let this = self.clone(); + let handle = std::thread::spawn(move || { + let ticker = crossbeam_channel::tick(Duration::from_millis(50)); + loop { + crossbeam_channel::select! { + recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() { + eprintln!("compaction failed: {}", e); + }, + recv(rx) -> _ => return + } + } + }); + return Ok(Some(handle)); + } + Ok(None) + } +} diff --git a/mini-lsm-starter/src/compact/leveled.rs b/mini-lsm-starter/src/compact/leveled.rs new file mode 100644 index 0000000..ee133fb --- /dev/null +++ b/mini-lsm-starter/src/compact/leveled.rs @@ -0,0 +1,58 @@ +use std::collections::HashSet; + +use serde::{Deserialize, Serialize}; + +use crate::lsm_storage::LsmStorageState; + +#[derive(Debug, Serialize, Deserialize)] +pub struct LeveledCompactionTask { + // if upper_level is `None`, then it is L0 compaction + pub upper_level: Option, + pub upper_level_sst_ids: Vec, + pub lower_level: usize, + pub lower_level_sst_ids: Vec, + pub is_lower_level_bottom_level: bool, +} + +#[derive(Debug, Clone)] +pub struct LeveledCompactionOptions { + pub level_size_multiplier: usize, + pub level0_file_num_compaction_trigger: usize, + pub max_levels: usize, + pub base_level_size_mb: usize, +} + +pub struct LeveledCompactionController { + options: LeveledCompactionOptions, +} + +impl LeveledCompactionController { + pub fn new(options: LeveledCompactionOptions) -> Self { + Self { options } + } + + fn find_overlapping_ssts( + &self, + snapshot: &LsmStorageState, + sst_ids: &[usize], + in_level: usize, + ) -> Vec { + unimplemented!() + } + + pub fn generate_compaction_task( + &self, + snapshot: &LsmStorageState, + ) -> Option { + unimplemented!() + } + + pub fn apply_compaction_result( + &self, + snapshot: &LsmStorageState, + task: &LeveledCompactionTask, + output: &[usize], + ) -> (LsmStorageState, Vec) { + unimplemented!() + } +} diff --git a/mini-lsm-starter/src/compact/simple_leveled.rs b/mini-lsm-starter/src/compact/simple_leveled.rs new file mode 100644 index 0000000..036d2ea --- /dev/null +++ b/mini-lsm-starter/src/compact/simple_leveled.rs @@ -0,0 +1,46 @@ +use serde::{Deserialize, Serialize}; + +use crate::lsm_storage::LsmStorageState; + +#[derive(Debug, Clone)] +pub struct SimpleLeveledCompactionOptions { + pub size_ratio_percent: usize, + pub level0_file_num_compaction_trigger: usize, + pub max_levels: usize, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SimpleLeveledCompactionTask { + // if upper_level is `None`, then it is L0 compaction + pub upper_level: Option, + pub upper_level_sst_ids: Vec, + pub lower_level: usize, + pub lower_level_sst_ids: Vec, + pub is_lower_level_bottom_level: bool, +} + +pub struct SimpleLeveledCompactionController { + options: SimpleLeveledCompactionOptions, +} + +impl SimpleLeveledCompactionController { + pub fn new(options: SimpleLeveledCompactionOptions) -> Self { + Self { options } + } + + pub fn generate_compaction_task( + &self, + snapshot: &LsmStorageState, + ) -> Option { + unimplemented!() + } + + pub fn apply_compaction_result( + &self, + snapshot: &LsmStorageState, + task: &SimpleLeveledCompactionTask, + output: &[usize], + ) -> (LsmStorageState, Vec) { + unimplemented!() + } +} diff --git a/mini-lsm-starter/src/compact/tiered.rs b/mini-lsm-starter/src/compact/tiered.rs new file mode 100644 index 0000000..5f92324 --- /dev/null +++ b/mini-lsm-starter/src/compact/tiered.rs @@ -0,0 +1,45 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use crate::lsm_storage::LsmStorageState; + +#[derive(Debug, Serialize, Deserialize)] +pub struct TieredCompactionTask { + pub tiers: Vec<(usize, Vec)>, + pub bottom_tier_included: bool, +} + +#[derive(Debug, Clone)] +pub struct TieredCompactionOptions { + pub num_tiers: usize, + pub max_size_amplification_percent: usize, + pub size_ratio: usize, + pub min_merge_width: usize, +} + +pub struct TieredCompactionController { + options: TieredCompactionOptions, +} + +impl TieredCompactionController { + pub fn new(options: TieredCompactionOptions) -> Self { + Self { options } + } + + pub fn generate_compaction_task( + &self, + snapshot: &LsmStorageState, + ) -> Option { + unimplemented!() + } + + pub fn apply_compaction_result( + &self, + snapshot: &LsmStorageState, + task: &TieredCompactionTask, + output: &[usize], + ) -> (LsmStorageState, Vec) { + unimplemented!() + } +} diff --git a/mini-lsm-starter/src/lib.rs b/mini-lsm-starter/src/lib.rs index 40ff84a..ac8e8ce 100644 --- a/mini-lsm-starter/src/lib.rs +++ b/mini-lsm-starter/src/lib.rs @@ -1,9 +1,12 @@ pub mod block; +pub mod compact; pub mod iterators; pub mod lsm_iterator; pub mod lsm_storage; +pub mod manifest; pub mod mem_table; pub mod table; +pub mod wal; #[cfg(test)] mod tests; diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index 7a4de8d..25a44d8 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -1,58 +1,151 @@ -#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod -#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod - +use std::collections::{BTreeSet, HashMap}; +use std::fs::File; use std::ops::Bound; -use std::path::Path; +use std::path::{Path, PathBuf}; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; use bytes::Bytes; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use crate::block::Block; +use crate::compact::{ + CompactionController, CompactionOptions, LeveledCompactionController, LeveledCompactionOptions, + SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController, +}; +use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::two_merge_iterator::TwoMergeIterator; +use crate::iterators::StorageIterator; use crate::lsm_iterator::{FusedIterator, LsmIterator}; -use crate::mem_table::MemTable; -use crate::table::SsTable; +use crate::manifest::{Manifest, ManifestRecord}; +use crate::mem_table::{map_bound, MemTable}; +use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator}; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; #[derive(Clone)] -pub struct LsmStorageInner { +pub struct LsmStorageState { /// The current memtable. - memtable: Arc, - /// Immutable memTables, from earliest to latest. - imm_memtables: Vec>, - /// L0 SsTables, from earliest to latest. - l0_sstables: Vec>, - /// L1 - L6 SsTables, sorted by key range. - #[allow(dead_code)] - levels: Vec>>, - /// The next SSTable ID. - next_sst_id: usize, + pub memtable: Arc, + /// Immutable memtables, from earliest to latest. + pub imm_memtables: Vec>, + /// L0 SSTs, from earliest to latest. + pub l0_sstables: Vec, + /// SsTables sorted by key range; L1 - L_max for leveled compaction, or tiers for tiered + /// compaction. + pub levels: Vec<(usize, Vec)>, + /// SST objects. + pub sstables: HashMap>, } -impl LsmStorageInner { - fn create() -> Self { +impl LsmStorageState { + fn create(options: &LsmStorageOptions) -> Self { + let levels = match &options.compaction_options { + CompactionOptions::Leveled(LeveledCompactionOptions { max_levels, .. }) + | CompactionOptions::Simple(SimpleLeveledCompactionOptions { max_levels, .. }) => (1 + ..=*max_levels) + .map(|level| (level, Vec::new())) + .collect::>(), + CompactionOptions::Tiered(_) | CompactionOptions::NoCompaction => Vec::new(), + }; Self { - memtable: Arc::new(MemTable::create()), - imm_memtables: vec![], - l0_sstables: vec![], - levels: vec![], - next_sst_id: 1, + memtable: Arc::new(MemTable::create(0)), + imm_memtables: Vec::new(), + l0_sstables: Vec::new(), + levels, + sstables: Default::default(), + } + } +} + +pub struct LsmStorageOptions { + pub block_size: usize, + pub target_sst_size: usize, + pub num_memtable_limit: usize, + pub compaction_options: CompactionOptions, + pub enable_wal: bool, +} + +impl LsmStorageOptions { + pub fn default_for_week1_test() -> Self { + Self { + block_size: 4096, + target_sst_size: 2 << 20, + compaction_options: CompactionOptions::NoCompaction, + enable_wal: false, + num_memtable_limit: 3, } } } /// The storage interface of the LSM tree. -pub struct LsmStorage { - inner: Arc>>, +pub(crate) struct LsmStorageInner { + pub(crate) state: Arc>>, + pub(crate) state_lock: Mutex<()>, + path: PathBuf, + pub(crate) block_cache: Arc, + next_sst_id: AtomicUsize, + pub(crate) options: Arc, + pub(crate) compaction_controller: CompactionController, + pub(crate) manifest: Manifest, } -impl LsmStorage { - pub fn open(path: impl AsRef) -> Result { - Ok(Self { - inner: Arc::new(RwLock::new(Arc::new(LsmStorageInner::create()))), - }) +pub struct MiniLsm { + pub(crate) inner: Arc, + compaction_notifier: crossbeam_channel::Sender<()>, + compaction_thread: Mutex>>, +} + +impl Drop for MiniLsm { + fn drop(&mut self) { + self.compaction_notifier.send(()).ok(); + } +} + +impl MiniLsm { + pub fn close(&self) -> Result<()> { + unimplemented!() + } + + pub fn open(path: impl AsRef, options: LsmStorageOptions) -> Result> { + unimplemented!() + } + + pub fn get(&self, key: &[u8]) -> Result> { + self.inner.get(key) + } + + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.inner.put(key, value) + } + + pub fn delete(&self, key: &[u8]) -> Result<()> { + self.inner.delete(key) + } + + pub fn scan( + &self, + lower: Bound<&[u8]>, + upper: Bound<&[u8]>, + ) -> Result> { + self.inner.scan(lower, upper) + } + + pub fn force_flush(&self) -> Result<()> { + self.inner.force_freeze_memtable()?; + self.inner.force_flush_next_imm_memtable() + } +} + +impl LsmStorageInner { + pub(crate) fn next_sst_id(&self) -> usize { + self.next_sst_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + pub(crate) fn open(path: impl AsRef, options: LsmStorageOptions) -> Result { + unimplemented!() } /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. @@ -62,26 +155,49 @@ impl LsmStorage { /// Put a key-value pair into the storage by writing into the current memtable. pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { - assert!(!value.is_empty(), "value cannot be empty"); - assert!(!key.is_empty(), "key cannot be empty"); unimplemented!() } /// Remove a key from the storage by writing an empty value. - pub fn delete(&self, _key: &[u8]) -> Result<()> { + pub fn delete(&self, key: &[u8]) -> Result<()> { unimplemented!() } - /// Persist data to disk. - pub fn sync(&self) -> Result<()> { + pub(crate) fn path_of_sst_static(path: impl AsRef, id: usize) -> PathBuf { + path.as_ref().join(format!("{:05}.sst", id)) + } + + pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf { + Self::path_of_sst_static(&self.path, id) + } + + pub(crate) fn path_of_wal_static(path: impl AsRef, id: usize) -> PathBuf { + path.as_ref().join(format!("{:05}.wal", id)) + } + + pub(crate) fn path_of_wal(&self, id: usize) -> PathBuf { + Self::path_of_wal_static(&self.path, id) + } + + fn sync_dir(&self) -> Result<()> { + unimplemented!() + } + + /// Force freeze the current memetable to an immutable memtable + pub fn force_freeze_memtable(&self) -> Result<()> { + unimplemented!() + } + + /// Force flush the earliest-created immutable memtable to disk + pub fn force_flush_next_imm_memtable(&self) -> Result<()> { unimplemented!() } /// Create an iterator over a range of keys. pub fn scan( &self, - _lower: Bound<&[u8]>, - _upper: Bound<&[u8]>, + lower: Bound<&[u8]>, + upper: Bound<&[u8]>, ) -> Result> { unimplemented!() } diff --git a/mini-lsm-starter/src/manifest.rs b/mini-lsm-starter/src/manifest.rs new file mode 100644 index 0000000..c638ee9 --- /dev/null +++ b/mini-lsm-starter/src/manifest.rs @@ -0,0 +1,42 @@ +use std::fs::File; +use std::path::Path; +use std::sync::Arc; + +use anyhow::Result; +use parking_lot::{Mutex, MutexGuard}; +use serde::{Deserialize, Serialize}; + +use crate::compact::CompactionTask; + +pub struct Manifest { + file: Arc>, +} + +#[derive(Serialize, Deserialize)] +pub enum ManifestRecord { + Flush(usize), + NewMemtable(usize), + Compaction(CompactionTask, Vec), +} + +impl Manifest { + pub fn create(path: impl AsRef) -> Result { + unimplemented!() + } + + pub fn recover(path: impl AsRef) -> Result<(Self, Vec)> { + unimplemented!() + } + + pub fn add_record( + &self, + _state_lock_observer: &MutexGuard<()>, + record: ManifestRecord, + ) -> Result<()> { + self.add_record_when_init(record) + } + + pub fn add_record_when_init(&self, record: ManifestRecord) -> Result<()> { + unimplemented!() + } +} diff --git a/mini-lsm-starter/src/mem_table.rs b/mini-lsm-starter/src/mem_table.rs index bfdc475..c677a94 100644 --- a/mini-lsm-starter/src/mem_table.rs +++ b/mini-lsm-starter/src/mem_table.rs @@ -1,25 +1,45 @@ -#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod -#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod - use std::ops::Bound; +use std::path::Path; use std::sync::Arc; use anyhow::Result; use bytes::Bytes; +use crossbeam_skiplist::map::Entry; use crossbeam_skiplist::SkipMap; use ouroboros::self_referencing; use crate::iterators::StorageIterator; use crate::table::SsTableBuilder; +use crate::wal::Wal; /// A basic mem-table based on crossbeam-skiplist pub struct MemTable { - map: SkipMap, + map: Arc>, + wal: Option, + id: usize, +} + +pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound { + match bound { + Bound::Included(x) => Bound::Included(Bytes::copy_from_slice(x)), + Bound::Excluded(x) => Bound::Excluded(Bytes::copy_from_slice(x)), + Bound::Unbounded => Bound::Unbounded, + } } impl MemTable { /// Create a new mem-table. - pub fn create() -> Self { + pub fn create(id: usize) -> Self { + unimplemented!() + } + + /// Create a new mem-table with WAL + pub fn create_with_wal(id: usize, path: impl AsRef) -> Result { + unimplemented!() + } + + /// Create a memtable from WAL + pub fn recover_from_wal(id: usize, path: impl AsRef) -> Result { unimplemented!() } @@ -29,10 +49,17 @@ impl MemTable { } /// Put a key-value pair into the mem-table. - pub fn put(&self, key: &[u8], value: &[u8]) { + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { unimplemented!() } + pub fn sync_wal(&self) -> Result<()> { + if let Some(ref wal) = self.wal { + wal.sync()?; + } + Ok(()) + } + /// Get an iterator over a range of keys. pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> MemTableIterator { unimplemented!() @@ -42,6 +69,10 @@ impl MemTable { pub fn flush(&self, builder: &mut SsTableBuilder) -> Result<()> { unimplemented!() } + + pub fn id(&self) -> usize { + self.id + } } type SkipMapRangeIter<'a> = diff --git a/mini-lsm-starter/src/table.rs b/mini-lsm-starter/src/table.rs index ee4d9a8..9dcaa3a 100644 --- a/mini-lsm-starter/src/table.rs +++ b/mini-lsm-starter/src/table.rs @@ -4,6 +4,7 @@ mod builder; mod iterator; +use std::fs::File; use std::path::Path; use std::sync::Arc; @@ -42,24 +43,37 @@ impl BlockMeta { } /// A file object. -pub struct FileObject(Bytes); +pub struct FileObject(Option, u64); impl FileObject { pub fn read(&self, offset: u64, len: u64) -> Result> { - Ok(self.0[offset as usize..(offset + len) as usize].to_vec()) + use std::os::unix::fs::FileExt; + let mut data = vec![0; len as usize]; + self.0 + .as_ref() + .unwrap() + .read_exact_at(&mut data[..], offset)?; + Ok(data) } pub fn size(&self) -> u64 { - self.0.len() as u64 + self.1 } /// Create a new file object (day 2) and write the file to the disk (day 4). pub fn create(path: &Path, data: Vec) -> Result { - unimplemented!() + std::fs::write(path, &data)?; + File::open(path)?.sync_all()?; + Ok(FileObject( + Some(File::options().read(true).write(false).open(path)?), + data.len() as u64, + )) } pub fn open(path: &Path) -> Result { - unimplemented!() + let file = File::options().read(true).write(false).open(path)?; + let size = file.metadata()?.len(); + Ok(FileObject(Some(file), size)) } } @@ -75,6 +89,10 @@ pub struct SsTable { block_metas: Vec, /// The offset that indicates the start point of meta blocks in `file`. block_meta_offset: usize, + id: usize, + block_cache: Option>, + first_key: Bytes, + last_key: Bytes, } impl SsTable { @@ -88,6 +106,19 @@ impl SsTable { unimplemented!() } + /// Create a mock SST with only first key + last key metadata + pub fn create_meta_only(id: usize, file_size: u64, first_key: Bytes, last_key: Bytes) -> Self { + Self { + file: FileObject(None, file_size), + block_metas: vec![], + block_meta_offset: 0, + id, + block_cache: None, + first_key, + last_key, + } + } + /// Read a block from the disk. pub fn read_block(&self, block_idx: usize) -> Result> { unimplemented!() diff --git a/mini-lsm-starter/src/wal.rs b/mini-lsm-starter/src/wal.rs new file mode 100644 index 0000000..23f73e7 --- /dev/null +++ b/mini-lsm-starter/src/wal.rs @@ -0,0 +1,30 @@ +use std::fs::File; +use std::path::Path; +use std::sync::Arc; + +use anyhow::Result; +use bytes::Bytes; +use crossbeam_skiplist::SkipMap; +use parking_lot::Mutex; + +pub struct Wal { + file: Arc>, +} + +impl Wal { + pub fn create(path: impl AsRef) -> Result { + unimplemented!() + } + + pub fn recover(path: impl AsRef, skiplist: &SkipMap) -> Result { + unimplemented!() + } + + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + unimplemented!() + } + + pub fn sync(&self) -> Result<()> { + unimplemented!() + } +} diff --git a/mini-lsm/src/bin/mini_lsm_cli.rs b/mini-lsm/src/bin/mini_lsm_cli.rs index 256744b..6cfd8dd 100644 --- a/mini-lsm/src/bin/mini_lsm_cli.rs +++ b/mini-lsm/src/bin/mini_lsm_cli.rs @@ -35,6 +35,7 @@ fn main() -> Result<()> { LsmStorageOptions { block_size: 4096, target_sst_size: 2 << 20, // 2MB + num_memtable_limit: 3, compaction_options: match args.compaction { CompactionStrategy::Simple => { CompactionOptions::Simple(SimpleLeveledCompactionOptions { diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index 83162b4..9355e9b 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -62,6 +62,7 @@ impl LsmStorageState { pub struct LsmStorageOptions { pub block_size: usize, pub target_sst_size: usize, + pub num_memtable_limit: usize, pub compaction_options: CompactionOptions, pub enable_wal: bool, } @@ -73,6 +74,7 @@ impl LsmStorageOptions { target_sst_size: 2 << 20, compaction_options: CompactionOptions::NoCompaction, enable_wal: false, + num_memtable_limit: 3, } } }