use std::collections::HashMap; use std::ops::Bound; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use anyhow::Result; use bytes::Bytes; use parking_lot::{Mutex, RwLock}; use crate::block::Block; use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::two_merge_iterator::TwoMergeIterator; use crate::iterators::StorageIterator; use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::mem_table::{map_bound, MemTable}; use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; #[derive(Clone)] pub struct LsmStorageInner { /// The current memtable. pub memtable: Arc, /// Immutable memTables, from earliest to latest. pub imm_memtables: Vec>, /// L0 SsTables, from earliest to latest. pub l0_sstables: Vec, /// SsTables sorted by key range; L1 - L6 for leveled compaction, or tiers for tiered /// compaction. #[allow(dead_code)] pub levels: Vec<(usize, Vec)>, /// SsTable objects. pub sstables: HashMap>, } impl LsmStorageInner { fn create() -> Self { Self { memtable: Arc::new(MemTable::create()), imm_memtables: vec![], l0_sstables: vec![], levels: vec![], sstables: Default::default(), } } } /// The storage interface of the LSM tree. pub struct LsmStorage { pub(crate) inner: Arc>>, flush_lock: Mutex<()>, path: PathBuf, pub(crate) block_cache: Arc, next_sst_id: AtomicUsize, } impl LsmStorage { pub(crate) fn next_sst_id(&self) -> usize { self.next_sst_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst) } pub fn open(path: impl AsRef) -> Result { Ok(Self { inner: Arc::new(RwLock::new(Arc::new(LsmStorageInner::create()))), flush_lock: Mutex::new(()), path: path.as_ref().to_path_buf(), block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache, next_sst_id: AtomicUsize::new(1), }) } /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. pub fn get(&self, key: &[u8]) -> Result> { let snapshot = { let guard = self.inner.read(); Arc::clone(&guard) }; // drop global lock here // Search on the current memtable. if let Some(value) = snapshot.memtable.get(key) { if value.is_empty() { // found tomestone, return key not exists return Ok(None); } return Ok(Some(value)); } // Search on immutable memtables. for memtable in snapshot.imm_memtables.iter().rev() { if let Some(value) = memtable.get(key) { if value.is_empty() { // found tomestone, return key not exists return Ok(None); } return Ok(Some(value)); } } let mut iters = Vec::with_capacity(snapshot.l0_sstables.len()); for table in snapshot.l0_sstables.iter().rev() { iters.push(Box::new(SsTableIterator::create_and_seek_to_key( snapshot.sstables[table].clone(), key, )?)); } let iter = MergeIterator::create(iters); if iter.is_valid() { return Ok(Some(Bytes::copy_from_slice(iter.value()))); } Ok(None) } /// Put a key-value pair into the storage by writing into the current memtable. pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { assert!(!value.is_empty(), "value cannot be empty"); assert!(!key.is_empty(), "key cannot be empty"); let guard = self.inner.read(); guard.memtable.put(key, value); Ok(()) } /// Remove a key from the storage by writing an empty value. pub fn delete(&self, key: &[u8]) -> Result<()> { assert!(!key.is_empty(), "key cannot be empty"); let guard = self.inner.read(); guard.memtable.put(key, b""); Ok(()) } pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf { self.path.join(format!("{:05}.sst", id)) } /// Persist data to disk. pub fn sync(&self) -> Result<()> { let _flush_lock = self.flush_lock.lock(); let flush_memtable; let sst_id; // Move mutable memtable to immutable memtables. { let mut guard = self.inner.write(); // Swap the current memtable with a new one. let mut snapshot = guard.as_ref().clone(); let memtable = std::mem::replace(&mut snapshot.memtable, Arc::new(MemTable::create())); flush_memtable = memtable.clone(); sst_id = self.next_sst_id(); // Add the memtable to the immutable memtables. snapshot.imm_memtables.push(memtable); // Update the snapshot. *guard = Arc::new(snapshot); } // At this point, the old memtable should be disabled for write, and all write threads // should be operating on the new memtable. We can safely flush the old memtable to // disk. let mut builder = SsTableBuilder::new(4096); flush_memtable.flush(&mut builder)?; let sst = Arc::new(builder.build( sst_id, Some(self.block_cache.clone()), self.path_of_sst(sst_id), )?); // Add the flushed L0 table to the list. { let mut guard = self.inner.write(); let mut snapshot = guard.as_ref().clone(); // Remove the memtable from the immutable memtables. snapshot.imm_memtables.pop(); // Add L0 table snapshot.l0_sstables.push(sst_id); snapshot.sstables.insert(sst_id, sst); // Update the snapshot. *guard = Arc::new(snapshot); } Ok(()) } /// Create an iterator over a range of keys. pub fn scan( &self, lower: Bound<&[u8]>, upper: Bound<&[u8]>, ) -> Result> { let snapshot = { let guard = self.inner.read(); Arc::clone(&guard) }; // drop global lock here let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1); memtable_iters.push(Box::new(snapshot.memtable.scan(lower, upper))); for memtable in snapshot.imm_memtables.iter().rev() { memtable_iters.push(Box::new(memtable.scan(lower, upper))); } let memtable_iter = MergeIterator::create(memtable_iters); let mut table_iters = Vec::with_capacity(snapshot.l0_sstables.len()); for table_id in snapshot.l0_sstables.iter().rev() { let table = snapshot.sstables[table_id].clone(); let iter = match lower { Bound::Included(key) => SsTableIterator::create_and_seek_to_key(table, key)?, Bound::Excluded(key) => { let mut iter = SsTableIterator::create_and_seek_to_key(table, key)?; if iter.is_valid() && iter.key() == key { iter.next()?; } iter } Bound::Unbounded => SsTableIterator::create_and_seek_to_first(table)?, }; table_iters.push(Box::new(iter)); } let table_iter = MergeIterator::create(table_iters); let iter = TwoMergeIterator::create(memtable_iter, table_iter)?; Ok(FusedIterator::new(LsmIterator::new( iter, map_bound(upper), )?)) } }