diff --git a/.gitignore b/.gitignore index cc2f724..f6d1fec 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ .vscode/ sync-tmp/ mini-lsm.db/ +lsm.db/ diff --git a/README.md b/README.md index f9beb9a..4be904d 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we | 2.4 | Compaction Strategy - Leveled | ✅ | | | | 2.5 | Manifest | 🚧 | | | | 2.6 | Write-Ahead Log | 🚧 | | | -| 2.7 | Batch Write (and preparations for MVCC) | | | | +| 2.7 | Batch Write + Checksum | | | | | 3.1 | Timestamp Encoding + Prefix Bloom Filter | | | | | 3.2 | Snapshot Read | | | | | 3.3 | Watermark and Garbage Collection | | | | diff --git a/mini-lsm/src/bin/mini_lsm_cli.rs b/mini-lsm/src/bin/mini_lsm_cli.rs index abaf1ce..256744b 100644 --- a/mini-lsm/src/bin/mini_lsm_cli.rs +++ b/mini-lsm/src/bin/mini_lsm_cli.rs @@ -3,7 +3,6 @@ use std::path::PathBuf; use anyhow::Result; use bytes::Bytes; use clap::{Parser, ValueEnum}; - use mini_lsm::compact::{ CompactionOptions, LeveledCompactionOptions, SimpleLeveledCompactionOptions, TieredCompactionOptions, @@ -21,11 +20,11 @@ enum CompactionStrategy { #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { - #[arg(long, default_value = "mini-lsm.db")] + #[arg(long, default_value = "lsm.db")] path: PathBuf, #[arg(long, default_value = "leveled")] compaction: CompactionStrategy, - #[arg(long)] + #[arg(long, default_value = "true")] enable_wal: bool, } diff --git a/mini-lsm/src/compact.rs b/mini-lsm/src/compact.rs index 2aeeb61..ec063f3 100644 --- a/mini-lsm/src/compact.rs +++ b/mini-lsm/src/compact.rs @@ -46,7 +46,7 @@ pub(crate) enum CompactionController { } impl CompactionController { - fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option { + pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option { match self { CompactionController::Leveled(ctrl) => ctrl .generate_compaction_task(&snapshot) @@ -61,7 +61,7 @@ impl CompactionController { } } - fn apply_compaction_result( + pub fn apply_compaction_result( &self, snapshot: &LsmStorageState, task: &CompactionTask, @@ -247,14 +247,16 @@ impl LsmStorageInner { assert!(result.is_some()); ssts_to_remove.push(result.unwrap()); } + let mut new_sst_ids = Vec::new(); for file_to_add in sstables { + new_sst_ids.push(file_to_add.sst_id()); let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add); assert!(result.is_none()); } let mut state = self.state.write(); *state = Arc::new(snapshot); self.manifest - .add_record(&state_lock, ManifestRecord::Compaction(task))?; + .add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?; ssts_to_remove }; for sst in ssts_to_remove { diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index a491bc3..1d228fd 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -1,11 +1,11 @@ -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::fs::File; use std::ops::Bound; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; use bytes::Bytes; use parking_lot::{Mutex, RwLock}; @@ -20,7 +20,7 @@ use crate::iterators::StorageIterator; use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::manifest::{Manifest, ManifestRecord}; use crate::mem_table::{map_bound, MemTable}; -use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; +use crate::table::{self, FileObject, SsTable, SsTableBuilder, SsTableIterator}; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; @@ -103,6 +103,7 @@ impl Drop for MiniLsm { impl MiniLsm { pub fn close(&self) -> Result<()> { + self.inner.sync_dir()?; self.compaction_notifier.send(()).ok(); let mut compaction_thread = self.compaction_thread.lock(); if let Some(compaction_thread) = compaction_thread.take() { @@ -157,36 +158,101 @@ impl LsmStorageInner { } pub(crate) fn open(path: impl AsRef, options: LsmStorageOptions) -> Result { - let path = path.as_ref(); - if !path.exists() { - std::fs::create_dir_all(path)?; - } let mut state = LsmStorageState::create(&options); - if options.enable_wal { - state.memtable = Arc::new(MemTable::create_with_wal( - state.memtable.id(), - Self::path_of_wal_static(path, state.memtable.id()), - )?); - } + let path = path.as_ref(); + let mut next_sst_id = 1; + let block_cache = Arc::new(BlockCache::new(1 << 20)); // 4GB block cache, + let manifest; + + let compaction_controller = match &options.compaction_options { + CompactionOptions::Leveled(options) => { + CompactionController::Leveled(LeveledCompactionController::new(options.clone())) + } + CompactionOptions::Tiered(options) => { + CompactionController::Tiered(TieredCompactionController::new(options.clone())) + } + CompactionOptions::Simple(options) => CompactionController::Simple( + SimpleLeveledCompactionController::new(options.clone()), + ), + CompactionOptions::NoCompaction => CompactionController::NoCompaction, + }; + + if !path.exists() { + std::fs::create_dir_all(path).context("failed to create DB dir")?; + if options.enable_wal { + state.memtable = Arc::new(MemTable::create_with_wal( + state.memtable.id(), + Self::path_of_wal_static(path, state.memtable.id()), + )?); + } + manifest = + Manifest::create(path.join("MANIFEST")).context("failed to create manifest")?; + manifest.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?; + } else { + let (m, records) = Manifest::recover(path.join("MANIFEST"))?; + let mut memtables = BTreeSet::new(); + for record in records { + match record { + ManifestRecord::Flush(sst_id) => { + let res = memtables.remove(&sst_id); + assert!(res, "memtable not exist?"); + state.l0_sstables.insert(0, sst_id); + } + ManifestRecord::NewMemtable(x) => { + next_sst_id = x + 1; + memtables.insert(x); + } + ManifestRecord::Compaction(task, output) => { + let (new_state, _) = + compaction_controller.apply_compaction_result(&state, &task, &output); + // TODO: apply remove again + state = new_state; + } + } + } + // recover SSTs + for table_id in state + .l0_sstables + .iter() + .chain(state.levels.iter().map(|(_, files)| files).flatten()) + { + let table_id = *table_id; + let sst = SsTable::open( + table_id, + Some(block_cache.clone()), + FileObject::open(&Self::path_of_sst_static(path, table_id)) + .context("failed to open SST")?, + )?; + state.sstables.insert(table_id, Arc::new(sst)); + } + // recover memtables + if options.enable_wal { + for id in memtables.iter() { + let memtable = + MemTable::recover_from_wal(*id, Self::path_of_wal_static(path, *id))?; + state.imm_memtables.insert(0, Arc::new(memtable)); + next_sst_id = *id + 1; + } + state.memtable = Arc::new(MemTable::create_with_wal( + next_sst_id, + Self::path_of_wal_static(path, next_sst_id), + )?); + } else { + state.memtable = Arc::new(MemTable::create(next_sst_id)); + } + m.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?; + next_sst_id += 1; + manifest = m; + }; + let storage = Self { state: Arc::new(RwLock::new(Arc::new(state))), state_lock: Mutex::new(()), path: path.to_path_buf(), - block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache, - next_sst_id: AtomicUsize::new(1), - compaction_controller: match &options.compaction_options { - CompactionOptions::Leveled(options) => { - CompactionController::Leveled(LeveledCompactionController::new(options.clone())) - } - CompactionOptions::Tiered(options) => { - CompactionController::Tiered(TieredCompactionController::new(options.clone())) - } - CompactionOptions::Simple(options) => CompactionController::Simple( - SimpleLeveledCompactionController::new(options.clone()), - ), - CompactionOptions::NoCompaction => CompactionController::NoCompaction, - }, - manifest: Manifest::create(path.join("MANIFEST"))?, + block_cache, + next_sst_id: AtomicUsize::new(next_sst_id), + compaction_controller, + manifest, options: options.into(), }; storage.sync_dir()?; @@ -259,8 +325,12 @@ impl LsmStorageInner { Ok(()) } + pub(crate) fn path_of_sst_static(path: impl AsRef, id: usize) -> PathBuf { + path.as_ref().join(format!("{:05}.sst", id)) + } + pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf { - self.path.join(format!("{:05}.sst", id)) + Self::path_of_sst_static(&self.path, id) } pub(crate) fn path_of_wal_static(path: impl AsRef, id: usize) -> PathBuf { @@ -303,7 +373,7 @@ impl LsmStorageInner { old_memtable.sync_wal()?; self.manifest - .add_record(&state_lock, ManifestRecord::NewWal(memtable_id))?; + .add_record(&state_lock, ManifestRecord::NewMemtable(memtable_id))?; Ok(()) } diff --git a/mini-lsm/src/manifest.rs b/mini-lsm/src/manifest.rs index 4f9d44a..a71e61c 100644 --- a/mini-lsm/src/manifest.rs +++ b/mini-lsm/src/manifest.rs @@ -1,29 +1,59 @@ -use std::fs::File; +use std::fs::{File, OpenOptions}; +use std::io::{Read, Write}; use std::path::Path; use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; use parking_lot::{Mutex, MutexGuard}; use serde::{Deserialize, Serialize}; +use serde_json::Deserializer; use crate::compact::CompactionTask; -pub struct Manifest {} +pub struct Manifest { + file: Arc>, +} #[derive(Serialize, Deserialize)] pub enum ManifestRecord { Flush(usize), - NewWal(usize), - Compaction(CompactionTask), + NewMemtable(usize), + Compaction(CompactionTask, Vec), } impl Manifest { pub fn create(path: impl AsRef) -> Result { - Ok(Self {}) + Ok(Self { + file: Arc::new(Mutex::new( + OpenOptions::new() + .read(true) + .create_new(true) + .write(true) + .open(path) + .context("failed to create manifest")?, + )), + }) } pub fn recover(path: impl AsRef) -> Result<(Self, Vec)> { - Ok((Self {}, Vec::new())) + let mut file = OpenOptions::new() + .read(true) + .append(true) + .open(path) + .context("failed to recover manifest")?; + let mut buf = Vec::new(); + file.read_to_end(&mut buf)?; + let mut stream = Deserializer::from_slice(&buf).into_iter::(); + let mut records = Vec::new(); + while let Some(x) = stream.next() { + records.push(x?); + } + Ok(( + Self { + file: Arc::new(Mutex::new(file)), + }, + records, + )) } pub fn add_record( @@ -31,6 +61,14 @@ impl Manifest { _state_lock_observer: &MutexGuard<()>, record: ManifestRecord, ) -> Result<()> { + self.add_record_when_init(record) + } + + pub fn add_record_when_init(&self, record: ManifestRecord) -> Result<()> { + let mut file = self.file.lock(); + let buf = serde_json::to_vec(&record)?; + file.write(&buf)?; + file.sync_all()?; Ok(()) } } diff --git a/mini-lsm/src/mem_table.rs b/mini-lsm/src/mem_table.rs index 2b2b0e4..fb6fbc8 100644 --- a/mini-lsm/src/mem_table.rs +++ b/mini-lsm/src/mem_table.rs @@ -47,8 +47,14 @@ impl MemTable { }) } + /// Create a memtable from WAL pub fn recover_from_wal(id: usize, path: impl AsRef) -> Result { - unimplemented!() + let map = Arc::new(SkipMap::new()); + Ok(Self { + id, + wal: Some(Wal::recover(path.as_ref(), &map)?), + map, + }) } /// Get a value by key. diff --git a/mini-lsm/src/table.rs b/mini-lsm/src/table.rs index a8083a3..c37aa99 100644 --- a/mini-lsm/src/table.rs +++ b/mini-lsm/src/table.rs @@ -102,8 +102,10 @@ impl FileObject { )) } - pub fn open(_path: &Path) -> Result { - unimplemented!() + pub fn open(path: &Path) -> Result { + let file = File::options().read(true).write(false).open(path)?; + let size = file.metadata()?.len(); + Ok(FileObject(Some(file), size)) } } diff --git a/mini-lsm/src/wal.rs b/mini-lsm/src/wal.rs index 9d026c2..b26f3c6 100644 --- a/mini-lsm/src/wal.rs +++ b/mini-lsm/src/wal.rs @@ -1,19 +1,70 @@ +use std::fs::{File, OpenOptions}; +use std::io::{Read, Write}; use std::path::Path; +use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; +use bytes::{Buf, BufMut, Bytes}; +use crossbeam_skiplist::SkipMap; +use parking_lot::Mutex; -pub struct Wal {} +pub struct Wal { + file: Arc>, +} impl Wal { pub fn create(path: impl AsRef) -> Result { - unimplemented!() + Ok(Self { + file: Arc::new(Mutex::new( + OpenOptions::new() + .read(true) + .create_new(true) + .write(true) + .open(path) + .context("failed to create WAL")?, + )), + }) + } + + pub fn recover(path: impl AsRef, skiplist: &SkipMap) -> Result { + let path = path.as_ref(); + let mut file = OpenOptions::new() + .read(true) + .append(true) + .open(path) + .context("failed to recover from WAL")?; + let mut buf = Vec::new(); + file.read_to_end(&mut buf)?; + let mut rbuf: &[u8] = buf.as_slice(); + while rbuf.has_remaining() { + let key_len = rbuf.get_u16() as usize; + let key = Bytes::copy_from_slice(&rbuf[..key_len]); + rbuf.advance(key_len); + let value_len = rbuf.get_u16() as usize; + let value = Bytes::copy_from_slice(&rbuf[..value_len]); + rbuf.advance(value_len); + skiplist.insert(key, value); + } + Ok(Self { + file: Arc::new(Mutex::new(file)), + }) } pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + let mut file = self.file.lock(); + let mut buf: Vec = + Vec::with_capacity(key.len() + value.len() + std::mem::size_of::()); + buf.put_u16(key.len() as u16); + buf.put_slice(key); + buf.put_u16(value.len() as u16); + buf.put_slice(value); + file.write(&buf)?; Ok(()) } pub fn sync(&self) -> Result<()> { + let file = self.file.lock(); + file.sync_all()?; Ok(()) } }