add compaction skeleton (#16)
* add compaction skeleton Signed-off-by: Alex Chi <iskyzh@gmail.com> * remove tombstone when compact to bottom-most level Signed-off-by: Alex Chi <iskyzh@gmail.com> * new plan Signed-off-by: Alex Chi Z <iskyzh@gmail.com> --------- Signed-off-by: Alex Chi <iskyzh@gmail.com> Signed-off-by: Alex Chi Z <iskyzh@gmail.com>
This commit is contained in:
72
mini-lsm/src/compact.rs
Normal file
72
mini-lsm/src/compact.rs
Normal file
@@ -0,0 +1,72 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
use crate::{
|
||||
iterators::{merge_iterator::MergeIterator, StorageIterator},
|
||||
lsm_storage::LsmStorage,
|
||||
table::{SsTable, SsTableBuilder, SsTableIterator},
|
||||
};
|
||||
|
||||
struct CompactOptions {
|
||||
block_size: usize,
|
||||
target_sst_size: usize,
|
||||
compact_to_bottom_level: bool,
|
||||
}
|
||||
|
||||
impl LsmStorage {
|
||||
#[allow(dead_code)]
|
||||
fn compact(
|
||||
&self,
|
||||
tables: Vec<Arc<SsTable>>,
|
||||
options: CompactOptions,
|
||||
) -> Result<Vec<Arc<SsTable>>> {
|
||||
let mut iters = Vec::new();
|
||||
iters.reserve(tables.len());
|
||||
for table in tables.iter() {
|
||||
iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
|
||||
table.clone(),
|
||||
)?));
|
||||
}
|
||||
let mut iter = MergeIterator::create(iters);
|
||||
|
||||
let mut builder = None;
|
||||
let mut new_sst = vec![];
|
||||
|
||||
while iter.is_valid() {
|
||||
if builder.is_none() {
|
||||
builder = Some(SsTableBuilder::new(options.block_size));
|
||||
}
|
||||
let builder_inner = builder.as_mut().unwrap();
|
||||
if options.compact_to_bottom_level {
|
||||
if !iter.value().is_empty() {
|
||||
builder_inner.add(iter.key(), iter.value());
|
||||
}
|
||||
} else {
|
||||
builder_inner.add(iter.key(), iter.value());
|
||||
}
|
||||
iter.next()?;
|
||||
|
||||
if builder_inner.estimated_size() >= options.target_sst_size {
|
||||
let sst_id = self.next_sst_id(); // lock dropped here
|
||||
let builder = builder.take().unwrap();
|
||||
let sst = Arc::new(builder.build(
|
||||
sst_id,
|
||||
Some(self.block_cache.clone()),
|
||||
self.path_of_sst(sst_id),
|
||||
)?);
|
||||
new_sst.push(sst);
|
||||
}
|
||||
}
|
||||
if let Some(builder) = builder {
|
||||
let sst_id = self.next_sst_id(); // lock dropped here
|
||||
let sst = Arc::new(builder.build(
|
||||
sst_id,
|
||||
Some(self.block_cache.clone()),
|
||||
self.path_of_sst(sst_id),
|
||||
)?);
|
||||
new_sst.push(sst);
|
||||
}
|
||||
Ok(new_sst)
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod block;
|
||||
mod compact;
|
||||
pub mod iterators;
|
||||
pub mod lsm_iterator;
|
||||
pub mod lsm_storage;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::ops::Bound;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
@@ -27,8 +28,6 @@ pub struct LsmStorageInner {
|
||||
/// L1 - L6 SsTables, sorted by key range.
|
||||
#[allow(dead_code)]
|
||||
levels: Vec<Vec<Arc<SsTable>>>,
|
||||
/// The next SSTable ID.
|
||||
next_sst_id: usize,
|
||||
}
|
||||
|
||||
impl LsmStorageInner {
|
||||
@@ -38,26 +37,32 @@ impl LsmStorageInner {
|
||||
imm_memtables: vec![],
|
||||
l0_sstables: vec![],
|
||||
levels: vec![],
|
||||
next_sst_id: 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The storage interface of the LSM tree.
|
||||
pub struct LsmStorage {
|
||||
inner: Arc<RwLock<Arc<LsmStorageInner>>>,
|
||||
pub(crate) inner: Arc<RwLock<Arc<LsmStorageInner>>>,
|
||||
flush_lock: Mutex<()>,
|
||||
path: PathBuf,
|
||||
block_cache: Arc<BlockCache>,
|
||||
pub(crate) block_cache: Arc<BlockCache>,
|
||||
next_sst_id: AtomicUsize,
|
||||
}
|
||||
|
||||
impl LsmStorage {
|
||||
pub(crate) fn next_sst_id(&self) -> usize {
|
||||
self.next_sst_id
|
||||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
inner: Arc::new(RwLock::new(Arc::new(LsmStorageInner::create()))),
|
||||
flush_lock: Mutex::new(()),
|
||||
path: path.as_ref().to_path_buf(),
|
||||
block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache
|
||||
block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache,
|
||||
next_sst_id: AtomicUsize::new(1),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -121,7 +126,7 @@ impl LsmStorage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn path_of_sst(&self, id: usize) -> PathBuf {
|
||||
pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf {
|
||||
self.path.join(format!("{:05}.sst", id))
|
||||
}
|
||||
|
||||
@@ -142,7 +147,7 @@ impl LsmStorage {
|
||||
let mut snapshot = guard.as_ref().clone();
|
||||
let memtable = std::mem::replace(&mut snapshot.memtable, Arc::new(MemTable::create()));
|
||||
flush_memtable = memtable.clone();
|
||||
sst_id = snapshot.next_sst_id;
|
||||
sst_id = self.next_sst_id();
|
||||
// Add the memtable to the immutable memtables.
|
||||
snapshot.imm_memtables.push(memtable);
|
||||
// Update the snapshot.
|
||||
@@ -169,8 +174,6 @@ impl LsmStorage {
|
||||
snapshot.imm_memtables.pop();
|
||||
// Add L0 table
|
||||
snapshot.l0_sstables.push(sst);
|
||||
// Update SST ID
|
||||
snapshot.next_sst_id += 1;
|
||||
// Update the snapshot.
|
||||
*guard = Arc::new(snapshot);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user