Skip to content

Commit 9338ef2

Browse files
committed
store segment timestamp in SegmentKey
1 parent efdded6 commit 9338ef2

3 files changed

Lines changed: 31 additions & 88 deletions

File tree

libsql-wal/src/storage/backend/s3.rs

Lines changed: 4 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use aws_sdk_s3::operation::get_object::GetObjectOutput;
1414
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
1515
use aws_sdk_s3::types::CreateBucketConfiguration;
1616
use aws_sdk_s3::Client;
17-
use aws_smithy_types_convert::date_time::DateTimeExt;
1817
use bytes::{Bytes, BytesMut};
18+
use chrono::{DateTime, Utc};
1919
use http_body::{Frame as HttpFrame, SizeHint};
2020
use libsql_sys::name::NamespaceName;
2121
use roaring::RoaringBitmap;
@@ -229,72 +229,9 @@ impl<IO: Io> S3Backend<IO> {
229229
Ok(key)
230230
}
231231

232-
/// finding a segment by timestamp should not be a common operation, and it's costly. We will
233-
/// perform a binary search until we find a satisfying segment. This forces us to iteratively:
234-
/// - find a segment for some frame_no
235-
/// - fetch that segment index to retrieve the metata (we don't need to download the data, just
236-
/// the metata is of use to us)
237-
/// - check the timestamp, rince and repeat
238-
///
239-
/// unfortunately, s3 doesn't have a way to list object with their metadata...
240-
#[tracing::instrument(skip(self, config, folder_key))]
241-
async fn find_segment_by_timestamp(&self, config: &S3Config, folder_key: &FolderKey<'_>, timestamp: DateTime<Utc>) -> Result<Option<SegmentKey>> {
242-
let mut attempted_frame_no = u64::MAX;
243-
let mut max_seen = 0;
244-
let mut best_so_far = None;
245-
loop {
246-
let Some(seg_key) = self.find_segment_by_frame_no(config, folder_key, attempted_frame_no).await? else { return Ok(None) };
247-
max_seen = max_seen.max(seg_key.end_frame_no);
248-
let key = s3_segment_index_key(folder_key, &seg_key);
249-
let object = self.s3_get(config, &key).await?;
250-
let Some(segment_timestamp_rfc3339) = object.metadata().and_then(|m| m.get(SEGMENT_TIMESTAMP_META_KEY)) else {
251-
tracing::warn!("{key} is missing metadata. Cannot be used as a restore point!");
252-
todo!("update next attempt frame_no");
253-
// continue
254-
};
255-
256-
let segment_timestamp = match DateTime::parse_from_rfc3339(segment_timestamp_rfc3339) {
257-
Ok(s) => s.to_utc(),
258-
Err(e) => {
259-
tracing::warn!("timestamp for `{key}` is invalid, skipping restore point: {e}");
260-
todo!("update next attempt frame_no");
261-
// continue
262-
}
263-
};
264-
265-
if segment_timestamp >= timestamp {
266-
if segment_timestamp == timestamp {
267-
// look no further!
268-
tracing::debug!("found exact match for timestamp: {seg_key}");
269-
return Ok(Some(seg_key))
270-
}
271-
best_so_far = match best_so_far {
272-
Some((best, key)) => {
273-
assert!(best >= timestamp);
274-
if best - timestamp <= segment_timestamp - timestamp {
275-
Some((best, key))
276-
} else {
277-
Some((segment_timestamp, seg_key))
278-
}
279-
}
280-
None => Some((segment_timestamp, seg_key)),
281-
};
282-
283-
let next_attempt = attempted_frame_no + ((max_seen - attempted_frame_no) / 2);
284-
if next_attempt == attempted_frame_no {
285-
let (ts, key) = best_so_far.unwrap();
286-
tracing::debug!("found best match for {timestamp}: {key}@{ts}");
287-
return Ok(Some(key))
288-
}
289-
290-
attempted_frame_no = next_attempt;
291-
} else {
292-
let next_attempt = attempted_frame_no - (attempted_frame_no / 2);
293-
if next_attempt <= 1 {
294-
return Ok(best_so_far.map(|x| x.1))
295-
}
296-
}
297-
}
232+
// #[tracing::instrument(skip(self, config, folder_key))]
233+
async fn find_segment_by_timestamp(&self, _config: &S3Config, _folder_key: &FolderKey<'_>, _timestamp: DateTime<Utc>) -> Result<Option<SegmentKey>> {
234+
todo!()
298235
}
299236

300237
// This method could probably be optimized a lot by using indexes and only downloading useful
@@ -407,7 +344,6 @@ impl<IO: Io> S3Backend<IO> {
407344
let infos = SegmentInfo {
408345
key,
409346
size: entry.size().unwrap_or(0) as usize,
410-
created_at: entry.last_modified().unwrap().to_chrono_utc().unwrap(),
411347
};
412348

413349
yield infos;

libsql-wal/src/storage/compaction/mod.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::path::PathBuf;
44
use std::sync::Arc;
55

66
use chrono::DateTime;
7-
use chrono::Utc;
87
use fst::map::OpBuilder;
98
use fst::Streamer;
109
use libsql_sys::name::NamespaceName;
@@ -58,7 +57,7 @@ impl<B> Compactor<B> {
5857
"CREATE TABLE IF NOT EXISTS segments (
5958
start_frame_no INTEGER,
6059
end_frame_no INTEGER,
61-
created_at DATE,
60+
timestamp DATE,
6261
size INTEGER,
6362
namespace_id INTEGER,
6463
PRIMARY KEY (start_frame_no, end_frame_no),
@@ -93,7 +92,7 @@ impl<B> Compactor<B> {
9392
pub fn analyze(&self, namespace: &NamespaceName) -> Result<AnalyzedSegments> {
9493
let mut stmt = self.meta.prepare_cached(
9594
r#"
96-
SELECT start_frame_no, end_frame_no
95+
SELECT start_frame_no, end_frame_no, timestamp
9796
FROM segments as s
9897
JOIN monitored_namespaces as m
9998
ON m.id = s.namespace_id
@@ -105,11 +104,12 @@ impl<B> Compactor<B> {
105104
while let Some(row) = rows.next()? {
106105
let start_frame_no: u64 = row.get(0)?;
107106
let end_frame_no: u64 = row.get(1)?;
107+
let timestamp: u64 = row.get(2)?;
108108
// it's free to go from one end of a segment to the next
109109
graph.add_edge(start_frame_no, end_frame_no, 0);
110110
if start_frame_no != 1 {
111111
// going from a segment to the next costs us
112-
graph.add_edge(start_frame_no - 1, start_frame_no, 1);
112+
graph.add_edge(start_frame_no - 1, start_frame_no, timestamp);
113113
}
114114
last_frame_no = last_frame_no.max(end_frame_no);
115115
}
@@ -304,7 +304,7 @@ impl<B> Compactor<B> {
304304
segment_id: Uuid::new_v4(),
305305
start_frame_no: start,
306306
end_frame_no: end,
307-
segment_timestamp: Utc::now(),
307+
segment_timestamp: DateTime::from_timestamp_millis(set.last().unwrap().timestamp as _).unwrap().to_utc(),
308308
},
309309
out_file,
310310
out_index.into_inner().unwrap(),
@@ -375,9 +375,13 @@ impl AnalyzedSegments {
375375
match path {
376376
Some((_len, nodes)) => {
377377
for chunk in nodes.chunks(2) {
378+
let start_frame_no = chunk[0];
379+
let end_frame_no = chunk[1];
380+
let timestamp = *self.graph.edges(start_frame_no).find_map(|(_, to, ts)| (to == end_frame_no).then_some(ts)).unwrap();
378381
let key = SegmentKey {
379-
start_frame_no: chunk[0],
380-
end_frame_no: chunk[1],
382+
start_frame_no,
383+
end_frame_no,
384+
timestamp,
381385
};
382386
segments.push(key);
383387
}
@@ -448,7 +452,7 @@ fn list_segments<'a>(
448452
) -> Result<()> {
449453
let mut stmt = conn.prepare_cached(
450454
r#"
451-
SELECT created_at, size, start_frame_no, end_frame_no
455+
SELECT timestamp, size, start_frame_no, end_frame_no
452456
FROM segments as s
453457
JOIN monitored_namespaces as m
454458
ON m.id == s.namespace_id
@@ -462,9 +466,9 @@ fn list_segments<'a>(
462466
key: SegmentKey {
463467
start_frame_no: r.get(2)?,
464468
end_frame_no: r.get(3)?,
469+
timestamp: r.get(0)?,
465470
},
466471
size: r.get(1)?,
467-
created_at: DateTime::from_timestamp(r.get(0)?, 0).unwrap(),
468472
})
469473
})?;
470474

@@ -486,7 +490,7 @@ fn register_segment_info(
486490
INSERT OR IGNORE INTO segments (
487491
start_frame_no,
488492
end_frame_no,
489-
created_at,
493+
timestamp,
490494
size,
491495
namespace_id
492496
)
@@ -495,7 +499,7 @@ fn register_segment_info(
495499
stmt.execute((
496500
info.key.start_frame_no,
497501
info.key.end_frame_no,
498-
info.created_at.timestamp(),
502+
info.key.timestamp,
499503
info.size,
500504
namespace_id,
501505
))?;
@@ -508,7 +512,7 @@ fn segments_range(
508512
) -> Result<Option<(SegmentInfo, SegmentInfo)>> {
509513
let mut stmt = conn.prepare_cached(
510514
r#"
511-
SELECT min(created_at), size, start_frame_no, end_frame_no
515+
SELECT min(timestamp), size, start_frame_no, end_frame_no
512516
FROM segments as s
513517
JOIN monitored_namespaces as m
514518
ON m.id == s.namespace_id
@@ -522,16 +526,16 @@ fn segments_range(
522526
key: SegmentKey {
523527
start_frame_no: r.get(2)?,
524528
end_frame_no: r.get(3)?,
529+
timestamp: r.get(0)?,
525530
},
526531
size: r.get(1)?,
527-
created_at: DateTime::from_timestamp(r.get(0)?, 0).unwrap(),
528532
})
529533
})
530534
.optional()?;
531535

532536
let mut stmt = conn.prepare_cached(
533537
r#"
534-
SELECT max(created_at), size, start_frame_no, end_frame_no
538+
SELECT max(timestamp), size, start_frame_no, end_frame_no
535539
FROM segments as s
536540
JOIN monitored_namespaces as m
537541
ON m.id == s.namespace_id
@@ -545,9 +549,9 @@ fn segments_range(
545549
key: SegmentKey {
546550
start_frame_no: r.get(2)?,
547551
end_frame_no: r.get(3)?,
552+
timestamp: r.get(0)?,
548553
},
549554
size: r.get(1)?,
550-
created_at: DateTime::from_timestamp(r.get(0)?, 0).unwrap(),
551555
})
552556
})
553557
.optional()?;

libsql-wal/src/storage/mod.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ pub enum RestoreOptions {
6464
pub struct SegmentKey {
6565
pub start_frame_no: u64,
6666
pub end_frame_no: u64,
67+
pub timestamp: u64,
6768
}
6869

6970
impl PartialOrd for SegmentKey {
@@ -115,6 +116,7 @@ impl From<&SegmentMeta> for SegmentKey {
115116
Self {
116117
start_frame_no: value.start_frame_no,
117118
end_frame_no: value.end_frame_no,
119+
timestamp: value.segment_timestamp.timestamp_millis() as _,
118120
}
119121
}
120122
}
@@ -125,11 +127,13 @@ impl FromStr for SegmentKey {
125127
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
126128
let (rev_start_fno, s) = s.split_at(20);
127129
let start_frame_no = u64::MAX - rev_start_fno.parse::<u64>().map_err(|_| ())?;
128-
let (_, rev_end_fno) = s.split_at(1);
130+
let (rev_end_fno, timestamp) = s[1..].split_at(20);
129131
let end_frame_no = u64::MAX - rev_end_fno.parse::<u64>().map_err(|_| ())?;
132+
let timestamp = timestamp[1..].parse().map_err(|_| ())?;
130133
Ok(Self {
131134
start_frame_no,
132135
end_frame_no,
136+
timestamp,
133137
})
134138
}
135139
}
@@ -138,9 +142,10 @@ impl fmt::Display for SegmentKey {
138142
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139143
write!(
140144
f,
141-
"{:020}-{:020}",
145+
"{:020}-{:020}-{:020}",
142146
u64::MAX - self.start_frame_no,
143147
u64::MAX - self.end_frame_no,
148+
self.timestamp,
144149
)
145150
}
146151
}
@@ -219,9 +224,6 @@ pub trait Storage: Send + Sync + 'static {
219224
pub struct SegmentInfo {
220225
pub key: SegmentKey,
221226
pub size: usize,
222-
/// when that segment was created. This is different from the segment timestamp, corresponding
223-
/// to the last commit date in this segment
224-
pub created_at: DateTime<Utc>,
225227
}
226228

227229
/// special zip function for Either storage implementation
@@ -506,6 +508,7 @@ impl<IO: Io> Storage for TestStorage<IO> {
506508
let key = SegmentKey {
507509
start_frame_no: seg.header().start_frame_no.get(),
508510
end_frame_no,
511+
timestamp: seg.header().sealed_at_timestamp.get(),
509512
};
510513
let index = Map::new(index.into()).unwrap();
511514
inner

0 commit comments

Comments
 (0)