implement mvcc compaction + snapshot

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2024-01-26 16:52:37 +08:00
parent 14c3be390c
commit 6025bb8dca
17 changed files with 300 additions and 63 deletions

View File

@@ -17,4 +17,5 @@ This is an advanced part that deep dives into optimizations and applications of
| 4.11 | Key-Value Separation | | | |
| 4.12 | Column Families | | | |
| 4.13 | Sharding | | | |
| 4.14 | SQL over Mini-LSM | | | |
| 4.14 | Compaction Optimizations | | | |
| 4.15 | SQL over Mini-LSM | | | |

View File

@@ -115,20 +115,45 @@ impl LsmStorageInner {
fn compact_generate_sst_from_iter(
&self,
mut iter: impl for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
_compact_to_bottom_level: bool,
compact_to_bottom_level: bool,
) -> Result<Vec<Arc<SsTable>>> {
let mut builder = None;
let mut new_sst = Vec::new();
let watermark = self.mvcc().watermark();
let mut last_key = Vec::<u8>::new();
let mut first_key_below_watermark = false;
while iter.is_valid() {
if builder.is_none() {
builder = Some(SsTableBuilder::new(self.options.block_size));
}
let builder_inner = builder.as_mut().unwrap();
builder_inner.add(iter.key(), iter.value());
let same_as_last_key = iter.key().key_ref() == last_key;
if !same_as_last_key {
first_key_below_watermark = true;
}
if compact_to_bottom_level
&& !same_as_last_key
&& iter.key().ts() <= watermark
&& iter.value().is_empty()
{
last_key.clear();
last_key.extend(iter.key().key_ref());
iter.next()?;
first_key_below_watermark = false;
continue;
}
if same_as_last_key && iter.key().ts() < watermark {
if !first_key_below_watermark {
iter.next()?;
continue;
}
first_key_below_watermark = false;
}
let builder_inner = builder.as_mut().unwrap();
builder_inner.add(iter.key(), iter.value());
if builder_inner.estimated_size() >= self.options.target_sst_size && !same_as_last_key {
let sst_id = self.next_sst_id();
@@ -145,6 +170,7 @@ impl LsmStorageInner {
last_key.clear();
last_key.extend(iter.key().key_ref());
}
iter.next()?;
}
if let Some(builder) = builder {
@@ -343,9 +369,7 @@ impl LsmStorageInner {
*state = Arc::new(snapshot);
drop(state);
self.sync_dir()?;
self.manifest
.as_ref()
.unwrap()
self.manifest()
.add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?;
ssts_to_remove
};

View File

@@ -7,6 +7,7 @@ pub mod lsm_iterator;
pub mod lsm_storage;
pub mod manifest;
pub mod mem_table;
pub mod mvcc;
pub mod table;
pub mod wal;

View File

@@ -2,7 +2,7 @@ use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::ops::Bound;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use anyhow::{Context, Result};
@@ -22,6 +22,7 @@ use crate::key::{self, KeySlice};
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::{Manifest, ManifestRecord};
use crate::mem_table::{map_bound, map_key_bound_plus_ts, MemTable};
use crate::mvcc::{LsmMvccInner, Transaction, TxnIterator};
use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator};
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
@@ -153,8 +154,7 @@ pub(crate) struct LsmStorageInner {
pub(crate) options: Arc<LsmStorageOptions>,
pub(crate) compaction_controller: CompactionController,
pub(crate) manifest: Option<Manifest>,
pub(crate) ts: Arc<AtomicU64>,
pub(crate) write_lock: Mutex<()>,
pub(crate) mvcc: Option<LsmMvccInner>,
}
/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
@@ -238,6 +238,10 @@ impl MiniLsm {
}))
}
pub fn new_txn(&self) -> Result<Arc<Transaction>> {
self.inner.new_txn()
}
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
self.inner.get(key)
}
@@ -258,11 +262,7 @@ impl MiniLsm {
self.inner.sync()
}
pub fn scan(
&self,
lower: Bound<&[u8]>,
upper: Bound<&[u8]>,
) -> Result<FusedIterator<LsmIterator>> {
pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result<TxnIterator> {
self.inner.scan(lower, upper)
}
@@ -289,6 +289,14 @@ impl LsmStorageInner {
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}
pub(crate) fn mvcc(&self) -> &LsmMvccInner {
self.mvcc.as_ref().unwrap()
}
pub(crate) fn manifest(&self) -> &Manifest {
self.manifest.as_ref().unwrap()
}
/// Start the storage engine by either loading an existing directory or creating a new one if the directory does
/// not exist.
pub(crate) fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Self> {
@@ -408,8 +416,7 @@ impl LsmStorageInner {
compaction_controller,
manifest: Some(manifest),
options: options.into(),
ts: Arc::new(AtomicU64::new(0)),
write_lock: Mutex::new(()),
mvcc: Some(LsmMvccInner::new(0)),
};
storage.sync_dir()?;
@@ -420,8 +427,17 @@ impl LsmStorageInner {
self.state.read().memtable.sync_wal()
}
pub fn new_txn(self: &Arc<Self>) -> Result<Arc<Transaction>> {
Ok(self.mvcc().new_txn(self.clone()))
}
/// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
pub fn get(self: &Arc<Self>, key: &[u8]) -> Result<Option<Bytes>> {
let txn = self.mvcc().new_txn(self.clone());
txn.get(key)
}
pub(crate) fn get_with_ts(&self, key: &[u8], read_ts: u64) -> Result<Option<Bytes>> {
let snapshot = {
let guard = self.state.read();
Arc::clone(&guard)
@@ -491,7 +507,7 @@ impl LsmStorageInner {
MergeIterator::create(level_iters),
)?,
Bound::Unbounded,
self.ts.load(Ordering::SeqCst),
read_ts,
)?;
if iter.is_valid() && iter.key() == key && !iter.value().is_empty() {
@@ -501,8 +517,8 @@ impl LsmStorageInner {
}
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
let _lck = self.write_lock.lock();
let ts = self.ts.fetch_add(1, Ordering::Relaxed);
let _lck = self.mvcc().write_lock.lock();
let ts = self.mvcc().latest_commit_ts() + 1;
for record in batch {
match record {
WriteBatchRecord::Del(key) => {
@@ -531,6 +547,7 @@ impl LsmStorageInner {
}
}
}
self.mvcc().update_commit_ts(ts);
Ok(())
}
@@ -608,7 +625,7 @@ impl LsmStorageInner {
self.freeze_memtable_with_memtable(memtable)?;
self.manifest.as_ref().unwrap().add_record(
self.manifest().add_record(
state_lock_observer,
ManifestRecord::NewMemtable(memtable_id),
)?;
@@ -666,9 +683,7 @@ impl LsmStorageInner {
std::fs::remove_file(self.path_of_wal(sst_id))?;
}
self.manifest
.as_ref()
.unwrap()
self.manifest()
.add_record(&state_lock, ManifestRecord::Flush(sst_id))?;
self.sync_dir()?;
@@ -677,10 +692,20 @@ impl LsmStorageInner {
}
/// Create an iterator over a range of keys.
pub fn scan(
pub fn scan<'a>(
self: &'a Arc<Self>,
lower: Bound<&[u8]>,
upper: Bound<&[u8]>,
) -> Result<TxnIterator> {
let txn = self.mvcc().new_txn(self.clone());
txn.scan(lower, upper)
}
pub(crate) fn scan_with_ts(
&self,
lower: Bound<&[u8]>,
upper: Bound<&[u8]>,
read_ts: u64,
) -> Result<FusedIterator<LsmIterator>> {
let snapshot = {
let guard = self.state.read();
@@ -773,7 +798,7 @@ impl LsmStorageInner {
Ok(FusedIterator::new(LsmIterator::new(
iter,
map_bound(upper),
self.ts.load(Ordering::SeqCst),
read_ts,
)?))
}
}

103
mini-lsm-mvcc/src/mvcc.rs Normal file
View File

@@ -0,0 +1,103 @@
mod watermark;
use std::{ops::Bound, sync::Arc};
use anyhow::Result;
use bytes::Bytes;
use parking_lot::Mutex;
use crate::{
iterators::StorageIterator,
lsm_iterator::{FusedIterator, LsmIterator},
lsm_storage::LsmStorageInner,
};
use self::watermark::Watermark;
pub(crate) struct LsmMvccInner {
pub(crate) write_lock: Mutex<()>,
pub(crate) ts: Arc<Mutex<(u64, Watermark)>>,
}
impl LsmMvccInner {
pub fn new(initial_ts: u64) -> Self {
Self {
write_lock: Mutex::new(()),
ts: Arc::new(Mutex::new((initial_ts, Watermark::new()))),
}
}
pub fn latest_commit_ts(&self) -> u64 {
self.ts.lock().0
}
pub fn update_commit_ts(&self, ts: u64) {
self.ts.lock().0 = ts;
}
/// All ts (strictly) below this ts can be garbage collected.
pub fn watermark(&self) -> u64 {
let ts = self.ts.lock();
ts.1.watermark().unwrap_or(ts.0)
}
pub fn new_txn(&self, inner: Arc<LsmStorageInner>) -> Arc<Transaction> {
let mut ts = self.ts.lock();
let read_ts = ts.0;
ts.1.add_reader(read_ts);
Arc::new(Transaction { inner, read_ts })
}
}
pub struct Transaction {
read_ts: u64,
inner: Arc<LsmStorageInner>,
}
impl Transaction {
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
self.inner.get_with_ts(key, self.read_ts)
}
pub fn scan(self: &Arc<Self>, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result<TxnIterator> {
Ok(TxnIterator {
_txn: self.clone(),
iter: self.inner.scan_with_ts(lower, upper, self.read_ts)?,
})
}
}
impl Drop for Transaction {
fn drop(&mut self) {
self.inner.mvcc().ts.lock().1.remove_reader(self.read_ts)
}
}
pub struct TxnIterator {
_txn: Arc<Transaction>,
iter: FusedIterator<LsmIterator>,
}
impl StorageIterator for TxnIterator {
type KeyType<'a> = &'a [u8] where Self: 'a;
fn value(&self) -> &[u8] {
self.iter.value()
}
fn key(&self) -> Self::KeyType<'_> {
self.iter.key()
}
fn is_valid(&self) -> bool {
self.iter.is_valid()
}
fn next(&mut self) -> Result<()> {
self.iter.next()
}
fn num_active_iterators(&self) -> usize {
self.iter.num_active_iterators()
}
}

View File

@@ -0,0 +1,29 @@
use std::collections::BTreeMap;
pub struct Watermark {
readers: BTreeMap<u64, usize>,
}
impl Watermark {
pub fn new() -> Self {
Self {
readers: BTreeMap::new(),
}
}
pub fn add_reader(&mut self, ts: u64) {
*self.readers.entry(ts).or_default() += 1;
}
pub fn remove_reader(&mut self, ts: u64) {
let cnt = self.readers.get_mut(&ts).unwrap();
*cnt -= 1;
if *cnt == 0 {
self.readers.remove(&ts);
}
}
pub fn watermark(&self) -> Option<u64> {
self.readers.first_key_value().map(|(ts, _)| *ts)
}
}

View File

@@ -10,3 +10,5 @@ mod week2_day1;
mod week2_day2;
mod week2_day3;
mod week2_day4;
// mod week2_day5;
// mod week2_day6;

View File

@@ -0,0 +1 @@
../../../mini-lsm/src/tests/week2_day5.rs

View File

@@ -0,0 +1 @@
../../../mini-lsm/src/tests/week2_day6.rs

View File

@@ -162,6 +162,10 @@ impl MiniLsm {
}))
}
pub fn new_txn(&self) -> Result<()> {
self.inner.new_txn()
}
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
self.inner.write_batch(batch)
}
@@ -300,6 +304,11 @@ impl LsmStorageInner {
unimplemented!()
}
pub fn new_txn(&self) -> Result<()> {
// no-op
Ok(())
}
/// Create an iterator over a range of keys.
pub fn scan(
&self,

View File

@@ -256,6 +256,10 @@ impl MiniLsm {
self.inner.sync()
}
pub fn new_txn(&self) -> Result<()> {
self.inner.new_txn()
}
pub fn scan(
&self,
lower: Bound<&[u8]>,
@@ -668,6 +672,11 @@ impl LsmStorageInner {
Ok(())
}
pub fn new_txn(&self) -> Result<()> {
// no-op
Ok(())
}
/// Create an iterator over a range of keys.
pub fn scan(
&self,

View File

@@ -184,25 +184,7 @@ pub fn compaction_bench(storage: Arc<MiniLsm>) {
max_key = max_key.max(i);
}
}
let mut expected_key_value_pairs = Vec::new();
for i in 0..(max_key + 40000) {
let key = gen_key(i);
let value = storage.get(key.as_bytes()).unwrap();
if let Some(val) = key_map.get(&i) {
let expected_value = gen_value(*val);
assert_eq!(value, Some(Bytes::from(expected_value.clone())));
expected_key_value_pairs.push((Bytes::from(key), Bytes::from(expected_value)));
} else {
assert!(value.is_none());
}
}
check_lsm_iter_result_by_key(
&mut storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
expected_key_value_pairs,
);
std::thread::sleep(Duration::from_secs(1)); // wait until all memtables flush
while {
let snapshot = storage.inner.state.read();
!snapshot.imm_memtables.is_empty()
@@ -222,6 +204,24 @@ pub fn compaction_bench(storage: Arc<MiniLsm>) {
println!("waiting for compaction to converge");
}
let mut expected_key_value_pairs = Vec::new();
for i in 0..(max_key + 40000) {
let key = gen_key(i);
let value = storage.get(key.as_bytes()).unwrap();
if let Some(val) = key_map.get(&i) {
let expected_value = gen_value(*val);
assert_eq!(value, Some(Bytes::from(expected_value.clone())));
expected_key_value_pairs.push((Bytes::from(key), Bytes::from(expected_value)));
} else {
assert!(value.is_none());
}
}
check_lsm_iter_result_by_key(
&mut storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
expected_key_value_pairs,
);
storage.dump_structure();
println!("This test case does not guarantee your compaction algorithm produces a LSM state as expected. It only does minimal checks on the size of the levels. Please use the compaction simulator to check if the compaction is correctly going on.");
@@ -243,6 +243,11 @@ pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
};
level_size.push(size);
}
let num_iters = storage
.scan(Bound::Unbounded, Bound::Unbounded)
.unwrap()
.num_active_iterators();
let num_memtables = storage.inner.state.read().imm_memtables.len() + 1;
match compaction_options {
CompactionOptions::NoCompaction => unreachable!(),
CompactionOptions::Simple(SimpleLeveledCompactionOptions {
@@ -268,6 +273,10 @@ pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
size_ratio_percent
);
}
assert!(
num_iters <= l0_sst_num + num_memtables + max_levels,
"did you use concat iterators?"
);
}
CompactionOptions::Leveled(LeveledCompactionOptions {
level_size_multiplier,
@@ -291,6 +300,10 @@ pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
level_size_multiplier
);
}
assert!(
num_iters <= l0_sst_num + num_memtables + max_levels,
"did you use concat iterators?"
);
}
CompactionOptions::Tiered(TieredCompactionOptions {
num_tiers,
@@ -329,6 +342,10 @@ pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
}
sum_size += this_size;
}
assert!(
num_iters <= num_memtables + num_tiers,
"did you use concat iterators?"
);
}
}
}

