implement 2.7

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2024-01-25 21:53:47 +08:00
parent 8dbaf54e38
commit 89acc23208
16 changed files with 237 additions and 81 deletions

View File

@@ -24,6 +24,7 @@ crossbeam-channel = "0.5.11"
serde_json = { version = "1.0" }
serde = { version = "1.0", features = ["derive"] }
farmhash = "1"
crc32fast = "1.3.2"
[dev-dependencies]
tempfile = "3"

View File

@@ -42,6 +42,11 @@ pub struct LsmStorageState {
pub sstables: HashMap<usize, Arc<SsTable>>,
}
pub enum WriteBatchRecord<T: AsRef<[u8]>> {
Put(T, T),
Del(T),
}
impl LsmStorageState {
fn create(options: &LsmStorageOptions) -> Self {
let levels = match &options.compaction_options {
@@ -234,6 +239,10 @@ impl MiniLsm {
self.inner.get(key)
}
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
self.inner.write_batch(batch)
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.inner.put(key, value)
}
@@ -484,37 +493,46 @@ impl LsmStorageInner {
Ok(None)
}
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
for record in batch {
match record {
WriteBatchRecord::Del(key) => {
let key = key.as_ref();
assert!(!key.is_empty(), "key cannot be empty");
let size;
{
let guard = self.state.read();
guard.memtable.put(key, b"")?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
}
WriteBatchRecord::Put(key, value) => {
let key = key.as_ref();
let value = value.as_ref();
assert!(!key.is_empty(), "key cannot be empty");
assert!(!value.is_empty(), "value cannot be empty");
let size;
{
let guard = self.state.read();
guard.memtable.put(key, value)?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
}
}
}
Ok(())
}
/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
assert!(!value.is_empty(), "value cannot be empty");
assert!(!key.is_empty(), "key cannot be empty");
let size;
{
let guard = self.state.read();
guard.memtable.put(key, value)?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
Ok(())
self.write_batch(&[WriteBatchRecord::Put(key, value)])
}
/// Remove a key from the storage by writing an empty value.
pub fn delete(&self, key: &[u8]) -> Result<()> {
assert!(!key.is_empty(), "key cannot be empty");
let size;
{
let guard = self.state.read();
guard.memtable.put(key, b"")?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
Ok(())
self.write_batch(&[WriteBatchRecord::Del(key)])
}
fn try_freeze(&self, estimated_size: usize) -> Result<()> {

View File

@@ -3,10 +3,10 @@ use std::io::{Read, Write};
use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use bytes::{Buf, BufMut};
use parking_lot::{Mutex, MutexGuard};
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;
use crate::compact::CompactionTask;
@@ -43,10 +43,18 @@ impl Manifest {
.context("failed to recover manifest")?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let stream = Deserializer::from_slice(&buf).into_iter::<ManifestRecord>();
let mut buf_ptr = buf.as_slice();
let mut records = Vec::new();
for x in stream {
records.push(x?);
while buf_ptr.has_remaining() {
let len = buf_ptr.get_u64();
let slice = &buf_ptr[..len as usize];
let json = serde_json::from_slice::<ManifestRecord>(slice)?;
buf_ptr.advance(len as usize);
let checksum = buf_ptr.get_u32();
if checksum != crc32fast::hash(slice) {
bail!("checksum mismatched!");
}
records.push(json);
}
Ok((
Self {
@@ -66,7 +74,10 @@ impl Manifest {
pub fn add_record_when_init(&self, record: ManifestRecord) -> Result<()> {
let mut file = self.file.lock();
let buf = serde_json::to_vec(&record)?;
let mut buf = serde_json::to_vec(&record)?;
let hash = crc32fast::hash(&buf);
file.write_all(&(buf.len() as u64).to_be_bytes())?;
buf.put_u32(hash);
file.write_all(&buf)?;
file.sync_all()?;
Ok(())

View File

@@ -6,7 +6,7 @@ use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Result};
pub use builder::SsTableBuilder;
use bytes::{Buf, BufMut};
pub use iterator::SsTableIterator;
@@ -30,7 +30,7 @@ pub struct BlockMeta {
impl BlockMeta {
/// Encode block meta to a buffer.
pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec<u8>) {
let mut estimated_size = 0;
let mut estimated_size = std::mem::size_of::<u32>();
for meta in block_meta {
// The size of offset
estimated_size += std::mem::size_of::<u32>();
@@ -43,10 +43,12 @@ impl BlockMeta {
// The size of actual key
estimated_size += meta.last_key.len();
}
estimated_size += std::mem::size_of::<u32>();
// Reserve the space to improve performance, especially when the size of incoming data is
// large
buf.reserve(estimated_size);
let original_len = buf.len();
buf.put_u32(block_meta.len() as u32);
for meta in block_meta {
buf.put_u32(meta.offset as u32);
buf.put_u16(meta.first_key.len() as u16);
@@ -54,13 +56,16 @@ impl BlockMeta {
buf.put_u16(meta.last_key.len() as u16);
buf.put_slice(meta.last_key.raw_ref());
}
buf.put_u32(crc32fast::hash(&buf[original_len + 4..]));
assert_eq!(estimated_size, buf.len() - original_len);
}
/// Decode block meta from a buffer.
pub fn decode_block_meta(mut buf: impl Buf) -> Vec<BlockMeta> {
pub fn decode_block_meta(mut buf: &[u8]) -> Result<Vec<BlockMeta>> {
let mut block_meta = Vec::new();
while buf.has_remaining() {
let num = buf.get_u32() as usize;
let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]);
for _ in 0..num {
let offset = buf.get_u32() as usize;
let first_key_len = buf.get_u16() as usize;
let first_key = KeyBytes::from_bytes(buf.copy_to_bytes(first_key_len));
@@ -72,7 +77,11 @@ impl BlockMeta {
last_key,
});
}
block_meta
if buf.get_u32() != checksum {
bail!("meta checksum mismatched");
}
Ok(block_meta)
}
}
@@ -141,7 +150,7 @@ impl SsTable {
let raw_meta_offset = file.read(bloom_offset - 4, 4)?;
let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64;
let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?;
let block_meta = BlockMeta::decode_block_meta(&raw_meta[..]);
let block_meta = BlockMeta::decode_block_meta(&raw_meta[..])?;
Ok(Self {
file,
first_key: block_meta.first().unwrap().first_key.clone(),
@@ -180,10 +189,16 @@ impl SsTable {
.block_meta
.get(block_idx + 1)
.map_or(self.block_meta_offset, |x| x.offset);
let block_data = self
let block_len = offset_end - offset - 4;
let block_data_with_chksum: Vec<u8> = self
.file
.read(offset as u64, (offset_end - offset) as u64)?;
Ok(Arc::new(Block::decode(&block_data[..])))
let block_data = &block_data_with_chksum[..block_len];
let checksum = (&block_data_with_chksum[block_len..]).get_u32();
if checksum != crc32fast::hash(block_data) {
bail!("block checksum mismatched");
}
Ok(Arc::new(Block::decode(block_data)))
}
/// Read a block from disk, with block cache.

View File

@@ -70,7 +70,9 @@ impl SsTableBuilder {
first_key: std::mem::take(&mut self.first_key).into_key_bytes(),
last_key: std::mem::take(&mut self.last_key).into_key_bytes(),
});
let checksum = crc32fast::hash(&encoded_block);
self.data.extend(encoded_block);
self.data.put_u32(checksum);
}
/// Builds the SSTable and writes it to the given path. Use the `FileObject` structure to manipulate the disk objects.

View File

@@ -1,9 +1,10 @@
use std::fs::{File, OpenOptions};
use std::hash::Hasher;
use std::io::{Read, Write};
use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use bytes::{Buf, BufMut, Bytes};
use crossbeam_skiplist::SkipMap;
use parking_lot::Mutex;
@@ -37,12 +38,21 @@ impl Wal {
file.read_to_end(&mut buf)?;
let mut rbuf: &[u8] = buf.as_slice();
while rbuf.has_remaining() {
let mut hasher = crc32fast::Hasher::new();
let key_len = rbuf.get_u16() as usize;
hasher.write_u16(key_len as u16);
let key = Bytes::copy_from_slice(&rbuf[..key_len]);
hasher.write(&key);
rbuf.advance(key_len);
let value_len = rbuf.get_u16() as usize;
hasher.write_u16(value_len as u16);
let value = Bytes::copy_from_slice(&rbuf[..value_len]);
hasher.write(&value);
rbuf.advance(value_len);
let checksum = rbuf.get_u32();
if hasher.finalize() != checksum {
bail!("checksum mismatch");
}
skiplist.insert(key, value);
}
Ok(Self {
@@ -54,10 +64,17 @@ impl Wal {
let mut file = self.file.lock();
let mut buf: Vec<u8> =
Vec::with_capacity(key.len() + value.len() + std::mem::size_of::<u16>());
let mut hasher = crc32fast::Hasher::new();
hasher.write_u16(key.len() as u16);
buf.put_u16(key.len() as u16);
hasher.write(key);
buf.put_slice(key);
hasher.write_u16(value.len() as u16);
buf.put_u16(value.len() as u16);
buf.put_slice(value);
hasher.write(value);
// add checksum: week 2 day 7
buf.put_u32(hasher.finalize());
file.write_all(&buf)?;
Ok(())
}