// Copyright (c) 2022-2025 Alex Chi Z // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use std::collections::{BTreeSet, HashMap}; use std::fs::File; use std::ops::Bound; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::sync::atomic::AtomicUsize; use anyhow::{Context, Result}; use bytes::Bytes; use parking_lot::{Mutex, MutexGuard, RwLock}; use crate::block::Block; use crate::compact::{ CompactionController, CompactionOptions, LeveledCompactionController, LeveledCompactionOptions, SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController, }; use crate::iterators::StorageIterator; use crate::iterators::concat_iterator::SstConcatIterator; use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::two_merge_iterator::TwoMergeIterator; use crate::key::{self, KeySlice}; use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::manifest::{Manifest, ManifestRecord}; use crate::mem_table::{MemTable, map_bound, map_key_bound_plus_ts}; use crate::mvcc::LsmMvccInner; use crate::mvcc::txn::{Transaction, TxnIterator}; use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator}; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; /// Represents the state of the storage engine. #[derive(Clone)] pub struct LsmStorageState { /// The current memtable. pub memtable: Arc, /// Immutable memtables, from latest to earliest. pub imm_memtables: Vec>, /// L0 SSTs, from latest to earliest. pub l0_sstables: Vec, /// SsTables sorted by key range; L1 - L_max for leveled compaction, or tiers for tiered /// compaction. pub levels: Vec<(usize, Vec)>, /// SST objects. pub sstables: HashMap>, } pub enum WriteBatchRecord> { Put(T, T), Del(T), } impl LsmStorageState { fn create(options: &LsmStorageOptions) -> Self { let levels = match &options.compaction_options { CompactionOptions::Leveled(LeveledCompactionOptions { max_levels, .. }) | CompactionOptions::Simple(SimpleLeveledCompactionOptions { max_levels, .. }) => (1 ..=*max_levels) .map(|level| (level, Vec::new())) .collect::>(), CompactionOptions::Tiered(_) => Vec::new(), CompactionOptions::NoCompaction => vec![(1, Vec::new())], }; Self { memtable: Arc::new(MemTable::create(0)), imm_memtables: Vec::new(), l0_sstables: Vec::new(), levels, sstables: Default::default(), } } } #[derive(Debug, Clone)] pub struct LsmStorageOptions { // Block size in bytes pub block_size: usize, // SST size in bytes, also the approximate memtable capacity limit pub target_sst_size: usize, // Maximum number of memtables in memory, flush to L0 when exceeding this limit pub num_memtable_limit: usize, pub compaction_options: CompactionOptions, pub enable_wal: bool, pub serializable: bool, } impl LsmStorageOptions { pub fn default_for_week1_test() -> Self { Self { block_size: 4096, target_sst_size: 2 << 20, compaction_options: CompactionOptions::NoCompaction, enable_wal: false, num_memtable_limit: 50, serializable: false, } } pub fn default_for_week1_day6_test() -> Self { Self { block_size: 4096, target_sst_size: 2 << 20, compaction_options: CompactionOptions::NoCompaction, enable_wal: false, num_memtable_limit: 2, serializable: false, } } pub fn default_for_week2_test(compaction_options: CompactionOptions) -> Self { Self { block_size: 4096, target_sst_size: 1 << 20, // 1MB compaction_options, enable_wal: false, num_memtable_limit: 2, serializable: false, } } } fn range_overlap( user_begin: Bound<&[u8]>, user_end: Bound<&[u8]>, table_begin: KeySlice, table_end: KeySlice, ) -> bool { match user_end { Bound::Excluded(key) if key <= table_begin.key_ref() => { return false; } Bound::Included(key) if key < table_begin.key_ref() => { return false; } _ => {} } match user_begin { Bound::Excluded(key) if key >= table_end.key_ref() => { return false; } Bound::Included(key) if key > table_end.key_ref() => { return false; } _ => {} } true } fn key_within(user_key: &[u8], table_begin: KeySlice, table_end: KeySlice) -> bool { table_begin.key_ref() <= user_key && user_key <= table_end.key_ref() } #[derive(Clone, Debug)] pub enum CompactionFilter { Prefix(Bytes), } /// The storage interface of the LSM tree. pub(crate) struct LsmStorageInner { pub(crate) state: Arc>>, pub(crate) state_lock: Mutex<()>, path: PathBuf, pub(crate) block_cache: Arc, next_sst_id: AtomicUsize, pub(crate) options: Arc, pub(crate) compaction_controller: CompactionController, pub(crate) manifest: Option, pub(crate) mvcc: Option, pub(crate) compaction_filters: Arc>>, } /// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM. pub struct MiniLsm { pub(crate) inner: Arc, /// Notifies the L0 flush thread to stop working. (In week 1 day 6) flush_notifier: crossbeam_channel::Sender<()>, /// The handle for the flush thread. (In week 1 day 6) flush_thread: Mutex>>, /// Notifies the compaction thread to stop working. (In week 2) compaction_notifier: crossbeam_channel::Sender<()>, /// The handle for the compaction thread. (In week 2) compaction_thread: Mutex>>, } impl Drop for MiniLsm { fn drop(&mut self) { self.compaction_notifier.send(()).ok(); self.flush_notifier.send(()).ok(); } } impl MiniLsm { pub fn close(&self) -> Result<()> { self.inner.sync_dir()?; self.compaction_notifier.send(()).ok(); self.flush_notifier.send(()).ok(); let mut compaction_thread = self.compaction_thread.lock(); if let Some(compaction_thread) = compaction_thread.take() { compaction_thread .join() .map_err(|e| anyhow::anyhow!("{:?}", e))?; } let mut flush_thread = self.flush_thread.lock(); if let Some(flush_thread) = flush_thread.take() { flush_thread .join() .map_err(|e| anyhow::anyhow!("{:?}", e))?; } if self.inner.options.enable_wal { self.inner.sync()?; self.inner.sync_dir()?; return Ok(()); } // create memtable and skip updating manifest if !self.inner.state.read().memtable.is_empty() { self.inner .freeze_memtable_with_memtable(Arc::new(MemTable::create( self.inner.next_sst_id(), )))?; } while { let snapshot = self.inner.state.read(); !snapshot.imm_memtables.is_empty() } { self.inner.force_flush_next_imm_memtable()?; } self.inner.sync_dir()?; Ok(()) } /// Start the storage engine by either loading an existing directory or creating a new one if the directory does /// not exist. pub fn open(path: impl AsRef, options: LsmStorageOptions) -> Result> { let inner = Arc::new(LsmStorageInner::open(path, options)?); let (tx1, rx) = crossbeam_channel::unbounded(); let compaction_thread = inner.spawn_compaction_thread(rx)?; let (tx2, rx) = crossbeam_channel::unbounded(); let flush_thread = inner.spawn_flush_thread(rx)?; Ok(Arc::new(Self { inner, flush_notifier: tx2, flush_thread: Mutex::new(flush_thread), compaction_notifier: tx1, compaction_thread: Mutex::new(compaction_thread), })) } pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) { self.inner.add_compaction_filter(compaction_filter) } pub fn get(&self, key: &[u8]) -> Result> { self.inner.get(key) } pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result<()> { self.inner.write_batch(batch) } pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { self.inner.put(key, value) } pub fn delete(&self, key: &[u8]) -> Result<()> { self.inner.delete(key) } pub fn sync(&self) -> Result<()> { self.inner.sync() } pub fn new_txn(&self) -> Result> { self.inner.new_txn() } pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { self.inner.scan(lower, upper) } /// Only call this in test cases due to race conditions pub fn force_flush(&self) -> Result<()> { if !self.inner.state.read().memtable.is_empty() { self.inner .force_freeze_memtable(&self.inner.state_lock.lock())?; } if !self.inner.state.read().imm_memtables.is_empty() { self.inner.force_flush_next_imm_memtable()?; } Ok(()) } pub fn force_full_compaction(&self) -> Result<()> { self.inner.force_full_compaction() } } impl LsmStorageInner { pub(crate) fn next_sst_id(&self) -> usize { self.next_sst_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst) } pub(crate) fn mvcc(&self) -> &LsmMvccInner { self.mvcc.as_ref().unwrap() } pub(crate) fn manifest(&self) -> &Manifest { self.manifest.as_ref().unwrap() } /// Start the storage engine by either loading an existing directory or creating a new one if the directory does /// not exist. pub(crate) fn open(path: impl AsRef, options: LsmStorageOptions) -> Result { let mut state = LsmStorageState::create(&options); let path = path.as_ref(); let mut next_sst_id = 1; let block_cache = Arc::new(BlockCache::new(1 << 20)); // 4GB block cache, let manifest; let compaction_controller = match &options.compaction_options { CompactionOptions::Leveled(options) => { CompactionController::Leveled(LeveledCompactionController::new(options.clone())) } CompactionOptions::Tiered(options) => { CompactionController::Tiered(TieredCompactionController::new(options.clone())) } CompactionOptions::Simple(options) => CompactionController::Simple( SimpleLeveledCompactionController::new(options.clone()), ), CompactionOptions::NoCompaction => CompactionController::NoCompaction, }; if !path.exists() { std::fs::create_dir_all(path).context("failed to create DB dir")?; } let manifest_path = path.join("MANIFEST"); let mut last_commit_ts = 0; if !manifest_path.exists() { if options.enable_wal { state.memtable = Arc::new(MemTable::create_with_wal( state.memtable.id(), Self::path_of_wal_static(path, state.memtable.id()), )?); } manifest = Manifest::create(&manifest_path).context("failed to create manifest")?; manifest.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?; } else { let (m, records) = Manifest::recover(&manifest_path)?; let mut memtables = BTreeSet::new(); for record in records { match record { ManifestRecord::Flush(sst_id) => { let res = memtables.remove(&sst_id); assert!(res, "memtable not exist?"); if compaction_controller.flush_to_l0() { state.l0_sstables.insert(0, sst_id); } else { state.levels.insert(0, (sst_id, vec![sst_id])); } next_sst_id = next_sst_id.max(sst_id); } ManifestRecord::NewMemtable(x) => { next_sst_id = next_sst_id.max(x); memtables.insert(x); } ManifestRecord::Compaction(task, output) => { let (new_state, _) = compaction_controller .apply_compaction_result(&state, &task, &output, true); // TODO: apply remove again state = new_state; next_sst_id = next_sst_id.max(output.iter().max().copied().unwrap_or_default()); } } } let mut sst_cnt = 0; // recover SSTs for table_id in state .l0_sstables .iter() .chain(state.levels.iter().flat_map(|(_, files)| files)) { let table_id = *table_id; let sst = SsTable::open( table_id, Some(block_cache.clone()), FileObject::open(&Self::path_of_sst_static(path, table_id)) .context("failed to open SST")?, )?; last_commit_ts = last_commit_ts.max(sst.max_ts()); state.sstables.insert(table_id, Arc::new(sst)); sst_cnt += 1; } println!("{} SSTs opened", sst_cnt); next_sst_id += 1; // Sort SSTs on each level (only for leveled compaction) if let CompactionController::Leveled(_) = &compaction_controller { for (_id, ssts) in &mut state.levels { ssts.sort_by(|x, y| { state .sstables .get(x) .unwrap() .first_key() .cmp(state.sstables.get(y).unwrap().first_key()) }) } } // recover memtables if options.enable_wal { let mut wal_cnt = 0; for id in memtables.iter() { let memtable = MemTable::recover_from_wal(*id, Self::path_of_wal_static(path, *id))?; let max_ts = memtable .map .iter() .map(|x| x.key().ts()) .max() .unwrap_or_default(); last_commit_ts = last_commit_ts.max(max_ts); if !memtable.is_empty() { state.imm_memtables.insert(0, Arc::new(memtable)); wal_cnt += 1; } } println!("{} WALs recovered", wal_cnt); state.memtable = Arc::new(MemTable::create_with_wal( next_sst_id, Self::path_of_wal_static(path, next_sst_id), )?); } else { state.memtable = Arc::new(MemTable::create(next_sst_id)); } m.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?; next_sst_id += 1; manifest = m; }; let storage = Self { state: Arc::new(RwLock::new(Arc::new(state))), state_lock: Mutex::new(()), path: path.to_path_buf(), block_cache, next_sst_id: AtomicUsize::new(next_sst_id), compaction_controller, manifest: Some(manifest), options: options.into(), mvcc: Some(LsmMvccInner::new(last_commit_ts)), compaction_filters: Arc::new(Mutex::new(Vec::new())), }; storage.sync_dir()?; Ok(storage) } pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) { let mut compaction_filters = self.compaction_filters.lock(); compaction_filters.push(compaction_filter); } pub fn sync(&self) -> Result<()> { self.state.read().memtable.sync_wal() } /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. pub fn get(self: &Arc, key: &[u8]) -> Result> { let txn = self.mvcc().new_txn(self.clone(), self.options.serializable); txn.get(key) } pub(crate) fn get_with_ts(&self, key: &[u8], read_ts: u64) -> Result> { let snapshot = { let guard = self.state.read(); Arc::clone(&guard) }; // drop global lock here let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1); memtable_iters.push(Box::new(snapshot.memtable.scan( Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_BEGIN)), Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_END)), ))); for memtable in snapshot.imm_memtables.iter() { memtable_iters.push(Box::new(memtable.scan( Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_BEGIN)), Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_END)), ))); } let memtable_iter = MergeIterator::create(memtable_iters); let mut l0_iters = Vec::with_capacity(snapshot.l0_sstables.len()); let keep_table = |key: &[u8], table: &SsTable| { if key_within( key, table.first_key().as_key_slice(), table.last_key().as_key_slice(), ) { if let Some(bloom) = &table.bloom { if bloom.may_contain(farmhash::fingerprint32(key)) { return true; } } else { return true; } } false }; for table in snapshot.l0_sstables.iter() { let table = snapshot.sstables[table].clone(); if keep_table(key, &table) { l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_key( table, KeySlice::from_slice(key, key::TS_RANGE_BEGIN), )?)); } } let l0_iter = MergeIterator::create(l0_iters); let mut level_iters = Vec::with_capacity(snapshot.levels.len()); for (_, level_sst_ids) in &snapshot.levels { let mut level_ssts = Vec::with_capacity(level_sst_ids.len()); for table in level_sst_ids { let table = snapshot.sstables[table].clone(); if keep_table(key, &table) { level_ssts.push(table); } } let level_iter = SstConcatIterator::create_and_seek_to_key( level_ssts, KeySlice::from_slice(key, key::TS_RANGE_BEGIN), )?; level_iters.push(Box::new(level_iter)); } let iter = LsmIterator::new( TwoMergeIterator::create( TwoMergeIterator::create(memtable_iter, l0_iter)?, MergeIterator::create(level_iters), )?, Bound::Unbounded, read_ts, )?; if iter.is_valid() && iter.key() == key && !iter.value().is_empty() { return Ok(Some(Bytes::copy_from_slice(iter.value()))); } Ok(None) } pub fn write_batch_inner>(&self, batch: &[WriteBatchRecord]) -> Result { let _lck = self.mvcc().write_lock.lock(); let ts = self.mvcc().latest_commit_ts() + 1; let mut batch_datas: Vec<(key::Key<&[u8]>, &[u8])> = vec![]; let size; for record in batch { match record { WriteBatchRecord::Del(key) => { let key = key.as_ref(); assert!(!key.is_empty(), "key cannot be empty"); batch_datas.push((KeySlice::from_slice(key, ts), b"")); } WriteBatchRecord::Put(key, value) => { let key = key.as_ref(); let value = value.as_ref(); assert!(!key.is_empty(), "key cannot be empty"); assert!(!value.is_empty(), "value cannot be empty"); batch_datas.push((KeySlice::from_slice(key, ts), value)); } } } { let guard = self.state.read(); guard.memtable.put_batch(&batch_datas)?; size = guard.memtable.approximate_size(); } self.try_freeze(size)?; self.mvcc().update_commit_ts(ts); Ok(ts) } pub fn write_batch>( self: &Arc, batch: &[WriteBatchRecord], ) -> Result<()> { if !self.options.serializable { self.write_batch_inner(batch)?; } else { let txn = self.mvcc().new_txn(self.clone(), self.options.serializable); for record in batch { match record { WriteBatchRecord::Del(key) => { txn.delete(key.as_ref()); } WriteBatchRecord::Put(key, value) => { txn.put(key.as_ref(), value.as_ref()); } } } txn.commit()?; } Ok(()) } /// Put a key-value pair into the storage by writing into the current memtable. pub fn put(self: &Arc, key: &[u8], value: &[u8]) -> Result<()> { if !self.options.serializable { self.write_batch_inner(&[WriteBatchRecord::Put(key, value)])?; } else { let txn = self.mvcc().new_txn(self.clone(), self.options.serializable); txn.put(key, value); txn.commit()?; } Ok(()) } /// Remove a key from the storage by writing an empty value. pub fn delete(self: &Arc, key: &[u8]) -> Result<()> { if !self.options.serializable { self.write_batch_inner(&[WriteBatchRecord::Del(key)])?; } else { let txn = self.mvcc().new_txn(self.clone(), self.options.serializable); txn.delete(key); txn.commit()?; } Ok(()) } fn try_freeze(&self, estimated_size: usize) -> Result<()> { if estimated_size >= self.options.target_sst_size { let state_lock = self.state_lock.lock(); let guard = self.state.read(); // the memtable could have already been frozen, check again to ensure we really need to freeze if guard.memtable.approximate_size() >= self.options.target_sst_size { drop(guard); self.force_freeze_memtable(&state_lock)?; } } Ok(()) } pub(crate) fn path_of_sst_static(path: impl AsRef, id: usize) -> PathBuf { path.as_ref().join(format!("{:05}.sst", id)) } pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf { Self::path_of_sst_static(&self.path, id) } pub(crate) fn path_of_wal_static(path: impl AsRef, id: usize) -> PathBuf { path.as_ref().join(format!("{:05}.wal", id)) } pub(crate) fn path_of_wal(&self, id: usize) -> PathBuf { Self::path_of_wal_static(&self.path, id) } pub(super) fn sync_dir(&self) -> Result<()> { File::open(&self.path)?.sync_all()?; Ok(()) } fn freeze_memtable_with_memtable(&self, memtable: Arc) -> Result<()> { let mut guard = self.state.write(); // Swap the current memtable with a new one. let mut snapshot = guard.as_ref().clone(); let old_memtable = std::mem::replace(&mut snapshot.memtable, memtable); // Add the memtable to the immutable memtables. snapshot.imm_memtables.insert(0, old_memtable.clone()); // Update the snapshot. *guard = Arc::new(snapshot); drop(guard); old_memtable.sync_wal()?; Ok(()) } /// Force freeze the current memtable to an immutable memtable pub fn force_freeze_memtable(&self, state_lock_observer: &MutexGuard<'_, ()>) -> Result<()> { let memtable_id = self.next_sst_id(); let memtable = if self.options.enable_wal { Arc::new(MemTable::create_with_wal( memtable_id, self.path_of_wal(memtable_id), )?) } else { Arc::new(MemTable::create(memtable_id)) }; self.freeze_memtable_with_memtable(memtable)?; self.manifest().add_record( state_lock_observer, ManifestRecord::NewMemtable(memtable_id), )?; self.sync_dir()?; Ok(()) } /// Force flush the earliest-created immutable memtable to disk pub fn force_flush_next_imm_memtable(&self) -> Result<()> { let state_lock = self.state_lock.lock(); let flush_memtable; { let guard = self.state.read(); flush_memtable = guard .imm_memtables .last() .expect("no imm memtables!") .clone(); } let mut builder = SsTableBuilder::new(self.options.block_size); flush_memtable.flush(&mut builder)?; let sst_id = flush_memtable.id(); let sst = Arc::new(builder.build( sst_id, Some(self.block_cache.clone()), self.path_of_sst(sst_id), )?); // Add the flushed L0 table to the list. { let mut guard = self.state.write(); let mut snapshot = guard.as_ref().clone(); // Remove the memtable from the immutable memtables. let mem = snapshot.imm_memtables.pop().unwrap(); assert_eq!(mem.id(), sst_id); // Add L0 table if self.compaction_controller.flush_to_l0() { // In leveled compaction or no compaction, simply flush to L0 snapshot.l0_sstables.insert(0, sst_id); } else { // In tiered compaction, create a new tier snapshot.levels.insert(0, (sst_id, vec![sst_id])); } println!("flushed {}.sst with size={}", sst_id, sst.table_size()); snapshot.sstables.insert(sst_id, sst); // Update the snapshot. *guard = Arc::new(snapshot); } if self.options.enable_wal { std::fs::remove_file(self.path_of_wal(sst_id))?; } self.manifest() .add_record(&state_lock, ManifestRecord::Flush(sst_id))?; self.sync_dir()?; Ok(()) } pub fn new_txn(self: &Arc) -> Result> { Ok(self.mvcc().new_txn(self.clone(), self.options.serializable)) } /// Create an iterator over a range of keys. pub fn scan<'a>( self: &'a Arc, lower: Bound<&[u8]>, upper: Bound<&[u8]>, ) -> Result { let txn = self.mvcc().new_txn(self.clone(), self.options.serializable); txn.scan(lower, upper) } pub(crate) fn scan_with_ts( &self, lower: Bound<&[u8]>, upper: Bound<&[u8]>, read_ts: u64, ) -> Result> { let snapshot = { let guard = self.state.read(); Arc::clone(&guard) }; // drop global lock here let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1); memtable_iters.push(Box::new(snapshot.memtable.scan( map_key_bound_plus_ts(lower, key::TS_RANGE_BEGIN), map_key_bound_plus_ts(upper, key::TS_RANGE_END), ))); for memtable in snapshot.imm_memtables.iter() { memtable_iters.push(Box::new(memtable.scan( map_key_bound_plus_ts(lower, key::TS_RANGE_BEGIN), map_key_bound_plus_ts(upper, key::TS_RANGE_END), ))); } let memtable_iter = MergeIterator::create(memtable_iters); let mut table_iters = Vec::with_capacity(snapshot.l0_sstables.len()); for table_id in snapshot.l0_sstables.iter() { let table = snapshot.sstables[table_id].clone(); if range_overlap( lower, upper, table.first_key().as_key_slice(), table.last_key().as_key_slice(), ) { let iter = match lower { Bound::Included(key) => SsTableIterator::create_and_seek_to_key( table, KeySlice::from_slice(key, key::TS_RANGE_BEGIN), )?, Bound::Excluded(key) => { let mut iter = SsTableIterator::create_and_seek_to_key( table, KeySlice::from_slice(key, key::TS_RANGE_BEGIN), )?; while iter.is_valid() && iter.key().key_ref() == key { iter.next()?; } iter } Bound::Unbounded => SsTableIterator::create_and_seek_to_first(table)?, }; table_iters.push(Box::new(iter)); } } let l0_iter = MergeIterator::create(table_iters); let mut level_iters = Vec::with_capacity(snapshot.levels.len()); for (_, level_sst_ids) in &snapshot.levels { let mut level_ssts = Vec::with_capacity(level_sst_ids.len()); for table in level_sst_ids { let table = snapshot.sstables[table].clone(); if range_overlap( lower, upper, table.first_key().as_key_slice(), table.last_key().as_key_slice(), ) { level_ssts.push(table); } } let level_iter = match lower { Bound::Included(key) => SstConcatIterator::create_and_seek_to_key( level_ssts, KeySlice::from_slice(key, key::TS_RANGE_BEGIN), )?, Bound::Excluded(key) => { let mut iter = SstConcatIterator::create_and_seek_to_key( level_ssts, KeySlice::from_slice(key, key::TS_RANGE_BEGIN), )?; while iter.is_valid() && iter.key().key_ref() == key { iter.next()?; } iter } Bound::Unbounded => SstConcatIterator::create_and_seek_to_first(level_ssts)?, }; level_iters.push(Box::new(level_iter)); } let iter = TwoMergeIterator::create(memtable_iter, l0_iter)?; let iter = TwoMergeIterator::create(iter, MergeIterator::create(level_iters))?; Ok(FusedIterator::new(LsmIterator::new( iter, map_bound(upper), read_ts, )?)) } }