Skip to content

Commit 12008ad

Browse files
committed
re-acquire txn on every new injection.
caused bugs with the savepoint not being updated and frame_count not being computed correctly.
1 parent 2e7685d commit 12008ad

1 file changed

Lines changed: 43 additions & 23 deletions

File tree

libsql-wal/src/replication/injector.rs

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,20 @@ pub struct Injector<IO: Io> {
1616
buffer: Vec<Box<Frame>>,
1717
/// capacity of the frame buffer
1818
capacity: usize,
19-
tx: TxGuardOwned<IO::File>,
19+
tx: Option<TxGuardOwned<IO::File>>,
2020
max_tx_frame_no: u64,
2121
previous_durable_frame_no: u64,
2222
}
2323

2424
impl<IO: Io> Injector<IO> {
2525
pub fn new(wal: Arc<SharedWal<IO>>, buffer_capacity: usize) -> Result<Self> {
26-
let mut tx = Transaction::Read(wal.begin_read(u64::MAX));
27-
wal.upgrade(&mut tx)?;
28-
let tx = tx
29-
.into_write()
30-
.unwrap_or_else(|_| unreachable!())
31-
.into_lock_owned();
3226
Ok(Self {
3327
wal,
3428
buffer: Vec::with_capacity(buffer_capacity),
3529
capacity: buffer_capacity,
36-
tx,
30+
tx: None,
3731
max_tx_frame_no: 0,
32+
previous_durable_frame_no: 0,
3833
})
3934
}
4035

@@ -51,7 +46,22 @@ impl<IO: Io> Injector<IO> {
5146
pub fn current_durable(&self) -> u64 {
5247
*self.wal.durable_frame_no.lock()
5348
}
49+
pub fn maybe_begin_txn(&mut self) -> Result<()> {
50+
if self.tx.is_none() {
51+
let mut tx = Transaction::Read(self.wal.begin_read(u64::MAX));
52+
self.wal.upgrade(&mut tx)?;
53+
let tx = tx
54+
.into_write()
55+
.unwrap_or_else(|_| unreachable!())
56+
.into_lock_owned();
57+
assert!(self.tx.replace(tx).is_none());
58+
}
59+
60+
Ok(())
61+
}
62+
5463
pub async fn insert_frame(&mut self, frame: Box<Frame>) -> Result<Option<u64>> {
64+
self.maybe_begin_txn()?;
5565
let size_after = frame.size_after();
5666
self.max_tx_frame_no = self.max_tx_frame_no.max(frame.header().frame_no());
5767
self.buffer.push(frame);
@@ -64,28 +74,38 @@ impl<IO: Io> Injector<IO> {
6474
}
6575

6676
pub async fn flush(&mut self, size_after: Option<u32>) -> Result<()> {
67-
let buffer = std::mem::take(&mut self.buffer);
68-
let current = self.wal.current.load();
69-
let commit_data = size_after.map(|size| (size, self.max_tx_frame_no));
70-
if commit_data.is_some() {
71-
self.max_tx_frame_no = 0;
77+
if !self.buffer.is_empty() && self.tx.is_some() {
78+
let last_committed_frame_no = self.max_tx_frame_no;
79+
{
80+
let tx = self.tx.as_mut().expect("we just checked that tx was there");
81+
let buffer = std::mem::take(&mut self.buffer);
82+
let current = self.wal.current.load();
83+
let commit_data = size_after.map(|size| (size, self.max_tx_frame_no));
84+
if commit_data.is_some() {
85+
self.max_tx_frame_no = 0;
86+
}
87+
let buffer = current
88+
.inject_frames(buffer, commit_data, tx)
89+
.await?;
90+
self.buffer = buffer;
91+
self.buffer.clear();
92+
}
93+
94+
if size_after.is_some() {
95+
let mut tx = self.tx.take().unwrap();
96+
self.wal.new_frame_notifier.send_replace(last_committed_frame_no);
97+
}
98+
}
7299
}
73-
let buffer = current
74-
.inject_frames(buffer, commit_data, &mut self.tx)
75-
.await?;
76-
self.buffer = buffer;
77-
self.buffer.clear();
78100

79101
Ok(())
80102
}
81103

82104
pub fn rollback(&mut self) {
83105
self.buffer.clear();
84-
self.tx.reset(0);
85-
}
86-
87-
pub(crate) fn into_guard(self) -> TxGuardOwned<IO::File> {
88-
self.tx
106+
if let Some(tx) = self.tx.as_mut() {
107+
tx.reset(0);
108+
}
89109
}
90110
}
91111

0 commit comments

Comments
 (0)