Skip to content

Commit ef0ad61

Browse files
authored
Merge pull request #1665 from tursodatabase/replication_index
libsql: Add max_write_replication_index and sync_until in Database
2 parents 4023a3a + 2cf3494 commit ef0ad61

6 files changed

Lines changed: 97 additions & 9 deletions

File tree

libsql-server/tests/embedded_replica/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ fn execute_batch() {
179179
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
180180
.await?;
181181

182+
assert_eq!(db.max_write_replication_index(), Some(1));
183+
182184
let n = db.sync().await?.frame_no();
183185
assert_eq!(n, Some(1));
184186

@@ -231,6 +233,7 @@ fn stream() {
231233

232234
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
233235
.await?;
236+
assert_eq!(db.max_write_replication_index(), Some(1));
234237

235238
let n = db.sync().await?.frame_no();
236239
assert_eq!(n, Some(1));
@@ -244,8 +247,10 @@ fn stream() {
244247
",
245248
)
246249
.await?;
250+
let replication_index = db.max_write_replication_index();
247251

248-
db.sync().await.unwrap();
252+
let synced_replication_index = db.sync().await.unwrap().frame_no();
253+
assert_eq!(synced_replication_index, replication_index);
249254

250255
let rows = conn.query("select * from user", ()).await.unwrap();
251256

libsql/src/database.rs

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ pub use builder::Builder;
77
#[cfg(feature = "core")]
88
pub use libsql_sys::{Cipher, EncryptionConfig};
99

10-
use std::fmt;
11-
1210
use crate::{Connection, Result};
11+
use std::fmt;
12+
use std::sync::atomic::AtomicU64;
13+
use std::sync::Arc;
1314

1415
cfg_core! {
1516
bitflags::bitflags! {
@@ -76,6 +77,9 @@ impl fmt::Debug for DbType {
7677
/// not do much work until the [`Database::connect`] fn is called.
7778
pub struct Database {
7879
db_type: DbType,
80+
/// The maximum replication index returned from a write performed using any connection created using this Database object.
81+
#[allow(dead_code)]
82+
max_write_replication_index: Arc<AtomicU64>,
7983
}
8084

8185
cfg_core! {
@@ -87,6 +91,7 @@ cfg_core! {
8791

8892
Ok(Database {
8993
db_type: DbType::Memory { db },
94+
max_write_replication_index: Default::default(),
9095
})
9196
}
9297

@@ -105,6 +110,7 @@ cfg_core! {
105110
flags,
106111
encryption_config: None,
107112
},
113+
max_write_replication_index: Default::default(),
108114
})
109115
}
110116
}
@@ -130,6 +136,7 @@ cfg_replication! {
130136

131137
Ok(Database {
132138
db_type: DbType::Sync { db, encryption_config },
139+
max_write_replication_index: Default::default(),
133140
})
134141
}
135142

@@ -191,6 +198,7 @@ cfg_replication! {
191198

192199
Ok(Database {
193200
db_type: DbType::Sync { db, encryption_config },
201+
max_write_replication_index: Default::default(),
194202
})
195203
}
196204

@@ -317,6 +325,7 @@ cfg_replication! {
317325

318326
Ok(Database {
319327
db_type: DbType::Sync { db, encryption_config },
328+
max_write_replication_index: Default::default(),
320329
})
321330
}
322331

@@ -331,6 +340,16 @@ cfg_replication! {
331340
}
332341
}
333342

343+
/// Sync database from remote until it gets to a given replication_index or further,
344+
/// and returns the committed frame_no after syncing, if applicable.
345+
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::replication::Replicated> {
346+
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
347+
db.sync_until(replication_index).await
348+
} else {
349+
Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
350+
}
351+
}
352+
334353
/// Apply a set of frames to the database and returns the committed frame_no after syncing, if
335354
/// applicable.
336355
pub async fn sync_frames(&self, frames: crate::replication::Frames) -> Result<Option<FrameNo>> {
@@ -372,12 +391,25 @@ cfg_replication! {
372391
DbType::Sync { db, .. } => {
373392
let path = db.path().to_string();
374393
Ok(Database {
375-
db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None}
394+
db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None},
395+
max_write_replication_index: Default::default(),
376396
})
377397
}
378398
t => Err(Error::FreezeNotSupported(format!("{:?}", t)))
379399
}
380400
}
401+
402+
/// Get the maximum replication index returned from a write performed using any connection created using this Database object.
403+
pub fn max_write_replication_index(&self) -> Option<FrameNo> {
404+
let index = self
405+
.max_write_replication_index
406+
.load(std::sync::atomic::Ordering::SeqCst);
407+
if index == 0 {
408+
None
409+
} else {
410+
Some(index)
411+
}
412+
}
381413
}
382414
}
383415

