From 51e075c1ed018ae8719b2d3a30a5d5b5d96f2b87 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Sat, 24 Dec 2022 18:07:18 -0500 Subject: [PATCH] feat(code): add block cache Signed-off-by: Alex Chi --- mini-lsm-starter/src/lsm_storage.rs | 3 ++ mini-lsm-starter/src/table.rs | 14 +++++- mini-lsm-starter/src/table/builder.rs | 14 +++++- mini-lsm/src/lsm_storage.rs | 33 +++++++++++-- mini-lsm/src/mem_table/tests.rs | 5 +- mini-lsm/src/table.rs | 68 ++++++++++++++++++++++++--- mini-lsm/src/table/builder.rs | 16 ++++++- mini-lsm/src/table/iterator.rs | 13 +++-- mini-lsm/src/table/tests.rs | 23 +++++---- mini-lsm/src/tests/day3_tests.rs | 25 +++++----- 10 files changed, 173 insertions(+), 41 deletions(-) diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index 8ade102..e7c50fd 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -9,10 +9,13 @@ use anyhow::Result; use arc_swap::ArcSwap; use bytes::Bytes; +use crate::block::Block; use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::mem_table::MemTable; use crate::table::SsTable; +pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; + #[derive(Clone)] pub struct LsmStorageInner { /// MemTables, from oldest to earliest. diff --git a/mini-lsm-starter/src/table.rs b/mini-lsm-starter/src/table.rs index 0e1ee80..47fb4e4 100644 --- a/mini-lsm-starter/src/table.rs +++ b/mini-lsm-starter/src/table.rs @@ -13,6 +13,7 @@ use bytes::{Buf, Bytes}; pub use iterator::SsTableIterator; use crate::block::Block; +use crate::lsm_storage::BlockCache; #[derive(Clone, Debug, PartialEq, Eq)] pub struct BlockMeta { @@ -50,6 +51,7 @@ impl FileObject { self.0.len() as u64 } + /// Create a new file object (day 2) and write the file to the disk (day 4). pub fn create(path: &Path, data: Vec) -> Result { unimplemented!() } @@ -66,8 +68,13 @@ pub struct SsTable { } impl SsTable { + #[cfg(test)] + pub(crate) fn open_for_test(file: FileObject) -> Result { + Self::open(0, None, file) + } + /// Open SSTable from a file. - pub fn open(file: FileObject) -> Result { + pub fn open(id: usize, block_cache: Option>, file: FileObject) -> Result { unimplemented!() } @@ -76,6 +83,11 @@ impl SsTable { unimplemented!() } + /// Read a block from disk, with block cache. (Day 4) + pub fn read_block_cached(&self, block_idx: usize) -> Result> { + unimplemented!() + } + /// Find the block that may contain `key`. pub fn find_block_idx(&self, key: &[u8]) -> usize { unimplemented!() diff --git a/mini-lsm-starter/src/table/builder.rs b/mini-lsm-starter/src/table/builder.rs index 2104ea5..9e53417 100644 --- a/mini-lsm-starter/src/table/builder.rs +++ b/mini-lsm-starter/src/table/builder.rs @@ -2,10 +2,12 @@ #![allow(dead_code)] // TODO(you): remove this lint after implementing this mod use std::path::Path; +use std::sync::Arc; use anyhow::Result; use super::{BlockMeta, SsTable}; +use crate::lsm_storage::BlockCache; /// Builds an SSTable from key-value pairs. pub struct SsTableBuilder { @@ -31,7 +33,17 @@ impl SsTableBuilder { /// Builds the SSTable and writes it to the given path. No need to actually write to disk until /// chapter 4 block cache. - pub fn build(self, path: impl AsRef) -> Result { + pub fn build( + self, + id: usize, + block_cache: Option>, + path: impl AsRef, + ) -> Result { unimplemented!() } + + #[cfg(test)] + pub(crate) fn build_for_test(self, path: impl AsRef) -> Result { + self.build(0, None, path) + } } diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index 487a5f7..6fc0722 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -1,11 +1,12 @@ use std::ops::Bound; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use anyhow::Result; use bytes::Bytes; use parking_lot::{Mutex, RwLock}; +use crate::block::Block; use crate::iterators::impls::StorageIterator; use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::two_merge_iterator::TwoMergeIterator; @@ -13,6 +14,8 @@ use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::mem_table::{map_bound, MemTable}; use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; +pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; + #[derive(Clone)] pub struct LsmStorageInner { /// The current memtable. @@ -24,6 +27,8 @@ pub struct LsmStorageInner { /// L1 - L6 SsTables, sorted by key range. #[allow(dead_code)] levels: Vec>>, + /// The next SSTable ID. + next_sst_id: usize, } impl LsmStorageInner { @@ -33,6 +38,7 @@ impl LsmStorageInner { imm_memtables: vec![], l0_sstables: vec![], levels: vec![], + next_sst_id: 1, } } } @@ -41,13 +47,17 @@ impl LsmStorageInner { pub struct LsmStorage { inner: Arc>>, flush_lock: Mutex<()>, + path: PathBuf, + block_cache: Arc, } impl LsmStorage { - pub fn open(_path: impl AsRef) -> Result { + pub fn open(path: impl AsRef) -> Result { Ok(Self { inner: Arc::new(RwLock::new(Arc::new(LsmStorageInner::create()))), flush_lock: Mutex::new(()), + path: path.as_ref().to_path_buf(), + block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache }) } @@ -112,12 +122,17 @@ impl LsmStorage { Ok(()) } + fn path_of_sst(&self, id: usize) -> PathBuf { + self.path.join(format!("{:05}.sst", id)) + } + /// In day 3: flush the current memtable to disk as L0 SST. /// In day 6: call `fsync` on WAL. pub fn sync(&self) -> Result<()> { let _flush_lock = self.flush_lock.lock(); let flush_memtable; + let sst_id; // Move mutable memtable to immutable memtables. { @@ -126,18 +141,24 @@ impl LsmStorage { let mut snapshot = guard.as_ref().clone(); let memtable = std::mem::replace(&mut snapshot.memtable, Arc::new(MemTable::create())); flush_memtable = memtable.clone(); + sst_id = snapshot.next_sst_id; // Add the memtable to the immutable memtables. snapshot.imm_memtables.push(memtable); // Update the snapshot. *guard = Arc::new(snapshot); } - // At this point, the old memtable should be disabled for write, and all write threads should be - // operating on the new memtable. We can safely flush the old memtable to disk. + // At this point, the old memtable should be disabled for write, and all write threads + // should be operating on the new memtable. We can safely flush the old memtable to + // disk. let mut builder = SsTableBuilder::new(4096); flush_memtable.flush(&mut builder)?; - let sst = Arc::new(builder.build("")?); + let sst = Arc::new(builder.build( + sst_id, + Some(self.block_cache.clone()), + self.path_of_sst(sst_id), + )?); // Add the flushed L0 table to the list. { @@ -147,6 +168,8 @@ impl LsmStorage { snapshot.imm_memtables.pop(); // Add L0 table snapshot.l0_sstables.push(sst); + // Update SST ID + snapshot.next_sst_id += 1; // Update the snapshot. *guard = Arc::new(snapshot); } diff --git a/mini-lsm/src/mem_table/tests.rs b/mini-lsm/src/mem_table/tests.rs index b96f516..6010e84 100644 --- a/mini-lsm/src/mem_table/tests.rs +++ b/mini-lsm/src/mem_table/tests.rs @@ -1,3 +1,5 @@ +use tempfile::tempdir; + use super::MemTable; use crate::iterators::impls::StorageIterator; use crate::table::{SsTableBuilder, SsTableIterator}; @@ -35,7 +37,8 @@ fn test_memtable_flush() { memtable.put(b"key3", b"value3"); let mut builder = SsTableBuilder::new(128); memtable.flush(&mut builder).unwrap(); - let sst = builder.build("").unwrap(); + let dir = tempdir().unwrap(); + let sst = builder.build_for_test(dir.path().join("1.sst")).unwrap(); let mut iter = SsTableIterator::create_and_seek_to_first(sst.into()).unwrap(); assert_eq!(iter.key(), b"key1"); assert_eq!(iter.value(), b"value1"); diff --git a/mini-lsm/src/table.rs b/mini-lsm/src/table.rs index fab64a3..2a36fd2 100644 --- a/mini-lsm/src/table.rs +++ b/mini-lsm/src/table.rs @@ -1,15 +1,17 @@ mod builder; mod iterator; +use std::fs::File; use std::path::Path; use std::sync::Arc; -use anyhow::Result; +use anyhow::{anyhow, Result}; pub use builder::SsTableBuilder; use bytes::{Buf, BufMut, Bytes}; pub use iterator::SsTableIterator; use crate::block::Block; +use crate::lsm_storage::BlockCache; #[derive(Clone, Debug, PartialEq, Eq)] pub struct BlockMeta { @@ -52,19 +54,50 @@ impl BlockMeta { } /// A file object. -pub struct FileObject(Bytes); +/// +/// Before day 4, it should look like: +/// +/// ```ignore +/// pub struct FileObject(Bytes); +/// +/// impl FileObject { +/// pub fn read(&self, offset: u64, len: u64) -> Result> { +/// Ok(self.0[offset as usize..(offset + len) as usize].to_vec()) +/// } +/// pub fn size(&self) -> u64 { +/// self.0.len() as u64 +/// } +/// +/// pub fn create(_path: &Path, data: Vec) -> Result { +/// Ok(FileObject(data.into())) +/// } +/// +/// pub fn open(_path: &Path) -> Result { +/// unimplemented!() +/// } +/// } +/// ``` +pub struct FileObject(File, u64); impl FileObject { pub fn read(&self, offset: u64, len: u64) -> Result> { - Ok(self.0[offset as usize..(offset + len) as usize].to_vec()) + use std::os::unix::fs::FileExt; + let mut data = vec![0; len as usize]; + self.0.read_exact_at(&mut data[..], offset)?; + Ok(data) } pub fn size(&self) -> u64 { - self.0.len() as u64 + self.1 } - pub fn create(_path: &Path, data: Vec) -> Result { - Ok(FileObject(data.into())) + /// Create a new file object (day 2) and write the file to the disk (day 4). + pub fn create(path: &Path, data: Vec) -> Result { + std::fs::write(path, &data)?; + Ok(FileObject( + File::options().read(true).write(false).open(path)?, + data.len() as u64, + )) } pub fn open(_path: &Path) -> Result { @@ -76,11 +109,18 @@ pub struct SsTable { file: FileObject, block_metas: Vec, block_meta_offset: usize, + id: usize, + block_cache: Option>, } impl SsTable { + #[cfg(test)] + pub(crate) fn open_for_test(file: FileObject) -> Result { + Self::open(0, None, file) + } + /// Open SSTable from a file. - pub fn open(file: FileObject) -> Result { + pub fn open(id: usize, block_cache: Option>, file: FileObject) -> Result { let len = file.size(); let raw_meta_offset = file.read(len - 4, 4)?; let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64; @@ -89,6 +129,8 @@ impl SsTable { file, block_metas: BlockMeta::decode_block_meta(&raw_meta[..]), block_meta_offset: block_meta_offset as usize, + id, + block_cache, }) } @@ -106,6 +148,18 @@ impl SsTable { Ok(Arc::new(Block::decode(&block_data[..]))) } + /// Read a block from disk, with block cache. + pub fn read_block_cached(&self, block_idx: usize) -> Result> { + if let Some(ref block_cache) = self.block_cache { + let blk = block_cache + .try_get_with((self.id, block_idx), || self.read_block(block_idx)) + .map_err(|e| anyhow!("{}", e))?; + Ok(blk) + } else { + self.read_block(block_idx) + } + } + /// Find the block that may contain `key`. pub fn find_block_idx(&self, key: &[u8]) -> usize { self.block_metas diff --git a/mini-lsm/src/table/builder.rs b/mini-lsm/src/table/builder.rs index db7f476..7a35437 100644 --- a/mini-lsm/src/table/builder.rs +++ b/mini-lsm/src/table/builder.rs @@ -1,10 +1,12 @@ use std::path::Path; +use std::sync::Arc; use anyhow::Result; use bytes::BufMut; use super::{BlockMeta, FileObject, SsTable}; use crate::block::BlockBuilder; +use crate::lsm_storage::BlockCache; /// Builds an SSTable from key-value pairs. pub struct SsTableBuilder { @@ -61,7 +63,12 @@ impl SsTableBuilder { /// Builds the SSTable and writes it to the given path. No need to actually write to disk until /// chapter 4 block cache. - pub fn build(mut self, path: impl AsRef) -> Result { + pub fn build( + mut self, + id: usize, + block_cache: Option>, + path: impl AsRef, + ) -> Result { self.finish_block(); let mut buf = self.data; let meta_offset = buf.len(); @@ -69,9 +76,16 @@ impl SsTableBuilder { buf.put_u32(meta_offset as u32); let file = FileObject::create(path.as_ref(), buf)?; Ok(SsTable { + id, file, block_metas: self.meta, block_meta_offset: meta_offset, + block_cache, }) } + + #[cfg(test)] + pub(crate) fn build_for_test(self, path: impl AsRef) -> Result { + self.build(0, None, path) + } } diff --git a/mini-lsm/src/table/iterator.rs b/mini-lsm/src/table/iterator.rs index a6bf1f4..feb9b79 100644 --- a/mini-lsm/src/table/iterator.rs +++ b/mini-lsm/src/table/iterator.rs @@ -17,7 +17,7 @@ impl SsTableIterator { fn seek_to_first_inner(table: &Arc) -> Result<(usize, BlockIterator)> { Ok(( 0, - BlockIterator::create_and_seek_to_first(table.read_block(0)?), + BlockIterator::create_and_seek_to_first(table.read_block_cached(0)?), )) } @@ -42,11 +42,13 @@ impl SsTableIterator { fn seek_to_key_inner(table: &Arc, key: &[u8]) -> Result<(usize, BlockIterator)> { let mut blk_idx = table.find_block_idx(key); - let mut blk_iter = BlockIterator::create_and_seek_to_key(table.read_block(blk_idx)?, key); + let mut blk_iter = + BlockIterator::create_and_seek_to_key(table.read_block_cached(blk_idx)?, key); if !blk_iter.is_valid() { blk_idx += 1; if blk_idx < table.num_of_blocks() { - blk_iter = BlockIterator::create_and_seek_to_first(table.read_block(blk_idx)?); + blk_iter = + BlockIterator::create_and_seek_to_first(table.read_block_cached(blk_idx)?); } } Ok((blk_idx, blk_iter)) @@ -90,8 +92,9 @@ impl StorageIterator for SsTableIterator { if !self.blk_iter.is_valid() { self.blk_idx += 1; if self.blk_idx < self.table.num_of_blocks() { - self.blk_iter = - BlockIterator::create_and_seek_to_first(self.table.read_block(self.blk_idx)?); + self.blk_iter = BlockIterator::create_and_seek_to_first( + self.table.read_block_cached(self.blk_idx)?, + ); } } Ok(()) diff --git a/mini-lsm/src/table/tests.rs b/mini-lsm/src/table/tests.rs index 177346c..349ce6c 100644 --- a/mini-lsm/src/table/tests.rs +++ b/mini-lsm/src/table/tests.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use bytes::Bytes; +use tempfile::{tempdir, TempDir}; use super::*; use crate::iterators::impls::StorageIterator; @@ -10,7 +11,8 @@ use crate::table::SsTableBuilder; fn test_sst_build_single_key() { let mut builder = SsTableBuilder::new(16); builder.add(b"233", b"233333"); - builder.build("").unwrap(); + let dir = tempdir().unwrap(); + builder.build_for_test(dir.path().join("1.sst")).unwrap(); } #[test] @@ -23,7 +25,8 @@ fn test_sst_build_two_blocks() { builder.add(b"55", b"11"); builder.add(b"66", b"22"); assert!(builder.meta.len() >= 2); - builder.build("").unwrap(); + let dir = tempdir().unwrap(); + builder.build_for_test(dir.path().join("1.sst")).unwrap(); } fn key_of(idx: usize) -> Vec { @@ -38,14 +41,16 @@ fn num_of_keys() -> usize { 100 } -fn generate_sst() -> SsTable { +fn generate_sst() -> (TempDir, SsTable) { let mut builder = SsTableBuilder::new(128); for idx in 0..num_of_keys() { let key = key_of(idx); let value = value_of(idx); builder.add(&key[..], &value[..]); } - builder.build("").unwrap() + let dir = tempdir().unwrap(); + let path = dir.path().join("1.sst"); + (dir, builder.build_for_test(path).unwrap()) } #[test] @@ -55,9 +60,9 @@ fn test_sst_build_all() { #[test] fn test_sst_decode() { - let sst = generate_sst(); + let (_dir, sst) = generate_sst(); let meta = sst.block_metas.clone(); - let new_sst = SsTable::open(sst.file).unwrap(); + let new_sst = SsTable::open_for_test(sst.file).unwrap(); assert_eq!(new_sst.block_metas, meta); } @@ -67,7 +72,8 @@ fn as_bytes(x: &[u8]) -> Bytes { #[test] fn test_sst_iterator() { - let sst = Arc::new(generate_sst()); + let (_dir, sst) = generate_sst(); + let sst = Arc::new(sst); let mut iter = SsTableIterator::create_and_seek_to_first(sst).unwrap(); for _ in 0..5 { for i in 0..num_of_keys() { @@ -95,7 +101,8 @@ fn test_sst_iterator() { #[test] fn test_sst_seek_key() { - let sst = Arc::new(generate_sst()); + let (_dir, sst) = generate_sst(); + let sst = Arc::new(sst); let mut iter = SsTableIterator::create_and_seek_to_key(sst, &key_of(0)).unwrap(); for offset in 1..=5 { for i in 0..num_of_keys() { diff --git a/mini-lsm/src/tests/day3_tests.rs b/mini-lsm/src/tests/day3_tests.rs index b591ca3..c4e041e 100644 --- a/mini-lsm/src/tests/day3_tests.rs +++ b/mini-lsm/src/tests/day3_tests.rs @@ -1,6 +1,7 @@ use std::ops::Bound; use bytes::Bytes; +use tempfile::tempdir; use crate::iterators::impls::StorageIterator; @@ -34,8 +35,8 @@ fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>) #[test] fn test_storage_get() { use crate::lsm_storage::LsmStorage; - - let storage = LsmStorage::open("").unwrap(); + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.put(b"3", b"23333").unwrap(); @@ -49,8 +50,8 @@ fn test_storage_get() { #[test] fn test_storage_scan_memtable_1() { use crate::lsm_storage::LsmStorage; - - let storage = LsmStorage::open("").unwrap(); + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.put(b"3", b"23333").unwrap(); @@ -79,8 +80,8 @@ fn test_storage_scan_memtable_1() { #[test] fn test_storage_scan_memtable_2() { use crate::lsm_storage::LsmStorage; - - let storage = LsmStorage::open("").unwrap(); + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.put(b"3", b"23333").unwrap(); @@ -109,8 +110,8 @@ fn test_storage_scan_memtable_2() { #[test] fn test_storage_get_after_sync() { use crate::lsm_storage::LsmStorage; - - let storage = LsmStorage::open("").unwrap(); + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.sync().unwrap(); @@ -125,8 +126,8 @@ fn test_storage_get_after_sync() { #[test] fn test_storage_scan_memtable_1_after_sync() { use crate::lsm_storage::LsmStorage; - - let storage = LsmStorage::open("").unwrap(); + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.sync().unwrap(); @@ -156,8 +157,8 @@ fn test_storage_scan_memtable_1_after_sync() { #[test] fn test_storage_scan_memtable_2_after_sync() { use crate::lsm_storage::LsmStorage; - - let storage = LsmStorage::open("").unwrap(); + let dir = tempdir().unwrap(); + let storage = LsmStorage::open(&dir).unwrap(); storage.put(b"1", b"233").unwrap(); storage.put(b"2", b"2333").unwrap(); storage.put(b"3", b"23333").unwrap();