fix: ensure WAL is atomic for each write batch (#84)

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi Z
2024-07-02 20:23:33 -04:00
committed by GitHub
parent 6d16ae2d01
commit 2b527fd6dc
9 changed files with 124 additions and 38 deletions

View File

@@ -122,16 +122,25 @@ impl MemTable {
///
/// In week 1, day 1, simply put the key-value pair into the skipmap.
/// In week 2, day 6, also flush the data to WAL.
/// In week 3, day 5, modify the function to use the batch API.
pub fn put(&self, key: KeySlice, value: &[u8]) -> Result<()> {
let estimated_size = key.raw_len() + value.len();
self.map.insert(
key.to_key_vec().into_key_bytes(),
Bytes::copy_from_slice(value),
);
self.put_batch(&[(key, value)])
}
/// Implement this in week 3, day 5.
pub fn put_batch(&self, data: &[(KeySlice, &[u8])]) -> Result<()> {
let mut estimated_size = 0;
for (key, value) in data {
estimated_size += key.raw_len() + value.len();
self.map.insert(
key.to_key_vec().into_key_bytes(),
Bytes::copy_from_slice(value),
);
}
self.approximate_size
.fetch_add(estimated_size, std::sync::atomic::Ordering::Relaxed);
if let Some(ref wal) = self.wal {
wal.put(key, value)?;
wal.put_batch(data)?;
}
Ok(())
}

View File

@@ -40,51 +40,71 @@ impl Wal {
file.read_to_end(&mut buf)?;
let mut rbuf: &[u8] = buf.as_slice();
while rbuf.has_remaining() {
let batch_size = rbuf.get_u32() as usize;
if rbuf.remaining() < batch_size {
bail!("incomplete WAL");
}
let mut batch_buf = &rbuf[..batch_size];
let mut kv_pairs = Vec::new();
let mut hasher = crc32fast::Hasher::new();
let key_len = rbuf.get_u16() as usize;
hasher.write_u16(key_len as u16);
let key = Bytes::copy_from_slice(&rbuf[..key_len]);
hasher.write(&key);
rbuf.advance(key_len);
let ts = rbuf.get_u64();
hasher.write_u64(ts);
let value_len = rbuf.get_u16() as usize;
hasher.write_u16(value_len as u16);
let value = Bytes::copy_from_slice(&rbuf[..value_len]);
hasher.write(&value);
rbuf.advance(value_len);
let checksum = rbuf.get_u32();
if hasher.finalize() != checksum {
// The checksum computed from the individual components should be the same as a direct checksum on the buffer.
// Students' implementation only needs to do a single checksum on the buffer. We compute both for verification purpose.
let single_checksum = crc32fast::hash(batch_buf);
while batch_buf.has_remaining() {
let key_len = batch_buf.get_u16() as usize;
hasher.write(&(key_len as u16).to_be_bytes());
let key = Bytes::copy_from_slice(&batch_buf[..key_len]);
hasher.write(&key);
batch_buf.advance(key_len);
let ts = batch_buf.get_u64();
hasher.write(&ts.to_be_bytes());
let value_len = batch_buf.get_u16() as usize;
hasher.write(&(value_len as u16).to_be_bytes());
let value = Bytes::copy_from_slice(&batch_buf[..value_len]);
hasher.write(&value);
kv_pairs.push((key, ts, value));
batch_buf.advance(value_len);
}
rbuf.advance(batch_size);
let expected_checksum = rbuf.get_u32();
let component_checksum = hasher.finalize();
assert_eq!(component_checksum, single_checksum);
if single_checksum != expected_checksum {
bail!("checksum mismatch");
}
skiplist.insert(KeyBytes::from_bytes_with_ts(key, ts), value);
for (key, ts, value) in kv_pairs {
skiplist.insert(KeyBytes::from_bytes_with_ts(key, ts), value);
}
}
Ok(Self {
file: Arc::new(Mutex::new(BufWriter::new(file))),
})
}
pub fn put(&self, key: KeySlice, value: &[u8]) -> Result<()> {
/// Implement this in week 3, day 5.
pub fn put_batch(&self, data: &[(KeySlice, &[u8])]) -> Result<()> {
let mut file = self.file.lock();
let mut buf: Vec<u8> =
Vec::with_capacity(key.raw_len() + value.len() + std::mem::size_of::<u16>());
let mut hasher = crc32fast::Hasher::new();
hasher.write_u16(key.key_len() as u16);
buf.put_u16(key.key_len() as u16);
hasher.write(key.key_ref());
buf.put_slice(key.key_ref());
hasher.write_u64(key.ts());
buf.put_u64(key.ts());
hasher.write_u16(value.len() as u16);
buf.put_u16(value.len() as u16);
buf.put_slice(value);
hasher.write(value);
// add checksum: week 2 day 7
buf.put_u32(hasher.finalize());
let mut buf = Vec::<u8>::new();
for (key, value) in data {
buf.put_u16(key.key_len() as u16);
buf.put_slice(key.key_ref());
buf.put_u64(key.ts());
buf.put_u16(value.len() as u16);
buf.put_slice(value);
}
// write batch_size header (u32)
file.write_all(&(buf.len() as u32).to_be_bytes())?;
// write key-value pairs body
file.write_all(&buf)?;
// write checksum (u32)
file.write_all(&crc32fast::hash(&buf).to_be_bytes())?;
Ok(())
}
pub fn put(&self, key: KeySlice, value: &[u8]) -> Result<()> {
self.put_batch(&[(key, value)])
}
pub fn sync(&self) -> Result<()> {
let mut file = self.file.lock();
file.flush()?;