@@ -7,7 +7,7 @@ use std::{
|
||||
},
|
||||
};
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use crossbeam_skiplist::{map::Entry, SkipMap};
|
||||
use ouroboros::self_referencing;
|
||||
@@ -20,8 +20,6 @@ use crate::{
|
||||
mem_table::map_bound,
|
||||
};
|
||||
|
||||
use super::CommittedTxnData;
|
||||
|
||||
pub struct Transaction {
|
||||
pub(crate) read_ts: u64,
|
||||
pub(crate) inner: Arc<LsmStorageInner>,
|
||||
@@ -37,7 +35,11 @@ impl Transaction {
|
||||
panic!("cannot operate on committed txn!");
|
||||
}
|
||||
if let Some(entry) = self.local_storage.get(key) {
|
||||
return Ok(Some(entry.value().clone()));
|
||||
if entry.value().is_empty() {
|
||||
return Ok(None);
|
||||
} else {
|
||||
return Ok(Some(entry.value().clone()));
|
||||
}
|
||||
}
|
||||
self.inner.get_with_ts(key, self.read_ts)
|
||||
}
|
||||
@@ -94,20 +96,6 @@ impl Transaction {
|
||||
self.committed
|
||||
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
|
||||
.expect("cannot operate on committed txn!");
|
||||
if let Some(guard) = &self.key_hashes {
|
||||
let guard = guard.lock();
|
||||
let (write_set, read_set) = &*guard;
|
||||
if !write_set.is_empty() {
|
||||
let committed_txns = self.inner.mvcc().committed_txns.lock();
|
||||
for (_, txn_data) in committed_txns.range(self.read_ts..) {
|
||||
for key_hash in read_set {
|
||||
if txn_data.key_hashes.contains(key_hash) {
|
||||
bail!("serializable check failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let batch = self
|
||||
.local_storage
|
||||
.iter()
|
||||
@@ -119,32 +107,7 @@ impl Transaction {
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let ts = self.inner.write_batch(&batch)?;
|
||||
{
|
||||
let mut committed_txns = self.inner.mvcc().committed_txns.lock();
|
||||
let mut key_hashes = self.key_hashes.as_ref().unwrap().lock();
|
||||
let (write_set, _) = &mut *key_hashes;
|
||||
|
||||
let old_data = committed_txns.insert(
|
||||
ts,
|
||||
CommittedTxnData {
|
||||
key_hashes: std::mem::take(write_set),
|
||||
read_ts: self.read_ts,
|
||||
commit_ts: ts,
|
||||
},
|
||||
);
|
||||
assert!(old_data.is_none());
|
||||
|
||||
// remove unneeded txn data
|
||||
let watermark = self.inner.mvcc().watermark();
|
||||
while let Some(entry) = committed_txns.first_entry() {
|
||||
if *entry.key() < watermark {
|
||||
entry.remove();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.inner.write_batch(&batch)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user