Signed-off-by: Alex Chi Z <iskyzh@gmail.com>
This commit is contained in:
Alex Chi Z
2024-01-30 14:48:03 +08:00
parent 9eab75ec1a
commit acc3c959aa
9 changed files with 345 additions and 24 deletions

View File

@@ -7,7 +7,7 @@ use std::{
},
};
use anyhow::Result;
use anyhow::{bail, Result};
use bytes::Bytes;
use crossbeam_skiplist::{map::Entry, SkipMap};
use ouroboros::self_referencing;
@@ -18,6 +18,7 @@ use crate::{
lsm_iterator::{FusedIterator, LsmIterator},
lsm_storage::{LsmStorageInner, WriteBatchRecord},
mem_table::map_bound,
mvcc::CommittedTxnData,
};
pub struct Transaction {
@@ -34,6 +35,11 @@ impl Transaction {
if self.committed.load(Ordering::SeqCst) {
panic!("cannot operate on committed txn!");
}
if let Some(guard) = &self.key_hashes {
let mut guard = guard.lock();
let (_, read_set) = &mut *guard;
read_set.insert(farmhash::hash32(key));
}
if let Some(entry) = self.local_storage.get(key) {
if entry.value().is_empty() {
return Ok(None);
@@ -75,7 +81,7 @@ impl Transaction {
if let Some(key_hashes) = &self.key_hashes {
let mut key_hashes = key_hashes.lock();
let (write_hashes, _) = &mut *key_hashes;
write_hashes.insert(crc32fast::hash(key));
write_hashes.insert(farmhash::hash32(key));
}
}
@@ -88,7 +94,7 @@ impl Transaction {
if let Some(key_hashes) = &self.key_hashes {
let mut key_hashes = key_hashes.lock();
let (write_hashes, _) = &mut *key_hashes;
write_hashes.insert(crc32fast::hash(key));
write_hashes.insert(farmhash::hash32(key));
}
}
@@ -96,6 +102,29 @@ impl Transaction {
self.committed
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.expect("cannot operate on committed txn!");
let _commit_lock = self.inner.mvcc().commit_lock.lock();
let serializability_check;
if let Some(guard) = &self.key_hashes {
let guard = guard.lock();
let (write_set, read_set) = &*guard;
println!(
"commit txn: write_set: {:?}, read_set: {:?}",
write_set, read_set
);
if !write_set.is_empty() {
let committed_txns = self.inner.mvcc().committed_txns.lock();
for (_, txn_data) in committed_txns.range((self.read_ts + 1)..) {
for key_hash in read_set {
if txn_data.key_hashes.contains(key_hash) {
bail!("serializable check failed");
}
}
}
}
serializability_check = true;
} else {
serializability_check = false;
}
let batch = self
.local_storage
.iter()
@@ -107,7 +136,32 @@ impl Transaction {
}
})
.collect::<Vec<_>>();
self.inner.write_batch(&batch)?;
let ts = self.inner.write_batch_inner(&batch)?;
if serializability_check {
let mut committed_txns = self.inner.mvcc().committed_txns.lock();
let mut key_hashes = self.key_hashes.as_ref().unwrap().lock();
let (write_set, _) = &mut *key_hashes;
let old_data = committed_txns.insert(
ts,
CommittedTxnData {
key_hashes: std::mem::take(write_set),
read_ts: self.read_ts,
commit_ts: ts,
},
);
assert!(old_data.is_none());
// remove unneeded txn data
let watermark = self.inner.mvcc().watermark();
while let Some(entry) = committed_txns.first_entry() {
if *entry.key() < watermark {
entry.remove();
} else {
break;
}
}
}
Ok(())
}
}
@@ -164,7 +218,7 @@ impl StorageIterator for TxnLocalIterator {
}
pub struct TxnIterator {
_txn: Arc<Transaction>,
txn: Arc<Transaction>,
iter: TwoMergeIterator<TxnLocalIterator, FusedIterator<LsmIterator>>,
}
@@ -173,8 +227,11 @@ impl TxnIterator {
txn: Arc<Transaction>,
iter: TwoMergeIterator<TxnLocalIterator, FusedIterator<LsmIterator>>,
) -> Result<Self> {
let mut iter = Self { _txn: txn, iter };
let mut iter = Self { txn, iter };
iter.skip_deletes()?;
if iter.is_valid() {
iter.add_to_read_set(iter.key());
}
Ok(iter)
}
@@ -184,6 +241,14 @@ impl TxnIterator {
}
Ok(())
}
fn add_to_read_set(&self, key: &[u8]) {
if let Some(guard) = &self.txn.key_hashes {
let mut guard = guard.lock();
let (_, read_set) = &mut *guard;
read_set.insert(farmhash::hash32(key));
}
}
}
impl StorageIterator for TxnIterator {
@@ -204,6 +269,9 @@ impl StorageIterator for TxnIterator {
fn next(&mut self) -> Result<()> {
self.iter.next()?;
self.skip_deletes()?;
if self.is_valid() {
self.add_to_read_set(self.key());
}
Ok(())
}