Skip to content

Commit a2bdc80

Browse files
committed
pass RpcFrame to client methods
necessary to pass different underlying frames
1 parent 566664e commit a2bdc80

8 files changed

Lines changed: 79 additions & 42 deletions

File tree

bottomless/src/replicator.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use aws_sdk_s3::{Client, Config};
1818
use bytes::{Buf, Bytes};
1919
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
2020
use libsql_replication::injector::Injector as _;
21+
use libsql_replication::rpc::replication::Frame as RpcFrame;
2122
use libsql_sys::{Cipher, EncryptionConfig};
2223
use std::ops::Deref;
2324
use std::path::{Path, PathBuf};
@@ -1554,7 +1555,11 @@ impl Replicator {
15541555
},
15551556
page_buf.as_slice(),
15561557
);
1557-
injector.inject_frame(frame_to_inject).await?;
1558+
let frame = RpcFrame {
1559+
data: frame_to_inject.bytes(),
1560+
timestamp: None,
1561+
};
1562+
injector.inject_frame(frame).await?;
15581563
applied_wal_frame = true;
15591564
}
15601565
}

libsql-replication/src/injector/libsql_injector.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use libsql_wal::replication::injector::Injector;
55
use libsql_wal::segment::Frame as WalFrame;
66
use zerocopy::{AsBytes, FromZeroes};
77

8-
use crate::frame::{Frame, FrameNo};
8+
use crate::frame::FrameNo;
9+
use crate::rpc::replication::Frame as RpcFrame;
910

1011
use super::error::{Error, Result};
1112

@@ -14,15 +15,15 @@ pub struct LibsqlInjector {
1415
}
1516

