Skip to content

Commit e4c2afc

Browse files
authored
Merge pull request #1652 from tursodatabase/libsql-wal-replicator
libsql wal replicator
2 parents 8077948 + d29ca7f commit e4c2afc

27 files changed

Lines changed: 695 additions & 400 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ rusqlite = { package = "libsql-rusqlite", path = "vendored/rusqlite", version =
4545
] }
4646
hyper = { version = "0.14" }
4747
tower = { version = "0.4.13" }
48+
zerocopy = { version = "0.7.32", features = ["derive", "alloc"] }
4849

4950
# Config for 'cargo dist'
5051
[workspace.metadata.dist]

bottomless/src/replicator.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use aws_sdk_s3::primitives::ByteStream;
1717
use aws_sdk_s3::{Client, Config};
1818
use bytes::{Buf, Bytes};
1919
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
20+
use libsql_replication::injector::Injector as _;
21+
use libsql_replication::rpc::replication::Frame as RpcFrame;
2022
use libsql_sys::{Cipher, EncryptionConfig};
2123
use std::ops::Deref;
2224
use std::path::{Path, PathBuf};
@@ -1449,12 +1451,13 @@ impl Replicator {
14491451
db_path: &Path,
14501452
) -> Result<bool> {
14511453
let encryption_config = self.encryption_config.clone();
1452-
let mut injector = libsql_replication::injector::Injector::new(
1453-
db_path,
1454+
let mut injector = libsql_replication::injector::SqliteInjector::new(
1455+
db_path.to_path_buf(),
14541456
4096,
14551457
libsql_sys::connection::NO_AUTOCHECKPOINT,
14561458
encryption_config,
1457-
)?;
1459+
)
1460+
.await?;
14581461
let prefix = format!("{}-{}/", self.db_name, generation);
14591462
let mut page_buf = {
14601463
let mut v = Vec::with_capacity(page_size);
@@ -1552,7 +1555,11 @@ impl Replicator {
15521555
},
15531556
page_buf.as_slice(),
15541557
);
1555-
injector.inject_frame(frame_to_inject)?;
1558+
let frame = RpcFrame {
1559+
data: frame_to_inject.bytes(),
1560+
timestamp: None,
1561+
};
1562+
injector.inject_frame(frame).await?;
15561563
applied_wal_frame = true;
15571564
}
15581565
}

libsql-replication/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ license = "MIT"
1212
tonic = { version = "0.11", features = ["tls"] }
1313
prost = "0.12"
1414
libsql-sys = { version = "0.7", path = "../libsql-sys", default-features = false, features = ["wal", "rusqlite", "api"] }
15+
libsql-wal = { path = "../libsql-wal/", optional = true }
1516
rusqlite = { workspace = true }
1617
parking_lot = "0.12.1"
1718
bytes = { version = "1.5.0", features = ["serde"] }
@@ -37,3 +38,4 @@ tonic-build = "0.11"
3738

3839
[features]
3940
encryption = ["libsql-sys/encryption"]
41+
libsql_wal = ["dep:libsql-wal"]

libsql-replication/proto/replication_log.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ import "metadata.proto";
55

66
message LogOffset {
77
uint64 next_offset = 1;
8+
enum WalFlavor {
9+
Sqlite = 0;
10+
Libsql = 1;
11+
}
12+
// the type of wal frames that the client is expecting
13+
optional WalFlavor wal_flavor = 2;
814
}
915

