pub(crate) mod bloom; mod builder; mod iterator; use std::fs::File; use std::path::Path; use std::sync::Arc; use anyhow::{anyhow, bail, Result}; pub use builder::SsTableBuilder; use bytes::{Buf, BufMut}; pub use iterator::SsTableIterator; use crate::block::Block; use crate::key::{KeyBytes, KeySlice}; use crate::lsm_storage::BlockCache; use self::bloom::Bloom; #[derive(Clone, Debug, PartialEq, Eq)] pub struct BlockMeta { /// Offset of this data block. pub offset: usize, /// The first key of the data block. pub first_key: KeyBytes, /// The last key of the data block. pub last_key: KeyBytes, } impl BlockMeta { /// Encode block meta to a buffer. pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec) { let mut estimated_size = std::mem::size_of::(); for meta in block_meta { // The size of offset estimated_size += std::mem::size_of::(); // The size of key length estimated_size += std::mem::size_of::(); // The size of actual key estimated_size += meta.first_key.len(); // The size of key length estimated_size += std::mem::size_of::(); // The size of actual key estimated_size += meta.last_key.len(); } estimated_size += std::mem::size_of::(); // Reserve the space to improve performance, especially when the size of incoming data is // large buf.reserve(estimated_size); let original_len = buf.len(); buf.put_u32(block_meta.len() as u32); for meta in block_meta { buf.put_u32(meta.offset as u32); buf.put_u16(meta.first_key.len() as u16); buf.put_slice(meta.first_key.raw_ref()); buf.put_u16(meta.last_key.len() as u16); buf.put_slice(meta.last_key.raw_ref()); } buf.put_u32(crc32fast::hash(&buf[original_len + 4..])); assert_eq!(estimated_size, buf.len() - original_len); } /// Decode block meta from a buffer. pub fn decode_block_meta(mut buf: &[u8]) -> Result> { let mut block_meta = Vec::new(); let num = buf.get_u32() as usize; let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]); for _ in 0..num { let offset = buf.get_u32() as usize; let first_key_len = buf.get_u16() as usize; let first_key = KeyBytes::from_bytes(buf.copy_to_bytes(first_key_len)); let last_key_len: usize = buf.get_u16() as usize; let last_key = KeyBytes::from_bytes(buf.copy_to_bytes(last_key_len)); block_meta.push(BlockMeta { offset, first_key, last_key, }); } if buf.get_u32() != checksum { bail!("meta checksum mismatched"); } Ok(block_meta) } } /// A file object. pub struct FileObject(Option, u64); impl FileObject { pub fn read(&self, offset: u64, len: u64) -> Result> { use std::os::unix::fs::FileExt; let mut data = vec![0; len as usize]; self.0 .as_ref() .unwrap() .read_exact_at(&mut data[..], offset)?; Ok(data) } pub fn size(&self) -> u64 { self.1 } /// Create a new file object (day 2) and write the file to the disk (day 4). pub fn create(path: &Path, data: Vec) -> Result { std::fs::write(path, &data)?; File::open(path)?.sync_all()?; Ok(FileObject( Some(File::options().read(true).write(false).open(path)?), data.len() as u64, )) } pub fn open(path: &Path) -> Result { let file = File::options().read(true).write(false).open(path)?; let size = file.metadata()?.len(); Ok(FileObject(Some(file), size)) } } /// An SSTable. pub struct SsTable { /// The actual storage unit of SsTable, the format is as above. pub(crate) file: FileObject, /// The meta blocks that hold info for data blocks. pub(crate) block_meta: Vec, /// The offset that indicates the start point of meta blocks in `file`. pub(crate) block_meta_offset: usize, id: usize, block_cache: Option>, first_key: KeyBytes, last_key: KeyBytes, pub(crate) bloom: Option, } impl SsTable { #[cfg(test)] pub(crate) fn open_for_test(file: FileObject) -> Result { Self::open(0, None, file) } /// Open SSTable from a file. pub fn open(id: usize, block_cache: Option>, file: FileObject) -> Result { let len = file.size(); let raw_bloom_offset = file.read(len - 4, 4)?; let bloom_offset = (&raw_bloom_offset[..]).get_u32() as u64; let raw_bloom = file.read(bloom_offset, len - 4 - bloom_offset)?; let bloom_filter = Bloom::decode(&raw_bloom)?; let raw_meta_offset = file.read(bloom_offset - 4, 4)?; let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64; let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?; let block_meta = BlockMeta::decode_block_meta(&raw_meta[..])?; Ok(Self { file, first_key: block_meta.first().unwrap().first_key.clone(), last_key: block_meta.last().unwrap().last_key.clone(), block_meta, block_meta_offset: block_meta_offset as usize, id, block_cache, bloom: Some(bloom_filter), }) } /// Create a mock SST with only first key + last key metadata pub fn create_meta_only( id: usize, file_size: u64, first_key: KeyBytes, last_key: KeyBytes, ) -> Self { Self { file: FileObject(None, file_size), block_meta: vec![], block_meta_offset: 0, id, block_cache: None, first_key, last_key, bloom: None, } } /// Read a block from the disk. pub fn read_block(&self, block_idx: usize) -> Result> { let offset = self.block_meta[block_idx].offset; let offset_end = self .block_meta .get(block_idx + 1) .map_or(self.block_meta_offset, |x| x.offset); let block_len = offset_end - offset - 4; let block_data_with_chksum: Vec = self .file .read(offset as u64, (offset_end - offset) as u64)?; let block_data = &block_data_with_chksum[..block_len]; let checksum = (&block_data_with_chksum[block_len..]).get_u32(); if checksum != crc32fast::hash(block_data) { bail!("block checksum mismatched"); } Ok(Arc::new(Block::decode(block_data))) } /// Read a block from disk, with block cache. pub fn read_block_cached(&self, block_idx: usize) -> Result> { if let Some(ref block_cache) = self.block_cache { let blk = block_cache .try_get_with((self.id, block_idx), || self.read_block(block_idx)) .map_err(|e| anyhow!("{}", e))?; Ok(blk) } else { self.read_block(block_idx) } } /// Find the block that may contain `key`. pub fn find_block_idx(&self, key: KeySlice) -> usize { self.block_meta .partition_point(|meta| meta.first_key.as_key_slice() <= key) .saturating_sub(1) } /// Get number of data blocks. pub fn num_of_blocks(&self) -> usize { self.block_meta.len() } pub fn first_key(&self) -> &KeyBytes { &self.first_key } pub fn last_key(&self) -> &KeyBytes { &self.last_key } pub fn table_size(&self) -> u64 { self.file.1 } pub fn sst_id(&self) -> usize { self.id } }