From 78ec7c937593f9b1a442bd7f762bb879dfbb0c64 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Fri, 26 Jan 2024 18:14:34 +0800 Subject: [PATCH] finish skeleton for serializability check Signed-off-by: Alex Chi --- README.md | 2 +- .../src/iterators/two_merge_iterator.rs | 14 +- mini-lsm-mvcc/src/lsm_storage.rs | 26 ++- mini-lsm-mvcc/src/mvcc.rs | 94 +++----- mini-lsm-mvcc/src/mvcc/txn.rs | 221 ++++++++++++++++++ mini-lsm-starter/src/bin/mini-lsm-cli.rs | 3 + mini-lsm-starter/src/lsm_storage.rs | 4 + mini-lsm/src/lsm_storage.rs | 4 + 8 files changed, 284 insertions(+), 84 deletions(-) create mode 100644 mini-lsm-mvcc/src/mvcc/txn.rs diff --git a/README.md b/README.md index 13a8f27..bcaff76 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ We are working on chapter 3 and more test cases for all existing contents. | 3.2 | Snapshot Read - Blocks, Memtables, and SSTs | ✅ | 🚧 | | | 3.3 | Snapshot Read - Engine Read Path | ✅ | 🚧 | | | 3.4 | Watermark and Garbage Collection | ✅ | 🚧 | | -| 3.5 | Transactions and Optimistic Concurrency Control | 🚧 | | | +| 3.5 | Transactions and Optimistic Concurrency Control | ✅ | | | | 3.6 | Serializable Snapshot Isolation | 🚧 | | | | 3.7 | Compaction Filter | 🚧 | | | diff --git a/mini-lsm-mvcc/src/iterators/two_merge_iterator.rs b/mini-lsm-mvcc/src/iterators/two_merge_iterator.rs index 781055a..8488cd2 100644 --- a/mini-lsm-mvcc/src/iterators/two_merge_iterator.rs +++ b/mini-lsm-mvcc/src/iterators/two_merge_iterator.rs @@ -1,7 +1,5 @@ use anyhow::Result; -use crate::key::KeySlice; - use super::StorageIterator; /// Merges two iterators of different types into one. If the two iterators have the same key, only @@ -13,8 +11,8 @@ pub struct TwoMergeIterator { } impl< - A: 'static + for<'a> StorageIterator = KeySlice<'a>>, - B: 'static + for<'a> StorageIterator = KeySlice<'a>>, + A: 'static + StorageIterator, + B: 'static + for<'a> StorageIterator = A::KeyType<'a>>, > TwoMergeIterator { fn choose_a(a: &A, b: &B) -> bool { @@ -47,13 +45,13 @@ impl< } impl< - A: 'static + for<'a> StorageIterator = KeySlice<'a>>, - B: 'static + for<'a> StorageIterator = KeySlice<'a>>, + A: 'static + StorageIterator, + B: 'static + for<'a> StorageIterator = A::KeyType<'a>>, > StorageIterator for TwoMergeIterator { - type KeyType<'a> = KeySlice<'a>; + type KeyType<'a> = A::KeyType<'a>; - fn key(&self) -> KeySlice { + fn key(&self) -> A::KeyType<'_> { if self.choose_a { self.a.key() } else { diff --git a/mini-lsm-mvcc/src/lsm_storage.rs b/mini-lsm-mvcc/src/lsm_storage.rs index 8bcc411..58a114d 100644 --- a/mini-lsm-mvcc/src/lsm_storage.rs +++ b/mini-lsm-mvcc/src/lsm_storage.rs @@ -22,7 +22,8 @@ use crate::key::{self, KeySlice}; use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::manifest::{Manifest, ManifestRecord}; use crate::mem_table::{map_bound, map_key_bound_plus_ts, MemTable}; -use crate::mvcc::{LsmMvccInner, Transaction, TxnIterator}; +use crate::mvcc::txn::{Transaction, TxnIterator}; +use crate::mvcc::LsmMvccInner; use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator}; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; @@ -79,6 +80,7 @@ pub struct LsmStorageOptions { pub num_memtable_limit: usize, pub compaction_options: CompactionOptions, pub enable_wal: bool, + pub serializable: bool, } impl LsmStorageOptions { @@ -89,6 +91,7 @@ impl LsmStorageOptions { compaction_options: CompactionOptions::NoCompaction, enable_wal: false, num_memtable_limit: 50, + serializable: false, } } @@ -99,6 +102,7 @@ impl LsmStorageOptions { compaction_options: CompactionOptions::NoCompaction, enable_wal: false, num_memtable_limit: 2, + serializable: false, } } @@ -109,6 +113,7 @@ impl LsmStorageOptions { compaction_options, enable_wal: false, num_memtable_limit: 2, + serializable: false, } } } @@ -246,7 +251,7 @@ impl MiniLsm { self.inner.get(key) } - pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result<()> { + pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result { self.inner.write_batch(batch) } @@ -428,12 +433,11 @@ impl LsmStorageInner { } pub fn new_txn(self: &Arc) -> Result> { - Ok(self.mvcc().new_txn(self.clone())) + Ok(self.mvcc().new_txn(self.clone(), self.options.serializable)) } - /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. pub fn get(self: &Arc, key: &[u8]) -> Result> { - let txn = self.mvcc().new_txn(self.clone()); + let txn = self.mvcc().new_txn(self.clone(), self.options.serializable); txn.get(key) } @@ -516,7 +520,7 @@ impl LsmStorageInner { Ok(None) } - pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result<()> { + pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result { let _lck = self.mvcc().write_lock.lock(); let ts = self.mvcc().latest_commit_ts() + 1; for record in batch { @@ -548,17 +552,19 @@ impl LsmStorageInner { } } self.mvcc().update_commit_ts(ts); - Ok(()) + Ok(ts) } /// Put a key-value pair into the storage by writing into the current memtable. pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.write_batch(&[WriteBatchRecord::Put(key, value)]) + self.write_batch(&[WriteBatchRecord::Put(key, value)])?; + Ok(()) } /// Remove a key from the storage by writing an empty value. pub fn delete(&self, key: &[u8]) -> Result<()> { - self.write_batch(&[WriteBatchRecord::Del(key)]) + self.write_batch(&[WriteBatchRecord::Del(key)])?; + Ok(()) } fn try_freeze(&self, estimated_size: usize) -> Result<()> { @@ -697,7 +703,7 @@ impl LsmStorageInner { lower: Bound<&[u8]>, upper: Bound<&[u8]>, ) -> Result { - let txn = self.mvcc().new_txn(self.clone()); + let txn = self.mvcc().new_txn(self.clone(), self.options.serializable); txn.scan(lower, upper) } diff --git a/mini-lsm-mvcc/src/mvcc.rs b/mini-lsm-mvcc/src/mvcc.rs index 526c5ff..868be0e 100644 --- a/mini-lsm-mvcc/src/mvcc.rs +++ b/mini-lsm-mvcc/src/mvcc.rs @@ -1,22 +1,28 @@ +pub mod txn; mod watermark; -use std::{ops::Bound, sync::Arc}; - -use anyhow::Result; -use bytes::Bytes; -use parking_lot::Mutex; - -use crate::{ - iterators::StorageIterator, - lsm_iterator::{FusedIterator, LsmIterator}, - lsm_storage::LsmStorageInner, +use std::{ + collections::{BTreeMap, HashSet}, + sync::{atomic::AtomicBool, Arc}, }; -use self::watermark::Watermark; +use crossbeam_skiplist::SkipMap; +use parking_lot::Mutex; + +use crate::lsm_storage::LsmStorageInner; + +use self::{txn::Transaction, watermark::Watermark}; + +pub(crate) struct CommittedTxnData { + pub(crate) key_hashes: Vec, + pub(crate) read_ts: u64, + pub(crate) commit_ts: u64, +} pub(crate) struct LsmMvccInner { pub(crate) write_lock: Mutex<()>, pub(crate) ts: Arc>, + pub(crate) committed_txns: Arc>>, } impl LsmMvccInner { @@ -24,6 +30,7 @@ impl LsmMvccInner { Self { write_lock: Mutex::new(()), ts: Arc::new(Mutex::new((initial_ts, Watermark::new()))), + committed_txns: Arc::new(Mutex::new(BTreeMap::new())), } } @@ -41,63 +48,20 @@ impl LsmMvccInner { ts.1.watermark().unwrap_or(ts.0) } - pub fn new_txn(&self, inner: Arc) -> Arc { + pub fn new_txn(&self, inner: Arc, serializable: bool) -> Arc { let mut ts = self.ts.lock(); let read_ts = ts.0; ts.1.add_reader(read_ts); - Arc::new(Transaction { inner, read_ts }) - } -} - -pub struct Transaction { - read_ts: u64, - inner: Arc, -} - -impl Transaction { - pub fn get(&self, key: &[u8]) -> Result> { - self.inner.get_with_ts(key, self.read_ts) - } - - pub fn scan(self: &Arc, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { - Ok(TxnIterator { - _txn: self.clone(), - iter: self.inner.scan_with_ts(lower, upper, self.read_ts)?, + Arc::new(Transaction { + inner, + read_ts, + local_storage: Arc::new(SkipMap::new()), + committed: Arc::new(AtomicBool::new(false)), + key_hashes: if serializable { + Some(Mutex::new(HashSet::new())) + } else { + None + }, }) } } - -impl Drop for Transaction { - fn drop(&mut self) { - self.inner.mvcc().ts.lock().1.remove_reader(self.read_ts) - } -} - -pub struct TxnIterator { - _txn: Arc, - iter: FusedIterator, -} - -impl StorageIterator for TxnIterator { - type KeyType<'a> = &'a [u8] where Self: 'a; - - fn value(&self) -> &[u8] { - self.iter.value() - } - - fn key(&self) -> Self::KeyType<'_> { - self.iter.key() - } - - fn is_valid(&self) -> bool { - self.iter.is_valid() - } - - fn next(&mut self) -> Result<()> { - self.iter.next() - } - - fn num_active_iterators(&self) -> usize { - self.iter.num_active_iterators() - } -} diff --git a/mini-lsm-mvcc/src/mvcc/txn.rs b/mini-lsm-mvcc/src/mvcc/txn.rs new file mode 100644 index 0000000..52d95ba --- /dev/null +++ b/mini-lsm-mvcc/src/mvcc/txn.rs @@ -0,0 +1,221 @@ +use std::{ + collections::HashSet, + ops::Bound, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use anyhow::Result; +use bytes::Bytes; +use crossbeam_skiplist::{map::Entry, SkipMap}; +use ouroboros::self_referencing; +use parking_lot::Mutex; + +use crate::{ + iterators::{two_merge_iterator::TwoMergeIterator, StorageIterator}, + lsm_iterator::{FusedIterator, LsmIterator}, + lsm_storage::{LsmStorageInner, WriteBatchRecord}, + mem_table::map_bound, +}; + +use super::CommittedTxnData; + +pub struct Transaction { + pub(crate) read_ts: u64, + pub(crate) inner: Arc, + pub(crate) local_storage: Arc>, + pub(crate) committed: Arc, + pub(crate) key_hashes: Option>>, +} + +impl Transaction { + pub fn get(&self, key: &[u8]) -> Result> { + if self.committed.load(Ordering::SeqCst) { + panic!("cannot operate on committed txn!"); + } + if let Some(entry) = self.local_storage.get(key) { + return Ok(Some(entry.value().clone())); + } + self.inner.get_with_ts(key, self.read_ts) + } + + pub fn scan(self: &Arc, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { + if self.committed.load(Ordering::SeqCst) { + panic!("cannot operate on committed txn!"); + } + let mut local_iter = TxnLocalIteratorBuilder { + map: self.local_storage.clone(), + iter_builder: |map| map.range((map_bound(lower), map_bound(upper))), + item: (Bytes::new(), Bytes::new()), + } + .build(); + let entry = local_iter.with_iter_mut(|iter| TxnLocalIterator::entry_to_item(iter.next())); + local_iter.with_mut(|x| *x.item = entry); + + TxnIterator::create( + self.clone(), + TwoMergeIterator::create( + local_iter, + self.inner.scan_with_ts(lower, upper, self.read_ts)?, + )?, + ) + } + + pub fn put(&self, key: &[u8], value: &[u8]) { + if self.committed.load(Ordering::SeqCst) { + panic!("cannot operate on committed txn!"); + } + self.local_storage + .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); + if let Some(key_hashes) = &self.key_hashes { + let mut key_hashes = key_hashes.lock(); + key_hashes.insert(crc32fast::hash(key)); + } + } + + pub fn delete(&self, key: &[u8]) { + if self.committed.load(Ordering::SeqCst) { + panic!("cannot operate on committed txn!"); + } + self.local_storage + .insert(Bytes::copy_from_slice(key), Bytes::new()); + if let Some(key_hashes) = &self.key_hashes { + let mut key_hashes = key_hashes.lock(); + key_hashes.insert(crc32fast::hash(key)); + } + } + + pub fn commit(&self) -> Result<()> { + self.committed + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .expect("cannot operate on committed txn!"); + let batch = self + .local_storage + .iter() + .map(|entry| { + if entry.value().is_empty() { + WriteBatchRecord::Del(entry.key().clone()) + } else { + WriteBatchRecord::Put(entry.key().clone(), entry.value().clone()) + } + }) + .collect::>(); + let ts = self.inner.write_batch(&batch)?; + { + let mut committed_txns = self.inner.mvcc().committed_txns.lock(); + let key_hashes = self.key_hashes.as_ref().unwrap().lock(); + + committed_txns.insert( + ts, + CommittedTxnData { + key_hashes: key_hashes.iter().copied().collect::>(), + read_ts: self.read_ts, + commit_ts: ts, + }, + ); + } + Ok(()) + } +} + +impl Drop for Transaction { + fn drop(&mut self) { + self.inner.mvcc().ts.lock().1.remove_reader(self.read_ts) + } +} + +type SkipMapRangeIter<'a> = + crossbeam_skiplist::map::Range<'a, Bytes, (Bound, Bound), Bytes, Bytes>; + +#[self_referencing] +pub struct TxnLocalIterator { + /// Stores a reference to the skipmap. + map: Arc>, + /// Stores a skipmap iterator that refers to the lifetime of `MemTableIterator` itself. + #[borrows(map)] + #[not_covariant] + iter: SkipMapRangeIter<'this>, + /// Stores the current key-value pair. + item: (Bytes, Bytes), +} + +impl TxnLocalIterator { + fn entry_to_item(entry: Option>) -> (Bytes, Bytes) { + entry + .map(|x| (x.key().clone(), x.value().clone())) + .unwrap_or_else(|| (Bytes::new(), Bytes::new())) + } +} + +impl StorageIterator for TxnLocalIterator { + type KeyType<'a> = &'a [u8]; + + fn value(&self) -> &[u8] { + &self.borrow_item().1[..] + } + + fn key(&self) -> &[u8] { + &self.borrow_item().0[..] + } + + fn is_valid(&self) -> bool { + !self.borrow_item().0.is_empty() + } + + fn next(&mut self) -> Result<()> { + let entry = self.with_iter_mut(|iter| TxnLocalIterator::entry_to_item(iter.next())); + self.with_mut(|x| *x.item = entry); + Ok(()) + } +} + +pub struct TxnIterator { + _txn: Arc, + iter: TwoMergeIterator>, +} + +impl TxnIterator { + pub fn create( + txn: Arc, + iter: TwoMergeIterator>, + ) -> Result { + let mut iter = Self { _txn: txn, iter }; + iter.skip_deletes()?; + Ok(iter) + } + + fn skip_deletes(&mut self) -> Result<()> { + while self.iter.is_valid() && self.iter.value().is_empty() { + self.iter.next()?; + } + Ok(()) + } +} + +impl StorageIterator for TxnIterator { + type KeyType<'a> = &'a [u8] where Self: 'a; + + fn value(&self) -> &[u8] { + self.iter.value() + } + + fn key(&self) -> Self::KeyType<'_> { + self.iter.key() + } + + fn is_valid(&self) -> bool { + self.iter.is_valid() + } + + fn next(&mut self) -> Result<()> { + self.iter.next()?; + self.skip_deletes()?; + Ok(()) + } + + fn num_active_iterators(&self) -> usize { + self.iter.num_active_iterators() + } +} diff --git a/mini-lsm-starter/src/bin/mini-lsm-cli.rs b/mini-lsm-starter/src/bin/mini-lsm-cli.rs index d210046..f44ef91 100644 --- a/mini-lsm-starter/src/bin/mini-lsm-cli.rs +++ b/mini-lsm-starter/src/bin/mini-lsm-cli.rs @@ -29,6 +29,8 @@ struct Args { compaction: CompactionStrategy, #[arg(long)] enable_wal: bool, + #[arg(long)] + serializable: bool, } fn main() -> Result<()> { @@ -64,6 +66,7 @@ fn main() -> Result<()> { } }, enable_wal: args.enable_wal, + serializable: args.serializable, }, )?; let mut epoch = 0; diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index e2d2702..338fde5 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -74,6 +74,7 @@ pub struct LsmStorageOptions { pub num_memtable_limit: usize, pub compaction_options: CompactionOptions, pub enable_wal: bool, + pub serializable: bool, } impl LsmStorageOptions { @@ -84,6 +85,7 @@ impl LsmStorageOptions { compaction_options: CompactionOptions::NoCompaction, enable_wal: false, num_memtable_limit: 50, + serializable: false, } } @@ -94,6 +96,7 @@ impl LsmStorageOptions { compaction_options: CompactionOptions::NoCompaction, enable_wal: false, num_memtable_limit: 2, + serializable: false, } } @@ -104,6 +107,7 @@ impl LsmStorageOptions { compaction_options, enable_wal: false, num_memtable_limit: 2, + serializable: false, } } } diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index 67aa386..a5a4c2a 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -78,6 +78,7 @@ pub struct LsmStorageOptions { pub num_memtable_limit: usize, pub compaction_options: CompactionOptions, pub enable_wal: bool, + pub serializable: bool, } impl LsmStorageOptions { @@ -88,6 +89,7 @@ impl LsmStorageOptions { compaction_options: CompactionOptions::NoCompaction, enable_wal: false, num_memtable_limit: 50, + serializable: false, } } @@ -98,6 +100,7 @@ impl LsmStorageOptions { compaction_options: CompactionOptions::NoCompaction, enable_wal: false, num_memtable_limit: 2, + serializable: false, } } @@ -108,6 +111,7 @@ impl LsmStorageOptions { compaction_options, enable_wal: false, num_memtable_limit: 2, + serializable: false, } } }