@@ -302,6 +302,8 @@ impl LsmStorageInner {
|
|||||||
self.manifest.as_ref().unwrap()
|
self.manifest.as_ref().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Start the storage engine by either loading an existing directory or creating a new one if the directory does
|
||||||
|
/// not exist.
|
||||||
/// Start the storage engine by either loading an existing directory or creating a new one if the directory does
|
/// Start the storage engine by either loading an existing directory or creating a new one if the directory does
|
||||||
/// not exist.
|
/// not exist.
|
||||||
pub(crate) fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Self> {
|
pub(crate) fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Self> {
|
||||||
@@ -328,6 +330,7 @@ impl LsmStorageInner {
|
|||||||
std::fs::create_dir_all(path).context("failed to create DB dir")?;
|
std::fs::create_dir_all(path).context("failed to create DB dir")?;
|
||||||
}
|
}
|
||||||
let manifest_path = path.join("MANIFEST");
|
let manifest_path = path.join("MANIFEST");
|
||||||
|
let mut last_commit_ts = 0;
|
||||||
if !manifest_path.exists() {
|
if !manifest_path.exists() {
|
||||||
if options.enable_wal {
|
if options.enable_wal {
|
||||||
state.memtable = Arc::new(MemTable::create_with_wal(
|
state.memtable = Arc::new(MemTable::create_with_wal(
|
||||||
@@ -381,6 +384,7 @@ impl LsmStorageInner {
|
|||||||
FileObject::open(&Self::path_of_sst_static(path, table_id))
|
FileObject::open(&Self::path_of_sst_static(path, table_id))
|
||||||
.context("failed to open SST")?,
|
.context("failed to open SST")?,
|
||||||
)?;
|
)?;
|
||||||
|
last_commit_ts = last_commit_ts.max(sst.max_ts());
|
||||||
state.sstables.insert(table_id, Arc::new(sst));
|
state.sstables.insert(table_id, Arc::new(sst));
|
||||||
sst_cnt += 1;
|
sst_cnt += 1;
|
||||||
}
|
}
|
||||||
@@ -394,6 +398,13 @@ impl LsmStorageInner {
|
|||||||
for id in memtables.iter() {
|
for id in memtables.iter() {
|
||||||
let memtable =
|
let memtable =
|
||||||
MemTable::recover_from_wal(*id, Self::path_of_wal_static(path, *id))?;
|
MemTable::recover_from_wal(*id, Self::path_of_wal_static(path, *id))?;
|
||||||
|
let max_ts = memtable
|
||||||
|
.map
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.key().ts())
|
||||||
|
.max()
|
||||||
|
.unwrap_or_default();
|
||||||
|
last_commit_ts = last_commit_ts.max(max_ts);
|
||||||
if !memtable.is_empty() {
|
if !memtable.is_empty() {
|
||||||
state.imm_memtables.insert(0, Arc::new(memtable));
|
state.imm_memtables.insert(0, Arc::new(memtable));
|
||||||
wal_cnt += 1;
|
wal_cnt += 1;
|
||||||
@@ -421,7 +432,7 @@ impl LsmStorageInner {
|
|||||||
compaction_controller,
|
compaction_controller,
|
||||||
manifest: Some(manifest),
|
manifest: Some(manifest),
|
||||||
options: options.into(),
|
options: options.into(),
|
||||||
mvcc: Some(LsmMvccInner::new(0)),
|
mvcc: Some(LsmMvccInner::new(last_commit_ts)),
|
||||||
};
|
};
|
||||||
storage.sync_dir()?;
|
storage.sync_dir()?;
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ use crate::wal::Wal;
|
|||||||
/// An initial implementation of memtable is part of week 1, day 1. It will be incrementally implemented in other
|
/// An initial implementation of memtable is part of week 1, day 1. It will be incrementally implemented in other
|
||||||
/// chapters of week 1 and week 2.
|
/// chapters of week 1 and week 2.
|
||||||
pub struct MemTable {
|
pub struct MemTable {
|
||||||
map: Arc<SkipMap<KeyBytes, Bytes>>,
|
pub(crate) map: Arc<SkipMap<KeyBytes, Bytes>>,
|
||||||
wal: Option<Wal>,
|
wal: Option<Wal>,
|
||||||
id: usize,
|
id: usize,
|
||||||
approximate_size: Arc<AtomicUsize>,
|
approximate_size: Arc<AtomicUsize>,
|
||||||
|
|||||||
@@ -29,8 +29,8 @@ pub struct BlockMeta {
|
|||||||
|
|
||||||
impl BlockMeta {
|
impl BlockMeta {
|
||||||
/// Encode block meta to a buffer.
|
/// Encode block meta to a buffer.
|
||||||
pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec<u8>) {
|
pub fn encode_block_meta(block_meta: &[BlockMeta], max_ts: u64, buf: &mut Vec<u8>) {
|
||||||
let mut estimated_size = std::mem::size_of::<u32>();
|
let mut estimated_size = std::mem::size_of::<u32>(); // number of blocks
|
||||||
for meta in block_meta {
|
for meta in block_meta {
|
||||||
// The size of offset
|
// The size of offset
|
||||||
estimated_size += std::mem::size_of::<u32>();
|
estimated_size += std::mem::size_of::<u32>();
|
||||||
@@ -43,7 +43,9 @@ impl BlockMeta {
|
|||||||
// The size of actual key
|
// The size of actual key
|
||||||
estimated_size += meta.last_key.raw_len();
|
estimated_size += meta.last_key.raw_len();
|
||||||
}
|
}
|
||||||
estimated_size += std::mem::size_of::<u32>();
|
estimated_size += std::mem::size_of::<u64>(); // max timestamp
|
||||||
|
estimated_size += std::mem::size_of::<u32>(); // checksum
|
||||||
|
|
||||||
// Reserve the space to improve performance, especially when the size of incoming data is
|
// Reserve the space to improve performance, especially when the size of incoming data is
|
||||||
// large
|
// large
|
||||||
buf.reserve(estimated_size);
|
buf.reserve(estimated_size);
|
||||||
@@ -58,12 +60,13 @@ impl BlockMeta {
|
|||||||
buf.put_slice(meta.last_key.key_ref());
|
buf.put_slice(meta.last_key.key_ref());
|
||||||
buf.put_u64(meta.last_key.ts());
|
buf.put_u64(meta.last_key.ts());
|
||||||
}
|
}
|
||||||
|
buf.put_u64(max_ts);
|
||||||
buf.put_u32(crc32fast::hash(&buf[original_len + 4..]));
|
buf.put_u32(crc32fast::hash(&buf[original_len + 4..]));
|
||||||
assert_eq!(estimated_size, buf.len() - original_len);
|
assert_eq!(estimated_size, buf.len() - original_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Decode block meta from a buffer.
|
/// Decode block meta from a buffer.
|
||||||
pub fn decode_block_meta(mut buf: &[u8]) -> Result<Vec<BlockMeta>> {
|
pub fn decode_block_meta(mut buf: &[u8]) -> Result<(Vec<BlockMeta>, u64)> {
|
||||||
let mut block_meta = Vec::new();
|
let mut block_meta = Vec::new();
|
||||||
let num = buf.get_u32() as usize;
|
let num = buf.get_u32() as usize;
|
||||||
let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]);
|
let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]);
|
||||||
@@ -81,11 +84,12 @@ impl BlockMeta {
|
|||||||
last_key,
|
last_key,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
let max_ts = buf.get_u64();
|
||||||
if buf.get_u32() != checksum {
|
if buf.get_u32() != checksum {
|
||||||
bail!("meta checksum mismatched");
|
bail!("meta checksum mismatched");
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(block_meta)
|
Ok((block_meta, max_ts))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,6 +141,7 @@ pub struct SsTable {
|
|||||||
first_key: KeyBytes,
|
first_key: KeyBytes,
|
||||||
last_key: KeyBytes,
|
last_key: KeyBytes,
|
||||||
pub(crate) bloom: Option<Bloom>,
|
pub(crate) bloom: Option<Bloom>,
|
||||||
|
max_ts: u64,
|
||||||
}
|
}
|
||||||
impl SsTable {
|
impl SsTable {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -154,7 +159,7 @@ impl SsTable {
|
|||||||
let raw_meta_offset = file.read(bloom_offset - 4, 4)?;
|
let raw_meta_offset = file.read(bloom_offset - 4, 4)?;
|
||||||
let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64;
|
let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64;
|
||||||
let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?;
|
let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?;
|
||||||
let block_meta = BlockMeta::decode_block_meta(&raw_meta[..])?;
|
let (block_meta, max_ts) = BlockMeta::decode_block_meta(&raw_meta[..])?;
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
file,
|
file,
|
||||||
first_key: block_meta.first().unwrap().first_key.clone(),
|
first_key: block_meta.first().unwrap().first_key.clone(),
|
||||||
@@ -164,6 +169,7 @@ impl SsTable {
|
|||||||
id,
|
id,
|
||||||
block_cache,
|
block_cache,
|
||||||
bloom: Some(bloom_filter),
|
bloom: Some(bloom_filter),
|
||||||
|
max_ts,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -183,6 +189,7 @@ impl SsTable {
|
|||||||
first_key,
|
first_key,
|
||||||
last_key,
|
last_key,
|
||||||
bloom: None,
|
bloom: None,
|
||||||
|
max_ts: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -246,8 +253,6 @@ impl SsTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn max_ts(&self) -> u64 {
|
pub fn max_ts(&self) -> u64 {
|
||||||
// TODO(you): implement me
|
self.max_ts
|
||||||
// self.max_ts
|
|
||||||
0
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ pub struct SsTableBuilder {
|
|||||||
pub(crate) meta: Vec<BlockMeta>,
|
pub(crate) meta: Vec<BlockMeta>,
|
||||||
block_size: usize,
|
block_size: usize,
|
||||||
key_hashes: Vec<u32>,
|
key_hashes: Vec<u32>,
|
||||||
|
max_ts: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SsTableBuilder {
|
impl SsTableBuilder {
|
||||||
@@ -32,6 +33,7 @@ impl SsTableBuilder {
|
|||||||
block_size,
|
block_size,
|
||||||
builder: BlockBuilder::new(block_size),
|
builder: BlockBuilder::new(block_size),
|
||||||
key_hashes: Vec::new(),
|
key_hashes: Vec::new(),
|
||||||
|
max_ts: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -41,6 +43,9 @@ impl SsTableBuilder {
|
|||||||
self.first_key.set_from_slice(key);
|
self.first_key.set_from_slice(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if key.ts() > self.max_ts {
|
||||||
|
self.max_ts = key.ts();
|
||||||
|
}
|
||||||
self.key_hashes.push(farmhash::fingerprint32(key.key_ref()));
|
self.key_hashes.push(farmhash::fingerprint32(key.key_ref()));
|
||||||
|
|
||||||
if self.builder.add(key, value) {
|
if self.builder.add(key, value) {
|
||||||
@@ -85,7 +90,7 @@ impl SsTableBuilder {
|
|||||||
self.finish_block();
|
self.finish_block();
|
||||||
let mut buf = self.data;
|
let mut buf = self.data;
|
||||||
let meta_offset = buf.len();
|
let meta_offset = buf.len();
|
||||||
BlockMeta::encode_block_meta(&self.meta, &mut buf);
|
BlockMeta::encode_block_meta(&self.meta, self.max_ts, &mut buf);
|
||||||
buf.put_u32(meta_offset as u32);
|
buf.put_u32(meta_offset as u32);
|
||||||
let bloom = Bloom::build_from_key_hashes(
|
let bloom = Bloom::build_from_key_hashes(
|
||||||
&self.key_hashes,
|
&self.key_hashes,
|
||||||
@@ -104,6 +109,7 @@ impl SsTableBuilder {
|
|||||||
block_meta_offset: meta_offset,
|
block_meta_offset: meta_offset,
|
||||||
block_cache,
|
block_cache,
|
||||||
bloom: Some(bloom),
|
bloom: Some(bloom),
|
||||||
|
max_ts: self.max_ts,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -10,8 +10,8 @@ mod week2_day1;
|
|||||||
mod week2_day2;
|
mod week2_day2;
|
||||||
mod week2_day3;
|
mod week2_day3;
|
||||||
mod week2_day4;
|
mod week2_day4;
|
||||||
// mod week2_day5;
|
mod week2_day5;
|
||||||
// mod week2_day6;
|
mod week2_day6;
|
||||||
mod week3_day1;
|
mod week3_day1;
|
||||||
mod week3_day2;
|
mod week3_day2;
|
||||||
mod week3_day3;
|
mod week3_day3;
|
||||||
|
|||||||
@@ -104,6 +104,7 @@ impl SsTableBuilder {
|
|||||||
block_meta_offset: meta_offset,
|
block_meta_offset: meta_offset,
|
||||||
block_cache,
|
block_cache,
|
||||||
bloom: Some(bloom),
|
bloom: Some(bloom),
|
||||||
|
max_ts: 0, // will be changed to latest ts in week 2
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user