fix wal close not waiting for threads, better test harness

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2024-01-28 16:38:56 +08:00
parent 9b75e72e58
commit 85acf69dcc
4 changed files with 17 additions and 16 deletions

View File

@@ -55,7 +55,7 @@ If WAL is enabled, you will need to recover the memtables based on WALs when loa
cargo run --bin mini-lsm-cli -- --enable-wal cargo run --bin mini-lsm-cli -- --enable-wal
``` ```
Remember to recover the correct `next_sst_id` from the state, which should be `max{memtable id, sst id}` + 1. In your `close` function, you should not flush memtables to SSTs if `enable_wal` is set to true, as WAL itself provides persistency. Remember to recover the correct `next_sst_id` from the state, which should be `max{memtable id, sst id}` + 1. In your `close` function, you should not flush memtables to SSTs if `enable_wal` is set to true, as WAL itself provides persistency. You should wait until all compaction and flush threads to exit before closing the database.
## Test Your Understanding ## Test Your Understanding

View File

@@ -188,12 +188,6 @@ impl MiniLsm {
self.compaction_notifier.send(()).ok(); self.compaction_notifier.send(()).ok();
self.flush_notifier.send(()).ok(); self.flush_notifier.send(()).ok();
if self.inner.options.enable_wal {
self.inner.sync()?;
self.inner.sync_dir()?;
return Ok(());
}
let mut compaction_thread = self.compaction_thread.lock(); let mut compaction_thread = self.compaction_thread.lock();
if let Some(compaction_thread) = compaction_thread.take() { if let Some(compaction_thread) = compaction_thread.take() {
compaction_thread compaction_thread
@@ -207,6 +201,12 @@ impl MiniLsm {
.map_err(|e| anyhow::anyhow!("{:?}", e))?; .map_err(|e| anyhow::anyhow!("{:?}", e))?;
} }
if self.inner.options.enable_wal {
self.inner.sync()?;
self.inner.sync_dir()?;
return Ok(());
}
// create memtable and skip updating manifest // create memtable and skip updating manifest
if !self.inner.state.read().memtable.is_empty() { if !self.inner.state.read().memtable.is_empty() {
self.inner self.inner

View File

@@ -188,12 +188,6 @@ impl MiniLsm {
self.compaction_notifier.send(()).ok(); self.compaction_notifier.send(()).ok();
self.flush_notifier.send(()).ok(); self.flush_notifier.send(()).ok();
if self.inner.options.enable_wal {
self.inner.sync()?;
self.inner.sync_dir()?;
return Ok(());
}
let mut compaction_thread = self.compaction_thread.lock(); let mut compaction_thread = self.compaction_thread.lock();
if let Some(compaction_thread) = compaction_thread.take() { if let Some(compaction_thread) = compaction_thread.take() {
compaction_thread compaction_thread
@@ -207,6 +201,12 @@ impl MiniLsm {
.map_err(|e| anyhow::anyhow!("{:?}", e))?; .map_err(|e| anyhow::anyhow!("{:?}", e))?;
} }
if self.inner.options.enable_wal {
self.inner.sync()?;
self.inner.sync_dir()?;
return Ok(());
}
// create memtable and skip updating manifest // create memtable and skip updating manifest
if !self.inner.state.read().memtable.is_empty() { if !self.inner.state.read().memtable.is_empty() {
self.inner self.inner

View File

@@ -401,12 +401,13 @@ pub fn check_compaction_ratio(storage: Arc<MiniLsm>) {
} }
pub fn dump_files_in_dir(path: impl AsRef<Path>) { pub fn dump_files_in_dir(path: impl AsRef<Path>) {
println!("--- DIR DUMP ---");
for f in path.as_ref().read_dir().unwrap() { for f in path.as_ref().read_dir().unwrap() {
let f = f.unwrap(); let f = f.unwrap();
print!("{}", f.path().display());
println!( println!(
"{}, size={:.3}KB", ", size={:.3}KB",
f.path().display(),
f.metadata().unwrap().size() as f64 / 1024.0 f.metadata().unwrap().size() as f64 / 1024.0
) );
} }
} }