@@ -19,7 +19,7 @@ use crate::iterators::merge_iterator::MergeIterator;
|
||||
use crate::iterators::two_merge_iterator::TwoMergeIterator;
|
||||
use crate::iterators::StorageIterator;
|
||||
use crate::key::KeySlice;
|
||||
use crate::lsm_storage::{LsmStorageInner, LsmStorageState};
|
||||
use crate::lsm_storage::{CompactionFilter, LsmStorageInner, LsmStorageState};
|
||||
use crate::manifest::ManifestRecord;
|
||||
use crate::table::{SsTable, SsTableBuilder, SsTableIterator};
|
||||
|
||||
@@ -122,7 +122,8 @@ impl LsmStorageInner {
|
||||
let watermark = self.mvcc().watermark();
|
||||
let mut last_key = Vec::<u8>::new();
|
||||
let mut first_key_below_watermark = false;
|
||||
while iter.is_valid() {
|
||||
let compaction_filters = self.compaction_filters.lock().clone();
|
||||
'outer: while iter.is_valid() {
|
||||
if builder.is_none() {
|
||||
builder = Some(SsTableBuilder::new(self.options.block_size));
|
||||
}
|
||||
@@ -144,12 +145,26 @@ impl LsmStorageInner {
|
||||
continue;
|
||||
}
|
||||
|
||||
if same_as_last_key && iter.key().ts() <= watermark {
|
||||
if !first_key_below_watermark {
|
||||
if iter.key().ts() <= watermark {
|
||||
if same_as_last_key && !first_key_below_watermark {
|
||||
iter.next()?;
|
||||
continue;
|
||||
}
|
||||
|
||||
first_key_below_watermark = false;
|
||||
|
||||
if !compaction_filters.is_empty() {
|
||||
for filter in &compaction_filters {
|
||||
match filter {
|
||||
CompactionFilter::Prefix(x) => {
|
||||
if iter.key().key_ref().starts_with(x) {
|
||||
iter.next()?;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let builder_inner = builder.as_mut().unwrap();
|
||||
|
||||
@@ -53,8 +53,10 @@ impl<
|
||||
|
||||
fn key(&self) -> A::KeyType<'_> {
|
||||
if self.choose_a {
|
||||
debug_assert!(self.a.is_valid());
|
||||
self.a.key()
|
||||
} else {
|
||||
debug_assert!(self.b.is_valid());
|
||||
self.b.key()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,6 +149,11 @@ fn key_within(user_key: &[u8], table_begin: KeySlice, table_end: KeySlice) -> bo
|
||||
table_begin.key_ref() <= user_key && user_key <= table_end.key_ref()
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum CompactionFilter {
|
||||
Prefix(Bytes),
|
||||
}
|
||||
|
||||
/// The storage interface of the LSM tree.
|
||||
pub(crate) struct LsmStorageInner {
|
||||
pub(crate) state: Arc<RwLock<Arc<LsmStorageState>>>,
|
||||
@@ -160,6 +165,7 @@ pub(crate) struct LsmStorageInner {
|
||||
pub(crate) compaction_controller: CompactionController,
|
||||
pub(crate) manifest: Option<Manifest>,
|
||||
pub(crate) mvcc: Option<LsmMvccInner>,
|
||||
pub(crate) compaction_filters: Arc<Mutex<Vec<CompactionFilter>>>,
|
||||
}
|
||||
|
||||
/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
|
||||
@@ -243,6 +249,10 @@ impl MiniLsm {
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) {
|
||||
self.inner.add_compaction_filter(compaction_filter)
|
||||
}
|
||||
|
||||
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
|
||||
self.inner.get(key)
|
||||
}
|
||||
@@ -431,12 +441,18 @@ impl LsmStorageInner {
|
||||
manifest: Some(manifest),
|
||||
options: options.into(),
|
||||
mvcc: Some(LsmMvccInner::new(last_commit_ts)),
|
||||
compaction_filters: Arc::new(Mutex::new(Vec::new())),
|
||||
};
|
||||
storage.sync_dir()?;
|
||||
|
||||
Ok(storage)
|
||||
}
|
||||
|
||||
pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) {
|
||||
let mut compaction_filters = self.compaction_filters.lock();
|
||||
compaction_filters.push(compaction_filter);
|
||||
}
|
||||
|
||||
pub fn sync(&self) -> Result<()> {
|
||||
self.state.read().memtable.sync_wal()
|
||||
}
|
||||
|
||||
@@ -18,3 +18,4 @@ mod week3_day3;
|
||||
mod week3_day4;
|
||||
mod week3_day5;
|
||||
mod week3_day6;
|
||||
mod week3_day7;
|
||||
|
||||
70
mini-lsm-mvcc/src/tests/week3_day7.rs
Normal file
70
mini-lsm-mvcc/src/tests/week3_day7.rs
Normal file
@@ -0,0 +1,70 @@
|
||||
use bytes::Bytes;
|
||||
use tempfile::tempdir;
|
||||
|
||||
use crate::{
|
||||
compact::CompactionOptions,
|
||||
lsm_storage::{CompactionFilter, LsmStorageOptions, MiniLsm, WriteBatchRecord},
|
||||
};
|
||||
|
||||
use super::harness::{check_iter_result_by_key, construct_merge_iterator_over_storage};
|
||||
|
||||
#[test]
|
||||
fn test_task3_mvcc_compaction() {
|
||||
let dir = tempdir().unwrap();
|
||||
let options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction);
|
||||
let storage = MiniLsm::open(&dir, options.clone()).unwrap();
|
||||
storage
|
||||
.write_batch(&[
|
||||
WriteBatchRecord::Put("table1_a", "1"),
|
||||
WriteBatchRecord::Put("table1_b", "1"),
|
||||
WriteBatchRecord::Put("table1_c", "1"),
|
||||
WriteBatchRecord::Put("table2_a", "1"),
|
||||
WriteBatchRecord::Put("table2_b", "1"),
|
||||
WriteBatchRecord::Put("table2_c", "1"),
|
||||
])
|
||||
.unwrap();
|
||||
storage.force_flush().unwrap();
|
||||
let snapshot0 = storage.new_txn().unwrap();
|
||||
storage
|
||||
.write_batch(&[
|
||||
WriteBatchRecord::Put("table1_a", "2"),
|
||||
WriteBatchRecord::Del("table1_b"),
|
||||
WriteBatchRecord::Put("table1_c", "2"),
|
||||
WriteBatchRecord::Put("table2_a", "2"),
|
||||
WriteBatchRecord::Del("table2_b"),
|
||||
WriteBatchRecord::Put("table2_c", "2"),
|
||||
])
|
||||
.unwrap();
|
||||
storage.force_flush().unwrap();
|
||||
storage.add_compaction_filter(CompactionFilter::Prefix(Bytes::from("table2_")));
|
||||
storage.force_full_compaction().unwrap();
|
||||
|
||||
let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read());
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from("table1_a"), Bytes::from("2")),
|
||||
(Bytes::from("table1_a"), Bytes::from("1")),
|
||||
(Bytes::from("table1_b"), Bytes::new()),
|
||||
(Bytes::from("table1_b"), Bytes::from("1")),
|
||||
(Bytes::from("table1_c"), Bytes::from("2")),
|
||||
(Bytes::from("table1_c"), Bytes::from("1")),
|
||||
(Bytes::from("table2_a"), Bytes::from("2")),
|
||||
(Bytes::from("table2_b"), Bytes::new()),
|
||||
(Bytes::from("table2_c"), Bytes::from("2")),
|
||||
],
|
||||
);
|
||||
|
||||
drop(snapshot0);
|
||||
|
||||
storage.force_full_compaction().unwrap();
|
||||
|
||||
let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read());
|
||||
check_iter_result_by_key(
|
||||
&mut iter,
|
||||
vec![
|
||||
(Bytes::from("table1_a"), Bytes::from("2")),
|
||||
(Bytes::from("table1_c"), Bytes::from("2")),
|
||||
],
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user