Skip to content

Commit 15e392c

Browse files
committed
swap segments in injector
1 parent e22ba69 commit 15e392c

1 file changed

Lines changed: 10 additions & 1 deletion

File tree

libsql-wal/src/replication/injector.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
//! The injector is the module in charge of injecting frames into a replica database.
22
3-
use std::sync::atomic::Ordering;
43
use std::sync::Arc;
54

65
use crate::error::Result;
@@ -46,6 +45,7 @@ impl<IO: Io> Injector<IO> {
4645
pub fn current_durable(&self) -> u64 {
4746
*self.wal.durable_frame_no.lock()
4847
}
48+
4949
pub fn maybe_begin_txn(&mut self) -> Result<()> {
5050
if self.tx.is_none() {
5151
let mut tx = Transaction::Read(self.wal.begin_read(u64::MAX));
@@ -94,6 +94,15 @@ impl<IO: Io> Injector<IO> {
9494
if size_after.is_some() {
9595
let mut tx = self.tx.take().unwrap();
9696
self.wal.new_frame_notifier.send_replace(last_committed_frame_no);
97+
// the strategy to swap the current log is to do it on change of durable boundary,
98+
// when we have caught up with the current durable frame_no
99+
if self.current_durable() != self.previous_durable_frame_no && self.current_durable() >= self.max_tx_frame_no {
100+
let wal = self.wal.clone();
101+
// FIXME: tokio dependency here is annoying, we need an async version of swap_current.
102+
tokio::task::spawn_blocking(move || {
103+
tx.commit();
104+
wal.swap_current(&tx)
105+
}).await.unwrap()?
97106
}
98107
}
99108
}

0 commit comments

Comments
 (0)