use std::{ collections::HashSet, ops::Bound, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, }; use anyhow::{bail, Result}; use bytes::Bytes; use crossbeam_skiplist::{map::Entry, SkipMap}; use ouroboros::self_referencing; use parking_lot::Mutex; use crate::{ iterators::{two_merge_iterator::TwoMergeIterator, StorageIterator}, lsm_iterator::{FusedIterator, LsmIterator}, lsm_storage::{LsmStorageInner, WriteBatchRecord}, mem_table::map_bound, }; use super::CommittedTxnData; pub struct Transaction { pub(crate) read_ts: u64, pub(crate) inner: Arc, pub(crate) local_storage: Arc>, pub(crate) committed: Arc, /// Write set and read set pub(crate) key_hashes: Option, HashSet)>>, } impl Transaction { pub fn get(&self, key: &[u8]) -> Result> { if self.committed.load(Ordering::SeqCst) { panic!("cannot operate on committed txn!"); } if let Some(entry) = self.local_storage.get(key) { return Ok(Some(entry.value().clone())); } self.inner.get_with_ts(key, self.read_ts) } pub fn scan(self: &Arc, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { if self.committed.load(Ordering::SeqCst) { panic!("cannot operate on committed txn!"); } let mut local_iter = TxnLocalIteratorBuilder { map: self.local_storage.clone(), iter_builder: |map| map.range((map_bound(lower), map_bound(upper))), item: (Bytes::new(), Bytes::new()), } .build(); let entry = local_iter.with_iter_mut(|iter| TxnLocalIterator::entry_to_item(iter.next())); local_iter.with_mut(|x| *x.item = entry); TxnIterator::create( self.clone(), TwoMergeIterator::create( local_iter, self.inner.scan_with_ts(lower, upper, self.read_ts)?, )?, ) } pub fn put(&self, key: &[u8], value: &[u8]) { if self.committed.load(Ordering::SeqCst) { panic!("cannot operate on committed txn!"); } self.local_storage .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); if let Some(key_hashes) = &self.key_hashes { let mut key_hashes = key_hashes.lock(); let (write_hashes, _) = &mut *key_hashes; write_hashes.insert(crc32fast::hash(key)); } } pub fn delete(&self, key: &[u8]) { if self.committed.load(Ordering::SeqCst) { panic!("cannot operate on committed txn!"); } self.local_storage .insert(Bytes::copy_from_slice(key), Bytes::new()); if let Some(key_hashes) = &self.key_hashes { let mut key_hashes = key_hashes.lock(); let (write_hashes, _) = &mut *key_hashes; write_hashes.insert(crc32fast::hash(key)); } } pub fn commit(&self) -> Result<()> { self.committed .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .expect("cannot operate on committed txn!"); if let Some(guard) = &self.key_hashes { let guard = guard.lock(); let (write_set, read_set) = &*guard; if !write_set.is_empty() { let committed_txns = self.inner.mvcc().committed_txns.lock(); for (_, txn_data) in committed_txns.range(self.read_ts..) { for key_hash in read_set { if txn_data.key_hashes.contains(key_hash) { bail!("serializable check failed"); } } } } } let batch = self .local_storage .iter() .map(|entry| { if entry.value().is_empty() { WriteBatchRecord::Del(entry.key().clone()) } else { WriteBatchRecord::Put(entry.key().clone(), entry.value().clone()) } }) .collect::>(); let ts = self.inner.write_batch(&batch)?; { let mut committed_txns = self.inner.mvcc().committed_txns.lock(); let mut key_hashes = self.key_hashes.as_ref().unwrap().lock(); let (write_set, _) = &mut *key_hashes; let old_data = committed_txns.insert( ts, CommittedTxnData { key_hashes: std::mem::take(write_set), read_ts: self.read_ts, commit_ts: ts, }, ); assert!(old_data.is_none()); // remove unneeded txn data let watermark = self.inner.mvcc().watermark(); while let Some(entry) = committed_txns.first_entry() { if *entry.key() < watermark { entry.remove(); } else { break; } } } Ok(()) } } impl Drop for Transaction { fn drop(&mut self) { self.inner.mvcc().ts.lock().1.remove_reader(self.read_ts) } } type SkipMapRangeIter<'a> = crossbeam_skiplist::map::Range<'a, Bytes, (Bound, Bound), Bytes, Bytes>; #[self_referencing] pub struct TxnLocalIterator { /// Stores a reference to the skipmap. map: Arc>, /// Stores a skipmap iterator that refers to the lifetime of `MemTableIterator` itself. #[borrows(map)] #[not_covariant] iter: SkipMapRangeIter<'this>, /// Stores the current key-value pair. item: (Bytes, Bytes), } impl TxnLocalIterator { fn entry_to_item(entry: Option>) -> (Bytes, Bytes) { entry .map(|x| (x.key().clone(), x.value().clone())) .unwrap_or_else(|| (Bytes::new(), Bytes::new())) } } impl StorageIterator for TxnLocalIterator { type KeyType<'a> = &'a [u8]; fn value(&self) -> &[u8] { &self.borrow_item().1[..] } fn key(&self) -> &[u8] { &self.borrow_item().0[..] } fn is_valid(&self) -> bool { !self.borrow_item().0.is_empty() } fn next(&mut self) -> Result<()> { let entry = self.with_iter_mut(|iter| TxnLocalIterator::entry_to_item(iter.next())); self.with_mut(|x| *x.item = entry); Ok(()) } } pub struct TxnIterator { _txn: Arc, iter: TwoMergeIterator>, } impl TxnIterator { pub fn create( txn: Arc, iter: TwoMergeIterator>, ) -> Result { let mut iter = Self { _txn: txn, iter }; iter.skip_deletes()?; Ok(iter) } fn skip_deletes(&mut self) -> Result<()> { while self.iter.is_valid() && self.iter.value().is_empty() { self.iter.next()?; } Ok(()) } } impl StorageIterator for TxnIterator { type KeyType<'a> = &'a [u8] where Self: 'a; fn value(&self) -> &[u8] { self.iter.value() } fn key(&self) -> Self::KeyType<'_> { self.iter.key() } fn is_valid(&self) -> bool { self.iter.is_valid() } fn next(&mut self) -> Result<()> { self.iter.next()?; self.skip_deletes()?; Ok(()) } fn num_active_iterators(&self) -> usize { self.iter.num_active_iterators() } }