Skip to content

Commit 59d6eb8

Browse files
committed
replicator: send durable_frame_no
1 parent 718b27c commit 59d6eb8

1 file changed

Lines changed: 13 additions & 5 deletions

File tree

libsql-server/src/rpc/replication/libsql_replicator.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use libsql_replication::rpc::replication::{
1313
use libsql_wal::io::StdIO;
1414
use libsql_wal::registry::WalRegistry;
1515
use libsql_wal::segment::Frame;
16+
use libsql_wal::shared_wal::SharedWal;
1617
use md5::{Digest as _, Md5};
1718
use tokio_stream::Stream;
1819
use tonic::Status;
@@ -72,12 +73,13 @@ pin_project_lite::pin_project! {
7273
#[pin]
7374
inner: S,
7475
flavor: WalFlavor,
76+
shared: Arc<SharedWal<StdIO>>,
7577
}
7678
}
7779

7880
impl<S> FrameStreamAdapter<S> {
79-
fn new(inner: S, flavor: WalFlavor) -> Self {
80-
Self { inner, flavor }
81+
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO>>) -> Self {
82+
Self { inner, flavor, shared }
8183
}
8284
}
8385

@@ -93,6 +95,11 @@ where
9395
Some(Ok(f)) => {
9496
match this.flavor {
9597
WalFlavor::Libsql => {
98+
let durable_frame_no = if f.header().is_commit() {
99+
Some(this.shared.durable_frame_no())
100+
} else {
101+
None
102+
};
96103
// safety: frame implemements zerocopy traits, so it can safely be interpreted as a
97104
// byte slize of the same size
98105
let bytes: Box<[u8; size_of::<Frame>()]> =
@@ -102,7 +109,7 @@ where
102109
Poll::Ready(Some(Ok(RpcFrame {
103110
data,
104111
timestamp: None,
105-
durable_frame_no: None,
112+
durable_frame_no,
106113
})))
107114
}
108115
WalFlavor::Sqlite => {
@@ -133,6 +140,7 @@ impl ReplicationLog for LibsqlReplicationService {
133140
type LogEntriesStream = BoxStream<'static, Result<RpcFrame, Status>>;
134141
type SnapshotStream = BoxStream<'static, Result<RpcFrame, Status>>;
135142

143+
#[tracing::instrument(skip_all, fields(namespace))]
136144
async fn log_entries(
137145
&self,
138146
req: tonic::Request<LogOffset>,
@@ -143,10 +151,10 @@ impl ReplicationLog for LibsqlReplicationService {
143151
let req = req.into_inner();
144152
// TODO: replicator should only accecpt NonZero
145153
let replicator =
146-
libsql_wal::replication::replicator::Replicator::new(shared, req.next_offset.max(1));
154+
libsql_wal::replication::replicator::Replicator::new(shared.clone(), req.next_offset.max(1));
147155

148156
let flavor = req.wal_flavor();
149-
let stream = FrameStreamAdapter::new(replicator.into_frame_stream(), flavor);
157+
let stream = FrameStreamAdapter::new(replicator.into_frame_stream(), flavor, shared);
150158
Ok(tonic::Response::new(Box::pin(stream)))
151159
}
152160

0 commit comments

Comments
 (0)