finish serializable check

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2024-01-26 22:20:06 +08:00
parent 78ec7c9375
commit 0d64ac090e
5 changed files with 60 additions and 16 deletions

View File

@@ -7,7 +7,7 @@ use std::{
},
};
use anyhow::Result;
use anyhow::{bail, Result};
use bytes::Bytes;
use crossbeam_skiplist::{map::Entry, SkipMap};
use ouroboros::self_referencing;
@@ -27,7 +27,8 @@ pub struct Transaction {
pub(crate) inner: Arc<LsmStorageInner>,
pub(crate) local_storage: Arc<SkipMap<Bytes, Bytes>>,
pub(crate) committed: Arc<AtomicBool>,
pub(crate) key_hashes: Option<Mutex<HashSet<u32>>>,
/// Write set and read set
pub(crate) key_hashes: Option<Mutex<(HashSet<u32>, HashSet<u32>)>>,
}
impl Transaction {
@@ -71,7 +72,8 @@ impl Transaction {
.insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value));
if let Some(key_hashes) = &self.key_hashes {
let mut key_hashes = key_hashes.lock();
key_hashes.insert(crc32fast::hash(key));
let (write_hashes, _) = &mut *key_hashes;
write_hashes.insert(crc32fast::hash(key));
}
}
@@ -83,7 +85,8 @@ impl Transaction {
.insert(Bytes::copy_from_slice(key), Bytes::new());
if let Some(key_hashes) = &self.key_hashes {
let mut key_hashes = key_hashes.lock();
key_hashes.insert(crc32fast::hash(key));
let (write_hashes, _) = &mut *key_hashes;
write_hashes.insert(crc32fast::hash(key));
}
}
@@ -91,6 +94,20 @@ impl Transaction {
self.committed
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.expect("cannot operate on committed txn!");
if let Some(guard) = &self.key_hashes {
let guard = guard.lock();
let (write_set, read_set) = &*guard;
if !write_set.is_empty() {
let committed_txns = self.inner.mvcc().committed_txns.lock();
for (_, txn_data) in committed_txns.range(self.read_ts..) {
for key_hash in read_set {
if txn_data.key_hashes.contains(&key_hash) {
bail!("serializable check failed");
}
}
}
}
}
let batch = self
.local_storage
.iter()
@@ -105,16 +122,28 @@ impl Transaction {
let ts = self.inner.write_batch(&batch)?;
{
let mut committed_txns = self.inner.mvcc().committed_txns.lock();
let key_hashes = self.key_hashes.as_ref().unwrap().lock();
let mut key_hashes = self.key_hashes.as_ref().unwrap().lock();
let (write_set, _) = &mut *key_hashes;
committed_txns.insert(
let old_data = committed_txns.insert(
ts,
CommittedTxnData {
key_hashes: key_hashes.iter().copied().collect::<Vec<_>>(),
key_hashes: std::mem::take(write_set),
read_ts: self.read_ts,
commit_ts: ts,
},
);
assert!(old_data.is_none());
// remove unneeded txn data
let watermark = self.inner.mvcc().watermark();
while let Some(entry) = committed_txns.first_entry() {
if *entry.key() < watermark {
entry.remove();
} else {
break;
}
}
}
Ok(())
}