Skip to content

Commit 5d25018

Browse files
authored
Merge pull request #1667 from tursodatabase/libsql-wal-rpc-server
libsql-wal final integration
2 parents 42a6f1b + 97c0d9c commit 5d25018

73 files changed

Lines changed: 4013 additions & 2675 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 99 additions & 552 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsql-replication/src/injector/libsql_injector.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ pub struct LibsqlInjector {
1414
injector: Injector<StdIO>,
1515
}
1616

17+
impl LibsqlInjector {
18+
pub fn new(injector: Injector<StdIO>) -> Self {
19+
Self { injector }
20+
}
21+
}
22+
1723
impl super::Injector for LibsqlInjector {
1824
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<Option<FrameNo>> {
1925
// this is a bit annoying be we want to read the frame, and it has to be aligned, so we
@@ -24,6 +30,7 @@ impl super::Injector for LibsqlInjector {
2430
todo!("invalid frame");
2531
}
2632
wal_frame.as_bytes_mut().copy_from_slice(&frame.data[..]);
33+
2734
Ok(self
2835
.injector
2936
.insert_frame(wal_frame)

libsql-replication/src/rpc.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,14 @@ pub mod proxy {
2323

2424
pub mod replication {
2525
#![allow(clippy::all)]
26+
use std::pin::Pin;
2627

28+
use tokio_stream::Stream;
2729
use uuid::Uuid;
2830

31+
pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
32+
33+
use self::replication_log_server::ReplicationLog;
2934
include!("generated/wal_log.rs");
3035

3136
pub const NO_HELLO_ERROR_MSG: &str = "NO_HELLO";
@@ -53,6 +58,47 @@ pub mod replication {
5358
}
5459
}
5560
}
61+
62+
pub type BoxReplicationService = Box<
63+
dyn ReplicationLog<
64+
LogEntriesStream = BoxStream<'static, Result<Frame, tonic::Status>>,
65+
SnapshotStream = BoxStream<'static, Result<Frame, tonic::Status>>,
66+
>,
67+
>;
68+
69+
#[tonic::async_trait]
70+
impl ReplicationLog for BoxReplicationService {
71+
type LogEntriesStream = BoxStream<'static, Result<Frame, tonic::Status>>;
72+
type SnapshotStream = BoxStream<'static, Result<Frame, tonic::Status>>;
73+
74+
async fn log_entries(
75+
&self,
76+
req: tonic::Request<LogOffset>,
77+
) -> Result<tonic::Response<Self::LogEntriesStream>, tonic::Status> {
78+
self.as_ref().log_entries(req).await
79+
}
80+
81+
async fn batch_log_entries(
82+
&self,
83+
req: tonic::Request<LogOffset>,
84+
) -> Result<tonic::Response<Frames>, tonic::Status> {
85+
self.as_ref().batch_log_entries(req).await
86+
}
87+
88+
async fn hello(
89+
&self,
90+
req: tonic::Request<HelloRequest>,
91+
) -> Result<tonic::Response<HelloResponse>, tonic::Status> {
92+
self.as_ref().hello(req).await
93+
}
94+
95+
async fn snapshot(
96+
&self,
97+
req: tonic::Request<LogOffset>,
98+
) -> Result<tonic::Response<Self::SnapshotStream>, tonic::Status> {
99+
self.as_ref().snapshot(req).await
100+
}
101+
}
56102
}
57103

58104
pub mod metadata {

libsql-server/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ hyper-tungstenite = "0.11"
3737
itertools = "0.10.5"
3838
jsonwebtoken = "9"
3939
libsql = { path = "../libsql/", optional = true }
40-
libsql_replication = { path = "../libsql-replication" }
40+
libsql_replication = { path = "../libsql-replication", features = ["libsql_wal"] }
4141
libsql-wal = { path = "../libsql-wal/" }
4242
libsql-storage = { path = "../libsql-storage", optional = true }
4343
metrics = "0.21.1"
@@ -92,11 +92,12 @@ async-recursion = "1"
9292
mimalloc = "0.1.42"
9393
rheaper = { git = "https://github.com/MarinPostma/rheaper.git", tag = "v0.2.0", default-features = false, features = ["allocator"] }
9494
tar = "0.4.41"
95+
aws-config = "1"
96+
aws-sdk-s3 = "1"
97+
aws-smithy-runtime = "1.6.2"
9598

9699
[dev-dependencies]
97100
arbitrary = { version = "1.3.0", features = ["derive_arbitrary"] }
98-
aws-config = "0.55"
99-
aws-sdk-s3 = "0.28"
100101
env_logger = "0.10"
101102
hyper = { workspace = true, features = ["client"] }
102103
insta = { version = "1.26.0", features = ["json"] }

0 commit comments

Comments
 (0)