Skip to content

Commit c050655

Browse files
authored
Merge pull request #1777 from tursodatabase/libsql-wal-replicate-from-db-file
libsql wal replicate from db file
2 parents 21ae561 + 18e7c0c commit c050655

7 files changed

Lines changed: 184 additions & 96 deletions

File tree

libsql-wal/src/io/buf.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,17 @@ impl<T> ZeroCopyBoxIoBuf<T> {
154154
Self { init: 0, inner }
155155
}
156156

157+
/// same as new_uninit, but partially fills the buffer starting at offset
158+
///
159+
/// # Safety: The caller must ensure that the remaining bytes are initialized
160+
pub unsafe fn new_uninit_partial(inner: Box<T>, offset: usize) -> Self {
161+
assert!(offset < size_of::<T>());
162+
Self {
163+
inner,
164+
init: offset,
165+
}
166+
}
167+
157168
fn is_init(&self) -> bool {
158169
self.init == size_of::<T>()
159170
}

libsql-wal/src/io/file.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -193,16 +193,17 @@ impl FileExt for File {
193193
let (buffer, ret) = tokio::task::spawn_blocking(move || {
194194
// let mut read = 0;
195195

196+
let len = buf.bytes_total();
197+
let init = buf.bytes_init();
196198
let chunk = unsafe {
197-
let len = buf.bytes_total();
198-
let ptr = buf.stable_mut_ptr();
199-
std::slice::from_raw_parts_mut(ptr, len)
199+
let ptr = buf.stable_mut_ptr().offset(init as _);
200+
std::slice::from_raw_parts_mut(ptr, len - init)
200201
};
201202

202203
let ret = file.read_exact_at(chunk, offset);
203204
if ret.is_ok() {
204205
unsafe {
205-
buf.set_init(buf.bytes_total());
206+
buf.set_init(init + chunk.len());
206207
}
207208
}
208209
(buf, ret)
@@ -222,16 +223,17 @@ impl FileExt for File {
222223
let (buffer, ret) = tokio::task::spawn_blocking(move || {
223224
// let mut read = 0;
224225

226+
let len = buf.bytes_total();
227+
let init = buf.bytes_init();
225228
let chunk = unsafe {
226-
let len = buf.bytes_total();
227-
let ptr = buf.stable_mut_ptr();
228-
std::slice::from_raw_parts_mut(ptr, len)
229+
let ptr = buf.stable_mut_ptr().offset(init as _);
230+
std::slice::from_raw_parts_mut(ptr, len - init)
229231
};
230232

231233
let ret = file.read_at(chunk, offset);
232234
if let Ok(n) = ret {
233235
unsafe {
234-
buf.set_init(n);
236+
buf.set_init(init + n);
235237
}
236238
}
237239
(buf, ret)
@@ -358,13 +360,13 @@ mod test {
358360
file.write_all(&[1; 12345]).unwrap();
359361
file.write_all(&[2; 50]).unwrap();
360362

361-
let buf = vec![0u8; 12345];
363+
let buf = Vec::with_capacity(12345);
362364
let (buf, ret) = file.read_exact_at_async(buf, 0).await;
363365
ret.unwrap();
364366
assert_eq!(buf.len(), 12345);
365367
assert!(buf.iter().all(|x| *x == 1));
366368

367-
let buf = vec![2u8; 50];
369+
let buf = Vec::with_capacity(50);
368370
let (buf, ret) = file.read_exact_at_async(buf, 12345).await;
369371
ret.unwrap();
370372
assert_eq!(buf.len(), 50);

libsql-wal/src/replication/replicator.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
use std::pin::Pin;
12
use std::sync::Arc;
23

34
use roaring::RoaringBitmap;
45
use tokio::sync::watch;
5-
use tokio_stream::{Stream, StreamExt};
6+
use tokio_stream::{Stream, StreamExt as _};
67

78
use crate::io::Io;
89
use crate::replication::Error;
@@ -58,13 +59,13 @@ impl<IO: Io> Replicator<IO> {
5859
tracing::debug!(most_recent_frame_no, "new frame_no available");
5960

6061
let mut commit_frame_no = 0;
62+
let tx = self.shared.begin_read(u64::MAX);
6163
// we have stuff to replicate
6264
if most_recent_frame_no >= self.next_frame_no {
6365
// first replicate the most recent version of each page from the current
6466
// segment. We also return how far back we have replicated from the current log
65-
let current = self.shared.current.load();
6667
let mut seen = RoaringBitmap::new();
67-
let (stream, replicated_until, size_after) = current.frame_stream_from(self.next_frame_no, &mut seen);
68+
let (stream, replicated_until) = tx.current.frame_stream_from(self.next_frame_no, &mut seen, &tx);
6869
let should_replicate_from_tail = replicated_until != self.next_frame_no;
6970

7071
{
@@ -78,7 +79,7 @@ impl<IO: Io> Replicator<IO> {
7879
let mut frame = frame.map_err(|e| Error::CurrentSegment(e.into()))?;
7980
commit_frame_no = frame.header().frame_no().max(commit_frame_no);
8081
if stream.peek().await.is_none() && !should_replicate_from_tail {
81-
frame.header_mut().set_size_after(size_after);
82+
frame.header_mut().set_size_after(tx.db_size);
8283
self.next_frame_no = commit_frame_no + 1;
8384
}
8485

@@ -90,9 +91,9 @@ impl<IO: Io> Replicator<IO> {
9091
// wee need to take frames from the sealed segments.
9192
if should_replicate_from_tail {
9293
let replicated_until = {
93-
let (stream, replicated_until) = current
94+
let (stream, replicated_until) = tx.current
9495
.tail()
95-
.stream_pages_from(replicated_until, self.next_frame_no, &mut seen).await;
96+
.stream_pages_from(replicated_until, self.next_frame_no, &mut seen, &tx).await;
9697
tokio::pin!(stream);
9798

9899
tracing::debug!(replicated_until, "replicating from tail");
@@ -105,7 +106,7 @@ impl<IO: Io> Replicator<IO> {
105106
let mut frame = frame.map_err(|e| Error::SealedSegment(e.into()))?;
106107
commit_frame_no = frame.header().frame_no().max(commit_frame_no);
107108
if stream.peek().await.is_none() && !should_replicate_from_storage {
108-
frame.header_mut().set_size_after(size_after);
109+
frame.header_mut().set_size_after(tx.db_size);
109110
self.next_frame_no = commit_frame_no + 1;
110111
}
111112

@@ -118,12 +119,21 @@ impl<IO: Io> Replicator<IO> {
118119
// Replicating from sealed segments was not enough, so we replicate from
119120
// durable storage
120121
if let Some(replicated_until) = replicated_until {
121-
tracing::debug!("replicating from durable storage");
122-
let stream = self
123-
.shared
124-
.stored_segments
125-
.stream(&mut seen, replicated_until, self.next_frame_no)
126-
.peekable();
122+
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if self.next_frame_no == 1 {
123+
// we're replicating from scratch, read straight from the main db
124+
// file
125+
tracing::debug!("replicating main db file");
126+
Box::pin(self.shared.replicate_from_db_file(&mut seen, &tx, replicated_until))
127+
} else {
128+
tracing::debug!("replicating from durable storage");
129+
Box::pin(self
130+
.shared
131+
.stored_segments
132+
.stream(&mut seen, replicated_until, self.next_frame_no)
133+
.peekable())
134+
};
135+
136+
let stream = stream.peekable();
127137

128138
tokio::pin!(stream);
129139

@@ -132,7 +142,7 @@ impl<IO: Io> Replicator<IO> {
132142
let mut frame = frame?;
133143
commit_frame_no = frame.header().frame_no().max(commit_frame_no);
134144
if stream.peek().await.is_none() {
135-
frame.header_mut().set_size_after(size_after);
145+
frame.header_mut().set_size_after(tx.db_size);
136146
self.next_frame_no = commit_frame_no + 1;
137147
}
138148

libsql-wal/src/replication/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ where
6161
let segment = match maybe_seg {
6262
Some(ref seg) => seg,
6363
None => {
64+
tracing::debug!(key = %key, "fetching segment");
6465
maybe_seg = Some(storage.fetch_segment_data(&namespace, &key, None).await?);
6566
maybe_seg.as_ref().unwrap()
6667
},

libsql-wal/src/segment/current.rs

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::io::file::FileExt;
2424
use crate::io::Inspect;
2525
use crate::segment::{checked_frame_offset, SegmentFlags};
2626
use crate::segment::{frame_offset, page_offset, sealed::SealedSegment};
27-
use crate::transaction::{Transaction, TxGuardOwned, TxGuardShared};
27+
use crate::transaction::{ReadTransaction, Transaction, TxGuardOwned, TxGuardShared};
2828
use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION};
2929

3030
use super::list::SegmentList;
@@ -507,24 +507,25 @@ impl<F> CurrentSegment<F> {
507507
&'a self,
508508
start_frame_no: u64,
509509
seen: &'a mut RoaringBitmap,
510-
) -> (impl Stream<Item = Result<Box<Frame>>> + 'a, u64, u32)
510+
// not actually used, but ensures that a read lock is held while this method id called
511+
tx: &'a ReadTransaction<F>,
512+
) -> (impl Stream<Item = Result<Box<Frame>>> + 'a, u64)
511513
where
512514
F: FileExt,
513515
{
514-
let (seg_start_frame_no, last_committed, db_size) =
515-
self.with_header(|h| (h.start_frame_no.get(), h.last_committed(), h.size_after()));
516+
let seg_start_frame_no = tx.current.with_header(|h| h.start_frame_no.get());
516517
let replicated_until = seg_start_frame_no
517518
// if current is empty, start_frame_no doesn't exist
518-
.min(last_committed)
519+
.min(tx.max_frame_no)
519520
.max(start_frame_no);
520521

521522
// TODO: optim, we could read less frames if we had a mapping from frame_no to page_no in
522523
// the index
523524
let stream = async_stream::try_stream! {
524525
if !self.is_empty() {
525-
let mut frame_offset = (last_committed - seg_start_frame_no) as u32;
526+
let mut frame_offset = (tx.max_frame_no - seg_start_frame_no) as u32;
526527
loop {
527-
let buf = ZeroCopyBoxIoBuf::new(Frame::new_box_zeroed());
528+
let buf = ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed());
528529
let (buf, res) = self.read_frame_offset_async(frame_offset, buf).await;
529530
res?;
530531

@@ -551,7 +552,7 @@ impl<F> CurrentSegment<F> {
551552
}
552553
};
553554

554-
(stream, replicated_until, db_size)
555+
(stream, replicated_until)
555556
}
556557

557558
fn recompute_checksum(&self, start_offset: u32, until_offset: u32) -> Result<u32>
@@ -714,18 +715,20 @@ mod test {
714715
.unwrap();
715716
}
716717

717-
let mut seen = RoaringBitmap::new();
718-
let current = shared.current.load();
719-
let (stream, replicated_until, size_after) = current.frame_stream_from(1, &mut seen);
720-
tokio::pin!(stream);
721-
assert_eq!(replicated_until, 1);
722-
assert_eq!(size_after, 6);
723-
724718
let mut tmp = tempfile().unwrap();
725-
while let Some(frame) = stream.next().await {
726-
let frame = frame.unwrap();
727-
let offset = (frame.header().page_no() - 1) * 4096;
728-
tmp.write_all_at(frame.data(), offset as _).unwrap();
719+
{
720+
let tx = shared.begin_read(u64::MAX);
721+
let mut seen = RoaringBitmap::new();
722+
let (stream, replicated_until) = tx.current.frame_stream_from(1, &mut seen, &tx);
723+
tokio::pin!(stream);
724+
assert_eq!(replicated_until, 1);
725+
assert_eq!(tx.db_size, 6);
726+
727+
while let Some(frame) = stream.next().await {
728+
let frame = frame.unwrap();
729+
let offset = (frame.header().page_no() - 1) * 4096;
730+
tmp.write_all_at(frame.data(), offset as _).unwrap();
731+
}
729732
}
730733

731734
seal_current_segment(&shared);
@@ -768,11 +771,11 @@ mod test {
768771

769772
let mut seen = RoaringBitmap::new();
770773
{
771-
let current = shared.current.load();
772-
let (stream, replicated_until, size_after) = current.frame_stream_from(1, &mut seen);
774+
let tx = shared.begin_read(u64::MAX);
775+
let (stream, replicated_until) = tx.current.frame_stream_from(1, &mut seen, &tx);
773776
tokio::pin!(stream);
774777
assert_eq!(replicated_until, 60);
775-
assert_eq!(size_after, 9);
778+
assert_eq!(tx.db_size, 9);
776779
assert_eq!(stream.fold(0, |count, _| count + 1).await, 6);
777780
}
778781
assert_debug_snapshot!(seen);
@@ -787,12 +790,12 @@ mod test {
787790
conn.execute("create table test (x)", ()).unwrap();
788791

789792
let mut seen = RoaringBitmap::new();
790-
let current = shared.current.load();
791-
let (stream, replicated_until, size_after) = current.frame_stream_from(100, &mut seen);
793+
let tx = shared.begin_read(u64::MAX);
794+
let (stream, replicated_until) = tx.current.frame_stream_from(100, &mut seen, &tx);
792795
tokio::pin!(stream);
793796
assert_eq!(replicated_until, 100);
794797
assert_eq!(stream.fold(0, |count, _| count + 1).await, 0);
795-
assert_eq!(size_after, 2);
798+
assert_eq!(tx.db_size, 2);
796799
}
797800

798801
#[tokio::test]
@@ -805,11 +808,11 @@ mod test {
805808
seal_current_segment(&shared);
806809

807810
let mut seen = RoaringBitmap::new();
808-
let current = shared.current.load();
809-
let (stream, replicated_until, size_after) = current.frame_stream_from(1, &mut seen);
811+
let tx = shared.begin_read(u64::MAX);
812+
let (stream, replicated_until) = tx.current.frame_stream_from(1, &mut seen, &tx);
810813
tokio::pin!(stream);
811814
assert_eq!(replicated_until, 2);
812-
assert_eq!(size_after, 2);
815+
assert_eq!(tx.db_size, 2);
813816
assert_eq!(stream.fold(0, |count, _| count + 1).await, 0);
814817
}
815818

@@ -1014,8 +1017,8 @@ mod test {
10141017
}
10151018
}
10161019

1017-
fn db_payload(db: &[u8]) -> &[u8] {
1020+
fn db_payload(db: &[u8]) -> u32 {
10181021
let size = (db.len() / 4096) * 4096;
1019-
&db[..size]
1022+
crc32fast::hash(&db[..size])
10201023
}
10211024
}

0 commit comments

Comments
 (0)