| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  | use std::collections::{BTreeSet, HashMap};
 | 
					
						
							|  |  |  | use std::fs::File;
 | 
					
						
							|  |  |  | use std::ops::Bound;
 | 
					
						
							|  |  |  | use std::path::{Path, PathBuf};
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  | use std::sync::atomic::AtomicUsize;
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  | use std::sync::Arc;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | use anyhow::{Context, Result};
 | 
					
						
							|  |  |  | use bytes::Bytes;
 | 
					
						
							|  |  |  | use parking_lot::{Mutex, MutexGuard, RwLock};
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | use crate::block::Block;
 | 
					
						
							|  |  |  | use crate::compact::{
 | 
					
						
							|  |  |  |     CompactionController, CompactionOptions, LeveledCompactionController, LeveledCompactionOptions,
 | 
					
						
							|  |  |  |     SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController,
 | 
					
						
							|  |  |  | };
 | 
					
						
							|  |  |  | use crate::iterators::concat_iterator::SstConcatIterator;
 | 
					
						
							|  |  |  | use crate::iterators::merge_iterator::MergeIterator;
 | 
					
						
							|  |  |  | use crate::iterators::two_merge_iterator::TwoMergeIterator;
 | 
					
						
							|  |  |  | use crate::iterators::StorageIterator;
 | 
					
						
							|  |  |  | use crate::key::{self, KeySlice};
 | 
					
						
							|  |  |  | use crate::lsm_iterator::{FusedIterator, LsmIterator};
 | 
					
						
							|  |  |  | use crate::manifest::{Manifest, ManifestRecord};
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  | use crate::mem_table::{map_bound, map_key_bound_plus_ts, MemTable};
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  | use crate::mvcc::txn::{Transaction, TxnIterator};
 | 
					
						
							|  |  |  | use crate::mvcc::LsmMvccInner;
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  | use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator};
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | /// Represents the state of the storage engine.
 | 
					
						
							|  |  |  | #[derive(Clone)]
 | 
					
						
							|  |  |  | pub struct LsmStorageState {
 | 
					
						
							|  |  |  |     /// The current memtable.
 | 
					
						
							|  |  |  |     pub memtable: Arc<MemTable>,
 | 
					
						
							|  |  |  |     /// Immutable memtables, from latest to earliest.
 | 
					
						
							|  |  |  |     pub imm_memtables: Vec<Arc<MemTable>>,
 | 
					
						
							|  |  |  |     /// L0 SSTs, from latest to earliest.
 | 
					
						
							|  |  |  |     pub l0_sstables: Vec<usize>,
 | 
					
						
							|  |  |  |     /// SsTables sorted by key range; L1 - L_max for leveled compaction, or tiers for tiered
 | 
					
						
							|  |  |  |     /// compaction.
 | 
					
						
							|  |  |  |     pub levels: Vec<(usize, Vec<usize>)>,
 | 
					
						
							|  |  |  |     /// SST objects.
 | 
					
						
							|  |  |  |     pub sstables: HashMap<usize, Arc<SsTable>>,
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-25 21:53:47 +08:00
										 |  |  | pub enum WriteBatchRecord<T: AsRef<[u8]>> {
 | 
					
						
							|  |  |  |     Put(T, T),
 | 
					
						
							|  |  |  |     Del(T),
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  | impl LsmStorageState {
 | 
					
						
							|  |  |  |     fn create(options: &LsmStorageOptions) -> Self {
 | 
					
						
							|  |  |  |         let levels = match &options.compaction_options {
 | 
					
						
							|  |  |  |             CompactionOptions::Leveled(LeveledCompactionOptions { max_levels, .. })
 | 
					
						
							|  |  |  |             | CompactionOptions::Simple(SimpleLeveledCompactionOptions { max_levels, .. }) => (1
 | 
					
						
							|  |  |  |                 ..=*max_levels)
 | 
					
						
							|  |  |  |                 .map(|level| (level, Vec::new()))
 | 
					
						
							|  |  |  |                 .collect::<Vec<_>>(),
 | 
					
						
							|  |  |  |             CompactionOptions::Tiered(_) => Vec::new(),
 | 
					
						
							|  |  |  |             CompactionOptions::NoCompaction => vec![(1, Vec::new())],
 | 
					
						
							|  |  |  |         };
 | 
					
						
							|  |  |  |         Self {
 | 
					
						
							|  |  |  |             memtable: Arc::new(MemTable::create(0)),
 | 
					
						
							|  |  |  |             imm_memtables: Vec::new(),
 | 
					
						
							|  |  |  |             l0_sstables: Vec::new(),
 | 
					
						
							|  |  |  |             levels,
 | 
					
						
							|  |  |  |             sstables: Default::default(),
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-26 15:19:56 +08:00
										 |  |  | #[derive(Debug, Clone)]
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  | pub struct LsmStorageOptions {
 | 
					
						
							|  |  |  |     // Block size in bytes
 | 
					
						
							|  |  |  |     pub block_size: usize,
 | 
					
						
							|  |  |  |     // SST size in bytes, also the approximate memtable capacity limit
 | 
					
						
							|  |  |  |     pub target_sst_size: usize,
 | 
					
						
							|  |  |  |     // Maximum number of memtables in memory, flush to L0 when exceeding this limit
 | 
					
						
							|  |  |  |     pub num_memtable_limit: usize,
 | 
					
						
							|  |  |  |     pub compaction_options: CompactionOptions,
 | 
					
						
							|  |  |  |     pub enable_wal: bool,
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |     pub serializable: bool,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl LsmStorageOptions {
 | 
					
						
							|  |  |  |     pub fn default_for_week1_test() -> Self {
 | 
					
						
							|  |  |  |         Self {
 | 
					
						
							|  |  |  |             block_size: 4096,
 | 
					
						
							|  |  |  |             target_sst_size: 2 << 20,
 | 
					
						
							|  |  |  |             compaction_options: CompactionOptions::NoCompaction,
 | 
					
						
							|  |  |  |             enable_wal: false,
 | 
					
						
							|  |  |  |             num_memtable_limit: 50,
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |             serializable: false,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn default_for_week1_day6_test() -> Self {
 | 
					
						
							|  |  |  |         Self {
 | 
					
						
							|  |  |  |             block_size: 4096,
 | 
					
						
							|  |  |  |             target_sst_size: 2 << 20,
 | 
					
						
							|  |  |  |             compaction_options: CompactionOptions::NoCompaction,
 | 
					
						
							|  |  |  |             enable_wal: false,
 | 
					
						
							|  |  |  |             num_memtable_limit: 2,
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |             serializable: false,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							| 
									
										
										
										
											2024-01-25 15:25:23 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     pub fn default_for_week2_test(compaction_options: CompactionOptions) -> Self {
 | 
					
						
							|  |  |  |         Self {
 | 
					
						
							|  |  |  |             block_size: 4096,
 | 
					
						
							|  |  |  |             target_sst_size: 1 << 20, // 1MB
 | 
					
						
							|  |  |  |             compaction_options,
 | 
					
						
							|  |  |  |             enable_wal: false,
 | 
					
						
							|  |  |  |             num_memtable_limit: 2,
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |             serializable: false,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 15:25:23 +08:00
										 |  |  |         }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | fn range_overlap(
 | 
					
						
							|  |  |  |     user_begin: Bound<&[u8]>,
 | 
					
						
							|  |  |  |     user_end: Bound<&[u8]>,
 | 
					
						
							|  |  |  |     table_begin: KeySlice,
 | 
					
						
							|  |  |  |     table_end: KeySlice,
 | 
					
						
							|  |  |  | ) -> bool {
 | 
					
						
							|  |  |  |     match user_end {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:27:16 +08:00
										 |  |  |         Bound::Excluded(key) if key <= table_begin.key_ref() => {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |             return false;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:27:16 +08:00
										 |  |  |         Bound::Included(key) if key < table_begin.key_ref() => {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |             return false;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         _ => {}
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  |     match user_begin {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:27:16 +08:00
										 |  |  |         Bound::Excluded(key) if key >= table_end.key_ref() => {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |             return false;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:27:16 +08:00
										 |  |  |         Bound::Included(key) if key > table_end.key_ref() => {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |             return false;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         _ => {}
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  |     true
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | fn key_within(user_key: &[u8], table_begin: KeySlice, table_end: KeySlice) -> bool {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:27:16 +08:00
										 |  |  |     table_begin.key_ref() <= user_key && user_key <= table_end.key_ref()
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | /// The storage interface of the LSM tree.
 | 
					
						
							|  |  |  | pub(crate) struct LsmStorageInner {
 | 
					
						
							|  |  |  |     pub(crate) state: Arc<RwLock<Arc<LsmStorageState>>>,
 | 
					
						
							|  |  |  |     pub(crate) state_lock: Mutex<()>,
 | 
					
						
							|  |  |  |     path: PathBuf,
 | 
					
						
							|  |  |  |     pub(crate) block_cache: Arc<BlockCache>,
 | 
					
						
							|  |  |  |     next_sst_id: AtomicUsize,
 | 
					
						
							|  |  |  |     pub(crate) options: Arc<LsmStorageOptions>,
 | 
					
						
							|  |  |  |     pub(crate) compaction_controller: CompactionController,
 | 
					
						
							|  |  |  |     pub(crate) manifest: Option<Manifest>,
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |     pub(crate) mvcc: Option<LsmMvccInner>,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | /// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
 | 
					
						
							|  |  |  | pub struct MiniLsm {
 | 
					
						
							|  |  |  |     pub(crate) inner: Arc<LsmStorageInner>,
 | 
					
						
							|  |  |  |     /// Notifies the L0 flush thread to stop working. (In week 1 day 6)
 | 
					
						
							|  |  |  |     flush_notifier: crossbeam_channel::Sender<()>,
 | 
					
						
							|  |  |  |     /// The handle for the compaction thread. (In week 1 day 6)
 | 
					
						
							|  |  |  |     flush_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
 | 
					
						
							|  |  |  |     /// Notifies the compaction thread to stop working. (In week 2)
 | 
					
						
							|  |  |  |     compaction_notifier: crossbeam_channel::Sender<()>,
 | 
					
						
							|  |  |  |     /// The handle for the compaction thread. (In week 2)
 | 
					
						
							|  |  |  |     compaction_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl Drop for MiniLsm {
 | 
					
						
							|  |  |  |     fn drop(&mut self) {
 | 
					
						
							|  |  |  |         self.compaction_notifier.send(()).ok();
 | 
					
						
							|  |  |  |         self.flush_notifier.send(()).ok();
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl MiniLsm {
 | 
					
						
							|  |  |  |     pub fn close(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         self.inner.sync_dir()?;
 | 
					
						
							|  |  |  |         self.compaction_notifier.send(()).ok();
 | 
					
						
							|  |  |  |         self.flush_notifier.send(()).ok();
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if self.inner.options.enable_wal {
 | 
					
						
							|  |  |  |             self.inner.sync()?;
 | 
					
						
							|  |  |  |             self.inner.sync_dir()?;
 | 
					
						
							|  |  |  |             return Ok(());
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let mut compaction_thread = self.compaction_thread.lock();
 | 
					
						
							|  |  |  |         if let Some(compaction_thread) = compaction_thread.take() {
 | 
					
						
							|  |  |  |             compaction_thread
 | 
					
						
							|  |  |  |                 .join()
 | 
					
						
							|  |  |  |                 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         let mut flush_thread = self.flush_thread.lock();
 | 
					
						
							|  |  |  |         if let Some(flush_thread) = flush_thread.take() {
 | 
					
						
							|  |  |  |             flush_thread
 | 
					
						
							|  |  |  |                 .join()
 | 
					
						
							|  |  |  |                 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         // create memtable and skip updating manifest
 | 
					
						
							|  |  |  |         if !self.inner.state.read().memtable.is_empty() {
 | 
					
						
							|  |  |  |             self.inner
 | 
					
						
							|  |  |  |                 .freeze_memtable_with_memtable(Arc::new(MemTable::create(
 | 
					
						
							|  |  |  |                     self.inner.next_sst_id(),
 | 
					
						
							|  |  |  |                 )))?;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         while {
 | 
					
						
							|  |  |  |             let snapshot = self.inner.state.read();
 | 
					
						
							|  |  |  |             !snapshot.imm_memtables.is_empty()
 | 
					
						
							|  |  |  |         } {
 | 
					
						
							|  |  |  |             self.inner.force_flush_next_imm_memtable()?;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         self.inner.sync_dir()?;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Ok(())
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /// Start the storage engine by either loading an existing directory or creating a new one if the directory does
 | 
					
						
							|  |  |  |     /// not exist.
 | 
					
						
							|  |  |  |     pub fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Arc<Self>> {
 | 
					
						
							|  |  |  |         let inner = Arc::new(LsmStorageInner::open(path, options)?);
 | 
					
						
							|  |  |  |         let (tx1, rx) = crossbeam_channel::unbounded();
 | 
					
						
							|  |  |  |         let compaction_thread = inner.spawn_compaction_thread(rx)?;
 | 
					
						
							|  |  |  |         let (tx2, rx) = crossbeam_channel::unbounded();
 | 
					
						
							|  |  |  |         let flush_thread = inner.spawn_flush_thread(rx)?;
 | 
					
						
							|  |  |  |         Ok(Arc::new(Self {
 | 
					
						
							|  |  |  |             inner,
 | 
					
						
							|  |  |  |             flush_notifier: tx2,
 | 
					
						
							|  |  |  |             flush_thread: Mutex::new(flush_thread),
 | 
					
						
							|  |  |  |             compaction_notifier: tx1,
 | 
					
						
							|  |  |  |             compaction_thread: Mutex::new(compaction_thread),
 | 
					
						
							|  |  |  |         }))
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |     pub fn new_txn(&self) -> Result<Arc<Transaction>> {
 | 
					
						
							|  |  |  |         self.inner.new_txn()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |     pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
 | 
					
						
							|  |  |  |         self.inner.get(key)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |     pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<u64> {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 21:53:47 +08:00
										 |  |  |         self.inner.write_batch(batch)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |     pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
 | 
					
						
							|  |  |  |         self.inner.put(key, value)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn delete(&self, key: &[u8]) -> Result<()> {
 | 
					
						
							|  |  |  |         self.inner.delete(key)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn sync(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         self.inner.sync()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |     pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result<TxnIterator> {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         self.inner.scan(lower, upper)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /// Only call this in test cases due to race conditions
 | 
					
						
							|  |  |  |     pub fn force_flush(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         if !self.inner.state.read().memtable.is_empty() {
 | 
					
						
							|  |  |  |             self.inner
 | 
					
						
							|  |  |  |                 .force_freeze_memtable(&self.inner.state_lock.lock())?;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         if !self.inner.state.read().imm_memtables.is_empty() {
 | 
					
						
							|  |  |  |             self.inner.force_flush_next_imm_memtable()?;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         Ok(())
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn force_full_compaction(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         self.inner.force_full_compaction()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | impl LsmStorageInner {
 | 
					
						
							|  |  |  |     pub(crate) fn next_sst_id(&self) -> usize {
 | 
					
						
							|  |  |  |         self.next_sst_id
 | 
					
						
							|  |  |  |             .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |     pub(crate) fn mvcc(&self) -> &LsmMvccInner {
 | 
					
						
							|  |  |  |         self.mvcc.as_ref().unwrap()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn manifest(&self) -> &Manifest {
 | 
					
						
							|  |  |  |         self.manifest.as_ref().unwrap()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |     /// Start the storage engine by either loading an existing directory or creating a new one if the directory does
 | 
					
						
							|  |  |  |     /// not exist.
 | 
					
						
							|  |  |  |     pub(crate) fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Self> {
 | 
					
						
							|  |  |  |         let mut state = LsmStorageState::create(&options);
 | 
					
						
							|  |  |  |         let path = path.as_ref();
 | 
					
						
							|  |  |  |         let mut next_sst_id = 1;
 | 
					
						
							|  |  |  |         let block_cache = Arc::new(BlockCache::new(1 << 20)); // 4GB block cache,
 | 
					
						
							|  |  |  |         let manifest;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let compaction_controller = match &options.compaction_options {
 | 
					
						
							|  |  |  |             CompactionOptions::Leveled(options) => {
 | 
					
						
							|  |  |  |                 CompactionController::Leveled(LeveledCompactionController::new(options.clone()))
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |             CompactionOptions::Tiered(options) => {
 | 
					
						
							|  |  |  |                 CompactionController::Tiered(TieredCompactionController::new(options.clone()))
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |             CompactionOptions::Simple(options) => CompactionController::Simple(
 | 
					
						
							|  |  |  |                 SimpleLeveledCompactionController::new(options.clone()),
 | 
					
						
							|  |  |  |             ),
 | 
					
						
							|  |  |  |             CompactionOptions::NoCompaction => CompactionController::NoCompaction,
 | 
					
						
							|  |  |  |         };
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if !path.exists() {
 | 
					
						
							|  |  |  |             std::fs::create_dir_all(path).context("failed to create DB dir")?;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         let manifest_path = path.join("MANIFEST");
 | 
					
						
							|  |  |  |         if !manifest_path.exists() {
 | 
					
						
							|  |  |  |             if options.enable_wal {
 | 
					
						
							|  |  |  |                 state.memtable = Arc::new(MemTable::create_with_wal(
 | 
					
						
							|  |  |  |                     state.memtable.id(),
 | 
					
						
							|  |  |  |                     Self::path_of_wal_static(path, state.memtable.id()),
 | 
					
						
							|  |  |  |                 )?);
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |             manifest = Manifest::create(&manifest_path).context("failed to create manifest")?;
 | 
					
						
							|  |  |  |             manifest.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?;
 | 
					
						
							|  |  |  |         } else {
 | 
					
						
							|  |  |  |             let (m, records) = Manifest::recover(&manifest_path)?;
 | 
					
						
							|  |  |  |             let mut memtables = BTreeSet::new();
 | 
					
						
							|  |  |  |             for record in records {
 | 
					
						
							|  |  |  |                 match record {
 | 
					
						
							|  |  |  |                     ManifestRecord::Flush(sst_id) => {
 | 
					
						
							|  |  |  |                         let res = memtables.remove(&sst_id);
 | 
					
						
							|  |  |  |                         assert!(res, "memtable not exist?");
 | 
					
						
							|  |  |  |                         if compaction_controller.flush_to_l0() {
 | 
					
						
							|  |  |  |                             state.l0_sstables.insert(0, sst_id);
 | 
					
						
							|  |  |  |                         } else {
 | 
					
						
							|  |  |  |                             state.levels.insert(0, (sst_id, vec![sst_id]));
 | 
					
						
							|  |  |  |                         }
 | 
					
						
							|  |  |  |                         next_sst_id = next_sst_id.max(sst_id);
 | 
					
						
							|  |  |  |                     }
 | 
					
						
							|  |  |  |                     ManifestRecord::NewMemtable(x) => {
 | 
					
						
							|  |  |  |                         next_sst_id = next_sst_id.max(x);
 | 
					
						
							|  |  |  |                         memtables.insert(x);
 | 
					
						
							|  |  |  |                     }
 | 
					
						
							|  |  |  |                     ManifestRecord::Compaction(task, output) => {
 | 
					
						
							|  |  |  |                         let (new_state, _) =
 | 
					
						
							|  |  |  |                             compaction_controller.apply_compaction_result(&state, &task, &output);
 | 
					
						
							|  |  |  |                         // TODO: apply remove again
 | 
					
						
							|  |  |  |                         state = new_state;
 | 
					
						
							|  |  |  |                         next_sst_id =
 | 
					
						
							|  |  |  |                             next_sst_id.max(output.iter().max().copied().unwrap_or_default());
 | 
					
						
							|  |  |  |                     }
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             let mut sst_cnt = 0;
 | 
					
						
							|  |  |  |             // recover SSTs
 | 
					
						
							|  |  |  |             for table_id in state
 | 
					
						
							|  |  |  |                 .l0_sstables
 | 
					
						
							|  |  |  |                 .iter()
 | 
					
						
							|  |  |  |                 .chain(state.levels.iter().flat_map(|(_, files)| files))
 | 
					
						
							|  |  |  |             {
 | 
					
						
							|  |  |  |                 let table_id = *table_id;
 | 
					
						
							|  |  |  |                 let sst = SsTable::open(
 | 
					
						
							|  |  |  |                     table_id,
 | 
					
						
							|  |  |  |                     Some(block_cache.clone()),
 | 
					
						
							|  |  |  |                     FileObject::open(&Self::path_of_sst_static(path, table_id))
 | 
					
						
							|  |  |  |                         .context("failed to open SST")?,
 | 
					
						
							|  |  |  |                 )?;
 | 
					
						
							|  |  |  |                 state.sstables.insert(table_id, Arc::new(sst));
 | 
					
						
							|  |  |  |                 sst_cnt += 1;
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |             println!("{} SSTs opened", sst_cnt);
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             next_sst_id += 1;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             // recover memtables
 | 
					
						
							|  |  |  |             if options.enable_wal {
 | 
					
						
							|  |  |  |                 let mut wal_cnt = 0;
 | 
					
						
							|  |  |  |                 for id in memtables.iter() {
 | 
					
						
							|  |  |  |                     let memtable =
 | 
					
						
							|  |  |  |                         MemTable::recover_from_wal(*id, Self::path_of_wal_static(path, *id))?;
 | 
					
						
							|  |  |  |                     if !memtable.is_empty() {
 | 
					
						
							|  |  |  |                         state.imm_memtables.insert(0, Arc::new(memtable));
 | 
					
						
							|  |  |  |                         wal_cnt += 1;
 | 
					
						
							|  |  |  |                     }
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |                 println!("{} WALs recovered", wal_cnt);
 | 
					
						
							|  |  |  |                 state.memtable = Arc::new(MemTable::create_with_wal(
 | 
					
						
							|  |  |  |                     next_sst_id,
 | 
					
						
							|  |  |  |                     Self::path_of_wal_static(path, next_sst_id),
 | 
					
						
							|  |  |  |                 )?);
 | 
					
						
							|  |  |  |             } else {
 | 
					
						
							|  |  |  |                 state.memtable = Arc::new(MemTable::create(next_sst_id));
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |             m.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?;
 | 
					
						
							|  |  |  |             next_sst_id += 1;
 | 
					
						
							|  |  |  |             manifest = m;
 | 
					
						
							|  |  |  |         };
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let storage = Self {
 | 
					
						
							|  |  |  |             state: Arc::new(RwLock::new(Arc::new(state))),
 | 
					
						
							|  |  |  |             state_lock: Mutex::new(()),
 | 
					
						
							|  |  |  |             path: path.to_path_buf(),
 | 
					
						
							|  |  |  |             block_cache,
 | 
					
						
							|  |  |  |             next_sst_id: AtomicUsize::new(next_sst_id),
 | 
					
						
							|  |  |  |             compaction_controller,
 | 
					
						
							|  |  |  |             manifest: Some(manifest),
 | 
					
						
							|  |  |  |             options: options.into(),
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |             mvcc: Some(LsmMvccInner::new(0)),
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         };
 | 
					
						
							|  |  |  |         storage.sync_dir()?;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Ok(storage)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub fn sync(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         self.state.read().memtable.sync_wal()
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |     pub fn new_txn(self: &Arc<Self>) -> Result<Arc<Transaction>> {
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |         Ok(self.mvcc().new_txn(self.clone(), self.options.serializable))
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |     }
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |     /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |     pub fn get(self: &Arc<Self>, key: &[u8]) -> Result<Option<Bytes>> {
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |         let txn = self.mvcc().new_txn(self.clone(), self.options.serializable);
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |         txn.get(key)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn get_with_ts(&self, key: &[u8], read_ts: u64) -> Result<Option<Bytes>> {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         let snapshot = {
 | 
					
						
							|  |  |  |             let guard = self.state.read();
 | 
					
						
							|  |  |  |             Arc::clone(&guard)
 | 
					
						
							|  |  |  |         }; // drop global lock here
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |         let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1);
 | 
					
						
							|  |  |  |         memtable_iters.push(Box::new(snapshot.memtable.scan(
 | 
					
						
							|  |  |  |             Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_BEGIN)),
 | 
					
						
							|  |  |  |             Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_END)),
 | 
					
						
							|  |  |  |         )));
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         for memtable in snapshot.imm_memtables.iter() {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |             memtable_iters.push(Box::new(memtable.scan(
 | 
					
						
							|  |  |  |                 Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_BEGIN)),
 | 
					
						
							|  |  |  |                 Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_END)),
 | 
					
						
							|  |  |  |             )));
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         }
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |         let memtable_iter = MergeIterator::create(memtable_iters);
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |         let mut l0_iters = Vec::with_capacity(snapshot.l0_sstables.len());
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let keep_table = |key: &[u8], table: &SsTable| {
 | 
					
						
							|  |  |  |             if key_within(
 | 
					
						
							|  |  |  |                 key,
 | 
					
						
							|  |  |  |                 table.first_key().as_key_slice(),
 | 
					
						
							|  |  |  |                 table.last_key().as_key_slice(),
 | 
					
						
							|  |  |  |             ) {
 | 
					
						
							|  |  |  |                 if let Some(bloom) = &table.bloom {
 | 
					
						
							|  |  |  |                     if bloom.may_contain(farmhash::fingerprint32(key)) {
 | 
					
						
							|  |  |  |                         return true;
 | 
					
						
							|  |  |  |                     }
 | 
					
						
							|  |  |  |                 } else {
 | 
					
						
							|  |  |  |                     return true;
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |             false
 | 
					
						
							|  |  |  |         };
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for table in snapshot.l0_sstables.iter() {
 | 
					
						
							|  |  |  |             let table = snapshot.sstables[table].clone();
 | 
					
						
							|  |  |  |             if keep_table(key, &table) {
 | 
					
						
							|  |  |  |                 l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_key(
 | 
					
						
							|  |  |  |                     table,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |                     KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |                 )?));
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         let l0_iter = MergeIterator::create(l0_iters);
 | 
					
						
							|  |  |  |         let mut level_iters = Vec::with_capacity(snapshot.levels.len());
 | 
					
						
							|  |  |  |         for (_, level_sst_ids) in &snapshot.levels {
 | 
					
						
							|  |  |  |             let mut level_ssts = Vec::with_capacity(snapshot.levels[0].1.len());
 | 
					
						
							|  |  |  |             for table in level_sst_ids {
 | 
					
						
							|  |  |  |                 let table = snapshot.sstables[table].clone();
 | 
					
						
							|  |  |  |                 if keep_table(key, &table) {
 | 
					
						
							|  |  |  |                     level_ssts.push(table);
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |             let level_iter = SstConcatIterator::create_and_seek_to_key(
 | 
					
						
							|  |  |  |                 level_ssts,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |                 KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |             )?;
 | 
					
						
							|  |  |  |             level_iters.push(Box::new(level_iter));
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |         let iter = LsmIterator::new(
 | 
					
						
							|  |  |  |             TwoMergeIterator::create(
 | 
					
						
							|  |  |  |                 TwoMergeIterator::create(memtable_iter, l0_iter)?,
 | 
					
						
							|  |  |  |                 MergeIterator::create(level_iters),
 | 
					
						
							|  |  |  |             )?,
 | 
					
						
							|  |  |  |             Bound::Unbounded,
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |             read_ts,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |         )?;
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |         if iter.is_valid() && iter.key() == key && !iter.value().is_empty() {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |             return Ok(Some(Bytes::copy_from_slice(iter.value())));
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         Ok(None)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |     pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<u64> {
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |         let _lck = self.mvcc().write_lock.lock();
 | 
					
						
							|  |  |  |         let ts = self.mvcc().latest_commit_ts() + 1;
 | 
					
						
							| 
									
										
										
										
											2024-01-25 21:53:47 +08:00
										 |  |  |         for record in batch {
 | 
					
						
							|  |  |  |             match record {
 | 
					
						
							|  |  |  |                 WriteBatchRecord::Del(key) => {
 | 
					
						
							|  |  |  |                     let key = key.as_ref();
 | 
					
						
							|  |  |  |                     assert!(!key.is_empty(), "key cannot be empty");
 | 
					
						
							|  |  |  |                     let size;
 | 
					
						
							|  |  |  |                     {
 | 
					
						
							|  |  |  |                         let guard = self.state.read();
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |                         guard.memtable.put(KeySlice::from_slice(key, ts), b"")?;
 | 
					
						
							| 
									
										
										
										
											2024-01-25 21:53:47 +08:00
										 |  |  |                         size = guard.memtable.approximate_size();
 | 
					
						
							|  |  |  |                     }
 | 
					
						
							|  |  |  |                     self.try_freeze(size)?;
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |                 WriteBatchRecord::Put(key, value) => {
 | 
					
						
							|  |  |  |                     let key = key.as_ref();
 | 
					
						
							|  |  |  |                     let value = value.as_ref();
 | 
					
						
							|  |  |  |                     assert!(!key.is_empty(), "key cannot be empty");
 | 
					
						
							|  |  |  |                     assert!(!value.is_empty(), "value cannot be empty");
 | 
					
						
							|  |  |  |                     let size;
 | 
					
						
							|  |  |  |                     {
 | 
					
						
							|  |  |  |                         let guard = self.state.read();
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |                         guard.memtable.put(KeySlice::from_slice(key, ts), value)?;
 | 
					
						
							| 
									
										
										
										
											2024-01-25 21:53:47 +08:00
										 |  |  |                         size = guard.memtable.approximate_size();
 | 
					
						
							|  |  |  |                     }
 | 
					
						
							|  |  |  |                     self.try_freeze(size)?;
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |             }
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         }
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |         self.mvcc().update_commit_ts(ts);
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |         Ok(ts)
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-25 21:53:47 +08:00
										 |  |  |     /// Put a key-value pair into the storage by writing into the current memtable.
 | 
					
						
							| 
									
										
										
										
											2024-01-26 22:20:06 +08:00
										 |  |  |     pub fn put(self: &Arc<Self>, key: &[u8], value: &[u8]) -> Result<()> {
 | 
					
						
							|  |  |  |         if !self.options.serializable {
 | 
					
						
							|  |  |  |             self.write_batch(&[WriteBatchRecord::Put(key, value)])?;
 | 
					
						
							|  |  |  |         } else {
 | 
					
						
							|  |  |  |             let txn = self.mvcc().new_txn(self.clone(), self.options.serializable);
 | 
					
						
							|  |  |  |             txn.put(key, value);
 | 
					
						
							|  |  |  |             txn.commit()?;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |         Ok(())
 | 
					
						
							| 
									
										
										
										
											2024-01-25 21:53:47 +08:00
										 |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |     /// Remove a key from the storage by writing an empty value.
 | 
					
						
							| 
									
										
										
										
											2024-01-26 22:20:06 +08:00
										 |  |  |     pub fn delete(self: &Arc<Self>, key: &[u8]) -> Result<()> {
 | 
					
						
							|  |  |  |         if !self.options.serializable {
 | 
					
						
							|  |  |  |             self.write_batch(&[WriteBatchRecord::Del(key)])?;
 | 
					
						
							|  |  |  |         } else {
 | 
					
						
							|  |  |  |             let txn = self.mvcc().new_txn(self.clone(), self.options.serializable);
 | 
					
						
							|  |  |  |             txn.delete(key);
 | 
					
						
							|  |  |  |             txn.commit()?;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |         Ok(())
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     fn try_freeze(&self, estimated_size: usize) -> Result<()> {
 | 
					
						
							|  |  |  |         if estimated_size >= self.options.target_sst_size {
 | 
					
						
							|  |  |  |             let state_lock = self.state_lock.lock();
 | 
					
						
							|  |  |  |             let guard = self.state.read();
 | 
					
						
							|  |  |  |             // the memtable could have already been frozen, check again to ensure we really need to freeze
 | 
					
						
							|  |  |  |             if guard.memtable.approximate_size() >= self.options.target_sst_size {
 | 
					
						
							|  |  |  |                 drop(guard);
 | 
					
						
							|  |  |  |                 self.force_freeze_memtable(&state_lock)?;
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  |         Ok(())
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn path_of_sst_static(path: impl AsRef<Path>, id: usize) -> PathBuf {
 | 
					
						
							|  |  |  |         path.as_ref().join(format!("{:05}.sst", id))
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf {
 | 
					
						
							|  |  |  |         Self::path_of_sst_static(&self.path, id)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn path_of_wal_static(path: impl AsRef<Path>, id: usize) -> PathBuf {
 | 
					
						
							|  |  |  |         path.as_ref().join(format!("{:05}.wal", id))
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn path_of_wal(&self, id: usize) -> PathBuf {
 | 
					
						
							|  |  |  |         Self::path_of_wal_static(&self.path, id)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(super) fn sync_dir(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         File::open(&self.path)?.sync_all()?;
 | 
					
						
							|  |  |  |         Ok(())
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     fn freeze_memtable_with_memtable(&self, memtable: Arc<MemTable>) -> Result<()> {
 | 
					
						
							|  |  |  |         let mut guard = self.state.write();
 | 
					
						
							|  |  |  |         // Swap the current memtable with a new one.
 | 
					
						
							|  |  |  |         let mut snapshot = guard.as_ref().clone();
 | 
					
						
							|  |  |  |         let old_memtable = std::mem::replace(&mut snapshot.memtable, memtable);
 | 
					
						
							|  |  |  |         // Add the memtable to the immutable memtables.
 | 
					
						
							|  |  |  |         snapshot.imm_memtables.insert(0, old_memtable.clone());
 | 
					
						
							|  |  |  |         // Update the snapshot.
 | 
					
						
							|  |  |  |         *guard = Arc::new(snapshot);
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         drop(guard);
 | 
					
						
							|  |  |  |         old_memtable.sync_wal()?;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Ok(())
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /// Force freeze the current memtable to an immutable memtable
 | 
					
						
							|  |  |  |     pub fn force_freeze_memtable(&self, state_lock_observer: &MutexGuard<'_, ()>) -> Result<()> {
 | 
					
						
							|  |  |  |         let memtable_id = self.next_sst_id();
 | 
					
						
							|  |  |  |         let memtable = if self.options.enable_wal {
 | 
					
						
							|  |  |  |             Arc::new(MemTable::create_with_wal(
 | 
					
						
							|  |  |  |                 memtable_id,
 | 
					
						
							|  |  |  |                 self.path_of_wal(memtable_id),
 | 
					
						
							|  |  |  |             )?)
 | 
					
						
							|  |  |  |         } else {
 | 
					
						
							|  |  |  |             Arc::new(MemTable::create(memtable_id))
 | 
					
						
							|  |  |  |         };
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.freeze_memtable_with_memtable(memtable)?;
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |         self.manifest().add_record(
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |             state_lock_observer,
 | 
					
						
							|  |  |  |             ManifestRecord::NewMemtable(memtable_id),
 | 
					
						
							|  |  |  |         )?;
 | 
					
						
							|  |  |  |         self.sync_dir()?;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Ok(())
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /// Force flush the earliest-created immutable memtable to disk
 | 
					
						
							|  |  |  |     pub fn force_flush_next_imm_memtable(&self) -> Result<()> {
 | 
					
						
							|  |  |  |         let state_lock = self.state_lock.lock();
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let flush_memtable;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         {
 | 
					
						
							|  |  |  |             let guard = self.state.read();
 | 
					
						
							|  |  |  |             flush_memtable = guard
 | 
					
						
							|  |  |  |                 .imm_memtables
 | 
					
						
							|  |  |  |                 .last()
 | 
					
						
							|  |  |  |                 .expect("no imm memtables!")
 | 
					
						
							|  |  |  |                 .clone();
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let mut builder = SsTableBuilder::new(self.options.block_size);
 | 
					
						
							|  |  |  |         flush_memtable.flush(&mut builder)?;
 | 
					
						
							|  |  |  |         let sst_id = flush_memtable.id();
 | 
					
						
							|  |  |  |         let sst = Arc::new(builder.build(
 | 
					
						
							|  |  |  |             sst_id,
 | 
					
						
							|  |  |  |             Some(self.block_cache.clone()),
 | 
					
						
							|  |  |  |             self.path_of_sst(sst_id),
 | 
					
						
							|  |  |  |         )?);
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         // Add the flushed L0 table to the list.
 | 
					
						
							|  |  |  |         {
 | 
					
						
							|  |  |  |             let mut guard = self.state.write();
 | 
					
						
							|  |  |  |             let mut snapshot = guard.as_ref().clone();
 | 
					
						
							|  |  |  |             // Remove the memtable from the immutable memtables.
 | 
					
						
							|  |  |  |             let mem = snapshot.imm_memtables.pop().unwrap();
 | 
					
						
							|  |  |  |             assert_eq!(mem.id(), sst_id);
 | 
					
						
							|  |  |  |             // Add L0 table
 | 
					
						
							|  |  |  |             if self.compaction_controller.flush_to_l0() {
 | 
					
						
							|  |  |  |                 // In leveled compaction or no compaction, simply flush to L0
 | 
					
						
							|  |  |  |                 snapshot.l0_sstables.insert(0, sst_id);
 | 
					
						
							|  |  |  |             } else {
 | 
					
						
							|  |  |  |                 // In tiered compaction, create a new tier
 | 
					
						
							|  |  |  |                 snapshot.levels.insert(0, (sst_id, vec![sst_id]));
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |             println!("flushed {}.sst with size={}", sst_id, sst.table_size());
 | 
					
						
							|  |  |  |             snapshot.sstables.insert(sst_id, sst);
 | 
					
						
							|  |  |  |             // Update the snapshot.
 | 
					
						
							|  |  |  |             *guard = Arc::new(snapshot);
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if self.options.enable_wal {
 | 
					
						
							|  |  |  |             std::fs::remove_file(self.path_of_wal(sst_id))?;
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |         self.manifest()
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |             .add_record(&state_lock, ManifestRecord::Flush(sst_id))?;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.sync_dir()?;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Ok(())
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /// Create an iterator over a range of keys.
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |     pub fn scan<'a>(
 | 
					
						
							|  |  |  |         self: &'a Arc<Self>,
 | 
					
						
							|  |  |  |         lower: Bound<&[u8]>,
 | 
					
						
							|  |  |  |         upper: Bound<&[u8]>,
 | 
					
						
							|  |  |  |     ) -> Result<TxnIterator> {
 | 
					
						
							| 
									
										
										
										
											2024-01-26 18:14:34 +08:00
										 |  |  |         let txn = self.mvcc().new_txn(self.clone(), self.options.serializable);
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |         txn.scan(lower, upper)
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pub(crate) fn scan_with_ts(
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         &self,
 | 
					
						
							|  |  |  |         lower: Bound<&[u8]>,
 | 
					
						
							|  |  |  |         upper: Bound<&[u8]>,
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |         read_ts: u64,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |     ) -> Result<FusedIterator<LsmIterator>> {
 | 
					
						
							|  |  |  |         let snapshot = {
 | 
					
						
							|  |  |  |             let guard = self.state.read();
 | 
					
						
							|  |  |  |             Arc::clone(&guard)
 | 
					
						
							|  |  |  |         }; // drop global lock here
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1);
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |         memtable_iters.push(Box::new(snapshot.memtable.scan(
 | 
					
						
							|  |  |  |             map_key_bound_plus_ts(lower, key::TS_RANGE_BEGIN),
 | 
					
						
							|  |  |  |             map_key_bound_plus_ts(upper, key::TS_RANGE_END),
 | 
					
						
							|  |  |  |         )));
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         for memtable in snapshot.imm_memtables.iter() {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |             memtable_iters.push(Box::new(memtable.scan(
 | 
					
						
							|  |  |  |                 map_key_bound_plus_ts(lower, key::TS_RANGE_BEGIN),
 | 
					
						
							|  |  |  |                 map_key_bound_plus_ts(upper, key::TS_RANGE_END),
 | 
					
						
							|  |  |  |             )));
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         }
 | 
					
						
							|  |  |  |         let memtable_iter = MergeIterator::create(memtable_iters);
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let mut table_iters = Vec::with_capacity(snapshot.l0_sstables.len());
 | 
					
						
							|  |  |  |         for table_id in snapshot.l0_sstables.iter() {
 | 
					
						
							|  |  |  |             let table = snapshot.sstables[table_id].clone();
 | 
					
						
							|  |  |  |             if range_overlap(
 | 
					
						
							|  |  |  |                 lower,
 | 
					
						
							|  |  |  |                 upper,
 | 
					
						
							|  |  |  |                 table.first_key().as_key_slice(),
 | 
					
						
							|  |  |  |                 table.last_key().as_key_slice(),
 | 
					
						
							|  |  |  |             ) {
 | 
					
						
							|  |  |  |                 let iter = match lower {
 | 
					
						
							|  |  |  |                     Bound::Included(key) => SsTableIterator::create_and_seek_to_key(
 | 
					
						
							|  |  |  |                         table,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |                         KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |                     )?,
 | 
					
						
							|  |  |  |                     Bound::Excluded(key) => {
 | 
					
						
							|  |  |  |                         let mut iter = SsTableIterator::create_and_seek_to_key(
 | 
					
						
							|  |  |  |                             table,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |                             KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |                         )?;
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:27:16 +08:00
										 |  |  |                         if iter.is_valid() && iter.key().key_ref() == key {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |                             iter.next()?;
 | 
					
						
							|  |  |  |                         }
 | 
					
						
							|  |  |  |                         iter
 | 
					
						
							|  |  |  |                     }
 | 
					
						
							|  |  |  |                     Bound::Unbounded => SsTableIterator::create_and_seek_to_first(table)?,
 | 
					
						
							|  |  |  |                 };
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 table_iters.push(Box::new(iter));
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let l0_iter = MergeIterator::create(table_iters);
 | 
					
						
							|  |  |  |         let mut level_iters = Vec::with_capacity(snapshot.levels.len());
 | 
					
						
							|  |  |  |         for (_, level_sst_ids) in &snapshot.levels {
 | 
					
						
							|  |  |  |             let mut level_ssts = Vec::with_capacity(level_sst_ids.len());
 | 
					
						
							|  |  |  |             for table in level_sst_ids {
 | 
					
						
							|  |  |  |                 let table = snapshot.sstables[table].clone();
 | 
					
						
							|  |  |  |                 if range_overlap(
 | 
					
						
							|  |  |  |                     lower,
 | 
					
						
							|  |  |  |                     upper,
 | 
					
						
							|  |  |  |                     table.first_key().as_key_slice(),
 | 
					
						
							|  |  |  |                     table.last_key().as_key_slice(),
 | 
					
						
							|  |  |  |                 ) {
 | 
					
						
							|  |  |  |                     level_ssts.push(table);
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |             }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             let level_iter = match lower {
 | 
					
						
							|  |  |  |                 Bound::Included(key) => SstConcatIterator::create_and_seek_to_key(
 | 
					
						
							|  |  |  |                     level_ssts,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |                     KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |                 )?,
 | 
					
						
							|  |  |  |                 Bound::Excluded(key) => {
 | 
					
						
							|  |  |  |                     let mut iter = SstConcatIterator::create_and_seek_to_key(
 | 
					
						
							|  |  |  |                         level_ssts,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 23:09:16 +08:00
										 |  |  |                         KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |                     )?;
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:27:16 +08:00
										 |  |  |                     if iter.is_valid() && iter.key().key_ref() == key {
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |                         iter.next()?;
 | 
					
						
							|  |  |  |                     }
 | 
					
						
							|  |  |  |                     iter
 | 
					
						
							|  |  |  |                 }
 | 
					
						
							|  |  |  |                 Bound::Unbounded => SstConcatIterator::create_and_seek_to_first(level_ssts)?,
 | 
					
						
							|  |  |  |             };
 | 
					
						
							|  |  |  |             level_iters.push(Box::new(level_iter));
 | 
					
						
							|  |  |  |         }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         let iter = TwoMergeIterator::create(memtable_iter, l0_iter)?;
 | 
					
						
							|  |  |  |         let iter = TwoMergeIterator::create(iter, MergeIterator::create(level_iters))?;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Ok(FusedIterator::new(LsmIterator::new(
 | 
					
						
							|  |  |  |             iter,
 | 
					
						
							|  |  |  |             map_bound(upper),
 | 
					
						
							| 
									
										
										
										
											2024-01-26 16:52:37 +08:00
										 |  |  |             read_ts,
 | 
					
						
							| 
									
										
										
										
											2024-01-25 12:07:53 +08:00
										 |  |  |         )?))
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  | }
 |