View File

@@ -1,3 +1,5 @@
use std::sync::Arc;
use tempfile::tempdir;
use crate::{
@@ -51,8 +53,9 @@ fn test_task1_memtable_overwrite() {
#[test]
fn test_task2_storage_integration() {
let dir = tempdir().unwrap();
let storage =
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap();
let storage = Arc::new(
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(),
);
assert_eq!(&storage.get(b"0").unwrap(), &None);
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
@@ -99,7 +102,7 @@ fn test_task3_freeze_on_capacity() {
let mut options = LsmStorageOptions::default_for_week1_test();
options.target_sst_size = 1024;
options.num_memtable_limit = 1000;
let storage = LsmStorageInner::open(dir.path(), options).unwrap();
let storage = Arc::new(LsmStorageInner::open(dir.path(), options).unwrap());
for _ in 0..1000 {
storage.put(b"1", b"2333").unwrap();
}
@@ -117,8 +120,9 @@ fn test_task3_freeze_on_capacity() {
#[test]
fn test_task4_storage_integration() {
let dir = tempdir().unwrap();
let storage =
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap();
let storage = Arc::new(
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(),
);
assert_eq!(&storage.get(b"0").unwrap(), &None);
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();

View File

@@ -1,4 +1,4 @@
use std::ops::Bound;
use std::{ops::Bound, sync::Arc};
use bytes::Bytes;
use tempfile::tempdir;
@@ -262,8 +262,9 @@ fn test_task3_fused_iterator() {
#[test]
fn test_task4_integration() {
let dir = tempdir().unwrap();
let storage =
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap();
let storage = Arc::new(
LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(),
);
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();

View File

@@ -1,4 +1,5 @@
use std::ops::Bound;
use std::sync::Arc;
use self::harness::{check_iter_result_by_key, MockIterator};
use self::harness::{check_lsm_iter_result_by_key, generate_sst};
@@ -130,7 +131,8 @@ fn test_task1_merge_5() {
#[test]
fn test_task2_storage_scan() {
let dir = tempdir().unwrap();
let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap();
let storage =
Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap());
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"00", b"2333").unwrap();
@@ -190,7 +192,8 @@ fn test_task2_storage_scan() {
#[test]
fn test_task3_storage_get() {
let dir = tempdir().unwrap();
let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap();
let storage =
Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap());
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"00", b"2333").unwrap();

View File

@@ -1,4 +1,4 @@
use std::{ops::Bound, time::Duration};
use std::{ops::Bound, sync::Arc, time::Duration};
use bytes::Bytes;
use tempfile::tempdir;
@@ -14,7 +14,8 @@ use crate::{
#[test]
fn test_task1_storage_scan() {
let dir = tempdir().unwrap();
let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap();
let storage =
Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap());
storage.put(b"0", b"2333333").unwrap();
storage.put(b"00", b"2333333").unwrap();
storage.put(b"4", b"23").unwrap();
@@ -67,7 +68,8 @@ fn test_task1_storage_scan() {
#[test]
fn test_task1_storage_get() {
let dir = tempdir().unwrap();
let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap();
let storage =
Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap());
storage.put(b"0", b"2333333").unwrap();
storage.put(b"00", b"2333333").unwrap();
storage.put(b"4", b"23").unwrap();
@@ -137,7 +139,8 @@ fn test_task2_auto_flush() {
#[test]
fn test_task3_sst_filter() {
let dir = tempdir().unwrap();
let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap();
let storage =
Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap());
for i in 1..=10000 {
if i % 1000 == 0 {

View File

@@ -39,7 +39,10 @@ fn construct_merge_iterator_over_storage(
fn test_task1_full_compaction() {
// We do not use LSM iterator in this test because it's implemented as part of task 3
let dir = tempdir().unwrap();
let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap();
let storage =
Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap());
#[allow(clippy::let_unit_value)]
let _txn = storage.new_txn().unwrap();
storage.put(b"0", b"v1").unwrap();
sync(&storage);
storage.put(b"0", b"v2").unwrap();
@@ -211,7 +214,8 @@ fn test_task2_concat_iterator() {
#[test]
fn test_task3_integration() {
let dir = tempdir().unwrap();
let storage = LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap();
let storage =
Arc::new(LsmStorageInner::open(&dir, LsmStorageOptions::default_for_week1_test()).unwrap());
storage.put(b"0", b"2333333").unwrap();
storage.put(b"00", b"2333333").unwrap();
storage.put(b"4", b"23").unwrap();