// Copyright (c) 2022-2025 Alex Chi Z // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use std::fs::{File, OpenOptions}; use std::hash::Hasher; use std::io::{BufWriter, Read, Write}; use std::path::Path; use std::sync::Arc; use anyhow::{Context, Result, bail}; use bytes::{Buf, BufMut, Bytes}; use crossbeam_skiplist::SkipMap; use parking_lot::Mutex; pub struct Wal { file: Arc>>, } impl Wal { pub fn create(path: impl AsRef) -> Result { Ok(Self { file: Arc::new(Mutex::new(BufWriter::new( OpenOptions::new() .read(true) .create_new(true) .write(true) .open(path) .context("failed to create WAL")?, ))), }) } pub fn recover(path: impl AsRef, skiplist: &SkipMap) -> Result { let path = path.as_ref(); let mut file = OpenOptions::new() .read(true) .append(true) .open(path) .context("failed to recover from WAL")?; let mut buf = Vec::new(); file.read_to_end(&mut buf)?; let mut rbuf: &[u8] = buf.as_slice(); while rbuf.has_remaining() { let mut hasher = crc32fast::Hasher::new(); let key_len = rbuf.get_u16() as usize; hasher.write_u16(key_len as u16); let key = Bytes::copy_from_slice(&rbuf[..key_len]); hasher.write(&key); rbuf.advance(key_len); let value_len = rbuf.get_u16() as usize; hasher.write_u16(value_len as u16); let value = Bytes::copy_from_slice(&rbuf[..value_len]); hasher.write(&value); rbuf.advance(value_len); let checksum = rbuf.get_u32(); if hasher.finalize() != checksum { bail!("checksum mismatch"); } skiplist.insert(key, value); } Ok(Self { file: Arc::new(Mutex::new(BufWriter::new(file))), }) } pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { let mut file = self.file.lock(); let mut buf: Vec = Vec::with_capacity(key.len() + value.len() + std::mem::size_of::()); let mut hasher = crc32fast::Hasher::new(); hasher.write_u16(key.len() as u16); buf.put_u16(key.len() as u16); hasher.write(key); buf.put_slice(key); hasher.write_u16(value.len() as u16); buf.put_u16(value.len() as u16); buf.put_slice(value); hasher.write(value); // add checksum: week 2 day 7 buf.put_u32(hasher.finalize()); file.write_all(&buf)?; Ok(()) } /// Implement this in week 3, day 5. pub fn put_batch(&self, _data: &[(&[u8], &[u8])]) -> Result<()> { unimplemented!() } pub fn sync(&self) -> Result<()> { let mut file = self.file.lock(); file.flush()?; file.get_mut().sync_all()?; Ok(()) } }