| 
									
										
										
										
											2024-01-20 11:14:19 +08:00
										 |  |  | #![allow(dead_code)] // REMOVE THIS LINE after fully implementing this functionality
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | use std::collections::HashMap;
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  | use std::ops::Bound;
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | use std::path::{Path, PathBuf};
 | 
					
						
							|  |  |  | use std::sync::atomic::AtomicUsize;
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  | use std::sync::Arc;
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:14:19 +08:00
										 |  |  | use anyhow::Result;
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  | use bytes::Bytes;
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  | use parking_lot::{Mutex, MutexGuard, RwLock};
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-24 18:07:18 -05:00
										 |  |  | use crate::block::Block;
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | use crate::compact::{
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     CompactionController, CompactionOptions, LeveledCompactionController, LeveledCompactionOptions,
 | 
					
						
							|  |  |  |     SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | };
 | 
					
						
							| 
									
										
										
										
											2022-12-24 14:48:57 -05:00
										 |  |  | use crate::lsm_iterator::{FusedIterator, LsmIterator};
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:14:19 +08:00
										 |  |  | use crate::manifest::Manifest;
 | 
					
						
							|  |  |  | use crate::mem_table::MemTable;
 | 
					
						
							|  |  |  | use crate::table::SsTable;
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-24 18:07:18 -05:00
										 |  |  | pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  | /// Represents the state of the storage engine.
 | 
					
						
							| 
									
										
										
										
											2022-12-24 14:48:57 -05:00
										 |  |  | #[derive(Clone)]
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | pub struct LsmStorageState {
 | 
					
						
							| 
									
										
										
										
											2022-12-24 23:45:53 -05:00
										 |  |  |     /// The current memtable.
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |     pub memtable: Arc<MemTable>,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     /// Immutable memtables, from latest to earliest.
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |     pub imm_memtables: Vec<Arc<MemTable>>,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     /// L0 SSTs, from latest to earliest.
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |     pub l0_sstables: Vec<usize>,
 | 
					
						
							|  |  |  |     /// SsTables sorted by key range; L1 - L_max for leveled compaction, or tiers for tiered
 | 
					
						
							|  |  |  |     /// compaction.
 | 
					
						
							|  |  |  |     pub levels: Vec<(usize, Vec<usize>)>,
 | 
					
						
							|  |  |  |     /// SST objects.
 | 
					
						
							|  |  |  |     pub sstables: HashMap<usize, Arc<SsTable>>,
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | impl LsmStorageState {
 | 
					
						
							|  |  |  |     fn create(options: &LsmStorageOptions) -> Self {
 | 
					
						
							|  |  |  |         let levels = match &options.compaction_options {
 | 
					
						
							|  |  |  |             CompactionOptions::Leveled(LeveledCompactionOptions { max_levels, .. })
 | 
					
						
							|  |  |  |             | CompactionOptions::Simple(SimpleLeveledCompactionOptions { max_levels, .. }) => (1
 | 
					
						
							|  |  |  |                 ..=*max_levels)
 | 
					
						
							|  |  |  |                 .map(|level| (level, Vec::new()))
 | 
					
						
							|  |  |  |                 .collect::<Vec<_>>(),
 | 
					
						
							|  |  |  |             CompactionOptions::Tiered(_) | CompactionOptions::NoCompaction => Vec::new(),
 | 
					
						
							|  |  |  |         };
 | 
					
						
							|  |  |  |         Self {
 | 
					
						
							|  |  |  |             memtable: Arc::new(MemTable::create(0)),
 | 
					
						
							|  |  |  |             imm_memtables: Vec::new(),
 | 
					
						
							|  |  |  |             l0_sstables: Vec::new(),
 | 
					
						
							|  |  |  |             levels,
 | 
					
						
							|  |  |  |             sstables: Default::default(),
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | pub struct LsmStorageOptions {
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     // Block size in bytes
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |     pub block_size: usize,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     // SST size in bytes, also the approximate memtable capacity limit
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |     pub target_sst_size: usize,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     // Maximum number of memtables in memory, flush to L0 when exceeding this limit
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |     pub num_memtable_limit: usize,
 | 
					
						
							|  |  |  |     pub compaction_options: CompactionOptions,
 | 
					
						
							|  |  |  |     pub enable_wal: bool,
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl LsmStorageOptions {
 | 
					
						
							|  |  |  |     pub fn default_for_week1_test() -> Self {
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  |         Self {
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |             block_size: 4096,
 | 
					
						
							|  |  |  |             target_sst_size: 2 << 20,
 | 
					
						
							|  |  |  |             compaction_options: CompactionOptions::NoCompaction,
 | 
					
						
							|  |  |  |             enable_wal: false,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |             num_memtable_limit: 50,
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  |         }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | /// The storage interface of the LSM tree.
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | pub(crate) struct LsmStorageInner {
 | 
					
						
							|  |  |  |     pub(crate) state: Arc<RwLock<Arc<LsmStorageState>>>,
 | 
					
						
							|  |  |  |     pub(crate) state_lock: Mutex<()>,
 | 
					
						
							|  |  |  |     path: PathBuf,
 | 
					
						
							|  |  |  |     pub(crate) block_cache: Arc<BlockCache>,
 | 
					
						
							|  |  |  |     next_sst_id: AtomicUsize,
 | 
					
						
							|  |  |  |     pub(crate) options: Arc<LsmStorageOptions>,
 | 
					
						
							|  |  |  |     pub(crate) compaction_controller: CompactionController,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     pub(crate) manifest: Option<Manifest>,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  | /// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | pub struct MiniLsm {
 | 
					
						
							|  |  |  |     pub(crate) inner: Arc<LsmStorageInner>,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     /// Notifies the compaction thread to stop working. (In week 2)
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |     compaction_notifier: crossbeam_channel::Sender<()>,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     /// The handle for the compaction thread. (In week 2)
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |     compaction_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl Drop for MiniLsm {
 | 
					
						
							|  |  |  |     fn drop(&mut self) {
 | 
					
						
							|  |  |  |         self.compaction_notifier.send(()).ok();
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl MiniLsm {
 | 
					
						
							|  |  |  |     pub fn close(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         unimplemented!()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     /// Start the storage engine by either loading an existing directory or creating a new one if the directory does
 | 
					
						
							|  |  |  |     /// not exist.
 | 
					
						
							|  |  |  |     pub fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Arc<Self>> {
 | 
					
						
							|  |  |  |         let inner = Arc::new(LsmStorageInner::open(path, options)?);
 | 
					
						
							|  |  |  |         let (tx, rx) = crossbeam_channel::unbounded();
 | 
					
						
							|  |  |  |         let compaction_thread = inner.spawn_compaction_thread(rx)?;
 | 
					
						
							|  |  |  |         Ok(Arc::new(Self {
 | 
					
						
							|  |  |  |             inner,
 | 
					
						
							|  |  |  |             compaction_notifier: tx,
 | 
					
						
							|  |  |  |             compaction_thread: Mutex::new(compaction_thread),
 | 
					
						
							|  |  |  |         }))
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
 | 
					
						
							|  |  |  |         self.inner.get(key)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
 | 
					
						
							|  |  |  |         self.inner.put(key, value)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn delete(&self, key: &[u8]) -> Result<()> {
 | 
					
						
							|  |  |  |         self.inner.delete(key)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn scan(
 | 
					
						
							|  |  |  |         &self,
 | 
					
						
							|  |  |  |         lower: Bound<&[u8]>,
 | 
					
						
							|  |  |  |         upper: Bound<&[u8]>,
 | 
					
						
							|  |  |  |     ) -> Result<FusedIterator<LsmIterator>> {
 | 
					
						
							|  |  |  |         self.inner.scan(lower, upper)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn force_flush(&self) -> Result<()> {
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |         self.inner
 | 
					
						
							|  |  |  |             .force_freeze_memtable(&self.inner.state_lock.lock())?;
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |         self.inner.force_flush_next_imm_memtable()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:14:19 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     pub fn force_full_compaction(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         self.inner.force_full_compaction()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  | impl LsmStorageInner {
 | 
					
						
							|  |  |  |     pub(crate) fn next_sst_id(&self) -> usize {
 | 
					
						
							|  |  |  |         self.next_sst_id
 | 
					
						
							|  |  |  |             .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     /// Start the storage engine by either loading an existing directory or creating a new one if the directory does
 | 
					
						
							|  |  |  |     /// not exist.
 | 
					
						
							|  |  |  |     pub(crate) fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Self> {
 | 
					
						
							|  |  |  |         let path = path.as_ref();
 | 
					
						
							|  |  |  |         let state = LsmStorageState::create(&options);
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let compaction_controller = match &options.compaction_options {
 | 
					
						
							|  |  |  |             CompactionOptions::Leveled(options) => {
 | 
					
						
							|  |  |  |                 CompactionController::Leveled(LeveledCompactionController::new(options.clone()))
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |             CompactionOptions::Tiered(options) => {
 | 
					
						
							|  |  |  |                 CompactionController::Tiered(TieredCompactionController::new(options.clone()))
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |             CompactionOptions::Simple(options) => CompactionController::Simple(
 | 
					
						
							|  |  |  |                 SimpleLeveledCompactionController::new(options.clone()),
 | 
					
						
							|  |  |  |             ),
 | 
					
						
							|  |  |  |             CompactionOptions::NoCompaction => CompactionController::NoCompaction,
 | 
					
						
							|  |  |  |         };
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let storage = Self {
 | 
					
						
							|  |  |  |             state: Arc::new(RwLock::new(Arc::new(state))),
 | 
					
						
							|  |  |  |             state_lock: Mutex::new(()),
 | 
					
						
							|  |  |  |             path: path.to_path_buf(),
 | 
					
						
							|  |  |  |             block_cache: Arc::new(BlockCache::new(1024)),
 | 
					
						
							|  |  |  |             next_sst_id: AtomicUsize::new(1),
 | 
					
						
							|  |  |  |             compaction_controller,
 | 
					
						
							|  |  |  |             manifest: None,
 | 
					
						
							|  |  |  |             options: options.into(),
 | 
					
						
							|  |  |  |         };
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Ok(storage)
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-24 18:25:41 -05:00
										 |  |  |     /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:14:19 +08:00
										 |  |  |     pub fn get(&self, _key: &[u8]) -> Result<Option<Bytes>> {
 | 
					
						
							| 
									
										
										
										
											2022-12-24 14:48:57 -05:00
										 |  |  |         unimplemented!()
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-24 18:25:41 -05:00
										 |  |  |     /// Put a key-value pair into the storage by writing into the current memtable.
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:14:19 +08:00
										 |  |  |     pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  |         unimplemented!()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-24 18:25:41 -05:00
										 |  |  |     /// Remove a key from the storage by writing an empty value.
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:14:19 +08:00
										 |  |  |     pub fn delete(&self, _key: &[u8]) -> Result<()> {
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |         unimplemented!()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn path_of_sst_static(path: impl AsRef<Path>, id: usize) -> PathBuf {
 | 
					
						
							|  |  |  |         path.as_ref().join(format!("{:05}.sst", id))
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf {
 | 
					
						
							|  |  |  |         Self::path_of_sst_static(&self.path, id)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn path_of_wal_static(path: impl AsRef<Path>, id: usize) -> PathBuf {
 | 
					
						
							|  |  |  |         path.as_ref().join(format!("{:05}.wal", id))
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn path_of_wal(&self, id: usize) -> PathBuf {
 | 
					
						
							|  |  |  |         Self::path_of_wal_static(&self.path, id)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     fn sync_dir(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         unimplemented!()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-20 21:54:16 +08:00
										 |  |  |     /// Force freeze the current memtable to an immutable memtable
 | 
					
						
							|  |  |  |     pub fn force_freeze_memtable(&self, _state_lock_observer: &MutexGuard<'_, ()>) -> Result<()> {
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  |         unimplemented!()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:11:09 +08:00
										 |  |  |     /// Force flush the earliest-created immutable memtable to disk
 | 
					
						
							|  |  |  |     pub fn force_flush_next_imm_memtable(&self) -> Result<()> {
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  |         unimplemented!()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-24 18:25:41 -05:00
										 |  |  |     /// Create an iterator over a range of keys.
 | 
					
						
							| 
									
										
										
										
											2022-12-24 14:48:57 -05:00
										 |  |  |     pub fn scan(
 | 
					
						
							|  |  |  |         &self,
 | 
					
						
							| 
									
										
										
										
											2024-01-20 11:14:19 +08:00
										 |  |  |         _lower: Bound<&[u8]>,
 | 
					
						
							|  |  |  |         _upper: Bound<&[u8]>,
 | 
					
						
							| 
									
										
										
										
											2022-12-24 14:48:57 -05:00
										 |  |  |     ) -> Result<FusedIterator<LsmIterator>> {
 | 
					
						
							| 
									
										
										
										
											2022-12-24 10:11:06 -05:00
										 |  |  |         unimplemented!()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 |