make write batch aotmic under txn sys (#91)
This commit is contained in:
@@ -573,34 +573,31 @@ impl LsmStorageInner {
|
||||
pub fn write_batch_inner<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<u64> {
|
||||
let _lck = self.mvcc().write_lock.lock();
|
||||
let ts = self.mvcc().latest_commit_ts() + 1;
|
||||
let mut batch_datas: Vec<(key::Key<&[u8]>, &[u8])> = vec![];
|
||||
let size;
|
||||
for record in batch {
|
||||
match record {
|
||||
WriteBatchRecord::Del(key) => {
|
||||
let key = key.as_ref();
|
||||
assert!(!key.is_empty(), "key cannot be empty");
|
||||
let size;
|
||||
{
|
||||
let guard = self.state.read();
|
||||
guard.memtable.put(KeySlice::from_slice(key, ts), b"")?;
|
||||
size = guard.memtable.approximate_size();
|
||||
}
|
||||
self.try_freeze(size)?;
|
||||
batch_datas.push((KeySlice::from_slice(key, ts), b""));
|
||||
}
|
||||
WriteBatchRecord::Put(key, value) => {
|
||||
let key = key.as_ref();
|
||||
let value = value.as_ref();
|
||||
assert!(!key.is_empty(), "key cannot be empty");
|
||||
assert!(!value.is_empty(), "value cannot be empty");
|
||||
let size;
|
||||
{
|
||||
let guard = self.state.read();
|
||||
guard.memtable.put(KeySlice::from_slice(key, ts), value)?;
|
||||
size = guard.memtable.approximate_size();
|
||||
}
|
||||
self.try_freeze(size)?;
|
||||
batch_datas.push((KeySlice::from_slice(key, ts), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
let guard = self.state.read();
|
||||
guard.memtable.put_batch(&batch_datas)?;
|
||||
size = guard.memtable.approximate_size();
|
||||
}
|
||||
self.try_freeze(size)?;
|
||||
|
||||
self.mvcc().update_commit_ts(ts);
|
||||
Ok(ts)
|
||||
}
|
||||
|
Reference in New Issue
Block a user