@@ -445,6 +477,7 @@ cfg_remote! {
445477
connector: crate::util::ConnectorService::new(svc),
446478
version,
447479
},
480+
max_write_replication_index: Default::default(),
448481
})
449482
}
450483
}
@@ -552,7 +585,11 @@ impl Database {
552585

553586
let local = LibsqlConnection { conn };
554587
let writer = local.conn.new_connection_writer();
555-
let remote = crate::replication::RemoteConnection::new(local, writer);
588+
let remote = crate::replication::RemoteConnection::new(
589+
local,
590+
writer,
591+
self.max_write_replication_index.clone(),
592+
);
556593
let conn = std::sync::Arc::new(remote);
557594

558595
Ok(Connection { conn })

libsql/src/database/builder.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ cfg_core! {
135135
let db = crate::local::Database::open(":memory:", crate::OpenFlags::default())?;
136136
Database {
137137
db_type: DbType::Memory { db } ,
138+
max_write_replication_index: Default::default(),
138139
}
139140
} else {
140141
let path = self
@@ -150,6 +151,7 @@ cfg_core! {
150151
flags: self.inner.flags,
151152
encryption_config: self.inner.encryption_config,
152153
},
154+
max_write_replication_index: Default::default(),
153155
}
154156
};
155157

@@ -291,6 +293,7 @@ cfg_replication! {
291293

292294
Ok(Database {
293295
db_type: DbType::Sync { db, encryption_config },
296+
max_write_replication_index: Default::default(),
294297
})
295298
}
296299
}
@@ -360,6 +363,7 @@ cfg_replication! {
360363

361364
Ok(Database {
362365
db_type: DbType::Sync { db, encryption_config },
366+
max_write_replication_index: Default::default(),
363367
})
364368
}
365369
}
@@ -414,6 +418,7 @@ cfg_remote! {
414418
connector,
415419
version,
416420
},
421+
max_write_replication_index: Default::default(),
417422
})
418423
}
419424
}

libsql/src/local/database.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,29 @@ impl Database {
277277
Ok(self.sync_oneshot().await?)
278278
}
279279

280+
#[cfg(feature = "replication")]
281+
/// Sync with primary at least to a given replication index
282+
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::replication::Replicated> {
283+
if let Some(ctx) = &self.replication_ctx {
284+
let mut frame_no: Option<FrameNo> = ctx.replicator.committed_frame_no().await;
285+
let mut frames_synced: usize = 0;
286+
while frame_no.unwrap_or(0) < replication_index {
287+
let res = ctx.replicator.sync_oneshot().await?;
288+
frame_no = res.frame_no();
289+
frames_synced += res.frames_synced();
290+
}
291+
Ok(crate::replication::Replicated {
292+
frame_no,
293+
frames_synced,
294+
})
295+
} else {
296+
Err(crate::errors::Error::Misuse(
297+
"No replicator available. Use Database::with_replicator() to enable replication"
298+
.to_string(),
299+
))
300+
}
301+
}
302+
280303
#[cfg(feature = "replication")]
281304
pub async fn sync_frames(&self, frames: Frames) -> Result<Option<FrameNo>> {
282305
if let Some(ref ctx) = self.replication_ctx {

libsql/src/replication/connection.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
use std::str::FromStr;
44
use std::sync::Arc;
5-
5+
use std::sync::atomic::AtomicU64;
66
use libsql_replication::rpc::proxy::{
77
describe_result, query_result::RowResult, Cond, DescribeResult, ExecuteResults, NotCond,
88
OkCond, Positional, Query, ResultRows, State as RemoteState, Step,
@@ -28,6 +28,7 @@ pub struct RemoteConnection {
2828
pub(self) local: LibsqlConnection,
2929
writer: Option<Writer>,
3030
inner: Arc<Mutex<Inner>>,
31+
max_write_replication_index: Arc<AtomicU64>,
3132
}
3233

3334
#[derive(Default, Debug)]
@@ -166,12 +167,25 @@ impl From<RemoteState> for State {
166167
}
167168

168169
impl RemoteConnection {
169-
pub(crate) fn new(local: LibsqlConnection, writer: Option<Writer>) -> Self {
170+
pub(crate) fn new(local: LibsqlConnection, writer: Option<Writer>, max_write_replication_index: Arc<AtomicU64>) -> Self {
170171
let state = Arc::new(Mutex::new(Inner::default()));
171172
Self {
172173
local,
173174
writer,
174175
inner: state,
176+
max_write_replication_index,
177+
}
178+
}
179+
180+
fn update_max_write_replication_index(&self, index: Option<u64>) {
181+
if let Some(index) = index {
182+
let mut current = self.max_write_replication_index.load(std::sync::atomic::Ordering::SeqCst);
183+
while index > current {
184+
match self.max_write_replication_index.compare_exchange(current, index, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst) {
185+
Ok(_) => break,
186+
Err(new_current) => current = new_current,
187+
}
188+
}
175189
}
176190
}
177191

@@ -201,6 +215,8 @@ impl RemoteConnection {
201215
.into();
202216
}
203217

218+
self.update_max_write_replication_index(res.current_frame_no);
219+
204220
if let Some(replicator) = writer.replicator() {
205221
replicator.sync_oneshot().await?;
206222
}
@@ -226,6 +242,8 @@ impl RemoteConnection {
226242
.into();
227243
}
228244

245+
self.update_max_write_replication_index(res.current_frame_no);
246+
229247
if let Some(replicator) = writer.replicator() {
230248
replicator.sync_oneshot().await?;
231249
}

libsql/src/replication/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ pub(crate) mod remote_client;
3636

3737
#[derive(Debug)]
3838
pub struct Replicated {
39-
frame_no: Option<FrameNo>,
40-
frames_synced: usize,
39+
pub(crate) frame_no: Option<FrameNo>,
40+
pub(crate) frames_synced: usize,
4141
}
4242

4343
impl Replicated {

0 commit comments

Comments
 (0)