Skip to content

Commit 5d7c676

Browse files
committed
pass segment timestamp to storage
1 parent 0dde100 commit 5d7c676

6 files changed

Lines changed: 51 additions & 33 deletions

File tree

libsql-wal/src/segment/mod.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use std::mem::size_of;
1515
use std::num::NonZeroU64;
1616
use std::sync::Arc;
1717

18+
use chrono::DateTime;
19+
use chrono::Utc;
1820
use zerocopy::byteorder::little_endian::{U128, U16, U32, U64};
1921
use zerocopy::AsBytes;
2022

@@ -171,6 +173,7 @@ pub trait Segment: Send + Sync + 'static {
171173
async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
172174
where
173175
B: IoBufMut + Send + 'static;
176+
fn timestamp(&self) -> DateTime<Utc>;
174177

175178
fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()>;
176179
}
@@ -196,15 +199,12 @@ impl<T: Segment> Segment for Arc<T> {
196199
self.as_ref().index()
197200
}
198201

199-
fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool> {
200-
self.as_ref().read_page(page_no, max_frame_no, buf)
202+
fn is_storable(&self) -> bool {
203+
self.as_ref().is_storable()
201204
}
202205

203-
async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
204-
where
205-
B: IoBufMut + Send + 'static,
206-
{
207-
self.as_ref().read_frame_offset_async(offset, buf).await
206+
fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool> {
207+
self.as_ref().read_page(page_no, max_frame_no, buf)
208208
}
209209

210210
fn is_checkpointable(&self) -> bool {
@@ -215,12 +215,19 @@ impl<T: Segment> Segment for Arc<T> {
215215
self.as_ref().size_after()
216216
}
217217

218+
async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
219+
where
220+
B: IoBufMut + Send + 'static,
221+
{
222+
self.as_ref().read_frame_offset_async(offset, buf).await
223+
}
224+
218225
fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()> {
219226
self.as_ref().destroy(io)
220227
}
221228

222-
fn is_storable(&self) -> bool {
223-
self.as_ref().is_storable()
229+
fn timestamp(&self) -> DateTime<Utc> {
230+
self.as_ref().timestamp()
224231
}
225232
}
226233

libsql-wal/src/segment/sealed.rs

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::sync::{
99
Arc,
1010
};
1111

12+
use chrono::prelude::{DateTime, Utc};
1213
use fst::{Map, MapBuilder, Streamer};
1314
use zerocopy::{AsBytes, FromZeroes};
1415

@@ -156,6 +157,17 @@ where
156157
&self.index
157158
}
158159

160+
fn is_storable(&self) -> bool {
161+
// we don't store unordered segments, since they only happen in two cases:
162+
// - in a replica: no need for storage
163+
// - in a primary, on recovery from storage: we don't want to override remote
164+
// segment.
165+
!self
166+
.header()
167+
.flags()
168+
.contains(SegmentFlags::FRAME_UNORDERED)
169+
}
170+
159171
fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> std::io::Result<bool> {
160172
if self.header().start_frame_no.get() > max_frame_no {
161173
return Ok(false);
@@ -171,16 +183,6 @@ where
171183
Ok(false)
172184
}
173185

174-
async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
175-
where
176-
B: IoBufMut + Send + 'static,
177-
{
178-
assert_eq!(buf.bytes_total(), size_of::<Frame>());
179-
let frame_offset = frame_offset(offset);
180-
let (buf, ret) = self.file.read_exact_at_async(buf, frame_offset as _).await;
181-
(buf, ret.map_err(Into::into))
182-
}
183-
184186
fn is_checkpointable(&self) -> bool {
185187
let read_locks = self.read_locks.load(Ordering::Relaxed);
186188
tracing::debug!(read_locks);
@@ -191,6 +193,16 @@ where
191193
self.header().size_after()
192194
}
193195

196+
async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
197+
where
198+
B: IoBufMut + Send + 'static,
199+
{
200+
assert_eq!(buf.bytes_total(), size_of::<Frame>());
201+
let frame_offset = frame_offset(offset);
202+
let (buf, ret) = self.file.read_exact_at_async(buf, frame_offset as _).await;
203+
(buf, ret.map_err(Into::into))
204+
}
205+
194206
fn destroy<IO: crate::io::Io>(&self, io: &IO) -> impl std::future::Future<Output = ()> {
195207
async move {
196208
if let Err(e) = io.remove_file_async(&self.path).await {
@@ -199,15 +211,9 @@ where
199211
}
200212
}
201213

202-
fn is_storable(&self) -> bool {
203-
// we don't store unordered segments, since they only happen in two cases:
204-
// - in a replica: no need for storage
205-
// - in a primary, on recovery from storage: we don't want to override remote
206-
// segment.
207-
!self
208-
.header()
209-
.flags()
210-
.contains(SegmentFlags::FRAME_UNORDERED)
214+
fn timestamp(&self) -> DateTime<Utc> {
215+
assert_ne!(self.header().sealed_at_timestamp.get(), 0, "segment was not sealed properly");
216+
DateTime::from_timestamp_millis(self.header().sealed_at_timestamp.get() as _).expect("this should be a guaranteed roundtrip with DateTime::timestamp_millis")
211217
}
212218
}
213219

libsql-wal/src/storage/backend/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub struct SegmentMeta {
2222
pub segment_id: Uuid,
2323
pub start_frame_no: u64,
2424
pub end_frame_no: u64,
25-
pub created_at: DateTime<Utc>,
25+
pub segment_timestamp: DateTime<Utc>,
2626
}
2727

2828
pub struct RestoreRequest {}

libsql-wal/src/storage/backend/s3.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ mod tests {
754754
segment_id: Uuid::new_v4(),
755755
start_frame_no: 0u64.into(),
756756
end_frame_no: 64u64.into(),
757-
created_at: Utc::now(),
757+
segment_timestamp: Utc::now(),
758758
},
759759
std::fs::File::open(&f_path).unwrap(),
760760
index,
@@ -776,7 +776,7 @@ mod tests {
776776
segment_id: Uuid::new_v4(),
777777
start_frame_no: 64u64.into(),
778778
end_frame_no: 128u64.into(),
779-
created_at: Utc::now(),
779+
segment_timestamp: Utc::now(),
780780
},
781781
std::fs::File::open(&f_path).unwrap(),
782782
index,

libsql-wal/src/storage/compaction/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ impl<B> Compactor<B> {
304304
segment_id: Uuid::new_v4(),
305305
start_frame_no: start,
306306
end_frame_no: end,
307-
created_at: Utc::now(),
307+
segment_timestamp: Utc::now(),
308308
},
309309
out_file,
310310
out_index.into_inner().unwrap(),

libsql-wal/src/storage/job.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ where
6969
namespace: self.request.namespace.clone(),
7070
start_frame_no: segment.start_frame_no(),
7171
end_frame_no: segment.last_committed(),
72-
created_at: io.now(),
72+
segment_timestamp: segment.timestamp(),
7373
};
7474
let config = self
7575
.request
@@ -108,6 +108,7 @@ mod test {
108108
// use std::sync::atomic::AtomicBool;
109109
use std::sync::Arc;
110110

111+
use chrono::prelude::DateTime;
111112
use chrono::Utc;
112113
// use fst::{Map, Streamer};
113114
// use libsql_sys::rusqlite::OpenFlags;
@@ -428,6 +429,10 @@ mod test {
428429
fn is_storable(&self) -> bool {
429430
true
430431
}
432+
433+
fn timestamp(&self) -> DateTime<Utc> {
434+
Utc::now()
435+
}
431436
}
432437

433438
struct TestBackend;

0 commit comments

Comments
 (0)