@@ -1,3 +1,7 @@
|
|||||||
# Snapshot Read - Memtables and SSTs
|
# Snapshot Read - Memtables and SSTs
|
||||||
|
|
||||||
During the refactor, you might need to change the signature of some functions from `&self` to `self: &Arc<Self>` as necessary.
|
During the refactor, you might need to change the signature of some functions from `&self` to `self: &Arc<Self>` as necessary.
|
||||||
|
|
||||||
|
## MemTable
|
||||||
|
|
||||||
|
## WAL
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ We have 7 chapters (days) in this part:
|
|||||||
|
|
||||||
* [Day 1: Timestamp Key Refactor](./week3-01-ts-key-refactor.md). You will change the `key` module to the MVCC one and refactor your system to use key with timestamp.
|
* [Day 1: Timestamp Key Refactor](./week3-01-ts-key-refactor.md). You will change the `key` module to the MVCC one and refactor your system to use key with timestamp.
|
||||||
* [Day 2: Snapshot Read - Memtables and SSTs](./week3-02-snapshot-read-part-1.md). You will refactor the memtable and the SST format to support multiple version.
|
* [Day 2: Snapshot Read - Memtables and SSTs](./week3-02-snapshot-read-part-1.md). You will refactor the memtable and the SST format to support multiple version.
|
||||||
* [Day 3: Snapshot Read - Engine Read Path](./week3-03-snapshot-read-part-2.md). You will implement a timestamp oracle to assign the timestamps and add multi-version support to the LSM engine by using the transaction API.
|
* [Day 3: Snapshot Read - Transaction API](./week3-03-snapshot-read-part-2.md). You will implement a timestamp oracle to assign the timestamps and add multi-version support to the LSM engine by using the transaction API.
|
||||||
* [Day 4: Watermark and Garbage Collection](./week3-04-watermark.md). You will implement the watermark computation algorithm and implement garbage collection at compaction time to remove old versions.
|
* [Day 4: Watermark and Garbage Collection](./week3-04-watermark.md). You will implement the watermark computation algorithm and implement garbage collection at compaction time to remove old versions.
|
||||||
* [Day 5: Transaction and Optimistic Concurrency Control](./week3-05-txn-occ.md). You will create a private workspace for all transactions and commit them in batch so that the modifications of a transaction will not be visible to other transactions.
|
* [Day 5: Transaction and Optimistic Concurrency Control](./week3-05-txn-occ.md). You will create a private workspace for all transactions and commit them in batch so that the modifications of a transaction will not be visible to other transactions.
|
||||||
* [Day 6: Serializable Snapshot Isolation](./week3-06-serializable.md). You will implement the OCC serializable checks to ensure the modifications to the database is serializable and abort transactions that violates serializability.
|
* [Day 6: Serializable Snapshot Isolation](./week3-06-serializable.md). You will implement the OCC serializable checks to ensure the modifications to the database is serializable and abort transactions that violates serializability.
|
||||||
|
|||||||
@@ -1,23 +0,0 @@
|
|||||||
use crate::lsm_storage::{LsmStorageInner, MiniLsm};
|
|
||||||
|
|
||||||
impl LsmStorageInner {
|
|
||||||
pub fn dump_structure(&self) {
|
|
||||||
let snapshot = self.state.read();
|
|
||||||
if !snapshot.l0_sstables.is_empty() {
|
|
||||||
println!(
|
|
||||||
"L0 ({}): {:?}",
|
|
||||||
snapshot.l0_sstables.len(),
|
|
||||||
snapshot.l0_sstables,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
for (level, files) in &snapshot.levels {
|
|
||||||
println!("L{level} ({}): {:?}", files.len(), files);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MiniLsm {
|
|
||||||
pub fn dump_structure(&self) {
|
|
||||||
self.inner.dump_structure()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
1
mini-lsm-mvcc/src/debug.rs
Symbolic link
1
mini-lsm-mvcc/src/debug.rs
Symbolic link
@@ -0,0 +1 @@
|
|||||||
|
../../mini-lsm-starter/src/debug.rs
|
||||||
@@ -1,137 +0,0 @@
|
|||||||
use super::*;
|
|
||||||
use crate::iterators::merge_iterator::MergeIterator;
|
|
||||||
|
|
||||||
fn as_bytes(x: &[u8]) -> Bytes {
|
|
||||||
Bytes::copy_from_slice(x)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) {
|
|
||||||
let mut iter = iter;
|
|
||||||
for (k, v) in expected {
|
|
||||||
assert!(iter.is_valid());
|
|
||||||
assert_eq!(
|
|
||||||
k,
|
|
||||||
iter.key(),
|
|
||||||
"expected key: {:?}, actual key: {:?}",
|
|
||||||
k,
|
|
||||||
as_bytes(iter.key()),
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
v,
|
|
||||||
iter.value(),
|
|
||||||
"expected value: {:?}, actual value: {:?}",
|
|
||||||
v,
|
|
||||||
as_bytes(iter.value()),
|
|
||||||
);
|
|
||||||
iter.next().unwrap();
|
|
||||||
}
|
|
||||||
assert!(!iter.is_valid());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_1() {
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
]);
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let i3 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.3")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.3")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.3")),
|
|
||||||
]);
|
|
||||||
|
|
||||||
let iter = MergeIterator::create(vec![
|
|
||||||
Box::new(i1.clone()),
|
|
||||||
Box::new(i2.clone()),
|
|
||||||
Box::new(i3.clone()),
|
|
||||||
]);
|
|
||||||
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
|
|
||||||
let iter = MergeIterator::create(vec![Box::new(i3), Box::new(i1), Box::new(i2)]);
|
|
||||||
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.3")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.3")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.3")),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_2() {
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
]);
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("d"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("e"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("f"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("g"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let i3 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("h"), Bytes::from("1.3")),
|
|
||||||
(Bytes::from("i"), Bytes::from("2.3")),
|
|
||||||
(Bytes::from("j"), Bytes::from("3.3")),
|
|
||||||
(Bytes::from("k"), Bytes::from("4.3")),
|
|
||||||
]);
|
|
||||||
let i4 = MockIterator::new(vec![]);
|
|
||||||
let result = vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
(Bytes::from("d"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("e"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("f"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("g"), Bytes::from("4.2")),
|
|
||||||
(Bytes::from("h"), Bytes::from("1.3")),
|
|
||||||
(Bytes::from("i"), Bytes::from("2.3")),
|
|
||||||
(Bytes::from("j"), Bytes::from("3.3")),
|
|
||||||
(Bytes::from("k"), Bytes::from("4.3")),
|
|
||||||
];
|
|
||||||
|
|
||||||
let iter = MergeIterator::create(vec![
|
|
||||||
Box::new(i1.clone()),
|
|
||||||
Box::new(i2.clone()),
|
|
||||||
Box::new(i3.clone()),
|
|
||||||
Box::new(i4.clone()),
|
|
||||||
]);
|
|
||||||
check_iter_result(iter, result.clone());
|
|
||||||
|
|
||||||
let iter = MergeIterator::create(vec![
|
|
||||||
Box::new(i2.clone()),
|
|
||||||
Box::new(i4.clone()),
|
|
||||||
Box::new(i3.clone()),
|
|
||||||
Box::new(i1.clone()),
|
|
||||||
]);
|
|
||||||
check_iter_result(iter, result.clone());
|
|
||||||
|
|
||||||
let iter = MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]);
|
|
||||||
check_iter_result(iter, result);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_empty() {
|
|
||||||
let iter = MergeIterator::<MockIterator>::create(vec![]);
|
|
||||||
check_iter_result(iter, vec![]);
|
|
||||||
}
|
|
||||||
@@ -1,129 +0,0 @@
|
|||||||
use super::*;
|
|
||||||
use crate::iterators::two_merge_iterator::TwoMergeIterator;
|
|
||||||
|
|
||||||
fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) {
|
|
||||||
let mut iter = iter;
|
|
||||||
for (k, v) in expected {
|
|
||||||
assert!(iter.is_valid());
|
|
||||||
assert_eq!(iter.key(), k.as_ref());
|
|
||||||
assert_eq!(iter.value(), v.as_ref());
|
|
||||||
iter.next().unwrap();
|
|
||||||
}
|
|
||||||
assert!(!iter.is_valid());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_1() {
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
]);
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_2() {
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
]);
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_3() {
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
]);
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_4() {
|
|
||||||
let i2 = MockIterator::new(vec![]);
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
let i1 = MockIterator::new(vec![]);
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_5() {
|
|
||||||
let i2 = MockIterator::new(vec![]);
|
|
||||||
let i1 = MockIterator::new(vec![]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(iter, vec![])
|
|
||||||
}
|
|
||||||
@@ -1,8 +1,8 @@
|
|||||||
use crate::lsm_storage::MiniLsm;
|
use crate::lsm_storage::{LsmStorageInner, MiniLsm};
|
||||||
|
|
||||||
impl MiniLsm {
|
impl LsmStorageInner {
|
||||||
pub fn dump_structure(&self) {
|
pub fn dump_structure(&self) {
|
||||||
let snapshot = self.inner.state.read();
|
let snapshot = self.state.read();
|
||||||
if !snapshot.l0_sstables.is_empty() {
|
if !snapshot.l0_sstables.is_empty() {
|
||||||
println!(
|
println!(
|
||||||
"L0 ({}): {:?}",
|
"L0 ({}): {:?}",
|
||||||
@@ -15,3 +15,9 @@ impl MiniLsm {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl MiniLsm {
|
||||||
|
pub fn dump_structure(&self) {
|
||||||
|
self.inner.dump_structure()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,8 +3,6 @@
|
|||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
|
||||||
use crate::key::KeySlice;
|
|
||||||
|
|
||||||
use super::StorageIterator;
|
use super::StorageIterator;
|
||||||
|
|
||||||
/// Merges two iterators of different types into one. If the two iterators have the same key, only
|
/// Merges two iterators of different types into one. If the two iterators have the same key, only
|
||||||
@@ -16,8 +14,8 @@ pub struct TwoMergeIterator<A: StorageIterator, B: StorageIterator> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<
|
||||||
A: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
|
A: 'static + StorageIterator,
|
||||||
B: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
|
B: 'static + for<'a> StorageIterator<KeyType<'a> = A::KeyType<'a>>,
|
||||||
> TwoMergeIterator<A, B>
|
> TwoMergeIterator<A, B>
|
||||||
{
|
{
|
||||||
pub fn create(a: A, b: B) -> Result<Self> {
|
pub fn create(a: A, b: B) -> Result<Self> {
|
||||||
@@ -26,13 +24,13 @@ impl<
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<
|
||||||
A: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
|
A: 'static + StorageIterator,
|
||||||
B: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
|
B: 'static + for<'a> StorageIterator<KeyType<'a> = A::KeyType<'a>>,
|
||||||
> StorageIterator for TwoMergeIterator<A, B>
|
> StorageIterator for TwoMergeIterator<A, B>
|
||||||
{
|
{
|
||||||
type KeyType<'a> = KeySlice<'a>;
|
type KeyType<'a> = A::KeyType<'a>;
|
||||||
|
|
||||||
fn key(&self) -> KeySlice {
|
fn key(&self) -> Self::KeyType<'_> {
|
||||||
unimplemented!()
|
unimplemented!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ pub mod lsm_iterator;
|
|||||||
pub mod lsm_storage;
|
pub mod lsm_storage;
|
||||||
pub mod manifest;
|
pub mod manifest;
|
||||||
pub mod mem_table;
|
pub mod mem_table;
|
||||||
|
pub mod mvcc;
|
||||||
pub mod table;
|
pub mod table;
|
||||||
pub mod wal;
|
pub mod wal;
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ use crate::compact::{
|
|||||||
use crate::lsm_iterator::{FusedIterator, LsmIterator};
|
use crate::lsm_iterator::{FusedIterator, LsmIterator};
|
||||||
use crate::manifest::Manifest;
|
use crate::manifest::Manifest;
|
||||||
use crate::mem_table::MemTable;
|
use crate::mem_table::MemTable;
|
||||||
|
use crate::mvcc::LsmMvccInner;
|
||||||
use crate::table::SsTable;
|
use crate::table::SsTable;
|
||||||
|
|
||||||
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
|
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
|
||||||
@@ -122,6 +123,7 @@ pub(crate) struct LsmStorageInner {
|
|||||||
pub(crate) options: Arc<LsmStorageOptions>,
|
pub(crate) options: Arc<LsmStorageOptions>,
|
||||||
pub(crate) compaction_controller: CompactionController,
|
pub(crate) compaction_controller: CompactionController,
|
||||||
pub(crate) manifest: Option<Manifest>,
|
pub(crate) manifest: Option<Manifest>,
|
||||||
|
pub(crate) mvcc: Option<LsmMvccInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
|
/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
|
||||||
@@ -249,6 +251,7 @@ impl LsmStorageInner {
|
|||||||
compaction_controller,
|
compaction_controller,
|
||||||
manifest: None,
|
manifest: None,
|
||||||
options: options.into(),
|
options: options.into(),
|
||||||
|
mvcc: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(storage)
|
Ok(storage)
|
||||||
|
|||||||
58
mini-lsm-starter/src/mvcc.rs
Normal file
58
mini-lsm-starter/src/mvcc.rs
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
|
||||||
|
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
|
||||||
|
|
||||||
|
pub mod txn;
|
||||||
|
mod watermark;
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
collections::{BTreeMap, HashSet},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
|
||||||
|
use crate::lsm_storage::LsmStorageInner;
|
||||||
|
|
||||||
|
use self::{txn::Transaction, watermark::Watermark};
|
||||||
|
|
||||||
|
pub(crate) struct CommittedTxnData {
|
||||||
|
pub(crate) key_hashes: HashSet<u32>,
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub(crate) read_ts: u64,
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub(crate) commit_ts: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct LsmMvccInner {
|
||||||
|
pub(crate) write_lock: Mutex<()>,
|
||||||
|
pub(crate) ts: Arc<Mutex<(u64, Watermark)>>,
|
||||||
|
pub(crate) committed_txns: Arc<Mutex<BTreeMap<u64, CommittedTxnData>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LsmMvccInner {
|
||||||
|
pub fn new(initial_ts: u64) -> Self {
|
||||||
|
Self {
|
||||||
|
write_lock: Mutex::new(()),
|
||||||
|
ts: Arc::new(Mutex::new((initial_ts, Watermark::new()))),
|
||||||
|
committed_txns: Arc::new(Mutex::new(BTreeMap::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn latest_commit_ts(&self) -> u64 {
|
||||||
|
self.ts.lock().0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_commit_ts(&self, ts: u64) {
|
||||||
|
self.ts.lock().0 = ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// All ts (strictly) below this ts can be garbage collected.
|
||||||
|
pub fn watermark(&self) -> u64 {
|
||||||
|
let ts = self.ts.lock();
|
||||||
|
ts.1.watermark().unwrap_or(ts.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_txn(&self, inner: Arc<LsmStorageInner>, serializable: bool) -> Arc<Transaction> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
128
mini-lsm-starter/src/mvcc/txn.rs
Normal file
128
mini-lsm-starter/src/mvcc/txn.rs
Normal file
@@ -0,0 +1,128 @@
|
|||||||
|
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
|
||||||
|
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
|
ops::Bound,
|
||||||
|
sync::{atomic::AtomicBool, Arc},
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use crossbeam_skiplist::SkipMap;
|
||||||
|
use ouroboros::self_referencing;
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
iterators::{two_merge_iterator::TwoMergeIterator, StorageIterator},
|
||||||
|
lsm_iterator::{FusedIterator, LsmIterator},
|
||||||
|
lsm_storage::LsmStorageInner,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct Transaction {
|
||||||
|
pub(crate) read_ts: u64,
|
||||||
|
pub(crate) inner: Arc<LsmStorageInner>,
|
||||||
|
pub(crate) local_storage: Arc<SkipMap<Bytes, Bytes>>,
|
||||||
|
pub(crate) committed: Arc<AtomicBool>,
|
||||||
|
/// Write set and read set
|
||||||
|
pub(crate) key_hashes: Option<Mutex<(HashSet<u32>, HashSet<u32>)>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Transaction {
|
||||||
|
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn scan(self: &Arc<Self>, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result<TxnIterator> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn put(&self, key: &[u8], value: &[u8]) {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delete(&self, key: &[u8]) {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn commit(&self) -> Result<()> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Transaction {
|
||||||
|
fn drop(&mut self) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
type SkipMapRangeIter<'a> =
|
||||||
|
crossbeam_skiplist::map::Range<'a, Bytes, (Bound<Bytes>, Bound<Bytes>), Bytes, Bytes>;
|
||||||
|
|
||||||
|
#[self_referencing]
|
||||||
|
pub struct TxnLocalIterator {
|
||||||
|
/// Stores a reference to the skipmap.
|
||||||
|
map: Arc<SkipMap<Bytes, Bytes>>,
|
||||||
|
/// Stores a skipmap iterator that refers to the lifetime of `MemTableIterator` itself.
|
||||||
|
#[borrows(map)]
|
||||||
|
#[not_covariant]
|
||||||
|
iter: SkipMapRangeIter<'this>,
|
||||||
|
/// Stores the current key-value pair.
|
||||||
|
item: (Bytes, Bytes),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StorageIterator for TxnLocalIterator {
|
||||||
|
type KeyType<'a> = &'a [u8];
|
||||||
|
|
||||||
|
fn value(&self) -> &[u8] {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn key(&self) -> &[u8] {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_valid(&self) -> bool {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn next(&mut self) -> Result<()> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TxnIterator {
|
||||||
|
_txn: Arc<Transaction>,
|
||||||
|
iter: TwoMergeIterator<TxnLocalIterator, FusedIterator<LsmIterator>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxnIterator {
|
||||||
|
pub fn create(
|
||||||
|
txn: Arc<Transaction>,
|
||||||
|
iter: TwoMergeIterator<TxnLocalIterator, FusedIterator<LsmIterator>>,
|
||||||
|
) -> Result<Self> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StorageIterator for TxnIterator {
|
||||||
|
type KeyType<'a> = &'a [u8] where Self: 'a;
|
||||||
|
|
||||||
|
fn value(&self) -> &[u8] {
|
||||||
|
self.iter.value()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn key(&self) -> Self::KeyType<'_> {
|
||||||
|
self.iter.key()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_valid(&self) -> bool {
|
||||||
|
self.iter.is_valid()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn next(&mut self) -> Result<()> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn num_active_iterators(&self) -> usize {
|
||||||
|
self.iter.num_active_iterators()
|
||||||
|
}
|
||||||
|
}
|
||||||
24
mini-lsm-starter/src/mvcc/watermark.rs
Normal file
24
mini-lsm-starter/src/mvcc/watermark.rs
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
|
||||||
|
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
|
||||||
|
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
pub struct Watermark {
|
||||||
|
readers: BTreeMap<u64, usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Watermark {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
readers: BTreeMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_reader(&mut self, ts: u64) {}
|
||||||
|
|
||||||
|
pub fn remove_reader(&mut self, ts: u64) {}
|
||||||
|
|
||||||
|
pub fn watermark(&self) -> Option<u64> {
|
||||||
|
Some(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,23 +0,0 @@
|
|||||||
use crate::lsm_storage::{LsmStorageInner, MiniLsm};
|
|
||||||
|
|
||||||
impl LsmStorageInner {
|
|
||||||
pub fn dump_structure(&self) {
|
|
||||||
let snapshot = self.state.read();
|
|
||||||
if !snapshot.l0_sstables.is_empty() {
|
|
||||||
println!(
|
|
||||||
"L0 ({}): {:?}",
|
|
||||||
snapshot.l0_sstables.len(),
|
|
||||||
snapshot.l0_sstables,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
for (level, files) in &snapshot.levels {
|
|
||||||
println!("L{level} ({}): {:?}", files.len(), files);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MiniLsm {
|
|
||||||
pub fn dump_structure(&self) {
|
|
||||||
self.inner.dump_structure()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
1
mini-lsm/src/debug.rs
Symbolic link
1
mini-lsm/src/debug.rs
Symbolic link
@@ -0,0 +1 @@
|
|||||||
|
../../mini-lsm-starter/src/debug.rs
|
||||||
@@ -1,137 +0,0 @@
|
|||||||
use super::*;
|
|
||||||
use crate::iterators::merge_iterator::MergeIterator;
|
|
||||||
|
|
||||||
fn as_bytes(x: &[u8]) -> Bytes {
|
|
||||||
Bytes::copy_from_slice(x)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) {
|
|
||||||
let mut iter = iter;
|
|
||||||
for (k, v) in expected {
|
|
||||||
assert!(iter.is_valid());
|
|
||||||
assert_eq!(
|
|
||||||
k,
|
|
||||||
iter.key(),
|
|
||||||
"expected key: {:?}, actual key: {:?}",
|
|
||||||
k,
|
|
||||||
as_bytes(iter.key()),
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
v,
|
|
||||||
iter.value(),
|
|
||||||
"expected value: {:?}, actual value: {:?}",
|
|
||||||
v,
|
|
||||||
as_bytes(iter.value()),
|
|
||||||
);
|
|
||||||
iter.next().unwrap();
|
|
||||||
}
|
|
||||||
assert!(!iter.is_valid());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_1() {
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
]);
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let i3 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.3")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.3")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.3")),
|
|
||||||
]);
|
|
||||||
|
|
||||||
let iter = MergeIterator::create(vec![
|
|
||||||
Box::new(i1.clone()),
|
|
||||||
Box::new(i2.clone()),
|
|
||||||
Box::new(i3.clone()),
|
|
||||||
]);
|
|
||||||
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
|
|
||||||
let iter = MergeIterator::create(vec![Box::new(i3), Box::new(i1), Box::new(i2)]);
|
|
||||||
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.3")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.3")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.3")),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_2() {
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
]);
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("d"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("e"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("f"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("g"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let i3 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("h"), Bytes::from("1.3")),
|
|
||||||
(Bytes::from("i"), Bytes::from("2.3")),
|
|
||||||
(Bytes::from("j"), Bytes::from("3.3")),
|
|
||||||
(Bytes::from("k"), Bytes::from("4.3")),
|
|
||||||
]);
|
|
||||||
let i4 = MockIterator::new(vec![]);
|
|
||||||
let result = vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
(Bytes::from("d"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("e"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("f"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("g"), Bytes::from("4.2")),
|
|
||||||
(Bytes::from("h"), Bytes::from("1.3")),
|
|
||||||
(Bytes::from("i"), Bytes::from("2.3")),
|
|
||||||
(Bytes::from("j"), Bytes::from("3.3")),
|
|
||||||
(Bytes::from("k"), Bytes::from("4.3")),
|
|
||||||
];
|
|
||||||
|
|
||||||
let iter = MergeIterator::create(vec![
|
|
||||||
Box::new(i1.clone()),
|
|
||||||
Box::new(i2.clone()),
|
|
||||||
Box::new(i3.clone()),
|
|
||||||
Box::new(i4.clone()),
|
|
||||||
]);
|
|
||||||
check_iter_result(iter, result.clone());
|
|
||||||
|
|
||||||
let iter = MergeIterator::create(vec![
|
|
||||||
Box::new(i2.clone()),
|
|
||||||
Box::new(i4.clone()),
|
|
||||||
Box::new(i3.clone()),
|
|
||||||
Box::new(i1.clone()),
|
|
||||||
]);
|
|
||||||
check_iter_result(iter, result.clone());
|
|
||||||
|
|
||||||
let iter = MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]);
|
|
||||||
check_iter_result(iter, result);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_empty() {
|
|
||||||
let iter = MergeIterator::<MockIterator>::create(vec![]);
|
|
||||||
check_iter_result(iter, vec![]);
|
|
||||||
}
|
|
||||||
@@ -1,129 +0,0 @@
|
|||||||
use super::*;
|
|
||||||
use crate::iterators::two_merge_iterator::TwoMergeIterator;
|
|
||||||
|
|
||||||
fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) {
|
|
||||||
let mut iter = iter;
|
|
||||||
for (k, v) in expected {
|
|
||||||
assert!(iter.is_valid());
|
|
||||||
assert_eq!(iter.key(), k.as_ref());
|
|
||||||
assert_eq!(iter.value(), v.as_ref());
|
|
||||||
iter.next().unwrap();
|
|
||||||
}
|
|
||||||
assert!(!iter.is_valid());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_1() {
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
]);
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_2() {
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
]);
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.2")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_3() {
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.1")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.1")),
|
|
||||||
]);
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("a"), Bytes::from("1.1")),
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_4() {
|
|
||||||
let i2 = MockIterator::new(vec![]);
|
|
||||||
let i1 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
let i1 = MockIterator::new(vec![]);
|
|
||||||
let i2 = MockIterator::new(vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(
|
|
||||||
iter,
|
|
||||||
vec![
|
|
||||||
(Bytes::from("b"), Bytes::from("2.2")),
|
|
||||||
(Bytes::from("c"), Bytes::from("3.2")),
|
|
||||||
(Bytes::from("d"), Bytes::from("4.2")),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_5() {
|
|
||||||
let i2 = MockIterator::new(vec![]);
|
|
||||||
let i1 = MockIterator::new(vec![]);
|
|
||||||
let iter = TwoMergeIterator::create(i1, i2).unwrap();
|
|
||||||
check_iter_result(iter, vec![])
|
|
||||||
}
|
|
||||||
@@ -1,7 +1,5 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
|
||||||
use crate::key::KeySlice;
|
|
||||||
|
|
||||||
use super::StorageIterator;
|
use super::StorageIterator;
|
||||||
|
|
||||||
/// Merges two iterators of different types into one. If the two iterators have the same key, only
|
/// Merges two iterators of different types into one. If the two iterators have the same key, only
|
||||||
@@ -13,8 +11,8 @@ pub struct TwoMergeIterator<A: StorageIterator, B: StorageIterator> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<
|
||||||
A: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
|
A: 'static + StorageIterator,
|
||||||
B: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
|
B: 'static + for<'a> StorageIterator<KeyType<'a> = A::KeyType<'a>>,
|
||||||
> TwoMergeIterator<A, B>
|
> TwoMergeIterator<A, B>
|
||||||
{
|
{
|
||||||
fn choose_a(a: &A, b: &B) -> bool {
|
fn choose_a(a: &A, b: &B) -> bool {
|
||||||
@@ -47,13 +45,13 @@ impl<
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<
|
||||||
A: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
|
A: 'static + StorageIterator,
|
||||||
B: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
|
B: 'static + for<'a> StorageIterator<KeyType<'a> = A::KeyType<'a>>,
|
||||||
> StorageIterator for TwoMergeIterator<A, B>
|
> StorageIterator for TwoMergeIterator<A, B>
|
||||||
{
|
{
|
||||||
type KeyType<'a> = KeySlice<'a>;
|
type KeyType<'a> = A::KeyType<'a>;
|
||||||
|
|
||||||
fn key(&self) -> KeySlice {
|
fn key(&self) -> Self::KeyType<'_> {
|
||||||
if self.choose_a {
|
if self.choose_a {
|
||||||
self.a.key()
|
self.a.key()
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ pub mod lsm_iterator;
|
|||||||
pub mod lsm_storage;
|
pub mod lsm_storage;
|
||||||
pub mod manifest;
|
pub mod manifest;
|
||||||
pub mod mem_table;
|
pub mod mem_table;
|
||||||
|
pub mod mvcc;
|
||||||
pub mod table;
|
pub mod table;
|
||||||
pub mod wal;
|
pub mod wal;
|
||||||
|
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ use crate::key::KeySlice;
|
|||||||
use crate::lsm_iterator::{FusedIterator, LsmIterator};
|
use crate::lsm_iterator::{FusedIterator, LsmIterator};
|
||||||
use crate::manifest::{Manifest, ManifestRecord};
|
use crate::manifest::{Manifest, ManifestRecord};
|
||||||
use crate::mem_table::{map_bound, MemTable};
|
use crate::mem_table::{map_bound, MemTable};
|
||||||
|
use crate::mvcc::LsmMvccInner;
|
||||||
use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator};
|
use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator};
|
||||||
|
|
||||||
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
|
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
|
||||||
@@ -157,6 +158,8 @@ pub(crate) struct LsmStorageInner {
|
|||||||
pub(crate) options: Arc<LsmStorageOptions>,
|
pub(crate) options: Arc<LsmStorageOptions>,
|
||||||
pub(crate) compaction_controller: CompactionController,
|
pub(crate) compaction_controller: CompactionController,
|
||||||
pub(crate) manifest: Option<Manifest>,
|
pub(crate) manifest: Option<Manifest>,
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub(crate) mvcc: Option<LsmMvccInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
|
/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
|
||||||
@@ -414,6 +417,7 @@ impl LsmStorageInner {
|
|||||||
compaction_controller,
|
compaction_controller,
|
||||||
manifest: Some(manifest),
|
manifest: Some(manifest),
|
||||||
options: options.into(),
|
options: options.into(),
|
||||||
|
mvcc: None,
|
||||||
};
|
};
|
||||||
storage.sync_dir()?;
|
storage.sync_dir()?;
|
||||||
|
|
||||||
|
|||||||
1
mini-lsm/src/mvcc.rs
Symbolic link
1
mini-lsm/src/mvcc.rs
Symbolic link
@@ -0,0 +1 @@
|
|||||||
|
../../mini-lsm-starter/src/mvcc.rs
|
||||||
1
mini-lsm/src/mvcc/txn.rs
Symbolic link
1
mini-lsm/src/mvcc/txn.rs
Symbolic link
@@ -0,0 +1 @@
|
|||||||
|
../../../mini-lsm-starter/src/mvcc/txn.rs
|
||||||
1
mini-lsm/src/mvcc/watermark.rs
Symbolic link
1
mini-lsm/src/mvcc/watermark.rs
Symbolic link
@@ -0,0 +1 @@
|
|||||||
|
../../../mini-lsm-starter/src/mvcc/watermark.rs
|
||||||
Reference in New Issue
Block a user