Skip to content

Commit ee7a4ed

Browse files
committed
optionally wait for more frames to become available in libsql_wal Replicator
When forking, we only want to replicate up to the current frame_no, not wait for more write
1 parent d3914ef commit ee7a4ed

3 files changed

Lines changed: 13 additions & 6 deletions

File tree

libsql-server/src/rpc/replication/libsql_replicator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ impl ReplicationLog for LibsqlReplicationService {
157157
let replicator = libsql_wal::replication::replicator::Replicator::new(
158158
shared.clone(),
159159
req.next_offset.max(1),
160+
true,
160161
);
161162

162163
let flavor = req.wal_flavor();

libsql-wal/src/replication/injector.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ mod test {
137137
let primary_conn = primary_env.open_conn("test");
138138
let primary_shared = primary_env.shared("test");
139139

140-
let replicator = Replicator::new(primary_shared.clone(), 1);
140+
let replicator = Replicator::new(primary_shared.clone(), 1, true);
141141
let stream = replicator.into_frame_stream();
142142

143143
tokio::pin!(stream);

libsql-wal/src/replication/replicator.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,17 @@ pub struct Replicator<IO: Io> {
1515
shared: Arc<SharedWal<IO>>,
1616
new_frame_notifier: watch::Receiver<u64>,
1717
next_frame_no: u64,
18+
wait_for_more: bool,
1819
}
1920

2021
impl<IO: Io> Replicator<IO> {
21-
pub fn new(shared: Arc<SharedWal<IO>>, next_frame_no: u64) -> Self {
22+
pub fn new(shared: Arc<SharedWal<IO>>, next_frame_no: u64, wait_for_more: bool) -> Self {
2223
let new_frame_notifier = shared.new_frame_notifier.subscribe();
2324
Self {
2425
shared,
2526
new_frame_notifier,
2627
next_frame_no,
28+
wait_for_more,
2729
}
2830
}
2931

@@ -59,7 +61,7 @@ impl<IO: Io> Replicator<IO> {
5961
// we have stuff to replicate
6062
if most_recent_frame_no >= self.next_frame_no {
6163
// first replicate the most recent version of each page from the current
62-
// segment. We also return how far we have replicated from the current log
64+
// segment. We also return how far back we have replicated from the current log
6365
let current = self.shared.current.load();
6466
let mut seen = RoaringBitmap::new();
6567
let (stream, replicated_until, size_after) = current.frame_stream_from(self.next_frame_no, &mut seen);
@@ -139,6 +141,10 @@ impl<IO: Io> Replicator<IO> {
139141
}
140142
}
141143
}
144+
145+
if !self.wait_for_more {
146+
break
147+
}
142148
}
143149
}
144150
}
@@ -169,7 +175,7 @@ mod test {
169175
.unwrap();
170176
}
171177

172-
let replicator = Replicator::new(shared.clone(), 1);
178+
let replicator = Replicator::new(shared.clone(), 1, true);
173179

174180
let tmp = NamedTempFile::new().unwrap();
175181
let stream = replicator.into_frame_stream();
@@ -240,7 +246,7 @@ mod test {
240246
// replicate everything from scratch again
241247
{
242248
let tmp = NamedTempFile::new().unwrap();
243-
let replicator = Replicator::new(shared.clone(), 1);
249+
let replicator = Replicator::new(shared.clone(), 1, true);
244250
let stream = replicator.into_frame_stream();
245251

246252
tokio::pin!(stream);
@@ -302,7 +308,7 @@ mod test {
302308

303309
let db_content = std::fs::read(&env.db_path("test").join("data")).unwrap();
304310

305-
let replicator = Replicator::new(shared, 1);
311+
let replicator = Replicator::new(shared, 1, true);
306312
let stream = replicator.into_frame_stream().take(3);
307313

308314
tokio::pin!(stream);

0 commit comments

Comments
 (0)