diff --git a/README.md b/README.md index bcaff76..dcf7669 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ We are working on chapter 3 and more test cases for all existing contents. | 3.3 | Snapshot Read - Engine Read Path | ✅ | 🚧 | | | 3.4 | Watermark and Garbage Collection | ✅ | 🚧 | | | 3.5 | Transactions and Optimistic Concurrency Control | ✅ | | | -| 3.6 | Serializable Snapshot Isolation | 🚧 | | | +| 3.6 | Serializable Snapshot Isolation | ✅ | | | | 3.7 | Compaction Filter | 🚧 | | | ## License diff --git a/mini-lsm-mvcc/src/lsm_storage.rs b/mini-lsm-mvcc/src/lsm_storage.rs index 58a114d..886c514 100644 --- a/mini-lsm-mvcc/src/lsm_storage.rs +++ b/mini-lsm-mvcc/src/lsm_storage.rs @@ -556,14 +556,26 @@ impl LsmStorageInner { } /// Put a key-value pair into the storage by writing into the current memtable. - pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.write_batch(&[WriteBatchRecord::Put(key, value)])?; + pub fn put(self: &Arc, key: &[u8], value: &[u8]) -> Result<()> { + if !self.options.serializable { + self.write_batch(&[WriteBatchRecord::Put(key, value)])?; + } else { + let txn = self.mvcc().new_txn(self.clone(), self.options.serializable); + txn.put(key, value); + txn.commit()?; + } Ok(()) } /// Remove a key from the storage by writing an empty value. - pub fn delete(&self, key: &[u8]) -> Result<()> { - self.write_batch(&[WriteBatchRecord::Del(key)])?; + pub fn delete(self: &Arc, key: &[u8]) -> Result<()> { + if !self.options.serializable { + self.write_batch(&[WriteBatchRecord::Del(key)])?; + } else { + let txn = self.mvcc().new_txn(self.clone(), self.options.serializable); + txn.delete(key); + txn.commit()?; + } Ok(()) } diff --git a/mini-lsm-mvcc/src/mvcc.rs b/mini-lsm-mvcc/src/mvcc.rs index 868be0e..4c93b11 100644 --- a/mini-lsm-mvcc/src/mvcc.rs +++ b/mini-lsm-mvcc/src/mvcc.rs @@ -14,8 +14,10 @@ use crate::lsm_storage::LsmStorageInner; use self::{txn::Transaction, watermark::Watermark}; pub(crate) struct CommittedTxnData { - pub(crate) key_hashes: Vec, + pub(crate) key_hashes: HashSet, + #[allow(dead_code)] pub(crate) read_ts: u64, + #[allow(dead_code)] pub(crate) commit_ts: u64, } @@ -58,7 +60,7 @@ impl LsmMvccInner { local_storage: Arc::new(SkipMap::new()), committed: Arc::new(AtomicBool::new(false)), key_hashes: if serializable { - Some(Mutex::new(HashSet::new())) + Some(Mutex::new((HashSet::new(), HashSet::new()))) } else { None }, diff --git a/mini-lsm-mvcc/src/mvcc/txn.rs b/mini-lsm-mvcc/src/mvcc/txn.rs index 52d95ba..0242fda 100644 --- a/mini-lsm-mvcc/src/mvcc/txn.rs +++ b/mini-lsm-mvcc/src/mvcc/txn.rs @@ -7,7 +7,7 @@ use std::{ }, }; -use anyhow::Result; +use anyhow::{bail, Result}; use bytes::Bytes; use crossbeam_skiplist::{map::Entry, SkipMap}; use ouroboros::self_referencing; @@ -27,7 +27,8 @@ pub struct Transaction { pub(crate) inner: Arc, pub(crate) local_storage: Arc>, pub(crate) committed: Arc, - pub(crate) key_hashes: Option>>, + /// Write set and read set + pub(crate) key_hashes: Option, HashSet)>>, } impl Transaction { @@ -71,7 +72,8 @@ impl Transaction { .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); if let Some(key_hashes) = &self.key_hashes { let mut key_hashes = key_hashes.lock(); - key_hashes.insert(crc32fast::hash(key)); + let (write_hashes, _) = &mut *key_hashes; + write_hashes.insert(crc32fast::hash(key)); } } @@ -83,7 +85,8 @@ impl Transaction { .insert(Bytes::copy_from_slice(key), Bytes::new()); if let Some(key_hashes) = &self.key_hashes { let mut key_hashes = key_hashes.lock(); - key_hashes.insert(crc32fast::hash(key)); + let (write_hashes, _) = &mut *key_hashes; + write_hashes.insert(crc32fast::hash(key)); } } @@ -91,6 +94,20 @@ impl Transaction { self.committed .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .expect("cannot operate on committed txn!"); + if let Some(guard) = &self.key_hashes { + let guard = guard.lock(); + let (write_set, read_set) = &*guard; + if !write_set.is_empty() { + let committed_txns = self.inner.mvcc().committed_txns.lock(); + for (_, txn_data) in committed_txns.range(self.read_ts..) { + for key_hash in read_set { + if txn_data.key_hashes.contains(&key_hash) { + bail!("serializable check failed"); + } + } + } + } + } let batch = self .local_storage .iter() @@ -105,16 +122,28 @@ impl Transaction { let ts = self.inner.write_batch(&batch)?; { let mut committed_txns = self.inner.mvcc().committed_txns.lock(); - let key_hashes = self.key_hashes.as_ref().unwrap().lock(); + let mut key_hashes = self.key_hashes.as_ref().unwrap().lock(); + let (write_set, _) = &mut *key_hashes; - committed_txns.insert( + let old_data = committed_txns.insert( ts, CommittedTxnData { - key_hashes: key_hashes.iter().copied().collect::>(), + key_hashes: std::mem::take(write_set), read_ts: self.read_ts, commit_ts: ts, }, ); + assert!(old_data.is_none()); + + // remove unneeded txn data + let watermark = self.inner.mvcc().watermark(); + while let Some(entry) = committed_txns.first_entry() { + if *entry.key() < watermark { + entry.remove(); + } else { + break; + } + } } Ok(()) } diff --git a/mini-lsm/src/tests/week1_day1.rs b/mini-lsm/src/tests/week1_day1.rs index bce740f..c024c5e 100644 --- a/mini-lsm/src/tests/week1_day1.rs +++ b/mini-lsm/src/tests/week1_day1.rs @@ -71,8 +71,9 @@ fn test_task2_storage_integration() { #[test] fn test_task3_storage_integration() { let dir = tempdir().unwrap(); - let storage = - LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(); + let storage = Arc::new( + LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(), + ); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.put(b"3", b"23333").unwrap();