mod watermark; use std::{ops::Bound, sync::Arc}; use anyhow::Result; use bytes::Bytes; use parking_lot::Mutex; use crate::{ iterators::StorageIterator, lsm_iterator::{FusedIterator, LsmIterator}, lsm_storage::LsmStorageInner, }; use self::watermark::Watermark; pub(crate) struct LsmMvccInner { pub(crate) write_lock: Mutex<()>, pub(crate) ts: Arc>, } impl LsmMvccInner { pub fn new(initial_ts: u64) -> Self { Self { write_lock: Mutex::new(()), ts: Arc::new(Mutex::new((initial_ts, Watermark::new()))), } } pub fn latest_commit_ts(&self) -> u64 { self.ts.lock().0 } pub fn update_commit_ts(&self, ts: u64) { self.ts.lock().0 = ts; } /// All ts (strictly) below this ts can be garbage collected. pub fn watermark(&self) -> u64 { let ts = self.ts.lock(); ts.1.watermark().unwrap_or(ts.0) } pub fn new_txn(&self, inner: Arc) -> Arc { let mut ts = self.ts.lock(); let read_ts = ts.0; ts.1.add_reader(read_ts); Arc::new(Transaction { inner, read_ts }) } } pub struct Transaction { read_ts: u64, inner: Arc, } impl Transaction { pub fn get(&self, key: &[u8]) -> Result> { self.inner.get_with_ts(key, self.read_ts) } pub fn scan(self: &Arc, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { Ok(TxnIterator { _txn: self.clone(), iter: self.inner.scan_with_ts(lower, upper, self.read_ts)?, }) } } impl Drop for Transaction { fn drop(&mut self) { self.inner.mvcc().ts.lock().1.remove_reader(self.read_ts) } } pub struct TxnIterator { _txn: Arc, iter: FusedIterator, } impl StorageIterator for TxnIterator { type KeyType<'a> = &'a [u8] where Self: 'a; fn value(&self) -> &[u8] { self.iter.value() } fn key(&self) -> Self::KeyType<'_> { self.iter.key() } fn is_valid(&self) -> bool { self.iter.is_valid() } fn next(&mut self) -> Result<()> { self.iter.next() } fn num_active_iterators(&self) -> usize { self.iter.num_active_iterators() } }