1617
impl super::Injector for LibsqlInjector {
17-
async fn inject_frame(&mut self, frame: Frame) -> Result<Option<FrameNo>> {
18+
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<Option<FrameNo>> {
1819
// this is a bit annoying be we want to read the frame, and it has to be aligned, so we
1920
// must copy it...
2021
// FIXME: optimize this.
2122
let mut wal_frame = WalFrame::new_box_zeroed();
22-
if frame.bytes().len() != size_of::<WalFrame>() {
23+
if frame.data.len() != size_of::<WalFrame>() {
2324
todo!("invalid frame");
2425
}
25-
wal_frame.as_bytes_mut().copy_from_slice(&frame.bytes()[..]);
26+
wal_frame.as_bytes_mut().copy_from_slice(&frame.data[..]);
2627
Ok(self
2728
.injector
2829
.insert_frame(wal_frame)

libsql-replication/src/injector/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use std::future::Future;
22

3+
use super::rpc::replication::Frame as RpcFrame;
34
pub use libsql_injector::LibsqlInjector;
45
pub use sqlite_injector::SqliteInjector;
56

6-
use crate::frame::{Frame, FrameNo};
7+
use crate::frame::FrameNo;
78

89
pub use error::Error;
910
use error::Result;
@@ -16,7 +17,7 @@ pub trait Injector {
1617
/// Inject a singular frame.
1718
fn inject_frame(
1819
&mut self,
19-
frame: Frame,
20+
frame: RpcFrame,
2021
) -> impl Future<Output = Result<Option<FrameNo>>> + Send;
2122

2223
/// Discard any uncommintted frames.

libsql-replication/src/injector/sqlite_injector/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use rusqlite::OpenFlags;
77
use tokio::task::spawn_blocking;
88

99
use crate::frame::{Frame, FrameNo};
10+
use crate::rpc::replication::Frame as RpcFrame;
1011

1112
use self::injector_wal::{
1213
InjectorWal, InjectorWalManager, LIBSQL_INJECT_FATAL, LIBSQL_INJECT_OK, LIBSQL_INJECT_OK_TXN,
@@ -25,8 +26,10 @@ pub struct SqliteInjector {
2526
}
2627

2728
impl Injector for SqliteInjector {
28-
async fn inject_frame(&mut self, frame: Frame) -> Result<Option<FrameNo>> {
29+
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<Option<FrameNo>> {
2930
let inner = self.inner.clone();
31+
let frame =
32+
Frame::try_from(&frame.data[..]).map_err(|e| Error::FatalInjectError(e.into()))?;
3033
spawn_blocking(move || inner.lock().inject_frame(frame))
3134
.await
3235
.unwrap()

libsql-replication/src/replicator.rs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl From<tokio::task::JoinError> for Error {
6363

6464
#[async_trait::async_trait]
6565
pub trait ReplicatorClient {
66-
type FrameStream: Stream<Item = Result<Frame, Error>> + Unpin + Send;
66+
type FrameStream: Stream<Item = Result<RpcFrame, Error>> + Unpin + Send;
6767

6868
/// Perform handshake with remote
6969
async fn handshake(&mut self) -> Result<(), Error>;
@@ -318,7 +318,7 @@ where
318318
}
319319
}
320320

321-
async fn inject_frame(&mut self, frame: Frame) -> Result<(), Error> {
321+
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<(), Error> {
322322
self.frames_synced += 1;
323323

324324
match self.injector.inject_frame(frame).await? {
@@ -360,6 +360,7 @@ mod test {
360360
use async_stream::stream;
361361

362362
use crate::frame::{FrameBorrowed, FrameMut};
363+
use crate::rpc::replication::Frame as RpcFrame;
363364

364365
use super::*;
365366

@@ -370,7 +371,8 @@ mod test {
370371

371372
#[async_trait::async_trait]
372373
impl ReplicatorClient for Client {
373-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
374+
type FrameStream =
375+
Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
374376

375377
/// Perform handshake with remote
376378
async fn handshake(&mut self) -> Result<(), Error> {
@@ -414,7 +416,8 @@ mod test {
414416

415417
#[async_trait::async_trait]
416418
impl ReplicatorClient for Client {
417-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
419+
type FrameStream =
420+
Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
418421

419422
/// Perform handshake with remote
420423
async fn handshake(&mut self) -> Result<(), Error> {
@@ -456,7 +459,8 @@ mod test {
456459

457460
#[async_trait::async_trait]
458461
impl ReplicatorClient for Client {
459-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
462+
type FrameStream =
463+
Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
460464

461465
/// Perform handshake with remote
462466
async fn handshake(&mut self) -> Result<(), Error> {
@@ -500,7 +504,8 @@ mod test {
500504

501505
#[async_trait::async_trait]
502506
impl ReplicatorClient for Client {
503-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
507+
type FrameStream =
508+
Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
504509

505510
/// Perform handshake with remote
506511
async fn handshake(&mut self) -> Result<(), Error> {
@@ -544,7 +549,8 @@ mod test {
544549

545550
#[async_trait::async_trait]
546551
impl ReplicatorClient for Client {
547-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
552+
type FrameStream =
553+
Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
548554

549555
/// Perform handshake with remote
550556
async fn handshake(&mut self) -> Result<(), Error> {
@@ -586,7 +592,8 @@ mod test {
586592

587593
#[async_trait::async_trait]
588594
impl ReplicatorClient for Client {
589-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
595+
type FrameStream =
596+
Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
590597

591598
/// Perform handshake with remote
592599
async fn handshake(&mut self) -> Result<(), Error> {
@@ -627,7 +634,8 @@ mod test {
627634

628635
#[async_trait::async_trait]
629636
impl ReplicatorClient for Client {
630-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
637+
type FrameStream =
638+
Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
631639

632640
/// Perform handshake with remote
633641
async fn handshake(&mut self) -> Result<(), Error> {
@@ -672,7 +680,8 @@ mod test {
672680

673681
#[async_trait::async_trait]
674682
impl ReplicatorClient for Client {
675-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
683+
type FrameStream =
684+
Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
676685

677686
/// Perform handshake with remote
678687
async fn handshake(&mut self) -> Result<(), Error> {
@@ -740,7 +749,8 @@ mod test {
740749

741750
#[async_trait::async_trait]
742751
impl ReplicatorClient for Client {
743-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
752+
type FrameStream =
753+
Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
744754

745755
/// Perform handshake with remote
746756
async fn handshake(&mut self) -> Result<(), Error> {
@@ -752,15 +762,26 @@ mod test {
752762
let frames = self
753763
.frames
754764
.iter()
765+
.map(|f| RpcFrame {
766+
data: f.bytes(),
767+
timestamp: None,
768+
})
755769
.take(2)
756-
.cloned()
757770
.map(Ok)
758771
.chain(Some(Err(Error::Client("some client error".into()))))
759772
.collect::<Vec<_>>();
760773
Ok(Box::pin(tokio_stream::iter(frames)))
761774
} else {
762-
let stream = tokio_stream::iter(self.frames.clone().into_iter().map(Ok));
763-
Ok(Box::pin(stream))
775+
let iter = self
776+
.frames
777+
.iter()
778+
.map(|f| RpcFrame {
779+
data: f.bytes(),
780+
timestamp: None,
781+
})
782+
.map(Ok)
783+
.collect::<Vec<_>>();
784+
Ok(Box::pin(tokio_stream::iter(iter)))
764785
}
765786
}
766787
/// Return a snapshot for the current replication index. Called after next_frame has returned a

libsql-server/src/replication/replicator_client.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@ use std::pin::Pin;
44
use bytes::Bytes;
55
use chrono::{DateTime, Utc};
66
use futures::TryStreamExt;
7-
use libsql_replication::frame::Frame;
87
use libsql_replication::meta::WalIndexMeta;
9-
use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient};
8+
use libsql_replication::replicator::{Error, ReplicatorClient};
109
use libsql_replication::rpc::replication::hello_request::WalFlavor;
1110
use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient;
1211
use libsql_replication::rpc::replication::{
13-
verify_session_token, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY, SESSION_TOKEN_KEY,
12+
verify_session_token, Frame as RpcFrame, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY,
13+
SESSION_TOKEN_KEY,
1414
};
1515
use tokio::sync::watch;
16-
use tokio_stream::{Stream, StreamExt};
16+
use tokio_stream::Stream;
17+
1718
use tonic::metadata::{AsciiMetadataValue, BinaryMetadataValue};
1819
use tonic::transport::Channel;
1920
use tonic::{Code, Request, Status};
@@ -95,7 +96,7 @@ impl Client {
9596

9697
#[async_trait::async_trait]
9798
impl ReplicatorClient for Client {
98-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
99+
type FrameStream = Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
99100

100101
#[tracing::instrument(skip(self))]
101102
async fn handshake(&mut self) -> Result<(), Error> {
@@ -169,7 +170,7 @@ impl ReplicatorClient for Client {
169170
None => REPLICATION_LATENCY_CACHE_MISS.increment(1),
170171
}
171172
})
172-
.map(map_frame_err);
173+
.map_err(Into::into);
173174

174175
Ok(Box::pin(stream))
175176
}
@@ -181,7 +182,7 @@ impl ReplicatorClient for Client {
181182
let req = self.make_request(offset);
182183
match self.client.snapshot(req).await {
183184
Ok(resp) => {
184-
let stream = resp.into_inner().map(map_frame_err);
185+
let stream = resp.into_inner().map_err(Into::into);
185186
Ok(Box::pin(stream))
186187
}
187188
Err(e) if e.code() == Code::Unavailable => Err(Error::SnapshotPending),

libsql/src/replication/local_client.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::pin::Pin;
33

44
use futures::{StreamExt, TryStreamExt};
55
use libsql_replication::{
6+
rpc::replication::Frame as RpcFrame,
67
frame::{Frame, FrameNo},
78
meta::WalIndexMeta,
89
replicator::{Error, ReplicatorClient},
@@ -35,7 +36,7 @@ impl LocalClient {
3536

3637
#[async_trait::async_trait]
3738
impl ReplicatorClient for LocalClient {
38-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
39+
type FrameStream = Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
3940

4041
/// Perform handshake with remote
4142
async fn handshake(&mut self) -> Result<(), Error> {
@@ -46,7 +47,7 @@ impl ReplicatorClient for LocalClient {
4647
async fn next_frames(&mut self) -> Result<Self::FrameStream, Error> {
4748
match self.frames.take() {
4849
Some(Frames::Vec(f)) => {
49-
let iter = f.into_iter().map(Ok);
50+
let iter = f.into_iter().map(|f| RpcFrame { data: f.bytes(), timestamp: None }).map(Ok);
5051
Ok(Box::pin(tokio_stream::iter(iter)))
5152
}
5253
Some(f @ Frames::Snapshot(_)) => {
@@ -70,7 +71,8 @@ impl ReplicatorClient for LocalClient {
7071
if s.as_mut().peek().await.is_none() {
7172
next.header_mut().size_after = size_after.into();
7273
}
73-
yield Frame::from(next);
74+
let frame = Frame::from(next);
75+
yield RpcFrame { data: frame.bytes(), timestamp: None };
7476
}
7577
};
7678

@@ -95,8 +97,9 @@ impl ReplicatorClient for LocalClient {
9597

9698
#[cfg(test)]
9799
mod test {
98-
use libsql_replication::snapshot::SnapshotFile;
100+
use libsql_replication::{frame::FrameHeader, snapshot::SnapshotFile};
99101
use tempfile::tempdir;
102+
use zerocopy::FromBytes;
100103

101104
use super::*;
102105

@@ -111,7 +114,8 @@ mod test {
111114
let mut s = client.snapshot().await.unwrap();
112115
assert!(matches!(s.next().await, Some(Ok(_))));
113116
let last = s.next().await.unwrap().unwrap();
114-
assert_eq!(last.header().size_after.get(), 2);
117+
let header: FrameHeader = FrameHeader::read_from_prefix(&last.data[..]).unwrap();
118+
assert_eq!(header.size_after.get(), 2);
115119
assert!(s.next().await.is_none());
116120
}
117121
}

libsql/src/replication/remote_client.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ use std::pin::Pin;
44
use std::time::{Duration, Instant};
55

66
use bytes::Bytes;
7-
use futures::StreamExt as _;
8-
use libsql_replication::frame::{Frame, FrameHeader, FrameNo};
7+
use futures::{StreamExt as _, TryStreamExt};
8+
use libsql_replication::frame::{FrameHeader, FrameNo};
99
use libsql_replication::meta::WalIndexMeta;
10-
use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient};
10+
use libsql_replication::replicator::{Error, ReplicatorClient};
1111
use libsql_replication::rpc::replication::hello_request::WalFlavor;
1212
use libsql_replication::rpc::replication::{
13-
verify_session_token, Frames, HelloRequest, HelloResponse, LogOffset, SESSION_TOKEN_KEY,
13+
Frame as RpcFrame, verify_session_token, Frames, HelloRequest, HelloResponse, LogOffset, SESSION_TOKEN_KEY,
1414
};
1515
use tokio_stream::Stream;
1616
use tonic::metadata::AsciiMetadataValue;
@@ -161,7 +161,7 @@ impl RemoteClient {
161161

162162
let frames_iter = frames
163163
.into_iter()
164-
.map(|f| Frame::try_from(&*f.data).map_err(|e| Error::Client(e.into())));
164+
.map(Ok);
165165

166166
let stream = tokio_stream::iter(frames_iter);
167167

@@ -197,15 +197,16 @@ impl RemoteClient {
197197
.snapshot(req)
198198
.await?
199199
.into_inner()
200-
.map(map_frame_err)
200+
.map_err(|e| e.into())
201201
.peekable();
202202

203203
{
204204
let frames = Pin::new(&mut frames);
205205

206206
// the first frame is the one with the highest frame_no in the snapshot
207207
if let Some(Ok(f)) = frames.peek().await {
208-
self.last_received = Some(f.header().frame_no.get());
208+
let header: FrameHeader = FrameHeader::read_from_prefix(&f.data[..]).unwrap();
209+
self.last_received = Some(header.frame_no.get());
209210
}
210211
}
211212

@@ -240,7 +241,7 @@ fn maybe_log<T>(
240241

241242
#[async_trait::async_trait]
242243
impl ReplicatorClient for RemoteClient {
243-
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;
244+
type FrameStream = Pin<Box<dyn Stream<Item = Result<RpcFrame, Error>> + Send + 'static>>;
244245

245246
/// Perform handshake with remote
246247
async fn handshake(&mut self) -> Result<(), Error> {

0 commit comments

Comments
 (0)