Skip to content

Commit 956c6eb

Browse files
committed
add find_segment_by_timestamp to s3 backend
1 parent 0c54654 commit 956c6eb

1 file changed

Lines changed: 68 additions & 0 deletions

File tree

  • libsql-wal/src/storage/backend

libsql-wal/src/storage/backend/s3.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,74 @@ impl<IO: Io> S3Backend<IO> {
229229
Ok(key)
230230
}
231231

232+
/// finding a segment by timestamp should not be a common operation, and it's costly. We will
233+
/// perform a binary search until we find a satisfying segment. This forces us to iteratively:
234+
/// - find a segment for some frame_no
235+
/// - fetch that segment index to retrieve the metata (we don't need to download the data, just
236+
/// the metata is of use to us)
237+
/// - check the timestamp, rince and repeat
238+
///
239+
/// unfortunately, s3 doesn't have a way to list object with their metadata...
240+
#[tracing::instrument(skip(self, config, folder_key))]
241+
async fn find_segment_by_timestamp(&self, config: &S3Config, folder_key: &FolderKey<'_>, timestamp: DateTime<Utc>) -> Result<Option<SegmentKey>> {
242+
let mut attempted_frame_no = u64::MAX;
243+
let mut max_seen = 0;
244+
let mut best_so_far = None;
245+
loop {
246+
let Some(seg_key) = self.find_segment_by_frame_no(config, folder_key, attempted_frame_no).await? else { return Ok(None) };
247+
max_seen = max_seen.max(seg_key.end_frame_no);
248+
let key = s3_segment_index_key(folder_key, &seg_key);
249+
let object = self.s3_get(config, &key).await?;
250+
let Some(segment_timestamp_rfc3339) = object.metadata().and_then(|m| m.get(SEGMENT_TIMESTAMP_META_KEY)) else {
251+
tracing::warn!("{key} is missing metadata. Cannot be used as a restore point!");
252+
todo!("update next attempt frame_no");
253+
// continue
254+
};
255+
256+
let segment_timestamp = match DateTime::parse_from_rfc3339(segment_timestamp_rfc3339) {
257+
Ok(s) => s.to_utc(),
258+
Err(e) => {
259+
tracing::warn!("timestamp for `{key}` is invalid, skipping restore point: {e}");
260+
todo!("update next attempt frame_no");
261+
// continue
262+
}
263+
};
264+
265+
if segment_timestamp >= timestamp {
266+
if segment_timestamp == timestamp {
267+
// look no further!
268+
tracing::debug!("found exact match for timestamp: {seg_key}");
269+
return Ok(Some(seg_key))
270+
}
271+
best_so_far = match best_so_far {
272+
Some((best, key)) => {
273+
assert!(best >= timestamp);
274+
if best - timestamp <= segment_timestamp - timestamp {
275+
Some((best, key))
276+
} else {
277+
Some((segment_timestamp, seg_key))
278+
}
279+
}
280+
None => Some((segment_timestamp, seg_key)),
281+
};
282+
283+
let next_attempt = attempted_frame_no + ((max_seen - attempted_frame_no) / 2);
284+
if next_attempt == attempted_frame_no {
285+
let (ts, key) = best_so_far.unwrap();
286+
tracing::debug!("found best match for {timestamp}: {key}@{ts}");
287+
return Ok(Some(key))
288+
}
289+
290+
attempted_frame_no = next_attempt;
291+
} else {
292+
let next_attempt = attempted_frame_no - (attempted_frame_no / 2);
293+
if next_attempt <= 1 {
294+
return Ok(best_so_far.map(|x| x.1))
295+
}
296+
}
297+
}
298+
}
299+
232300
// This method could probably be optimized a lot by using indexes and only downloading useful
233301
// segments
234302
async fn restore_latest(

0 commit comments

Comments
 (0)