add w1d1 and update starter code

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2024-01-20 21:54:16 +08:00
parent 1ece52a2d9
commit f7f2fd37e4
8 changed files with 331 additions and 25 deletions

View File

@@ -8,12 +8,12 @@ use std::sync::Arc;
use anyhow::Result;
use bytes::Bytes;
use parking_lot::{Mutex, RwLock};
use parking_lot::{Mutex, MutexGuard, RwLock};
use crate::block::Block;
use crate::compact::{
CompactionController, CompactionOptions, LeveledCompactionOptions,
SimpleLeveledCompactionOptions,
CompactionController, CompactionOptions, LeveledCompactionController, LeveledCompactionOptions,
SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController,
};
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::Manifest;
@@ -22,13 +22,14 @@ use crate::table::SsTable;
pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;
/// Represents the state of the storage engine.
#[derive(Clone)]
pub struct LsmStorageState {
/// The current memtable.
pub memtable: Arc<MemTable>,
/// Immutable memtables, from earliest to latest.
/// Immutable memtables, from latest to earliest.
pub imm_memtables: Vec<Arc<MemTable>>,
/// L0 SSTs, from earliest to latest.
/// L0 SSTs, from latest to earliest.
pub l0_sstables: Vec<usize>,
/// SsTables sorted by key range; L1 - L_max for leveled compaction, or tiers for tiered
/// compaction.
@@ -58,8 +59,11 @@ impl LsmStorageState {
}
pub struct LsmStorageOptions {
// Block size in bytes
pub block_size: usize,
// SST size in bytes, also the approximate memtable capacity limit
pub target_sst_size: usize,
// Maximum number of memtables in memory, flush to L0 when exceeding this limit
pub num_memtable_limit: usize,
pub compaction_options: CompactionOptions,
pub enable_wal: bool,
@@ -72,7 +76,7 @@ impl LsmStorageOptions {
target_sst_size: 2 << 20,
compaction_options: CompactionOptions::NoCompaction,
enable_wal: false,
num_memtable_limit: 3,
num_memtable_limit: 50,
}
}
}
@@ -86,12 +90,15 @@ pub(crate) struct LsmStorageInner {
next_sst_id: AtomicUsize,
pub(crate) options: Arc<LsmStorageOptions>,
pub(crate) compaction_controller: CompactionController,
pub(crate) manifest: Manifest,
pub(crate) manifest: Option<Manifest>,
}
/// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM.
pub struct MiniLsm {
pub(crate) inner: Arc<LsmStorageInner>,
/// Notifies the compaction thread to stop working. (In week 2)
compaction_notifier: crossbeam_channel::Sender<()>,
/// The handle for the compaction thread. (In week 2)
compaction_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
}
@@ -106,8 +113,17 @@ impl MiniLsm {
unimplemented!()
}
pub fn open(_path: impl AsRef<Path>, _options: LsmStorageOptions) -> Result<Arc<Self>> {
unimplemented!()
/// Start the storage engine by either loading an existing directory or creating a new one if the directory does
/// not exist.
pub fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Arc<Self>> {
let inner = Arc::new(LsmStorageInner::open(path, options)?);
let (tx, rx) = crossbeam_channel::unbounded();
let compaction_thread = inner.spawn_compaction_thread(rx)?;
Ok(Arc::new(Self {
inner,
compaction_notifier: tx,
compaction_thread: Mutex::new(compaction_thread),
}))
}
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
@@ -131,7 +147,8 @@ impl MiniLsm {
}
pub fn force_flush(&self) -> Result<()> {
self.inner.force_freeze_memtable()?;
self.inner
.force_freeze_memtable(&self.inner.state_lock.lock())?;
self.inner.force_flush_next_imm_memtable()
}
@@ -146,8 +163,37 @@ impl LsmStorageInner {
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}
pub(crate) fn open(_path: impl AsRef<Path>, _options: LsmStorageOptions) -> Result<Self> {
unimplemented!()
/// Start the storage engine by either loading an existing directory or creating a new one if the directory does
/// not exist.
pub(crate) fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Self> {
let path = path.as_ref();
let state = LsmStorageState::create(&options);
let compaction_controller = match &options.compaction_options {
CompactionOptions::Leveled(options) => {
CompactionController::Leveled(LeveledCompactionController::new(options.clone()))
}
CompactionOptions::Tiered(options) => {
CompactionController::Tiered(TieredCompactionController::new(options.clone()))
}
CompactionOptions::Simple(options) => CompactionController::Simple(
SimpleLeveledCompactionController::new(options.clone()),
),
CompactionOptions::NoCompaction => CompactionController::NoCompaction,
};
let storage = Self {
state: Arc::new(RwLock::new(Arc::new(state))),
state_lock: Mutex::new(()),
path: path.to_path_buf(),
block_cache: Arc::new(BlockCache::new(1024)),
next_sst_id: AtomicUsize::new(1),
compaction_controller,
manifest: None,
options: options.into(),
};
Ok(storage)
}
/// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
@@ -185,8 +231,8 @@ impl LsmStorageInner {
unimplemented!()
}
/// Force freeze the current memetable to an immutable memtable
pub fn force_freeze_memtable(&self) -> Result<()> {
/// Force freeze the current memtable to an immutable memtable
pub fn force_freeze_memtable(&self, _state_lock_observer: &MutexGuard<'_, ()>) -> Result<()> {
unimplemented!()
}