feat(code): part 3 iterators

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2022-12-24 10:11:06 -05:00
parent b263ea4fac
commit 4eb2177a3e
39 changed files with 1304 additions and 73 deletions

View File

@@ -0,0 +1,15 @@
use anyhow::Result;
pub trait StorageIterator {
/// Get the current value.
fn value(&self) -> &[u8];
/// Get the current key.
fn key(&self) -> &[u8];
/// Check if the current iterator is valid.
fn is_valid(&self) -> bool;
/// Move to the next position.
fn next(&mut self) -> Result<()>;
}

View File

@@ -0,0 +1,127 @@
use std::cmp::{self};
use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use anyhow::Result;
use super::impls::StorageIterator;
struct HeapWrapper<I: StorageIterator>(pub usize, pub Box<I>);
impl<I: StorageIterator> PartialEq for HeapWrapper<I> {
fn eq(&self, other: &Self) -> bool {
self.partial_cmp(other).unwrap() == cmp::Ordering::Equal
}
}
impl<I: StorageIterator> Eq for HeapWrapper<I> {}
impl<I: StorageIterator> PartialOrd for HeapWrapper<I> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
match self.1.key().cmp(other.1.key()) {
cmp::Ordering::Greater => Some(cmp::Ordering::Greater),
cmp::Ordering::Less => Some(cmp::Ordering::Less),
cmp::Ordering::Equal => self.0.partial_cmp(&other.0),
}
.map(|x| x.reverse())
}
}
impl<I: StorageIterator> Ord for HeapWrapper<I> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.partial_cmp(other).unwrap()
}
}
/// Merge multiple iterators of the same type. If the same key occurs multiple times in some
/// iterators, perfer the one with smaller index.
pub struct MergeIterator<I: StorageIterator> {
iters: BinaryHeap<HeapWrapper<I>>,
current: HeapWrapper<I>,
}
impl<I: StorageIterator> MergeIterator<I> {
pub fn create(iters: Vec<Box<I>>) -> Self {
assert!(!iters.is_empty());
let mut heap = BinaryHeap::new();
if iters.iter().all(|x| !x.is_valid()) {
// All invalid, select the last one as the current.
let mut iters = iters;
return Self {
iters: heap,
current: HeapWrapper(0, iters.pop().unwrap()),
};
}
for (idx, iter) in iters.into_iter().enumerate() {
if iter.is_valid() {
heap.push(HeapWrapper(idx, iter));
}
}
let current = heap.pop().unwrap();
Self {
iters: heap,
current,
}
}
}
impl<I: StorageIterator> StorageIterator for MergeIterator<I> {
fn key(&self) -> &[u8] {
self.current.1.key()
}
fn value(&self) -> &[u8] {
self.current.1.value()
}
fn is_valid(&self) -> bool {
self.current.1.is_valid()
}
fn next(&mut self) -> Result<()> {
// Pop the item out of the heap if they have the same value.
while let Some(mut inner_iter) = self.iters.peek_mut() {
debug_assert!(
inner_iter.1.key() >= self.current.1.key(),
"heap invariant violated"
);
if inner_iter.1.key() == self.current.1.key() {
// Case 1: an error occurred when calling `next`.
if let e @ Err(_) = inner_iter.1.next() {
PeekMut::pop(inner_iter);
return e;
}
// Case 2: iter is no longer valid.
if !inner_iter.1.is_valid() {
PeekMut::pop(inner_iter);
}
} else {
break;
}
}
self.current.1.next()?;
// If the current iterator is invalid, pop it out of the heap and select the next one.
if !self.current.1.is_valid() {
if let Some(iter) = self.iters.pop() {
self.current = iter;
}
return Ok(());
}
// Otherwise, compare with heap top and swap if necessary.
if let Some(mut inner_iter) = self.iters.peek_mut() {
if self.current < *inner_iter {
std::mem::swap(&mut *inner_iter, &mut self.current);
}
}
Ok(())
}
}

View File

@@ -0,0 +1,40 @@
use anyhow::Result;
use bytes::Bytes;
use super::impls::StorageIterator;
pub mod merge_iterator_test;
pub mod two_merge_iterator_test;
#[derive(Clone)]
pub struct MockIterator {
pub data: Vec<(Bytes, Bytes)>,
pub index: usize,
}
impl MockIterator {
pub fn new(data: Vec<(Bytes, Bytes)>) -> Self {
Self { data, index: 0 }
}
}
impl StorageIterator for MockIterator {
fn next(&mut self) -> Result<()> {
if self.index < self.data.len() {
self.index += 1;
}
Ok(())
}
fn key(&self) -> &[u8] {
self.data[self.index].0.as_ref()
}
fn value(&self) -> &[u8] {
self.data[self.index].1.as_ref()
}
fn is_valid(&self) -> bool {
self.index < self.data.len()
}
}

View File

