feat(code): add block cache

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2022-12-24 18:07:18 -05:00
parent 86503ac58d
commit 51e075c1ed
10 changed files with 173 additions and 41 deletions

View File

@@ -9,10 +9,13 @@ use anyhow::Result;
use arc_swap::ArcSwap;
use bytes::Bytes;
use crate::block::Block;
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::mem_table::MemTable;
use crate::table::SsTable;
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
#[derive(Clone)]
pub struct LsmStorageInner {
/// MemTables, from oldest to earliest.

View File

@@ -13,6 +13,7 @@ use bytes::{Buf, Bytes};
pub use iterator::SsTableIterator;
use crate::block::Block;
use crate::lsm_storage::BlockCache;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BlockMeta {
@@ -50,6 +51,7 @@ impl FileObject {
self.0.len() as u64
}
/// Create a new file object (day 2) and write the file to the disk (day 4).
pub fn create(path: &Path, data: Vec<u8>) -> Result<Self> {
unimplemented!()
}
@@ -66,8 +68,13 @@ pub struct SsTable {
}
impl SsTable {
#[cfg(test)]
pub(crate) fn open_for_test(file: FileObject) -> Result<Self> {
Self::open(0, None, file)
}
/// Open SSTable from a file.
pub fn open(file: FileObject) -> Result<Self> {
pub fn open(id: usize, block_cache: Option<Arc<BlockCache>>, file: FileObject) -> Result<Self> {
unimplemented!()
}
@@ -76,6 +83,11 @@ impl SsTable {
unimplemented!()
}
/// Read a block from disk, with block cache. (Day 4)
pub fn read_block_cached(&self, block_idx: usize) -> Result<Arc<Block>> {
unimplemented!()
}
/// Find the block that may contain `key`.
pub fn find_block_idx(&self, key: &[u8]) -> usize {
unimplemented!()

View File

@@ -2,10 +2,12 @@
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
use std::path::Path;
use std::sync::Arc;
use anyhow::Result;
use super::{BlockMeta, SsTable};
use crate::lsm_storage::BlockCache;
/// Builds an SSTable from key-value pairs.
pub struct SsTableBuilder {
@@ -31,7 +33,17 @@ impl SsTableBuilder {
/// Builds the SSTable and writes it to the given path. No need to actually write to disk until
/// chapter 4 block cache.
pub fn build(self, path: impl AsRef<Path>) -> Result<SsTable> {
pub fn build(
self,
id: usize,
block_cache: Option<Arc<BlockCache>>,
path: impl AsRef<Path>,
) -> Result<SsTable> {
unimplemented!()
}
#[cfg(test)]
pub(crate) fn build_for_test(self, path: impl AsRef<Path>) -> Result<SsTable> {
self.build(0, None, path)
}
}

View File

@@ -1,11 +1,12 @@
use std::ops::Bound;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Result;
use bytes::Bytes;
use parking_lot::{Mutex, RwLock};
use crate::block::Block;
use crate::iterators::impls::StorageIterator;
use crate::iterators::merge_iterator::MergeIterator;
use crate::iterators::two_merge_iterator::TwoMergeIterator;
@@ -13,6 +14,8 @@ use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::mem_table::{map_bound, MemTable};
use crate::table::{SsTable, SsTableBuilder, SsTableIterator};
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
#[derive(Clone)]
pub struct LsmStorageInner {
/// The current memtable.
@@ -24,6 +27,8 @@ pub struct LsmStorageInner {
/// L1 - L6 SsTables, sorted by key range.
#[allow(dead_code)]
levels: Vec<Vec<Arc<SsTable>>>,
/// The next SSTable ID.
next_sst_id: usize,
}
impl LsmStorageInner {
@@ -33,6 +38,7 @@ impl LsmStorageInner {
imm_memtables: vec![],
l0_sstables: vec![],
levels: vec![],
next_sst_id: 1,
}
}
}
@@ -41,13 +47,17 @@ impl LsmStorageInner {
pub struct LsmStorage {
inner: Arc<RwLock<Arc<LsmStorageInner>>>,
flush_lock: Mutex<()>,
path: PathBuf,
block_cache: Arc<BlockCache>,
}
impl LsmStorage {
pub fn open(_path: impl AsRef<Path>) -> Result<Self> {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {
inner: Arc::new(RwLock::new(Arc::new(LsmStorageInner::create()))),
flush_lock: Mutex::new(()),
path: path.as_ref().to_path_buf(),
block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache
})
}
@@ -112,12 +122,17 @@ impl LsmStorage {
Ok(())
}
fn path_of_sst(&self, id: usize) -> PathBuf {
self.path.join(format!("{:05}.sst", id))
}
/// In day 3: flush the current memtable to disk as L0 SST.
/// In day 6: call `fsync` on WAL.
pub fn sync(&self) -> Result<()> {
let _flush_lock = self.flush_lock.lock();
let flush_memtable;
let sst_id;
// Move mutable memtable to immutable memtables.
{
@@ -126,18 +141,24 @@ impl LsmStorage {
let mut snapshot = guard.as_ref().clone();
let memtable = std::mem::replace(&mut snapshot.memtable, Arc::new(MemTable::create()));
flush_memtable = memtable.clone();
sst_id = snapshot.next_sst_id;
// Add the memtable to the immutable memtables.
snapshot.imm_memtables.push(memtable);
// Update the snapshot.
*guard = Arc::new(snapshot);
}
// At this point, the old memtable should be disabled for write, and all write threads should be
// operating on the new memtable. We can safely flush the old memtable to disk.
// At this point, the old memtable should be disabled for write, and all write threads
// should be operating on the new memtable. We can safely flush the old memtable to
// disk.
let mut builder = SsTableBuilder::new(4096);
flush_memtable.flush(&mut builder)?;
let sst = Arc::new(builder.build("")?);
let sst = Arc::new(builder.build(
sst_id,
Some(self.block_cache.clone()),
self.path_of_sst(sst_id),
)?);
// Add the flushed L0 table to the list.
{
@@ -147,6 +168,8 @@ impl LsmStorage {
snapshot.imm_memtables.pop();
// Add L0 table
snapshot.l0_sstables.push(sst);
// Update SST ID
snapshot.next_sst_id += 1;
// Update the snapshot.
*guard = Arc::new(snapshot);
}

View File

@@ -1,3 +1,5 @@
use tempfile::tempdir;
use super::MemTable;
use crate::iterators::impls::StorageIterator;
use crate::table::{SsTableBuilder, SsTableIterator};
@@ -35,7 +37,8 @@ fn test_memtable_flush() {
memtable.put(b"key3", b"value3");
let mut builder = SsTableBuilder::new(128);
memtable.flush(&mut builder).unwrap();
let sst = builder.build("").unwrap();
let dir = tempdir().unwrap();
let sst = builder.build_for_test(dir.path().join("1.sst")).unwrap();
let mut iter = SsTableIterator::create_and_seek_to_first(sst.into()).unwrap();
assert_eq!(iter.key(), b"key1");
assert_eq!(iter.value(), b"value1");

View File

@@ -1,15 +1,17 @@
mod builder;
mod iterator;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use anyhow::Result;
use anyhow::{anyhow, Result};
pub use builder::SsTableBuilder;
use bytes::{Buf, BufMut, Bytes};
pub use iterator::SsTableIterator;
use crate::block::Block;
use crate::lsm_storage::BlockCache;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BlockMeta {
@@ -52,19 +54,50 @@ impl BlockMeta {
}
/// A file object.
pub struct FileObject(Bytes);
///
/// Before day 4, it should look like:
///
/// ```ignore
/// pub struct FileObject(Bytes);
///
/// impl FileObject {
/// pub fn read(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
/// Ok(self.0[offset as usize..(offset + len) as usize].to_vec())
/// }
/// pub fn size(&self) -> u64 {
/// self.0.len() as u64
/// }
///
/// pub fn create(_path: &Path, data: Vec<u8>) -> Result<Self> {
/// Ok(FileObject(data.into()))
/// }
///
/// pub fn open(_path: &Path) -> Result<Self> {
/// unimplemented!()
/// }
/// }
/// ```
pub struct FileObject(File, u64);
impl FileObject {
pub fn read(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
Ok(self.0[offset as usize..(offset + len) as usize].to_vec())
use std::os::unix::fs::FileExt;
let mut data = vec![0; len as usize];
self.0.read_exact_at(&mut data[..], offset)?;
Ok(data)
}
pub fn size(&self) -> u64 {
self.0.len() as u64
self.1
}
pub fn create(_path: &Path, data: Vec<u8>) -> Result<Self> {
Ok(FileObject(data.into()))
/// Create a new file object (day 2) and write the file to the disk (day 4).
pub fn create(path: &Path, data: Vec<u8>) -> Result<Self> {
std::fs::write(path, &data)?;
Ok(FileObject(
File::options().read(true).write(false).open(path)?,
data.len() as u64,
))
}
pub fn open(_path: &Path) -> Result<Self> {
@@ -76,11 +109,18 @@ pub struct SsTable {
file: FileObject,
block_metas: Vec<BlockMeta>,
block_meta_offset: usize,
id: usize,
block_cache: Option<Arc<BlockCache>>,
}
impl SsTable {
#[cfg(test)]
pub(crate) fn open_for_test(file: FileObject) -> Result<Self> {
Self::open(0, None, file)
}
/// Open SSTable from a file.
pub fn open(file: FileObject) -> Result<Self> {
pub fn open(id: usize, block_cache: Option<Arc<BlockCache>>, file: FileObject) -> Result<Self> {
let len = file.size();
let raw_meta_offset = file.read(len - 4, 4)?;
let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64;
@@ -89,6 +129,8 @@ impl SsTable {
file,
block_metas: BlockMeta::decode_block_meta(&raw_meta[..]),
block_meta_offset: block_meta_offset as usize,
id,
block_cache,
})
}
@@ -106,6 +148,18 @@ impl SsTable {
Ok(Arc::new(Block::decode(&block_data[..])))
}
/// Read a block from disk, with block cache.
pub fn read_block_cached(&self, block_idx: usize) -> Result<Arc<Block>> {
if let Some(ref block_cache) = self.block_cache {
let blk = block_cache
.try_get_with((self.id, block_idx), || self.read_block(block_idx))
.map_err(|e| anyhow!("{}", e))?;
Ok(blk)
} else {
self.read_block(block_idx)
}
}
/// Find the block that may contain `key`.
pub fn find_block_idx(&self, key: &[u8]) -> usize {
self.block_metas

View File

@@ -1,10 +1,12 @@
use std::path::Path;
use std::sync::Arc;
use anyhow::Result;
use bytes::BufMut;
use super::{BlockMeta, FileObject, SsTable};
use crate::block::BlockBuilder;
use crate::lsm_storage::BlockCache;
/// Builds an SSTable from key-value pairs.
pub struct SsTableBuilder {
@@ -61,7 +63,12 @@ impl SsTableBuilder {
/// Builds the SSTable and writes it to the given path. No need to actually write to disk until
/// chapter 4 block cache.
pub fn build(mut self, path: impl AsRef<Path>) -> Result<SsTable> {
pub fn build(
mut self,
id: usize,
block_cache: Option<Arc<BlockCache>>,
path: impl AsRef<Path>,
) -> Result<SsTable> {
self.finish_block();
let mut buf = self.data;
let meta_offset = buf.len();
@@ -69,9 +76,16 @@ impl SsTableBuilder {
buf.put_u32(meta_offset as u32);
let file = FileObject::create(path.as_ref(), buf)?;
Ok(SsTable {
id,
file,
block_metas: self.meta,
block_meta_offset: meta_offset,
block_cache,
})
}
#[cfg(test)]
pub(crate) fn build_for_test(self, path: impl AsRef<Path>) -> Result<SsTable> {
self.build(0, None, path)
}
}

View File

@@ -17,7 +17,7 @@ impl SsTableIterator {
fn seek_to_first_inner(table: &Arc<SsTable>) -> Result<(usize, BlockIterator)> {
Ok((
0,
BlockIterator::create_and_seek_to_first(table.read_block(0)?),
BlockIterator::create_and_seek_to_first(table.read_block_cached(0)?),
))
}
@@ -42,11 +42,13 @@ impl SsTableIterator {
fn seek_to_key_inner(table: &Arc<SsTable>, key: &[u8]) -> Result<(usize, BlockIterator)> {
let mut blk_idx = table.find_block_idx(key);
let mut blk_iter = BlockIterator::create_and_seek_to_key(table.read_block(blk_idx)?, key);
let mut blk_iter =
BlockIterator::create_and_seek_to_key(table.read_block_cached(blk_idx)?, key);
if !blk_iter.is_valid() {
blk_idx += 1;
if blk_idx < table.num_of_blocks() {
blk_iter = BlockIterator::create_and_seek_to_first(table.read_block(blk_idx)?);
blk_iter =
BlockIterator::create_and_seek_to_first(table.read_block_cached(blk_idx)?);
}
}
Ok((blk_idx, blk_iter))
@@ -90,8 +92,9 @@ impl StorageIterator for SsTableIterator {
if !self.blk_iter.is_valid() {
self.blk_idx += 1;
if self.blk_idx < self.table.num_of_blocks() {
self.blk_iter =
BlockIterator::create_and_seek_to_first(self.table.read_block(self.blk_idx)?);
self.blk_iter = BlockIterator::create_and_seek_to_first(
self.table.read_block_cached(self.blk_idx)?,
);
}
}
Ok(())

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use bytes::Bytes;
use tempfile::{tempdir, TempDir};
use super::*;
use crate::iterators::impls::StorageIterator;
@@ -10,7 +11,8 @@ use crate::table::SsTableBuilder;
fn test_sst_build_single_key() {
let mut builder = SsTableBuilder::new(16);
builder.add(b"233", b"233333");
builder.build("").unwrap();
let dir = tempdir().unwrap();
builder.build_for_test(dir.path().join("1.sst")).unwrap();
}
#[test]
@@ -23,7 +25,8 @@ fn test_sst_build_two_blocks() {
builder.add(b"55", b"11");
builder.add(b"66", b"22");
assert!(builder.meta.len() >= 2);
builder.build("").unwrap();
let dir = tempdir().unwrap();
builder.build_for_test(dir.path().join("1.sst")).unwrap();
}
fn key_of(idx: usize) -> Vec<u8> {
@@ -38,14 +41,16 @@ fn num_of_keys() -> usize {
100
}
fn generate_sst() -> SsTable {
fn generate_sst() -> (TempDir, SsTable) {
let mut builder = SsTableBuilder::new(128);
for idx in 0..num_of_keys() {
let key = key_of(idx);
let value = value_of(idx);
builder.add(&key[..], &value[..]);
}
builder.build("").unwrap()
let dir = tempdir().unwrap();
let path = dir.path().join("1.sst");
(dir, builder.build_for_test(path).unwrap())
}
#[test]
@@ -55,9 +60,9 @@ fn test_sst_build_all() {
#[test]
fn test_sst_decode() {
let sst = generate_sst();
let (_dir, sst) = generate_sst();
let meta = sst.block_metas.clone();
let new_sst = SsTable::open(sst.file).unwrap();
let new_sst = SsTable::open_for_test(sst.file).unwrap();
assert_eq!(new_sst.block_metas, meta);
}
@@ -67,7 +72,8 @@ fn as_bytes(x: &[u8]) -> Bytes {
#[test]
fn test_sst_iterator() {
let sst = Arc::new(generate_sst());
let (_dir, sst) = generate_sst();
let sst = Arc::new(sst);
let mut iter = SsTableIterator::create_and_seek_to_first(sst).unwrap();
for _ in 0..5 {
for i in 0..num_of_keys() {
@@ -95,7 +101,8 @@ fn test_sst_iterator() {
#[test]
fn test_sst_seek_key() {
let sst = Arc::new(generate_sst());
let (_dir, sst) = generate_sst();
let sst = Arc::new(sst);
let mut iter = SsTableIterator::create_and_seek_to_key(sst, &key_of(0)).unwrap();
for offset in 1..=5 {
for i in 0..num_of_keys() {

View File

@@ -1,6 +1,7 @@
use std::ops::Bound;
use bytes::Bytes;
use tempfile::tempdir;
use crate::iterators::impls::StorageIterator;
@@ -34,8 +35,8 @@ fn check_iter_result(iter: impl StorageIterator, expected: Vec<(Bytes, Bytes)>)
#[test]
fn test_storage_get() {
use crate::lsm_storage::LsmStorage;
let storage = LsmStorage::open("").unwrap();
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
@@ -49,8 +50,8 @@ fn test_storage_get() {
#[test]
fn test_storage_scan_memtable_1() {
use crate::lsm_storage::LsmStorage;
let storage = LsmStorage::open("").unwrap();
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
@@ -79,8 +80,8 @@ fn test_storage_scan_memtable_1() {
#[test]
fn test_storage_scan_memtable_2() {
use crate::lsm_storage::LsmStorage;
let storage = LsmStorage::open("").unwrap();
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();
@@ -109,8 +110,8 @@ fn test_storage_scan_memtable_2() {
#[test]
fn test_storage_get_after_sync() {
use crate::lsm_storage::LsmStorage;
let storage = LsmStorage::open("").unwrap();
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.sync().unwrap();
@@ -125,8 +126,8 @@ fn test_storage_get_after_sync() {
#[test]
fn test_storage_scan_memtable_1_after_sync() {
use crate::lsm_storage::LsmStorage;
let storage = LsmStorage::open("").unwrap();
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.sync().unwrap();
@@ -156,8 +157,8 @@ fn test_storage_scan_memtable_1_after_sync() {
#[test]
fn test_storage_scan_memtable_2_after_sync() {
use crate::lsm_storage::LsmStorage;
let storage = LsmStorage::open("").unwrap();
let dir = tempdir().unwrap();
let storage = LsmStorage::open(&dir).unwrap();
storage.put(b"1", b"233").unwrap();
storage.put(b"2", b"2333").unwrap();
storage.put(b"3", b"23333").unwrap();