#![allow(dead_code)] // REMOVE THIS LINE after fully implementing this functionality use std::collections::HashMap; use std::ops::Bound; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use anyhow::Result; use bytes::Bytes; use parking_lot::{Mutex, MutexGuard, RwLock}; use crate::block::Block; use crate::compact::{ CompactionController, CompactionOptions, LeveledCompactionController, LeveledCompactionOptions, SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController, }; use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::manifest::Manifest; use crate::mem_table::MemTable; use crate::table::SsTable; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; /// Represents the state of the storage engine. #[derive(Clone)] pub struct LsmStorageState { /// The current memtable. pub memtable: Arc, /// Immutable memtables, from latest to earliest. pub imm_memtables: Vec>, /// L0 SSTs, from latest to earliest. pub l0_sstables: Vec, /// SsTables sorted by key range; L1 - L_max for leveled compaction, or tiers for tiered /// compaction. pub levels: Vec<(usize, Vec)>, /// SST objects. pub sstables: HashMap>, } impl LsmStorageState { fn create(options: &LsmStorageOptions) -> Self { let levels = match &options.compaction_options { CompactionOptions::Leveled(LeveledCompactionOptions { max_levels, .. }) | CompactionOptions::Simple(SimpleLeveledCompactionOptions { max_levels, .. }) => (1 ..=*max_levels) .map(|level| (level, Vec::new())) .collect::>(), CompactionOptions::Tiered(_) | CompactionOptions::NoCompaction => Vec::new(), }; Self { memtable: Arc::new(MemTable::create(0)), imm_memtables: Vec::new(), l0_sstables: Vec::new(), levels, sstables: Default::default(), } } } pub struct LsmStorageOptions { // Block size in bytes pub block_size: usize, // SST size in bytes, also the approximate memtable capacity limit pub target_sst_size: usize, // Maximum number of memtables in memory, flush to L0 when exceeding this limit pub num_memtable_limit: usize, pub compaction_options: CompactionOptions, pub enable_wal: bool, } impl LsmStorageOptions { pub fn default_for_week1_test() -> Self { Self { block_size: 4096, target_sst_size: 2 << 20, compaction_options: CompactionOptions::NoCompaction, enable_wal: false, num_memtable_limit: 50, } } pub fn default_for_week1_day6_test() -> Self { Self { block_size: 4096, target_sst_size: 2 << 20, compaction_options: CompactionOptions::NoCompaction, enable_wal: false, num_memtable_limit: 2, } } } /// The storage interface of the LSM tree. pub(crate) struct LsmStorageInner { pub(crate) state: Arc>>, pub(crate) state_lock: Mutex<()>, path: PathBuf, pub(crate) block_cache: Arc, next_sst_id: AtomicUsize, pub(crate) options: Arc, pub(crate) compaction_controller: CompactionController, pub(crate) manifest: Option, } /// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM. pub struct MiniLsm { pub(crate) inner: Arc, /// Notifies the L0 flush thread to stop working. (In week 1 day 6) flush_notifier: crossbeam_channel::Sender<()>, /// The handle for the compaction thread. (In week 1 day 6) flush_thread: Mutex>>, /// Notifies the compaction thread to stop working. (In week 2) compaction_notifier: crossbeam_channel::Sender<()>, /// The handle for the compaction thread. (In week 2) compaction_thread: Mutex>>, } impl Drop for MiniLsm { fn drop(&mut self) { self.compaction_notifier.send(()).ok(); self.flush_notifier.send(()).ok(); } } impl MiniLsm { pub fn close(&self) -> Result<()> { unimplemented!() } /// Start the storage engine by either loading an existing directory or creating a new one if the directory does /// not exist. pub fn open(path: impl AsRef, options: LsmStorageOptions) -> Result> { let inner = Arc::new(LsmStorageInner::open(path, options)?); let (tx1, rx) = crossbeam_channel::unbounded(); let compaction_thread = inner.spawn_compaction_thread(rx)?; let (tx2, rx) = crossbeam_channel::unbounded(); let flush_thread = inner.spawn_flush_thread(rx)?; Ok(Arc::new(Self { inner, flush_notifier: tx2, flush_thread: Mutex::new(flush_thread), compaction_notifier: tx1, compaction_thread: Mutex::new(compaction_thread), })) } pub fn get(&self, key: &[u8]) -> Result> { self.inner.get(key) } pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { self.inner.put(key, value) } pub fn delete(&self, key: &[u8]) -> Result<()> { self.inner.delete(key) } pub fn scan( &self, lower: Bound<&[u8]>, upper: Bound<&[u8]>, ) -> Result> { self.inner.scan(lower, upper) } pub fn force_flush(&self) -> Result<()> { self.inner .force_freeze_memtable(&self.inner.state_lock.lock())?; self.inner.force_flush_next_imm_memtable() } pub fn force_full_compaction(&self) -> Result<()> { self.inner.force_full_compaction() } } impl LsmStorageInner { pub(crate) fn next_sst_id(&self) -> usize { self.next_sst_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst) } /// Start the storage engine by either loading an existing directory or creating a new one if the directory does /// not exist. pub(crate) fn open(path: impl AsRef, options: LsmStorageOptions) -> Result { let path = path.as_ref(); let state = LsmStorageState::create(&options); let compaction_controller = match &options.compaction_options { CompactionOptions::Leveled(options) => { CompactionController::Leveled(LeveledCompactionController::new(options.clone())) } CompactionOptions::Tiered(options) => { CompactionController::Tiered(TieredCompactionController::new(options.clone())) } CompactionOptions::Simple(options) => CompactionController::Simple( SimpleLeveledCompactionController::new(options.clone()), ), CompactionOptions::NoCompaction => CompactionController::NoCompaction, }; let storage = Self { state: Arc::new(RwLock::new(Arc::new(state))), state_lock: Mutex::new(()), path: path.to_path_buf(), block_cache: Arc::new(BlockCache::new(1024)), next_sst_id: AtomicUsize::new(1), compaction_controller, manifest: None, options: options.into(), }; Ok(storage) } /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. pub fn get(&self, _key: &[u8]) -> Result> { unimplemented!() } /// Put a key-value pair into the storage by writing into the current memtable. pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> { unimplemented!() } /// Remove a key from the storage by writing an empty value. pub fn delete(&self, _key: &[u8]) -> Result<()> { unimplemented!() } pub(crate) fn path_of_sst_static(path: impl AsRef, id: usize) -> PathBuf { path.as_ref().join(format!("{:05}.sst", id)) } pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf { Self::path_of_sst_static(&self.path, id) } pub(crate) fn path_of_wal_static(path: impl AsRef, id: usize) -> PathBuf { path.as_ref().join(format!("{:05}.wal", id)) } pub(crate) fn path_of_wal(&self, id: usize) -> PathBuf { Self::path_of_wal_static(&self.path, id) } fn sync_dir(&self) -> Result<()> { unimplemented!() } /// Force freeze the current memtable to an immutable memtable pub fn force_freeze_memtable(&self, _state_lock_observer: &MutexGuard<'_, ()>) -> Result<()> { unimplemented!() } /// Force flush the earliest-created immutable memtable to disk pub fn force_flush_next_imm_memtable(&self) -> Result<()> { unimplemented!() } /// Create an iterator over a range of keys. pub fn scan( &self, _lower: Bound<&[u8]>, _upper: Bound<&[u8]>, ) -> Result> { unimplemented!() } }