Files
mini_lsm/mini-lsm/src/iterators/merge_iterator.rs

140 lines
4.0 KiB
Rust
Raw Normal View History

use std::cmp::{self};
use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use anyhow::Result;
use super::StorageIterator;
struct HeapWrapper<I: StorageIterator>(pub usize, pub Box<I>);
impl<I: StorageIterator> PartialEq for HeapWrapper<I> {
fn eq(&self, other: &Self) -> bool {
self.partial_cmp(other).unwrap() == cmp::Ordering::Equal
}
}
impl<I: StorageIterator> Eq for HeapWrapper<I> {}
impl<I: StorageIterator> PartialOrd for HeapWrapper<I> {
#[allow(clippy::non_canonical_partial_ord_impl)]
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
match self.1.key().cmp(other.1.key()) {
cmp::Ordering::Greater => Some(cmp::Ordering::Greater),
cmp::Ordering::Less => Some(cmp::Ordering::Less),
cmp::Ordering::Equal => self.0.partial_cmp(&other.0),
}
.map(|x| x.reverse())
}
}
impl<I: StorageIterator> Ord for HeapWrapper<I> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.partial_cmp(other).unwrap()
}
}
/// Merge multiple iterators of the same type. If the same key occurs multiple times in some
/// iterators, perfer the one with smaller index.
pub struct MergeIterator<I: StorageIterator> {
iters: BinaryHeap<HeapWrapper<I>>,
current: Option<HeapWrapper<I>>,
}
impl<I: StorageIterator> MergeIterator<I> {
pub fn create(iters: Vec<Box<I>>) -> Self {
if iters.is_empty() {
return Self {
iters: BinaryHeap::new(),
current: None,
};
}
let mut heap = BinaryHeap::new();
if iters.iter().all(|x| !x.is_valid()) {
// All invalid, select the last one as the current.
let mut iters = iters;
return Self {
iters: heap,
current: Some(HeapWrapper(0, iters.pop().unwrap())),
};
}
for (idx, iter) in iters.into_iter().enumerate() {
if iter.is_valid() {
heap.push(HeapWrapper(idx, iter));
}
}
let current = heap.pop().unwrap();
Self {
iters: heap,
current: Some(current),
}
}
}
impl<I: StorageIterator> StorageIterator for MergeIterator<I> {
fn key(&self) -> &[u8] {
unsafe { self.current.as_ref().unwrap_unchecked() }.1.key()
}
fn value(&self) -> &[u8] {
unsafe { self.current.as_ref().unwrap_unchecked() }
.1
.value()
}
fn is_valid(&self) -> bool {
self.current
.as_ref()
.map(|x| x.1.is_valid())
.unwrap_or(false)
}
fn next(&mut self) -> Result<()> {
let current = unsafe { self.current.as_mut().unwrap_unchecked() };
// Pop the item out of the heap if they have the same value.
while let Some(mut inner_iter) = self.iters.peek_mut() {
debug_assert!(
inner_iter.1.key() >= current.1.key(),
"heap invariant violated"
);
if inner_iter.1.key() == current.1.key() {
// Case 1: an error occurred when calling `next`.
if let e @ Err(_) = inner_iter.1.next() {
PeekMut::pop(inner_iter);
return e;
}
// Case 2: iter is no longer valid.
if !inner_iter.1.is_valid() {
PeekMut::pop(inner_iter);
}
} else {
break;
}
}
current.1.next()?;
// If the current iterator is invalid, pop it out of the heap and select the next one.
if !current.1.is_valid() {
if let Some(iter) = self.iters.pop() {
*current = iter;
}
return Ok(());
}
// Otherwise, compare with heap top and swap if necessary.
if let Some(mut inner_iter) = self.iters.peek_mut() {
if *current < *inner_iter {
std::mem::swap(&mut *inner_iter, current);
}
}
Ok(())
}
}