1016
message HelloRequest {

libsql-replication/src/frame.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use crate::LIBSQL_PAGE_SIZE;
1313
pub type FrameNo = u64;
1414

1515
/// The file header for the WAL log. All fields are represented in little-endian ordering.
16-
/// See `encode` and `decode` for actual layout.
1716
// repr C for stable sizing
1817
#[repr(C)]
1918
#[derive(Debug, Clone, Copy, zerocopy::FromZeroes, zerocopy::FromBytes, zerocopy::AsBytes)]
@@ -22,7 +21,7 @@ pub struct FrameHeader {
2221
pub frame_no: lu64,
2322
/// Rolling checksum of all the previous frames, including this one.
2423
pub checksum: lu64,
25-
/// page number, if frame_type is FrameType::Page
24+
/// page number
2625
pub page_no: lu32,
2726
/// Size of the database (in page) after committing the transaction. This is passed from sqlite,
2827
/// and serves as commit transaction boundary

libsql-replication/src/generated/wal_log.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,48 @@
44
pub struct LogOffset {
55
#[prost(uint64, tag = "1")]
66
pub next_offset: u64,
7+
/// the type of wal frames that the client is expecting
8+
#[prost(enumeration = "log_offset::WalFlavor", optional, tag = "2")]
9+
pub wal_flavor: ::core::option::Option<i32>,
10+
}
11+
/// Nested message and enum types in `LogOffset`.
12+
pub mod log_offset {
13+
#[derive(
14+
Clone,
15+
Copy,
16+
Debug,
17+
PartialEq,
18+
Eq,
19+
Hash,
20+
PartialOrd,
21+
Ord,
22+
::prost::Enumeration
23+
)]
24+
#[repr(i32)]
25+
pub enum WalFlavor {
26+
Sqlite = 0,
27+
Libsql = 1,
28+
}
29+
impl WalFlavor {
30+
/// String value of the enum field names used in the ProtoBuf definition.
31+
///
32+
/// The values are not transformed in any way and thus are considered stable
33+
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
34+
pub fn as_str_name(&self) -> &'static str {
35+
match self {
36+
WalFlavor::Sqlite => "Sqlite",
37+
WalFlavor::Libsql => "Libsql",
38+
}
39+
}
40+
/// Creates an enum from field names used in the ProtoBuf definition.
41+
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
42+
match value {
43+
"Sqlite" => Some(Self::Sqlite),
44+
"Libsql" => Some(Self::Libsql),
45+
_ => None,
46+
}
47+
}
48+
}
749
}
850
#[allow(clippy::derive_partial_eq_without_eq)]
951
#[derive(Clone, PartialEq, ::prost::Message)]
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
pub type Result<T, E = Error> = std::result::Result<T, E>;
2+
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
3+
14
#[derive(Debug, thiserror::Error)]
25
pub enum Error {
36
#[error("IO error: {0}")]
47
Io(#[from] std::io::Error),
58
#[error("SQLite error: {0}")]
69
Sqlite(#[from] rusqlite::Error),
7-
#[error("A fatal error occured injecting frames")]
8-
FatalInjectError,
10+
#[error("A fatal error occured injecting frames: {0}")]
11+
FatalInjectError(BoxError),
912
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use std::mem::size_of;
2+
3+
use libsql_wal::io::StdIO;
4+
use libsql_wal::replication::injector::Injector;
5+
use libsql_wal::segment::Frame as WalFrame;
6+
use zerocopy::{AsBytes, FromZeroes};
7+
8+
use crate::frame::FrameNo;
9+
use crate::rpc::replication::Frame as RpcFrame;
10+
11+
use super::error::{Error, Result};
12+
13+
pub struct LibsqlInjector {
14+
injector: Injector<StdIO>,
15+
}
16+
17+
impl super::Injector for LibsqlInjector {
18+
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<Option<FrameNo>> {
19+
// this is a bit annoying be we want to read the frame, and it has to be aligned, so we
20+
// must copy it...
21+
// FIXME: optimize this.
22+
let mut wal_frame = WalFrame::new_box_zeroed();
23+
if frame.data.len() != size_of::<WalFrame>() {
24+
todo!("invalid frame");
25+
}
26+
wal_frame.as_bytes_mut().copy_from_slice(&frame.data[..]);
27+
Ok(self
28+
.injector
29+
.insert_frame(wal_frame)
30+
.await
31+
.map_err(|e| Error::FatalInjectError(e.into()))?)
32+
}
33+
34+
async fn rollback(&mut self) {
35+
self.injector.rollback();
36+
}
37+
38+
async fn flush(&mut self) -> Result<Option<FrameNo>> {
39+
self.injector
40+
.flush(None)
41+
.await
42+
.map_err(|e| Error::FatalInjectError(e.into()))?;
43+
Ok(None)
44+
}
45+
}

0 commit comments

Comments
 (0)