From 89acc232080ea508580950f2f64b92de2ed7d189 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Thu, 25 Jan 2024 21:53:47 +0800 Subject: [PATCH] implement 2.7 Signed-off-by: Alex Chi --- Cargo.lock | 11 +++++ README.md | 2 +- mini-lsm-book/src/week2-07-snacks.md | 4 ++ mini-lsm-mvcc/Cargo.toml | 1 + mini-lsm-mvcc/src/lsm_storage.rs | 68 ++++++++++++++++++---------- mini-lsm-mvcc/src/manifest.rs | 23 +++++++--- mini-lsm-mvcc/src/table.rs | 31 +++++++++---- mini-lsm-mvcc/src/wal.rs | 19 +++++++- mini-lsm-starter/src/compact.rs | 1 + mini-lsm-starter/src/lsm_storage.rs | 14 ++++++ mini-lsm/Cargo.toml | 1 + mini-lsm/src/lsm_storage.rs | 68 ++++++++++++++++++---------- mini-lsm/src/manifest.rs | 23 +++++++--- mini-lsm/src/table.rs | 31 +++++++++---- mini-lsm/src/table/builder.rs | 2 + mini-lsm/src/wal.rs | 19 +++++++- 16 files changed, 237 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca0ca2a..66dd13e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,6 +195,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-channel" version = "0.5.11" @@ -411,6 +420,7 @@ dependencies = [ "arc-swap", "bytes", "clap", + "crc32fast", "crossbeam-channel", "crossbeam-epoch", "crossbeam-skiplist", @@ -432,6 +442,7 @@ dependencies = [ "arc-swap", "bytes", "clap", + "crc32fast", "crossbeam-channel", "crossbeam-epoch", "crossbeam-skiplist", diff --git a/README.md b/README.md index dd284c4..37d4d14 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ We are working on chapter 3 and more test cases for all existing contents. | Week + Chapter | Topic | Solution | Starter Code | Writeup | | -------------- | ----------------------------------------------- | -------- | ------------ | ------- | -| 2.7 | Batch Write + Checksum | 🚧 | | | +| 2.7 | Batch Write + Checksum | 🚧 | 🚧 | | | 3.1 | Timestamp Key Encoding | 🚧 | | | | 3.2 | Snapshot Read - Blocks, Memtables, and SSTs | | | | | 3.3 | Snapshot Read - Engine Read Path | | | | diff --git a/mini-lsm-book/src/week2-07-snacks.md b/mini-lsm-book/src/week2-07-snacks.md index 7cd7b06..cacd46e 100644 --- a/mini-lsm-book/src/week2-07-snacks.md +++ b/mini-lsm-book/src/week2-07-snacks.md @@ -26,4 +26,8 @@ In this chapter, you will: We do not provide reference answers to the questions, and feel free to discuss about them in the Discord community. +## Bonus Tasks + +* **Try Recovering**. If there is a checksum error, open the database in a safe mode so that no writes can be performed and non-corrupted data can still be retrieved. + {{#include copyright.md}} diff --git a/mini-lsm-mvcc/Cargo.toml b/mini-lsm-mvcc/Cargo.toml index 92fc798..15213c3 100644 --- a/mini-lsm-mvcc/Cargo.toml +++ b/mini-lsm-mvcc/Cargo.toml @@ -24,6 +24,7 @@ crossbeam-channel = "0.5.11" serde_json = { version = "1.0" } serde = { version = "1.0", features = ["derive"] } farmhash = "1" +crc32fast = "1.3.2" [dev-dependencies] tempfile = "3" diff --git a/mini-lsm-mvcc/src/lsm_storage.rs b/mini-lsm-mvcc/src/lsm_storage.rs index e126eb3..fe5a105 100644 --- a/mini-lsm-mvcc/src/lsm_storage.rs +++ b/mini-lsm-mvcc/src/lsm_storage.rs @@ -42,6 +42,11 @@ pub struct LsmStorageState { pub sstables: HashMap>, } +pub enum WriteBatchRecord> { + Put(T, T), + Del(T), +} + impl LsmStorageState { fn create(options: &LsmStorageOptions) -> Self { let levels = match &options.compaction_options { @@ -234,6 +239,10 @@ impl MiniLsm { self.inner.get(key) } + pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result<()> { + self.inner.write_batch(batch) + } + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { self.inner.put(key, value) } @@ -486,37 +495,46 @@ impl LsmStorageInner { Ok(None) } + pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result<()> { + for record in batch { + match record { + WriteBatchRecord::Del(key) => { + let key = key.as_ref(); + assert!(!key.is_empty(), "key cannot be empty"); + let size; + { + let guard = self.state.read(); + guard.memtable.put(key, b"")?; + size = guard.memtable.approximate_size(); + } + self.try_freeze(size)?; + } + WriteBatchRecord::Put(key, value) => { + let key = key.as_ref(); + let value = value.as_ref(); + assert!(!key.is_empty(), "key cannot be empty"); + assert!(!value.is_empty(), "value cannot be empty"); + let size; + { + let guard = self.state.read(); + guard.memtable.put(key, value)?; + size = guard.memtable.approximate_size(); + } + self.try_freeze(size)?; + } + } + } + Ok(()) + } + /// Put a key-value pair into the storage by writing into the current memtable. pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { - assert!(!value.is_empty(), "value cannot be empty"); - assert!(!key.is_empty(), "key cannot be empty"); - - let size; - { - let guard = self.state.read(); - guard.memtable.put(key, value)?; - size = guard.memtable.approximate_size(); - } - - self.try_freeze(size)?; - - Ok(()) + self.write_batch(&[WriteBatchRecord::Put(key, value)]) } /// Remove a key from the storage by writing an empty value. pub fn delete(&self, key: &[u8]) -> Result<()> { - assert!(!key.is_empty(), "key cannot be empty"); - - let size; - { - let guard = self.state.read(); - guard.memtable.put(key, b"")?; - size = guard.memtable.approximate_size(); - } - - self.try_freeze(size)?; - - Ok(()) + self.write_batch(&[WriteBatchRecord::Del(key)]) } fn try_freeze(&self, estimated_size: usize) -> Result<()> { diff --git a/mini-lsm-mvcc/src/manifest.rs b/mini-lsm-mvcc/src/manifest.rs index 62d968c..c28a99b 100644 --- a/mini-lsm-mvcc/src/manifest.rs +++ b/mini-lsm-mvcc/src/manifest.rs @@ -3,10 +3,10 @@ use std::io::{Read, Write}; use std::path::Path; use std::sync::Arc; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; +use bytes::{Buf, BufMut}; use parking_lot::{Mutex, MutexGuard}; use serde::{Deserialize, Serialize}; -use serde_json::Deserializer; use crate::compact::CompactionTask; @@ -43,10 +43,18 @@ impl Manifest { .context("failed to recover manifest")?; let mut buf = Vec::new(); file.read_to_end(&mut buf)?; - let stream = Deserializer::from_slice(&buf).into_iter::(); + let mut buf_ptr = buf.as_slice(); let mut records = Vec::new(); - for x in stream { - records.push(x?); + while buf_ptr.has_remaining() { + let len = buf_ptr.get_u64(); + let slice = &buf_ptr[..len as usize]; + let json = serde_json::from_slice::(slice)?; + buf_ptr.advance(len as usize); + let checksum = buf_ptr.get_u32(); + if checksum != crc32fast::hash(slice) { + bail!("checksum mismatched!"); + } + records.push(json); } Ok(( Self { @@ -66,7 +74,10 @@ impl Manifest { pub fn add_record_when_init(&self, record: ManifestRecord) -> Result<()> { let mut file = self.file.lock(); - let buf = serde_json::to_vec(&record)?; + let mut buf = serde_json::to_vec(&record)?; + let hash = crc32fast::hash(&buf); + file.write_all(&(buf.len() as u64).to_be_bytes())?; + buf.put_u32(hash); file.write_all(&buf)?; file.sync_all()?; Ok(()) diff --git a/mini-lsm-mvcc/src/table.rs b/mini-lsm-mvcc/src/table.rs index 1dfda93..8d9589b 100644 --- a/mini-lsm-mvcc/src/table.rs +++ b/mini-lsm-mvcc/src/table.rs @@ -6,7 +6,7 @@ use std::fs::File; use std::path::Path; use std::sync::Arc; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; pub use builder::SsTableBuilder; use bytes::{Buf, BufMut}; pub use iterator::SsTableIterator; @@ -30,7 +30,7 @@ pub struct BlockMeta { impl BlockMeta { /// Encode block meta to a buffer. pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec) { - let mut estimated_size = 0; + let mut estimated_size = std::mem::size_of::(); for meta in block_meta { // The size of offset estimated_size += std::mem::size_of::(); @@ -43,10 +43,12 @@ impl BlockMeta { // The size of actual key estimated_size += meta.last_key.raw_len(); } + estimated_size += std::mem::size_of::(); // Reserve the space to improve performance, especially when the size of incoming data is // large buf.reserve(estimated_size); let original_len = buf.len(); + buf.put_u32(block_meta.len() as u32); for meta in block_meta { buf.put_u32(meta.offset as u32); buf.put_u16(meta.first_key.key_len() as u16); @@ -56,13 +58,16 @@ impl BlockMeta { buf.put_slice(meta.last_key.key_ref()); buf.put_u64(meta.last_key.ts()); } + buf.put_u32(crc32fast::hash(&buf[original_len + 4..])); assert_eq!(estimated_size, buf.len() - original_len); } /// Decode block meta from a buffer. - pub fn decode_block_meta(mut buf: impl Buf) -> Vec { + pub fn decode_block_meta(mut buf: &[u8]) -> Result> { let mut block_meta = Vec::new(); - while buf.has_remaining() { + let num = buf.get_u32() as usize; + let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]); + for _ in 0..num { let offset = buf.get_u32() as usize; let first_key_len = buf.get_u16() as usize; let first_key = @@ -76,7 +81,11 @@ impl BlockMeta { last_key, }); } - block_meta + if buf.get_u32() != checksum { + bail!("meta checksum mismatched"); + } + + Ok(block_meta) } } @@ -145,7 +154,7 @@ impl SsTable { let raw_meta_offset = file.read(bloom_offset - 4, 4)?; let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64; let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?; - let block_meta = BlockMeta::decode_block_meta(&raw_meta[..]); + let block_meta = BlockMeta::decode_block_meta(&raw_meta[..])?; Ok(Self { file, first_key: block_meta.first().unwrap().first_key.clone(), @@ -184,10 +193,16 @@ impl SsTable { .block_meta .get(block_idx + 1) .map_or(self.block_meta_offset, |x| x.offset); - let block_data = self + let block_len = offset_end - offset - 4; + let block_data_with_chksum: Vec = self .file .read(offset as u64, (offset_end - offset) as u64)?; - Ok(Arc::new(Block::decode(&block_data[..]))) + let block_data = &block_data_with_chksum[..block_len]; + let checksum = (&block_data_with_chksum[block_len..]).get_u32(); + if checksum != crc32fast::hash(block_data) { + bail!("block checksum mismatched"); + } + Ok(Arc::new(Block::decode(block_data))) } /// Read a block from disk, with block cache. diff --git a/mini-lsm-mvcc/src/wal.rs b/mini-lsm-mvcc/src/wal.rs index e14d53c..fabd69f 100644 --- a/mini-lsm-mvcc/src/wal.rs +++ b/mini-lsm-mvcc/src/wal.rs @@ -1,9 +1,10 @@ use std::fs::{File, OpenOptions}; +use std::hash::Hasher; use std::io::{Read, Write}; use std::path::Path; use std::sync::Arc; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use bytes::{Buf, BufMut, Bytes}; use crossbeam_skiplist::SkipMap; use parking_lot::Mutex; @@ -37,12 +38,21 @@ impl Wal { file.read_to_end(&mut buf)?; let mut rbuf: &[u8] = buf.as_slice(); while rbuf.has_remaining() { + let mut hasher = crc32fast::Hasher::new(); let key_len = rbuf.get_u16() as usize; + hasher.write_u16(key_len as u16); let key = Bytes::copy_from_slice(&rbuf[..key_len]); + hasher.write(&key); rbuf.advance(key_len); let value_len = rbuf.get_u16() as usize; + hasher.write_u16(value_len as u16); let value = Bytes::copy_from_slice(&rbuf[..value_len]); + hasher.write(&value); rbuf.advance(value_len); + let checksum = rbuf.get_u32(); + if hasher.finalize() != checksum { + bail!("checksum mismatch"); + } skiplist.insert(key, value); } Ok(Self { @@ -54,10 +64,17 @@ impl Wal { let mut file = self.file.lock(); let mut buf: Vec = Vec::with_capacity(key.len() + value.len() + std::mem::size_of::()); + let mut hasher = crc32fast::Hasher::new(); + hasher.write_u16(key.len() as u16); buf.put_u16(key.len() as u16); + hasher.write(key); buf.put_slice(key); + hasher.write_u16(value.len() as u16); buf.put_u16(value.len() as u16); buf.put_slice(value); + hasher.write(value); + // add checksum: week 2 day 7 + buf.put_u32(hasher.finalize()); file.write_all(&buf)?; Ok(()) } diff --git a/mini-lsm-starter/src/compact.rs b/mini-lsm-starter/src/compact.rs index b0debfb..d52cc13 100644 --- a/mini-lsm-starter/src/compact.rs +++ b/mini-lsm-starter/src/compact.rs @@ -93,6 +93,7 @@ impl CompactionController { } } +#[derive(Debug, Clone)] pub enum CompactionOptions { /// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled /// Compaction) diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index b5bbcf9..e3f8853 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -38,6 +38,11 @@ pub struct LsmStorageState { pub sstables: HashMap>, } +pub enum WriteBatchRecord> { + Put(T, T), + Del(T), +} + impl LsmStorageState { fn create(options: &LsmStorageOptions) -> Self { let levels = match &options.compaction_options { @@ -156,6 +161,10 @@ impl MiniLsm { })) } + pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result<()> { + self.inner.write_batch(batch) + } + pub fn get(&self, key: &[u8]) -> Result> { self.inner.get(key) } @@ -245,6 +254,11 @@ impl LsmStorageInner { unimplemented!() } + /// Write a batch of data into the storage. Implement in week 2 day 7. + pub fn write_batch>(&self, _batch: &[WriteBatchRecord]) -> Result<()> { + unimplemented!() + } + /// Put a key-value pair into the storage by writing into the current memtable. pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> { unimplemented!() diff --git a/mini-lsm/Cargo.toml b/mini-lsm/Cargo.toml index 441baa3..0936160 100644 --- a/mini-lsm/Cargo.toml +++ b/mini-lsm/Cargo.toml @@ -24,6 +24,7 @@ crossbeam-channel = "0.5.11" serde_json = { version = "1.0" } serde = { version = "1.0", features = ["derive"] } farmhash = "1" +crc32fast = "1.3.2" [dev-dependencies] tempfile = "3" diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index 961b7af..383a7c4 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -42,6 +42,11 @@ pub struct LsmStorageState { pub sstables: HashMap>, } +pub enum WriteBatchRecord> { + Put(T, T), + Del(T), +} + impl LsmStorageState { fn create(options: &LsmStorageOptions) -> Self { let levels = match &options.compaction_options { @@ -234,6 +239,10 @@ impl MiniLsm { self.inner.get(key) } + pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result<()> { + self.inner.write_batch(batch) + } + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { self.inner.put(key, value) } @@ -484,37 +493,46 @@ impl LsmStorageInner { Ok(None) } + pub fn write_batch>(&self, batch: &[WriteBatchRecord]) -> Result<()> { + for record in batch { + match record { + WriteBatchRecord::Del(key) => { + let key = key.as_ref(); + assert!(!key.is_empty(), "key cannot be empty"); + let size; + { + let guard = self.state.read(); + guard.memtable.put(key, b"")?; + size = guard.memtable.approximate_size(); + } + self.try_freeze(size)?; + } + WriteBatchRecord::Put(key, value) => { + let key = key.as_ref(); + let value = value.as_ref(); + assert!(!key.is_empty(), "key cannot be empty"); + assert!(!value.is_empty(), "value cannot be empty"); + let size; + { + let guard = self.state.read(); + guard.memtable.put(key, value)?; + size = guard.memtable.approximate_size(); + } + self.try_freeze(size)?; + } + } + } + Ok(()) + } + /// Put a key-value pair into the storage by writing into the current memtable. pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { - assert!(!value.is_empty(), "value cannot be empty"); - assert!(!key.is_empty(), "key cannot be empty"); - - let size; - { - let guard = self.state.read(); - guard.memtable.put(key, value)?; - size = guard.memtable.approximate_size(); - } - - self.try_freeze(size)?; - - Ok(()) + self.write_batch(&[WriteBatchRecord::Put(key, value)]) } /// Remove a key from the storage by writing an empty value. pub fn delete(&self, key: &[u8]) -> Result<()> { - assert!(!key.is_empty(), "key cannot be empty"); - - let size; - { - let guard = self.state.read(); - guard.memtable.put(key, b"")?; - size = guard.memtable.approximate_size(); - } - - self.try_freeze(size)?; - - Ok(()) + self.write_batch(&[WriteBatchRecord::Del(key)]) } fn try_freeze(&self, estimated_size: usize) -> Result<()> { diff --git a/mini-lsm/src/manifest.rs b/mini-lsm/src/manifest.rs index 62d968c..c28a99b 100644 --- a/mini-lsm/src/manifest.rs +++ b/mini-lsm/src/manifest.rs @@ -3,10 +3,10 @@ use std::io::{Read, Write}; use std::path::Path; use std::sync::Arc; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; +use bytes::{Buf, BufMut}; use parking_lot::{Mutex, MutexGuard}; use serde::{Deserialize, Serialize}; -use serde_json::Deserializer; use crate::compact::CompactionTask; @@ -43,10 +43,18 @@ impl Manifest { .context("failed to recover manifest")?; let mut buf = Vec::new(); file.read_to_end(&mut buf)?; - let stream = Deserializer::from_slice(&buf).into_iter::(); + let mut buf_ptr = buf.as_slice(); let mut records = Vec::new(); - for x in stream { - records.push(x?); + while buf_ptr.has_remaining() { + let len = buf_ptr.get_u64(); + let slice = &buf_ptr[..len as usize]; + let json = serde_json::from_slice::(slice)?; + buf_ptr.advance(len as usize); + let checksum = buf_ptr.get_u32(); + if checksum != crc32fast::hash(slice) { + bail!("checksum mismatched!"); + } + records.push(json); } Ok(( Self { @@ -66,7 +74,10 @@ impl Manifest { pub fn add_record_when_init(&self, record: ManifestRecord) -> Result<()> { let mut file = self.file.lock(); - let buf = serde_json::to_vec(&record)?; + let mut buf = serde_json::to_vec(&record)?; + let hash = crc32fast::hash(&buf); + file.write_all(&(buf.len() as u64).to_be_bytes())?; + buf.put_u32(hash); file.write_all(&buf)?; file.sync_all()?; Ok(()) diff --git a/mini-lsm/src/table.rs b/mini-lsm/src/table.rs index 99fa818..88c06f2 100644 --- a/mini-lsm/src/table.rs +++ b/mini-lsm/src/table.rs @@ -6,7 +6,7 @@ use std::fs::File; use std::path::Path; use std::sync::Arc; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; pub use builder::SsTableBuilder; use bytes::{Buf, BufMut}; pub use iterator::SsTableIterator; @@ -30,7 +30,7 @@ pub struct BlockMeta { impl BlockMeta { /// Encode block meta to a buffer. pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec) { - let mut estimated_size = 0; + let mut estimated_size = std::mem::size_of::(); for meta in block_meta { // The size of offset estimated_size += std::mem::size_of::(); @@ -43,10 +43,12 @@ impl BlockMeta { // The size of actual key estimated_size += meta.last_key.len(); } + estimated_size += std::mem::size_of::(); // Reserve the space to improve performance, especially when the size of incoming data is // large buf.reserve(estimated_size); let original_len = buf.len(); + buf.put_u32(block_meta.len() as u32); for meta in block_meta { buf.put_u32(meta.offset as u32); buf.put_u16(meta.first_key.len() as u16); @@ -54,13 +56,16 @@ impl BlockMeta { buf.put_u16(meta.last_key.len() as u16); buf.put_slice(meta.last_key.raw_ref()); } + buf.put_u32(crc32fast::hash(&buf[original_len + 4..])); assert_eq!(estimated_size, buf.len() - original_len); } /// Decode block meta from a buffer. - pub fn decode_block_meta(mut buf: impl Buf) -> Vec { + pub fn decode_block_meta(mut buf: &[u8]) -> Result> { let mut block_meta = Vec::new(); - while buf.has_remaining() { + let num = buf.get_u32() as usize; + let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]); + for _ in 0..num { let offset = buf.get_u32() as usize; let first_key_len = buf.get_u16() as usize; let first_key = KeyBytes::from_bytes(buf.copy_to_bytes(first_key_len)); @@ -72,7 +77,11 @@ impl BlockMeta { last_key, }); } - block_meta + if buf.get_u32() != checksum { + bail!("meta checksum mismatched"); + } + + Ok(block_meta) } } @@ -141,7 +150,7 @@ impl SsTable { let raw_meta_offset = file.read(bloom_offset - 4, 4)?; let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64; let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?; - let block_meta = BlockMeta::decode_block_meta(&raw_meta[..]); + let block_meta = BlockMeta::decode_block_meta(&raw_meta[..])?; Ok(Self { file, first_key: block_meta.first().unwrap().first_key.clone(), @@ -180,10 +189,16 @@ impl SsTable { .block_meta .get(block_idx + 1) .map_or(self.block_meta_offset, |x| x.offset); - let block_data = self + let block_len = offset_end - offset - 4; + let block_data_with_chksum: Vec = self .file .read(offset as u64, (offset_end - offset) as u64)?; - Ok(Arc::new(Block::decode(&block_data[..]))) + let block_data = &block_data_with_chksum[..block_len]; + let checksum = (&block_data_with_chksum[block_len..]).get_u32(); + if checksum != crc32fast::hash(block_data) { + bail!("block checksum mismatched"); + } + Ok(Arc::new(Block::decode(block_data))) } /// Read a block from disk, with block cache. diff --git a/mini-lsm/src/table/builder.rs b/mini-lsm/src/table/builder.rs index a8753d1..0167124 100644 --- a/mini-lsm/src/table/builder.rs +++ b/mini-lsm/src/table/builder.rs @@ -70,7 +70,9 @@ impl SsTableBuilder { first_key: std::mem::take(&mut self.first_key).into_key_bytes(), last_key: std::mem::take(&mut self.last_key).into_key_bytes(), }); + let checksum = crc32fast::hash(&encoded_block); self.data.extend(encoded_block); + self.data.put_u32(checksum); } /// Builds the SSTable and writes it to the given path. Use the `FileObject` structure to manipulate the disk objects. diff --git a/mini-lsm/src/wal.rs b/mini-lsm/src/wal.rs index e14d53c..fabd69f 100644 --- a/mini-lsm/src/wal.rs +++ b/mini-lsm/src/wal.rs @@ -1,9 +1,10 @@ use std::fs::{File, OpenOptions}; +use std::hash::Hasher; use std::io::{Read, Write}; use std::path::Path; use std::sync::Arc; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use bytes::{Buf, BufMut, Bytes}; use crossbeam_skiplist::SkipMap; use parking_lot::Mutex; @@ -37,12 +38,21 @@ impl Wal { file.read_to_end(&mut buf)?; let mut rbuf: &[u8] = buf.as_slice(); while rbuf.has_remaining() { + let mut hasher = crc32fast::Hasher::new(); let key_len = rbuf.get_u16() as usize; + hasher.write_u16(key_len as u16); let key = Bytes::copy_from_slice(&rbuf[..key_len]); + hasher.write(&key); rbuf.advance(key_len); let value_len = rbuf.get_u16() as usize; + hasher.write_u16(value_len as u16); let value = Bytes::copy_from_slice(&rbuf[..value_len]); + hasher.write(&value); rbuf.advance(value_len); + let checksum = rbuf.get_u32(); + if hasher.finalize() != checksum { + bail!("checksum mismatch"); + } skiplist.insert(key, value); } Ok(Self { @@ -54,10 +64,17 @@ impl Wal { let mut file = self.file.lock(); let mut buf: Vec = Vec::with_capacity(key.len() + value.len() + std::mem::size_of::()); + let mut hasher = crc32fast::Hasher::new(); + hasher.write_u16(key.len() as u16); buf.put_u16(key.len() as u16); + hasher.write(key); buf.put_slice(key); + hasher.write_u16(value.len() as u16); buf.put_u16(value.len() as u16); buf.put_slice(value); + hasher.write(value); + // add checksum: week 2 day 7 + buf.put_u32(hasher.finalize()); file.write_all(&buf)?; Ok(()) }