finish skeleton for serializability check

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2024-01-26 18:14:34 +08:00
parent 7824e103df
commit 78ec7c9375
8 changed files with 284 additions and 84 deletions

View File

@@ -1,7 +1,5 @@
use anyhow::Result;
use crate::key::KeySlice;
use super::StorageIterator;
/// Merges two iterators of different types into one. If the two iterators have the same key, only
@@ -13,8 +11,8 @@ pub struct TwoMergeIterator<A: StorageIterator, B: StorageIterator> {
}
impl<
A: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
B: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
A: 'static + StorageIterator,
B: 'static + for<'a> StorageIterator<KeyType<'a> = A::KeyType<'a>>,
> TwoMergeIterator<A, B>
{
fn choose_a(a: &A, b: &B) -> bool {
@@ -47,13 +45,13 @@ impl<
}
impl<
A: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
B: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
A: 'static + StorageIterator,
B: 'static + for<'a> StorageIterator<KeyType<'a> = A::KeyType<'a>>,
> StorageIterator for TwoMergeIterator<A, B>
{
type KeyType<'a> = KeySlice<'a>;
type KeyType<'a> = A::KeyType<'a>;
fn key(&self) -> KeySlice {
fn key(&self) -> A::KeyType<'_> {
if self.choose_a {
self.a.key()
} else {

View File

@@ -22,7 +22,8 @@ use crate::key::{self, KeySlice};
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::{Manifest, ManifestRecord};
use crate::mem_table::{map_bound, map_key_bound_plus_ts, MemTable};
use crate::mvcc::{LsmMvccInner, Transaction, TxnIterator};
use crate::mvcc::txn::{Transaction, TxnIterator};
use crate::mvcc::LsmMvccInner;
use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator};
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
@@ -79,6 +80,7 @@ pub struct LsmStorageOptions {
pub num_memtable_limit: usize,
pub compaction_options: CompactionOptions,
pub enable_wal: bool,
pub serializable: bool,
}
impl LsmStorageOptions {
@@ -89,6 +91,7 @@ impl LsmStorageOptions {
compaction_options: CompactionOptions::NoCompaction,
enable_wal: false,
num_memtable_limit: 50,
serializable: false,
}
}
@@ -99,6 +102,7 @@ impl LsmStorageOptions {
compaction_options: CompactionOptions::NoCompaction,
enable_wal: false,
num_memtable_limit: 2,
serializable: false,
}
}
@@ -109,6 +113,7 @@ impl LsmStorageOptions {
compaction_options,
enable_wal: false,
num_memtable_limit: 2,
serializable: false,
}
}
}
@@ -246,7 +251,7 @@ impl MiniLsm {
self.inner.get(key)
}
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<u64> {
self.inner.write_batch(batch)
}
@@ -428,12 +433,11 @@ impl LsmStorageInner {
}
pub fn new_txn(self: &Arc<Self>) -> Result<Arc<Transaction>> {
Ok(self.mvcc().new_txn(self.clone()))
Ok(self.mvcc().new_txn(self.clone(), self.options.serializable))
}
/// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
pub fn get(self: &Arc<Self>, key: &[u8]) -> Result<Option<Bytes>> {
let txn = self.mvcc().new_txn(self.clone());
let txn = self.mvcc().new_txn(self.clone(), self.options.serializable);
txn.get(key)
}
@@ -516,7 +520,7 @@ impl LsmStorageInner {
Ok(None)
}
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<u64> {
let _lck = self.mvcc().write_lock.lock();
let ts = self.mvcc().latest_commit_ts() + 1;
for record in batch {
@@ -548,17 +552,19 @@ impl LsmStorageInner {
}
}
self.mvcc().update_commit_ts(ts);
Ok(())
Ok(ts)
}
/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.write_batch(&[WriteBatchRecord::Put(key, value)])
self.write_batch(&[WriteBatchRecord::Put(key, value)])?;
Ok(())
}
/// Remove a key from the storage by writing an empty value.
pub fn delete(&self, key: &[u8]) -> Result<()> {
self.write_batch(&[WriteBatchRecord::Del(key)])
self.write_batch(&[WriteBatchRecord::Del(key)])?;
Ok(())
}
fn try_freeze(&self, estimated_size: usize) -> Result<()> {
@@ -697,7 +703,7 @@ impl LsmStorageInner {
lower: Bound<&[u8]>,
upper: Bound<&[u8]>,
) -> Result<TxnIterator> {
let txn = self.mvcc().new_txn(self.clone());
let txn = self.mvcc().new_txn(self.clone(), self.options.serializable);
txn.scan(lower, upper)
}

View File

@@ -1,22 +1,28 @@
pub mod txn;
mod watermark;
use std::{ops::Bound, sync::Arc};
use anyhow::Result;
use bytes::Bytes;
use parking_lot::Mutex;
use crate::{
iterators::StorageIterator,
lsm_iterator::{FusedIterator, LsmIterator},
lsm_storage::LsmStorageInner,
use std::{
collections::{BTreeMap, HashSet},
sync::{atomic::AtomicBool, Arc},
};
use self::watermark::Watermark;
use crossbeam_skiplist::SkipMap;
use parking_lot::Mutex;
use crate::lsm_storage::LsmStorageInner;
use self::{txn::Transaction, watermark::Watermark};
pub(crate) struct CommittedTxnData {
pub(crate) key_hashes: Vec<u32>,
pub(crate) read_ts: u64,
pub(crate) commit_ts: u64,
}
pub(crate) struct LsmMvccInner {
pub(crate) write_lock: Mutex<()>,
pub(crate) ts: Arc<Mutex<(u64, Watermark)>>,
pub(crate) committed_txns: Arc<Mutex<BTreeMap<u64, CommittedTxnData>>>,
}
impl LsmMvccInner {
@@ -24,6 +30,7 @@ impl LsmMvccInner {
Self {
write_lock: Mutex::new(()),
ts: Arc::new(Mutex::new((initial_ts, Watermark::new()))),
committed_txns: Arc::new(Mutex::new(BTreeMap::new())),
}
}
@@ -41,63 +48,20 @@ impl LsmMvccInner {
ts.1.watermark().unwrap_or(ts.0)
}
pub fn new_txn(&self, inner: Arc<LsmStorageInner>) -> Arc<Transaction> {
pub fn new_txn(&self, inner: Arc<LsmStorageInner>, serializable: bool) -> Arc<Transaction> {
let mut ts = self.ts.lock();
let read_ts = ts.0;
ts.1.add_reader(read_ts);
Arc::new(Transaction { inner, read_ts })
}
}
pub struct Transaction {
read_ts: u64,
inner: Arc<LsmStorageInner>,
}
impl Transaction {
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
self.inner.get_with_ts(key, self.read_ts)
}
pub fn scan(self: &Arc<Self>, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result<TxnIterator> {
Ok(TxnIterator {
_txn: self.clone(),
iter: self.inner.scan_with_ts(lower, upper, self.read_ts)?,
Arc::new(Transaction {
inner,
read_ts,
local_storage: Arc::new(SkipMap::new()),
committed: Arc::new(AtomicBool::new(false)),
key_hashes: if serializable {
Some(Mutex::new(HashSet::new()))
} else {
None
},
})
}
}
impl Drop for Transaction {
fn drop(&mut self) {
self.inner.mvcc().ts.lock().1.remove_reader(self.read_ts)
}
}
pub struct TxnIterator {
_txn: Arc<Transaction>,
iter: FusedIterator<LsmIterator>,
}
impl StorageIterator for TxnIterator {
type KeyType<'a> = &'a [u8] where Self: 'a;
fn value(&self) -> &[u8] {
self.iter.value()
}
fn key(&self) -> Self::KeyType<'_> {
self.iter.key()
}
fn is_valid(&self) -> bool {
self.iter.is_valid()
}
fn next(&mut self) -> Result<()> {
self.iter.next()
}
fn num_active_iterators(&self) -> usize {
self.iter.num_active_iterators()
}
}

View File

@@ -0,0 +1,221 @@
use std::{
collections::HashSet,
ops::Bound,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use anyhow::Result;
use bytes::Bytes;
use crossbeam_skiplist::{map::Entry, SkipMap};
use ouroboros::self_referencing;
use parking_lot::Mutex;
use crate::{
iterators::{two_merge_iterator::TwoMergeIterator, StorageIterator},
lsm_iterator::{FusedIterator, LsmIterator},
lsm_storage::{LsmStorageInner, WriteBatchRecord},
mem_table::map_bound,
};
use super::CommittedTxnData;
pub struct Transaction {
pub(crate) read_ts: u64,
pub(crate) inner: Arc<LsmStorageInner>,
pub(crate) local_storage: Arc<SkipMap<Bytes, Bytes>>,
pub(crate) committed: Arc<AtomicBool>,
pub(crate) key_hashes: Option<Mutex<HashSet<u32>>>,
}
impl Transaction {
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
if self.committed.load(Ordering::SeqCst) {
panic!("cannot operate on committed txn!");
}
if let Some(entry) = self.local_storage.get(key) {
return Ok(Some(entry.value().clone()));
}
self.inner.get_with_ts(key, self.read_ts)
}
pub fn scan(self: &Arc<Self>, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result<TxnIterator> {
if self.committed.load(Ordering::SeqCst) {
panic!("cannot operate on committed txn!");
}
let mut local_iter = TxnLocalIteratorBuilder {
map: self.local_storage.clone(),
iter_builder: |map| map.range((map_bound(lower), map_bound(upper))),
item: (Bytes::new(), Bytes::new()),
}
.build();
let entry = local_iter.with_iter_mut(|iter| TxnLocalIterator::entry_to_item(iter.next()));
local_iter.with_mut(|x| *x.item = entry);
TxnIterator::create(
self.clone(),
TwoMergeIterator::create(
local_iter,
self.inner.scan_with_ts(lower, upper, self.read_ts)?,
)?,
)
}
pub fn put(&self, key: &[u8], value: &[u8]) {
if self.committed.load(Ordering::SeqCst) {
panic!("cannot operate on committed txn!");
}
self.local_storage
.insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value));
if let Some(key_hashes) = &self.key_hashes {
let mut key_hashes = key_hashes.lock();
key_hashes.insert(crc32fast::hash(key));
}
}
pub fn delete(&self, key: &[u8]) {
if self.committed.load(Ordering::SeqCst) {
panic!("cannot operate on committed txn!");
}
self.local_storage
.insert(Bytes::copy_from_slice(key), Bytes::new());
if let Some(key_hashes) = &self.key_hashes {
let mut key_hashes = key_hashes.lock();
key_hashes.insert(crc32fast::hash(key));
}
}
pub fn commit(&self) -> Result<()> {
self.committed
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.expect("cannot operate on committed txn!");
let batch = self
.local_storage
.iter()
.map(|entry| {
if entry.value().is_empty() {
WriteBatchRecord::Del(entry.key().clone())
} else {
WriteBatchRecord::Put(entry.key().clone(), entry.value().clone())
}
})
.collect::<Vec<_>>();
let ts = self.inner.write_batch(&batch)?;
{
let mut committed_txns = self.inner.mvcc().committed_txns.lock();
let key_hashes = self.key_hashes.as_ref().unwrap().lock();
committed_txns.insert(
ts,
CommittedTxnData {
key_hashes: key_hashes.iter().copied().collect::<Vec<_>>(),
read_ts: self.read_ts,
commit_ts: ts,
},
);
}
Ok(())
}
}
impl Drop for Transaction {
fn drop(&mut self) {
self.inner.mvcc().ts.lock().1.remove_reader(self.read_ts)
}
}
type SkipMapRangeIter<'a> =
crossbeam_skiplist::map::Range<'a, Bytes, (Bound<Bytes>, Bound<Bytes>), Bytes, Bytes>;
#[self_referencing]
pub struct TxnLocalIterator {
/// Stores a reference to the skipmap.
map: Arc<SkipMap<Bytes, Bytes>>,
/// Stores a skipmap iterator that refers to the lifetime of `MemTableIterator` itself.
#[borrows(map)]
#[not_covariant]
iter: SkipMapRangeIter<'this>,
/// Stores the current key-value pair.
item: (Bytes, Bytes),
}
impl TxnLocalIterator {
fn entry_to_item(entry: Option<Entry<'_, Bytes, Bytes>>) -> (Bytes, Bytes) {
entry
.map(|x| (x.key().clone(), x.value().clone()))
.unwrap_or_else(|| (Bytes::new(), Bytes::new()))
}
}
impl StorageIterator for TxnLocalIterator {
type KeyType<'a> = &'a [u8];
fn value(&self) -> &[u8] {
&self.borrow_item().1[..]
}
fn key(&self) -> &[u8] {
&self.borrow_item().0[..]
}
fn is_valid(&self) -> bool {
!self.borrow_item().0.is_empty()
}
fn next(&mut self) -> Result<()> {
let entry = self.with_iter_mut(|iter| TxnLocalIterator::entry_to_item(iter.next()));
self.with_mut(|x| *x.item = entry);
Ok(())
}
}
pub struct TxnIterator {
_txn: Arc<Transaction>,
iter: TwoMergeIterator<TxnLocalIterator, FusedIterator<LsmIterator>>,
}
impl TxnIterator {
pub fn create(
txn: Arc<Transaction>,
iter: TwoMergeIterator<TxnLocalIterator, FusedIterator<LsmIterator>>,
) -> Result<Self> {
let mut iter = Self { _txn: txn, iter };
iter.skip_deletes()?;
Ok(iter)
}
fn skip_deletes(&mut self) -> Result<()> {
while self.iter.is_valid() && self.iter.value().is_empty() {
self.iter.next()?;
}
Ok(())
}
}
impl StorageIterator for TxnIterator {
type KeyType<'a> = &'a [u8] where Self: 'a;
fn value(&self) -> &[u8] {
self.iter.value()
}
fn key(&self) -> Self::KeyType<'_> {
self.iter.key()
}
fn is_valid(&self) -> bool {
self.iter.is_valid()
}
fn next(&mut self) -> Result<()> {
self.iter.next()?;
self.skip_deletes()?;
Ok(())
}
fn num_active_iterators(&self) -> usize {
self.iter.num_active_iterators()
}
}