@@ -0,0 +1,131 @@
use super::*;
use crate::iterators::merge_iterator::MergeIterator;
fn as_bytes(x: &[u8]) -> Bytes {
Bytes::copy_from_slice(x)
}
fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) {
let mut iter = iter;
for (k, v) in expected {
assert!(iter.is_valid());
assert_eq!(
k,
iter.key(),
"expected key: {:?}, actual key: {:?}",
k,
as_bytes(iter.key()),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
iter.next().unwrap();
}
assert!(!iter.is_valid());
}
#[test]
fn test_merge_1() {
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let i3 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.3")),
(Bytes::from("c"), Bytes::from("3.3")),
(Bytes::from("d"), Bytes::from("4.3")),
]);
let iter = MergeIterator::create(vec![
Box::new(i1.clone()),
Box::new(i2.clone()),
Box::new(i3.clone()),
]);
check_iter_result(
iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("d"), Bytes::from("4.2")),
],
);
let iter = MergeIterator::create(vec![Box::new(i3), Box::new(i1), Box::new(i2)]);
check_iter_result(
iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.3")),
(Bytes::from("c"), Bytes::from("3.3")),
(Bytes::from("d"), Bytes::from("4.3")),
],
);
}
#[test]
fn test_merge_2() {
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new(vec![
(Bytes::from("d"), Bytes::from("1.2")),
(Bytes::from("e"), Bytes::from("2.2")),
(Bytes::from("f"), Bytes::from("3.2")),
(Bytes::from("g"), Bytes::from("4.2")),
]);
let i3 = MockIterator::new(vec![
(Bytes::from("h"), Bytes::from("1.3")),
(Bytes::from("i"), Bytes::from("2.3")),
(Bytes::from("j"), Bytes::from("3.3")),
(Bytes::from("k"), Bytes::from("4.3")),
]);
let i4 = MockIterator::new(vec![]);
let result = vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("d"), Bytes::from("1.2")),
(Bytes::from("e"), Bytes::from("2.2")),
(Bytes::from("f"), Bytes::from("3.2")),
(Bytes::from("g"), Bytes::from("4.2")),
(Bytes::from("h"), Bytes::from("1.3")),
(Bytes::from("i"), Bytes::from("2.3")),
(Bytes::from("j"), Bytes::from("3.3")),
(Bytes::from("k"), Bytes::from("4.3")),
];
let iter = MergeIterator::create(vec![
Box::new(i1.clone()),
Box::new(i2.clone()),
Box::new(i3.clone()),
Box::new(i4.clone()),
]);
check_iter_result(iter, result.clone());
let iter = MergeIterator::create(vec![
Box::new(i2.clone()),
Box::new(i4.clone()),
Box::new(i3.clone()),
Box::new(i1.clone()),
]);
check_iter_result(iter, result.clone());
let iter = MergeIterator::create(vec![Box::new(i4), Box::new(i3), Box::new(i2), Box::new(i1)]);
check_iter_result(iter, result);
}

View File

@@ -0,0 +1,129 @@
use super::*;
use crate::iterators::two_merge_iterator::TwoMergeIterator;
fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) {
let mut iter = iter;
for (k, v) in expected {
assert!(iter.is_valid());
assert_eq!(iter.key(), k.as_ref());
assert_eq!(iter.value(), v.as_ref());
iter.next().unwrap();
}
assert!(!iter.is_valid());
}
#[test]
fn test_merge_1() {
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
(Bytes::from("d"), Bytes::from("4.2")),
],
)
}
#[test]
fn test_merge_2() {
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i1 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
iter,
vec![
(Bytes::from("a"), Bytes::from("1.2")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
)
}
#[test]
fn test_merge_3() {
let i2 = MockIterator::new(vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.1")),
(Bytes::from("c"), Bytes::from("3.1")),
]);
let i1 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
iter,
vec![
(Bytes::from("a"), Bytes::from("1.1")),
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
)
}
#[test]
fn test_merge_4() {
let i2 = MockIterator::new(vec![]);
let i1 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
iter,
vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
);
let i1 = MockIterator::new(vec![]);
let i2 = MockIterator::new(vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(
iter,
vec![
(Bytes::from("b"), Bytes::from("2.2")),
(Bytes::from("c"), Bytes::from("3.2")),
(Bytes::from("d"), Bytes::from("4.2")),
],
);
}
#[test]
fn test_merge_5() {
let i2 = MockIterator::new(vec![]);
let i1 = MockIterator::new(vec![]);
let iter = TwoMergeIterator::create(i1, i2).unwrap();
check_iter_result(iter, vec![])
}

View File

@@ -0,0 +1,80 @@
use anyhow::Result;
use super::impls::StorageIterator;
/// Merges two iterators of different types into one. If the two iterators have the same key, only
/// produce the key once and prefer the entry from A.
pub struct TwoMergeIterator<A: StorageIterator, B: StorageIterator> {
a: A,
b: B,
choose_a: bool,
}
impl<A: StorageIterator, B: StorageIterator> TwoMergeIterator<A, B> {
fn choose_a(a: &A, b: &B) -> bool {
if !a.is_valid() {
return false;
}
if !b.is_valid() {
return true;
}
a.key() < b.key()
}
fn skip_b(&mut self) -> Result<()> {
if self.a.is_valid() {
while self.b.is_valid() && self.b.key() == self.a.key() {
self.b.next()?;
}
}
Ok(())
}
pub fn create(a: A, b: B) -> Result<Self> {
let mut iter = Self {
choose_a: false,
a,
b,
};
iter.skip_b()?;
iter.choose_a = Self::choose_a(&iter.a, &iter.b);
Ok(iter)
}
}
impl<A: StorageIterator, B: StorageIterator> StorageIterator for TwoMergeIterator<A, B> {
fn key(&self) -> &[u8] {
if self.choose_a {
self.a.key()
} else {
self.b.key()
}
}
fn value(&self) -> &[u8] {
if self.choose_a {
self.a.value()
} else {
self.b.value()
}
}
fn is_valid(&self) -> bool {
if self.choose_a {
self.a.is_valid()
} else {
self.b.is_valid()
}
}
fn next(&mut self) -> Result<()> {
if self.choose_a {
self.a.next()?;
} else {
self.b.next()?;
}
self.skip_b()?;
self.choose_a = Self::choose_a(&self.a, &self.b);
Ok(())
}
}