finish persistence

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2024-01-19 17:28:47 +08:00
parent 13ae8fe8fb
commit 6b24d6dfab
9 changed files with 219 additions and 50 deletions

View File

@@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fs::File;
use std::ops::Bound;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use anyhow::Result;
use anyhow::{Context, Result};
use bytes::Bytes;
use parking_lot::{Mutex, RwLock};
@@ -20,7 +20,7 @@ use crate::iterators::StorageIterator;
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::{Manifest, ManifestRecord};
use crate::mem_table::{map_bound, MemTable};
use crate::table::{SsTable, SsTableBuilder, SsTableIterator};
use crate::table::{self, FileObject, SsTable, SsTableBuilder, SsTableIterator};
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
@@ -103,6 +103,7 @@ impl Drop for MiniLsm {
impl MiniLsm {
pub fn close(&self) -> Result<()> {
self.inner.sync_dir()?;
self.compaction_notifier.send(()).ok();
let mut compaction_thread = self.compaction_thread.lock();
if let Some(compaction_thread) = compaction_thread.take() {
@@ -157,36 +158,101 @@ impl LsmStorageInner {
}
pub(crate) fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Self> {
let path = path.as_ref();
if !path.exists() {
std::fs::create_dir_all(path)?;
}
let mut state = LsmStorageState::create(&options);
if options.enable_wal {
state.memtable = Arc::new(MemTable::create_with_wal(
state.memtable.id(),
Self::path_of_wal_static(path, state.memtable.id()),
)?);
}
let path = path.as_ref();
let mut next_sst_id = 1;
let block_cache = Arc::new(BlockCache::new(1 << 20)); // 4GB block cache,
let manifest;
let compaction_controller = match &options.compaction_options {
CompactionOptions::Leveled(options) => {
CompactionController::Leveled(LeveledCompactionController::new(options.clone()))
}
CompactionOptions::Tiered(options) => {
CompactionController::Tiered(TieredCompactionController::new(options.clone()))
}
CompactionOptions::Simple(options) => CompactionController::Simple(
SimpleLeveledCompactionController::new(options.clone()),
),
CompactionOptions::NoCompaction => CompactionController::NoCompaction,
};
if !path.exists() {
std::fs::create_dir_all(path).context("failed to create DB dir")?;
if options.enable_wal {
state.memtable = Arc::new(MemTable::create_with_wal(
state.memtable.id(),
Self::path_of_wal_static(path, state.memtable.id()),
)?);
}
manifest =
Manifest::create(path.join("MANIFEST")).context("failed to create manifest")?;
manifest.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?;
} else {
let (m, records) = Manifest::recover(path.join("MANIFEST"))?;
let mut memtables = BTreeSet::new();
for record in records {
match record {
ManifestRecord::Flush(sst_id) => {
let res = memtables.remove(&sst_id);
assert!(res, "memtable not exist?");
state.l0_sstables.insert(0, sst_id);
}
ManifestRecord::NewMemtable(x) => {
next_sst_id = x + 1;
memtables.insert(x);
}
ManifestRecord::Compaction(task, output) => {
let (new_state, _) =
compaction_controller.apply_compaction_result(&state, &task, &output);
// TODO: apply remove again
state = new_state;
}
}
}
// recover SSTs
for table_id in state
.l0_sstables
.iter()
.chain(state.levels.iter().map(|(_, files)| files).flatten())
{
let table_id = *table_id;
let sst = SsTable::open(
table_id,
Some(block_cache.clone()),
FileObject::open(&Self::path_of_sst_static(path, table_id))
.context("failed to open SST")?,
)?;
state.sstables.insert(table_id, Arc::new(sst));
}
// recover memtables
if options.enable_wal {
for id in memtables.iter() {
let memtable =
MemTable::recover_from_wal(*id, Self::path_of_wal_static(path, *id))?;
state.imm_memtables.insert(0, Arc::new(memtable));
next_sst_id = *id + 1;
}
state.memtable = Arc::new(MemTable::create_with_wal(
next_sst_id,
Self::path_of_wal_static(path, next_sst_id),
)?);
} else {
state.memtable = Arc::new(MemTable::create(next_sst_id));
}
m.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?;
next_sst_id += 1;
manifest = m;
};
let storage = Self {
state: Arc::new(RwLock::new(Arc::new(state))),
state_lock: Mutex::new(()),
path: path.to_path_buf(),
block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache,
next_sst_id: AtomicUsize::new(1),
compaction_controller: match &options.compaction_options {
CompactionOptions::Leveled(options) => {
CompactionController::Leveled(LeveledCompactionController::new(options.clone()))
}
CompactionOptions::Tiered(options) => {
CompactionController::Tiered(TieredCompactionController::new(options.clone()))
}
CompactionOptions::Simple(options) => CompactionController::Simple(
SimpleLeveledCompactionController::new(options.clone()),
),
CompactionOptions::NoCompaction => CompactionController::NoCompaction,
},
manifest: Manifest::create(path.join("MANIFEST"))?,
block_cache,
next_sst_id: AtomicUsize::new(next_sst_id),
compaction_controller,
manifest,
options: options.into(),
};
storage.sync_dir()?;
@@ -259,8 +325,12 @@ impl LsmStorageInner {
Ok(())
}
pub(crate) fn path_of_sst_static(path: impl AsRef<Path>, id: usize) -> PathBuf {
path.as_ref().join(format!("{:05}.sst", id))
}
pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf {
self.path.join(format!("{:05}.sst", id))
Self::path_of_sst_static(&self.path, id)
}
pub(crate) fn path_of_wal_static(path: impl AsRef<Path>, id: usize) -> PathBuf {
@@ -303,7 +373,7 @@ impl LsmStorageInner {
old_memtable.sync_wal()?;
self.manifest
.add_record(&state_lock, ManifestRecord::NewWal(memtable_id))?;
.add_record(&state_lock, ManifestRecord::NewMemtable(memtable_id))?;
Ok(())
}