patch memtable and add ts for wal

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2024-01-25 23:09:16 +08:00
parent 89acc23208
commit 218c73f384
11 changed files with 256 additions and 96 deletions

View File

@@ -2,7 +2,7 @@ use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::ops::Bound;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use anyhow::{Context, Result};
@@ -21,7 +21,7 @@ use crate::iterators::StorageIterator;
use crate::key::{self, KeySlice};
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::{Manifest, ManifestRecord};
use crate::mem_table::{map_bound, MemTable};
use crate::mem_table::{map_bound, map_key_bound_plus_ts, MemTable};
use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator};
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
@@ -152,6 +152,8 @@ pub(crate) struct LsmStorageInner {
pub(crate) options: Arc<LsmStorageOptions>,
pub(crate) compaction_controller: CompactionController,
pub(crate) manifest: Option<Manifest>,
pub(crate) ts: Arc<AtomicU64>,
pub(crate) write_lock: Mutex<()>,
}
/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
@@ -405,6 +407,8 @@ impl LsmStorageInner {
compaction_controller,
manifest: Some(manifest),
options: options.into(),
ts: Arc::new(AtomicU64::new(0)),
write_lock: Mutex::new(()),
};
storage.sync_dir()?;
@@ -422,25 +426,18 @@ impl LsmStorageInner {
Arc::clone(&guard)
}; // drop global lock here
// Search on the current memtable.
if let Some(value) = snapshot.memtable.get(key) {
if value.is_empty() {
// found tomestone, return key not exists
return Ok(None);
}
return Ok(Some(value));
}
// Search on immutable memtables.
let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1);
memtable_iters.push(Box::new(snapshot.memtable.scan(
Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_BEGIN)),
Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_END)),
)));
for memtable in snapshot.imm_memtables.iter() {
if let Some(value) = memtable.get(key) {
if value.is_empty() {
// found tomestone, return key not exists
return Ok(None);
}
return Ok(Some(value));
}
memtable_iters.push(Box::new(memtable.scan(
Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_BEGIN)),
Bound::Included(KeySlice::from_slice(key, key::TS_RANGE_END)),
)));
}
let memtable_iter = MergeIterator::create(memtable_iters);
let mut l0_iters = Vec::with_capacity(snapshot.l0_sstables.len());
@@ -466,7 +463,7 @@ impl LsmStorageInner {
if keep_table(key, &table) {
l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_key(
table,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?));
}
}
@@ -482,20 +479,29 @@ impl LsmStorageInner {
}
let level_iter = SstConcatIterator::create_and_seek_to_key(
level_ssts,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?;
level_iters.push(Box::new(level_iter));
}
let iter = TwoMergeIterator::create(l0_iter, MergeIterator::create(level_iters))?;
let iter = LsmIterator::new(
TwoMergeIterator::create(
TwoMergeIterator::create(memtable_iter, l0_iter)?,
MergeIterator::create(level_iters),
)?,
Bound::Unbounded,
self.ts.load(Ordering::SeqCst),
)?;
if iter.is_valid() && iter.key().key_ref() == key && !iter.value().is_empty() {
if iter.is_valid() && iter.key() == key && !iter.value().is_empty() {
return Ok(Some(Bytes::copy_from_slice(iter.value())));
}
Ok(None)
}
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
let _lck = self.write_lock.lock();
let ts = self.ts.fetch_add(1, Ordering::Relaxed);
for record in batch {
match record {
WriteBatchRecord::Del(key) => {
@@ -504,7 +510,7 @@ impl LsmStorageInner {
let size;
{
let guard = self.state.read();
guard.memtable.put(key, b"")?;
guard.memtable.put(KeySlice::from_slice(key, ts), b"")?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
@@ -517,7 +523,7 @@ impl LsmStorageInner {
let size;
{
let guard = self.state.read();
guard.memtable.put(key, value)?;
guard.memtable.put(KeySlice::from_slice(key, ts), value)?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
@@ -681,9 +687,15 @@ impl LsmStorageInner {
}; // drop global lock here
let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1);
memtable_iters.push(Box::new(snapshot.memtable.scan(lower, upper)));
memtable_iters.push(Box::new(snapshot.memtable.scan(
map_key_bound_plus_ts(lower, key::TS_RANGE_BEGIN),
map_key_bound_plus_ts(upper, key::TS_RANGE_END),
)));
for memtable in snapshot.imm_memtables.iter() {
memtable_iters.push(Box::new(memtable.scan(lower, upper)));
memtable_iters.push(Box::new(memtable.scan(
map_key_bound_plus_ts(lower, key::TS_RANGE_BEGIN),
map_key_bound_plus_ts(upper, key::TS_RANGE_END),
)));
}
let memtable_iter = MergeIterator::create(memtable_iters);
@@ -699,12 +711,12 @@ impl LsmStorageInner {
let iter = match lower {
Bound::Included(key) => SsTableIterator::create_and_seek_to_key(
table,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?,
Bound::Excluded(key) => {
let mut iter = SsTableIterator::create_and_seek_to_key(
table,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?;
if iter.is_valid() && iter.key().key_ref() == key {
iter.next()?;
@@ -737,12 +749,12 @@ impl LsmStorageInner {
let level_iter = match lower {
Bound::Included(key) => SstConcatIterator::create_and_seek_to_key(
level_ssts,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?,
Bound::Excluded(key) => {
let mut iter = SstConcatIterator::create_and_seek_to_key(
level_ssts,
KeySlice::from_slice(key, key::TS_DEFAULT),
KeySlice::from_slice(key, key::TS_RANGE_BEGIN),
)?;
if iter.is_valid() && iter.key().key_ref() == key {
iter.next()?;
@@ -760,6 +772,7 @@ impl LsmStorageInner {
Ok(FusedIterator::new(LsmIterator::new(
iter,
map_bound(upper),
self.ts.load(Ordering::SeqCst),
)?))
}
}