Skip to content

Commit d29ca7f

Browse files
committed
fix conflicts
1 parent e97026c commit d29ca7f

7 files changed

Lines changed: 39 additions & 29 deletions

File tree

libsql-replication/proto/replication_log.proto

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ import "metadata.proto";
55

66
message LogOffset {
77
uint64 next_offset = 1;
8-
}
9-
10-
message HelloRequest {
11-
optional uint64 handshake_version = 1;
128
enum WalFlavor {
139
Sqlite = 0;
1410
Libsql = 1;
1511
}
16-
// the type of wal that the client is expecting
12+
// the type of wal frames that the client is expecting
1713
optional WalFlavor wal_flavor = 2;
1814
}
1915

16+
message HelloRequest {
17+
optional uint64 handshake_version = 1;
18+
}
19+
2020
message HelloResponse {
2121
/// Uuid of the current generation
2222
string generation_id = 1;

libsql-replication/src/generated/wal_log.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,12 @@
44
pub struct LogOffset {
55
#[prost(uint64, tag = "1")]
66
pub next_offset: u64,
7-
}
8-
#[allow(clippy::derive_partial_eq_without_eq)]
9-
#[derive(Clone, PartialEq, ::prost::Message)]
10-
pub struct HelloRequest {
11-
#[prost(uint64, optional, tag = "1")]
12-
pub handshake_version: ::core::option::Option<u64>,
13-
/// the type of wal that the client is expecting
14-
#[prost(enumeration = "hello_request::WalFlavor", optional, tag = "2")]
7+
/// the type of wal frames that the client is expecting
8+
#[prost(enumeration = "log_offset::WalFlavor", optional, tag = "2")]
159
pub wal_flavor: ::core::option::Option<i32>,
1610
}
17-
/// Nested message and enum types in `HelloRequest`.
18-
pub mod hello_request {
11+
/// Nested message and enum types in `LogOffset`.
12+
pub mod log_offset {
1913
#[derive(
2014
Clone,
2115
Copy,
@@ -55,6 +49,12 @@ pub mod hello_request {
5549
}
5650
#[allow(clippy::derive_partial_eq_without_eq)]
5751
#[derive(Clone, PartialEq, ::prost::Message)]
52+
pub struct HelloRequest {
53+
#[prost(uint64, optional, tag = "1")]
54+
pub handshake_version: ::core::option::Option<u64>,
55+
}
56+
#[allow(clippy::derive_partial_eq_without_eq)]
57+
#[derive(Clone, PartialEq, ::prost::Message)]
5858
pub struct HelloResponse {
5959
/// / Uuid of the current generation
6060
#[prost(string, tag = "1")]

libsql-replication/src/rpc.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ pub mod replication {
2626

2727
use uuid::Uuid;
2828

29-
use self::hello_request::WalFlavor;
3029
include!("generated/wal_log.rs");
3130

3231
pub const NO_HELLO_ERROR_MSG: &str = "NO_HELLO";
@@ -48,10 +47,9 @@ pub mod replication {
4847
}
4948

5049
impl HelloRequest {
51-
pub fn new(wal_flavor: WalFlavor) -> Self {
50+
pub fn new() -> Self {
5251
Self {
5352
handshake_version: Some(1),
54-
wal_flavor: Some(wal_flavor.into()),
5553
}
5654
}
5755
}

libsql-server/src/namespace/configurator/replica.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33

44
use futures::Future;
55
use hyper::Uri;
6+
use libsql_replication::rpc::replication::log_offset::WalFlavor;
67
use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient;
78
use tokio::task::JoinSet;
89
use tonic::transport::Channel;
@@ -68,10 +69,11 @@ impl ConfigureNamespace for ReplicaConfigurator {
6869
&db_path,
6970
meta_store_handle.clone(),
7071
store.clone(),
72+
WalFlavor::Sqlite,
7173
)
7274
.await?;
7375
let applied_frame_no_receiver = client.current_frame_no_notifier.subscribe();
74-
let mut replicator = libsql_replication::replicator::Replicator::new(
76+
let mut replicator = libsql_replication::replicator::Replicator::new_sqlite(
7577
client,
7678
db_path.join("data"),
7779
DEFAULT_AUTO_CHECKPOINT,

libsql-server/src/replication/replicator_client.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use chrono::{DateTime, Utc};
66
use futures::TryStreamExt;
77
use libsql_replication::meta::WalIndexMeta;
88
use libsql_replication::replicator::{Error, ReplicatorClient};
9-
use libsql_replication::rpc::replication::hello_request::WalFlavor;
9+
use libsql_replication::rpc::replication::log_offset::WalFlavor;
1010
use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient;
1111
use libsql_replication::rpc::replication::{
1212
verify_session_token, Frame as RpcFrame, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY,
@@ -101,7 +101,7 @@ impl ReplicatorClient for Client {
101101
#[tracing::instrument(skip(self))]
102102
async fn handshake(&mut self) -> Result<(), Error> {
103103
tracing::debug!("Attempting to perform handshake with primary.");
104-
let req = self.make_request(HelloRequest::new(self.wal_flavor));
104+
let req = self.make_request(HelloRequest::new());
105105
let resp = self.client.hello(req).await?;
106106
let hello = resp.into_inner();
107107
verify_session_token(&hello.session_token).map_err(Error::Client)?;
@@ -143,6 +143,7 @@ impl ReplicatorClient for Client {
143143
async fn next_frames(&mut self) -> Result<Self::FrameStream, Error> {
144144
let offset = LogOffset {
145145
next_offset: self.next_frame_no(),
146+
wal_flavor: Some(self.wal_flavor.into()),
146147
};
147148
let req = self.make_request(offset);
148149
let stream = self
@@ -178,6 +179,7 @@ impl ReplicatorClient for Client {
178179
async fn snapshot(&mut self) -> Result<Self::FrameStream, Error> {
179180
let offset = LogOffset {
180181
next_offset: self.next_frame_no(),
182+
wal_flavor: Some(self.wal_flavor.into()),
181183
};
182184
let req = self.make_request(offset);
183185
match self.client.snapshot(req).await {

libsql-server/src/rpc/replication_log.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use chrono::{DateTime, Utc};
88
use futures::stream::BoxStream;
99
use futures_core::Future;
1010
pub use libsql_replication::rpc::replication as rpc;
11-
use libsql_replication::rpc::replication::hello_request::WalFlavor;
11+
use libsql_replication::rpc::replication::log_offset::WalFlavor;
1212
use libsql_replication::rpc::replication::replication_log_server::ReplicationLog;
1313
use libsql_replication::rpc::replication::{
1414
Frame, Frames, HelloRequest, HelloResponse, LogOffset, NAMESPACE_DOESNT_EXIST,
@@ -260,6 +260,9 @@ impl ReplicationLog for ReplicationLogService {
260260
&self,
261261
req: tonic::Request<LogOffset>,
262262
) -> Result<tonic::Response<Self::LogEntriesStream>, Status> {
263+
if let WalFlavor::Libsql = req.get_ref().wal_flavor() {
264+
return Err(Status::invalid_argument("libsql wal not supported"));
265+
}
263266
let namespace = super::extract_namespace(self.disable_namespaces, &req)?;
264267

265268
self.authenticate(&req, namespace.clone()).await?;
@@ -305,6 +308,9 @@ impl ReplicationLog for ReplicationLogService {
305308
&self,
306309
req: tonic::Request<LogOffset>,
307310
) -> Result<tonic::Response<Frames>, Status> {
311+
if let WalFlavor::Libsql = req.get_ref().wal_flavor() {
312+
return Err(Status::invalid_argument("libsql wal not supported"));
313+
}
308314
let namespace = super::extract_namespace(self.disable_namespaces, &req)?;
309315
self.authenticate(&req, namespace.clone()).await?;
310316

@@ -355,11 +361,6 @@ impl ReplicationLog for ReplicationLogService {
355361
guard.insert((replica_addr, namespace.clone()));
356362
}
357363
}
358-
359-
if let WalFlavor::Libsql = req.get_ref().wal_flavor() {
360-
return Err(Status::invalid_argument("libsql wal not supported"));
361-
}
362-
363364
let (logger, config, version, _, _) =
364365
self.logger_from_namespace(namespace, &req, false).await?;
365366

@@ -381,7 +382,12 @@ impl ReplicationLog for ReplicationLogService {
381382
&self,
382383
req: tonic::Request<LogOffset>,
383384
) -> Result<tonic::Response<Self::SnapshotStream>, Status> {
385+
if let WalFlavor::Libsql = req.get_ref().wal_flavor() {
386+
return Err(Status::invalid_argument("libsql wal not supported"));
387+
}
388+
384389
let namespace = super::extract_namespace(self.disable_namespaces, &req)?;
390+
385391
self.authenticate(&req, namespace.clone()).await?;
386392

387393
let (logger, _, _, stats, _) = self.logger_from_namespace(namespace, &req, true).await?;

libsql/src/replication/remote_client.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use futures::{StreamExt as _, TryStreamExt};
88
use libsql_replication::frame::{FrameHeader, FrameNo};
99
use libsql_replication::meta::WalIndexMeta;
1010
use libsql_replication::replicator::{Error, ReplicatorClient};
11-
use libsql_replication::rpc::replication::hello_request::WalFlavor;
1211
use libsql_replication::rpc::replication::{
1312
Frame as RpcFrame, verify_session_token, Frames, HelloRequest, HelloResponse, LogOffset, SESSION_TOKEN_KEY,
1413
};
@@ -117,9 +116,10 @@ impl RemoteClient {
117116
self.dirty = false;
118117
}
119118
let prefetch = self.session_token.is_some();
120-
let hello_req = self.make_request(HelloRequest::new(WalFlavor::Sqlite));
119+
let hello_req = self.make_request(HelloRequest::new());
121120
let log_offset_req = self.make_request(LogOffset {
122121
next_offset: self.next_offset(),
122+
wal_flavor: None,
123123
});
124124
let mut client_clone = self.remote.clone();
125125
let hello_fut = time(async {
@@ -179,6 +179,7 @@ impl RemoteClient {
179179
None => {
180180
let req = self.make_request(LogOffset {
181181
next_offset: self.next_offset(),
182+
wal_flavor: None,
182183
});
183184
time(self.remote.replication.batch_log_entries(req)).await
184185
}
@@ -190,6 +191,7 @@ impl RemoteClient {
190191
async fn do_snapshot(&mut self) -> Result<<Self as ReplicatorClient>::FrameStream, Error> {
191192
let req = self.make_request(LogOffset {
192193
next_offset: self.next_offset(),
194+
wal_flavor: None,
193195
});
194196
let mut frames = self
195197
.remote

0 commit comments

Comments
 (0)