2024-01-25 12:07:53 +08:00
|
|
|
use std::cmp::{self};
|
|
|
|
|
use std::collections::binary_heap::PeekMut;
|
|
|
|
|
use std::collections::BinaryHeap;
|
|
|
|
|
|
|
|
|
|
use anyhow::Result;
|
|
|
|
|
|
|
|
|
|
use crate::key::KeySlice;
|
|
|
|
|
|
|
|
|
|
use super::StorageIterator;
|
|
|
|
|
|
|
|
|
|
struct HeapWrapper<I: StorageIterator>(pub usize, pub Box<I>);
|
|
|
|
|
|
|
|
|
|
impl<I: StorageIterator> PartialEq for HeapWrapper<I> {
|
|
|
|
|
fn eq(&self, other: &Self) -> bool {
|
|
|
|
|
self.partial_cmp(other).unwrap() == cmp::Ordering::Equal
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<I: StorageIterator> Eq for HeapWrapper<I> {}
|
|
|
|
|
|
|
|
|
|
impl<I: StorageIterator> PartialOrd for HeapWrapper<I> {
|
|
|
|
|
#[allow(clippy::non_canonical_partial_ord_impl)]
|
|
|
|
|
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
|
|
|
|
|
match self.1.key().cmp(&other.1.key()) {
|
|
|
|
|
cmp::Ordering::Greater => Some(cmp::Ordering::Greater),
|
|
|
|
|
cmp::Ordering::Less => Some(cmp::Ordering::Less),
|
|
|
|
|
cmp::Ordering::Equal => self.0.partial_cmp(&other.0),
|
|
|
|
|
}
|
|
|
|
|
.map(|x| x.reverse())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<I: StorageIterator> Ord for HeapWrapper<I> {
|
|
|
|
|
fn cmp(&self, other: &Self) -> cmp::Ordering {
|
|
|
|
|
self.partial_cmp(other).unwrap()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Merge multiple iterators of the same type. If the same key occurs multiple times in some
|
2024-02-06 11:38:07 +08:00
|
|
|
/// iterators, prefer the one with smaller index.
|
2024-01-25 12:07:53 +08:00
|
|
|
pub struct MergeIterator<I: StorageIterator> {
|
|
|
|
|
iters: BinaryHeap<HeapWrapper<I>>,
|
|
|
|
|
current: Option<HeapWrapper<I>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<I: StorageIterator> MergeIterator<I> {
|
|
|
|
|
pub fn create(iters: Vec<Box<I>>) -> Self {
|
|
|
|
|
if iters.is_empty() {
|
|
|
|
|
return Self {
|
|
|
|
|
iters: BinaryHeap::new(),
|
|
|
|
|
current: None,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut heap = BinaryHeap::new();
|
|
|
|
|
|
|
|
|
|
if iters.iter().all(|x| !x.is_valid()) {
|
|
|
|
|
// All invalid, select the last one as the current.
|
|
|
|
|
let mut iters = iters;
|
|
|
|
|
return Self {
|
|
|
|
|
iters: heap,
|
|
|
|
|
current: Some(HeapWrapper(0, iters.pop().unwrap())),
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (idx, iter) in iters.into_iter().enumerate() {
|
|
|
|
|
if iter.is_valid() {
|
|
|
|
|
heap.push(HeapWrapper(idx, iter));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let current = heap.pop().unwrap();
|
|
|
|
|
Self {
|
|
|
|
|
iters: heap,
|
|
|
|
|
current: Some(current),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<I: 'static + for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>> StorageIterator
|
|
|
|
|
for MergeIterator<I>
|
|
|
|
|
{
|
|
|
|
|
type KeyType<'a> = KeySlice<'a>;
|
|
|
|
|
|
|
|
|
|
fn key(&self) -> KeySlice {
|
|
|
|
|
self.current.as_ref().unwrap().1.key()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn value(&self) -> &[u8] {
|
|
|
|
|
self.current.as_ref().unwrap().1.value()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn is_valid(&self) -> bool {
|
|
|
|
|
self.current
|
|
|
|
|
.as_ref()
|
|
|
|
|
.map(|x| x.1.is_valid())
|
|
|
|
|
.unwrap_or(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Result<()> {
|
|
|
|
|
let current = self.current.as_mut().unwrap();
|
|
|
|
|
// Pop the item out of the heap if they have the same value.
|
|
|
|
|
while let Some(mut inner_iter) = self.iters.peek_mut() {
|
|
|
|
|
debug_assert!(
|
|
|
|
|
inner_iter.1.key() >= current.1.key(),
|
|
|
|
|
"heap invariant violated"
|
|
|
|
|
);
|
|
|
|
|
if inner_iter.1.key() == current.1.key() {
|
|
|
|
|
// Case 1: an error occurred when calling `next`.
|
|
|
|
|
if let e @ Err(_) = inner_iter.1.next() {
|
|
|
|
|
PeekMut::pop(inner_iter);
|
|
|
|
|
return e;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Case 2: iter is no longer valid.
|
|
|
|
|
if !inner_iter.1.is_valid() {
|
|
|
|
|
PeekMut::pop(inner_iter);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
current.1.next()?;
|
|
|
|
|
|
|
|
|
|
// If the current iterator is invalid, pop it out of the heap and select the next one.
|
|
|
|
|
if !current.1.is_valid() {
|
|
|
|
|
if let Some(iter) = self.iters.pop() {
|
|
|
|
|
*current = iter;
|
|
|
|
|
}
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Otherwise, compare with heap top and swap if necessary.
|
|
|
|
|
if let Some(mut inner_iter) = self.iters.peek_mut() {
|
|
|
|
|
if *current < *inner_iter {
|
|
|
|
|
std::mem::swap(&mut *inner_iter, current);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn num_active_iterators(&self) -> usize {
|
|
|
|
|
self.iters
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|x| x.1.num_active_iterators())
|
|
|
|
|
.sum::<usize>()
|
|
|
|
|
+ self
|
|
|
|
|
.current
|
|
|
|
|
.as_ref()
|
|
|
|
|
.map(|x| x.1.num_active_iterators())
|
|
|
|
|
.unwrap_or(0)
|
|
|
|
|
}
|
|
|
|
|
}
|