@@ -1,6 +1,6 @@
|
||||
# Transaction and Optimistic Concurrency Control
|
||||
|
||||
In this chapter, you will implement all interfaces of `Transaction`. Your implementation will maintain a private workspace for modifications inside a transaction, and commit them in batch, so that all modifications within the transaction will only be visible to the transaction itself until commit.
|
||||
In this chapter, you will implement all interfaces of `Transaction`. Your implementation will maintain a private workspace for modifications inside a transaction, and commit them in batch, so that all modifications within the transaction will only be visible to the transaction itself until commit. We only check for conflicts (i.e., serializable conflicts) when commit, and this is optimistic concurrency control.
|
||||
|
||||
To run test cases,
|
||||
|
||||
@@ -45,6 +45,7 @@ Your commit implementation should simply collect all key-value pairs from the lo
|
||||
|
||||
* With all the things we have implemented up to this point, does the system satisfy snapshot isolation? If not, what else do we need to do to support snapshot isolation? (Note: snapshot isolation is different from serializable snapshot isolation we will talk about in the next chapter)
|
||||
* What if the user wants to batch import data (i.e., 1TB?) If they use the transaction API to do that, will you give them some advice? Is there any opportunity to optimize for this case?
|
||||
* What is optimistic concurrency control? What would the system be like if we implement pessimistic concurrency control instead in Mini-LSM?
|
||||
|
||||
## Bonus Tasks
|
||||
|
||||
|
@@ -4,6 +4,12 @@ pub struct Watermark {
|
||||
readers: BTreeMap<u64, usize>,
|
||||
}
|
||||
|
||||
impl Default for Watermark {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Watermark {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
|
@@ -16,3 +16,4 @@ mod week3_day1;
|
||||
mod week3_day2;
|
||||
mod week3_day3;
|
||||
mod week3_day4;
|
||||
mod week3_day5;
|
||||
|
@@ -1,15 +1,10 @@
|
||||
use std::ops::Bound;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tempfile::tempdir;
|
||||
|
||||
use crate::{
|
||||
compact::CompactionOptions,
|
||||
key::KeySlice,
|
||||
lsm_storage::{LsmStorageOptions, MiniLsm, WriteBatchRecord},
|
||||
mvcc::watermark::Watermark,
|
||||
table::SsTableBuilder,
|
||||
tests::harness::check_lsm_iter_result_by_key,
|
||||
};
|
||||
|
||||
use super::harness::{check_iter_result_by_key, construct_merge_iterator_over_storage};
|
||||
|
56
mini-lsm-mvcc/src/tests/week3_day5.rs
Normal file
56
mini-lsm-mvcc/src/tests/week3_day5.rs
Normal file
@@ -0,0 +1,56 @@
|
||||
use std::ops::Bound;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tempfile::tempdir;
|
||||
|
||||
use crate::{
|
||||
compact::CompactionOptions,
|
||||
lsm_storage::{LsmStorageOptions, MiniLsm},
|
||||
tests::harness::check_lsm_iter_result_by_key,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_txn_integration() {
|
||||
let dir = tempdir().unwrap();
|
||||
let options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction);
|
||||
let storage = MiniLsm::open(&dir, options.clone()).unwrap();
|
||||
let txn1 = storage.new_txn().unwrap();
|
||||
let txn2 = storage.new_txn().unwrap();
|
||||
txn1.put(b"test1", b"233");
|
||||
txn2.put(b"test2", b"233");
|
||||
check_lsm_iter_result_by_key(
|
||||
&mut txn1.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
|
||||
vec![(Bytes::from("test1"), Bytes::from("233"))],
|
||||
);
|
||||
check_lsm_iter_result_by_key(
|
||||
&mut txn2.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
|
||||
vec![(Bytes::from("test2"), Bytes::from("233"))],
|
||||
);
|
||||
let txn3 = storage.new_txn().unwrap();
|
||||
check_lsm_iter_result_by_key(
|
||||
&mut txn3.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
|
||||
vec![],
|
||||
);
|
||||
txn1.commit().unwrap();
|
||||
txn2.commit().unwrap();
|
||||
check_lsm_iter_result_by_key(
|
||||
&mut txn3.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
|
||||
vec![],
|
||||
);
|
||||
drop(txn3);
|
||||
check_lsm_iter_result_by_key(
|
||||
&mut storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
|
||||
vec![
|
||||
(Bytes::from("test1"), Bytes::from("233")),
|
||||
(Bytes::from("test2"), Bytes::from("233")),
|
||||
],
|
||||
);
|
||||
let txn4 = storage.new_txn().unwrap();
|
||||
check_lsm_iter_result_by_key(
|
||||
&mut txn4.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
|
||||
vec![
|
||||
(Bytes::from("test1"), Bytes::from("233")),
|
||||
(Bytes::from("test2"), Bytes::from("233")),
|
||||
],
|
||||
);
|
||||
}
|
@@ -114,6 +114,7 @@ where
|
||||
assert!(!iter.is_valid());
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn check_iter_result_by_key_and_ts<I>(iter: &mut I, expected: Vec<((Bytes, u64), Bytes)>)
|
||||
where
|
||||
I: for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
|
||||
@@ -192,6 +193,7 @@ pub fn generate_sst(
|
||||
builder.build(id, block_cache, path.as_ref()).unwrap()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn generate_sst_with_ts(
|
||||
id: usize,
|
||||
path: impl AsRef<Path>,
|
||||
|
@@ -7,12 +7,10 @@ use week2_day1::harness::construct_merge_iterator_over_storage;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
iterators::{
|
||||
concat_iterator::SstConcatIterator, merge_iterator::MergeIterator, StorageIterator,
|
||||
},
|
||||
iterators::{concat_iterator::SstConcatIterator, StorageIterator},
|
||||
key::{KeySlice, TS_ENABLED},
|
||||
lsm_storage::{LsmStorageInner, LsmStorageOptions, LsmStorageState},
|
||||
table::{SsTable, SsTableBuilder, SsTableIterator},
|
||||
lsm_storage::{LsmStorageInner, LsmStorageOptions},
|
||||
table::{SsTable, SsTableBuilder},
|
||||
};
|
||||
|
||||
#[test]
|
||||
|
Reference in New Issue
Block a user