Skip to content

Commit cd2696c

Browse files
committed
implement find segment by timestamp
1 parent 1ec3aa0 commit cd2696c

1 file changed

Lines changed: 122 additions & 15 deletions

File tree

  • libsql-wal/src/storage/backend

libsql-wal/src/storage/backend/s3.rs

Lines changed: 122 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use aws_config::SdkConfig;
1212
use aws_sdk_s3::operation::create_bucket::CreateBucketError;
1313
use aws_sdk_s3::operation::get_object::GetObjectOutput;
1414
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
15-
use aws_sdk_s3::types::CreateBucketConfiguration;
15+
use aws_sdk_s3::types::{CreateBucketConfiguration, Object};
1616
use aws_sdk_s3::Client;
1717
use bytes::{Bytes, BytesMut};
1818
use chrono::{DateTime, Utc};
@@ -181,7 +181,11 @@ impl<IO: Io> S3Backend<IO> {
181181
segment_key: &SegmentKey,
182182
) -> Result<fst::Map<Arc<[u8]>>> {
183183
let s3_index_key = s3_segment_index_key(folder_key, segment_key);
184-
let mut stream = self.s3_get(config, s3_index_key).await?.body.into_async_read();
184+
let mut stream = self
185+
.s3_get(config, s3_index_key)
186+
.await?
187+
.body
188+
.into_async_read();
185189
let mut header: SegmentIndexHeader = SegmentIndexHeader::new_zeroed();
186190
stream.read_exact(header.as_bytes_mut()).await?;
187191
if header.magic.get() != LIBSQL_MAGIC && header.version.get() != 1 {
@@ -229,9 +233,99 @@ impl<IO: Io> S3Backend<IO> {
229233
Ok(key)
230234
}
231235

232-
// #[tracing::instrument(skip(self, config, folder_key))]
233-
async fn find_segment_by_timestamp(&self, _config: &S3Config, _folder_key: &FolderKey<'_>, _timestamp: DateTime<Utc>) -> Result<Option<SegmentKey>> {
234-
todo!()
236+
/// We are kinda bruteforcing out way into finding a segment that fits the bill, this can very
237+
/// probably be optimized
238+
#[tracing::instrument(skip(self, config, folder_key))]
239+
async fn find_segment_by_timestamp(
240+
&self,
241+
config: &S3Config,
242+
folder_key: &FolderKey<'_>,
243+
timestamp: DateTime<Utc>,
244+
) -> Result<Option<SegmentKey>> {
245+
let object_to_key = |o: &Object| {
246+
let key_path = o.key().unwrap();
247+
SegmentKey::validate_from_path(key_path.as_ref(), &folder_key.namespace)
248+
};
249+
250+
let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key);
251+
252+
let mut continuation_token = None;
253+
loop {
254+
let objects = self
255+
.client
256+
.list_objects_v2()
257+
.set_continuation_token(continuation_token.take())
258+
.bucket(&config.bucket)
259+
.prefix(lookup_key_prefix.to_string())
260+
.send()
261+
.await
262+
.map_err(|e| Error::unhandled(e, "failed to list bucket"))?;
263+
264+
// there is noting to restore
265+
if objects.contents().is_empty() {
266+
return Ok(None);
267+
}
268+
269+
let ts = timestamp.timestamp_millis() as u64;
270+
let search_result =
271+
objects
272+
.contents()
273+
.binary_search_by_key(&std::cmp::Reverse(ts), |o| {
274+
let key = object_to_key(o).unwrap();
275+
std::cmp::Reverse(key.timestamp)
276+
});
277+
278+
match search_result {
279+
Ok(i) => {
280+
let key = object_to_key(&objects.contents()[i]).unwrap();
281+
tracing::trace!("found perfect match for `{timestamp}`: {key}");
282+
return Ok(Some(key));
283+
}
284+
Err(i) if i == 0 => {
285+
// this is caught by the first iteration of the loop, anything that's more
286+
// recent than the most recent should be interpret as most recent
287+
let key = object_to_key(&objects.contents()[i]).unwrap();
288+
tracing::trace!("best match for `{timestamp}` is most recent segment: {key}");
289+
return Ok(Some(key));
290+
}
291+
Err(i) if i == objects.contents().len() => {
292+
// there are two scenarios. Either there are more pages with the request, and
293+
// we fetch older entries, or there aren't. If there are older segment, search
294+
// in those, otherwise, just take the oldest segment and return that
295+
if objects.continuation_token().is_some() {
296+
// nothing to do; fetch next page
297+
} else {
298+
let key = object_to_key(&objects.contents().last().unwrap()).unwrap();
299+
return Ok(Some(key));
300+
}
301+
}
302+
// This is the index where timestamp would be inserted, we look left and right of that
303+
// key and pick the closest one.
304+
Err(i) => {
305+
// i - 1 is well defined since we already catch the case where i == 0 above
306+
let left_key = object_to_key(&objects.contents()[i - 1]).unwrap();
307+
let right_key = object_to_key(&objects.contents()[i]).unwrap();
308+
let time_to_left = left_key.timestamp().signed_duration_since(timestamp).abs();
309+
let time_to_right =
310+
right_key.timestamp().signed_duration_since(timestamp).abs();
311+
312+
if time_to_left < time_to_right {
313+
return Ok(Some(left_key));
314+
} else {
315+
return Ok(Some(right_key));
316+
}
317+
}
318+
}
319+
320+
match objects.continuation_token {
321+
Some(token) => {
322+
continuation_token = Some(token);
323+
}
324+
None => {
325+
unreachable!("the absence of continuation token should be dealt with earlier");
326+
}
327+
}
328+
}
235329
}
236330

237331
// This method could probably be optimized a lot by using indexes and only downloading useful
@@ -389,7 +483,10 @@ impl fmt::Display for SegmentDataKey<'_> {
389483
}
390484
}
391485

392-
fn s3_segment_data_key<'a>(folder_key: &'a FolderKey, segment_key: &'a SegmentKey) -> SegmentDataKey<'a> {
486+
fn s3_segment_data_key<'a>(
487+
folder_key: &'a FolderKey,
488+
segment_key: &'a SegmentKey,
489+
) -> SegmentDataKey<'a> {
393490
SegmentDataKey(folder_key, segment_key)
394491
}
395492

@@ -401,7 +498,10 @@ impl fmt::Display for SegmentIndexKey<'_> {
401498
}
402499
}
403500

404-
fn s3_segment_index_key<'a>(folder_key: &'a FolderKey, segment_key: &'a SegmentKey) -> SegmentIndexKey<'a> {
501+
fn s3_segment_index_key<'a>(
502+
folder_key: &'a FolderKey,
503+
segment_key: &'a SegmentKey,
504+
) -> SegmentIndexKey<'a> {
405505
SegmentIndexKey(folder_key, segment_key)
406506
}
407507

@@ -413,7 +513,9 @@ impl fmt::Display for SegmentIndexLookupKeyPrefix<'_> {
413513
}
414514
}
415515

416-
fn s3_segment_index_lookup_key_prefix<'a>(folder_key: &'a FolderKey) -> SegmentIndexLookupKeyPrefix<'a> {
516+
fn s3_segment_index_lookup_key_prefix<'a>(
517+
folder_key: &'a FolderKey,
518+
) -> SegmentIndexLookupKeyPrefix<'a> {
417519
SegmentIndexLookupKeyPrefix(folder_key)
418520
}
419521

@@ -425,7 +527,10 @@ impl fmt::Display for SegmentIndexLookupKey<'_> {
425527
}
426528
}
427529

428-
fn s3_segment_index_lookup_key<'a>(folder_key: &'a FolderKey, frame_no: u64) -> SegmentIndexLookupKey<'a> {
530+
fn s3_segment_index_lookup_key<'a>(
531+
folder_key: &'a FolderKey,
532+
frame_no: u64,
533+
) -> SegmentIndexLookupKey<'a> {
429534
SegmentIndexLookupKey(folder_key, frame_no)
430535
}
431536

@@ -553,12 +658,14 @@ where
553658
};
554659

555660
match req {
556-
FindSegmentReq::Frame(frame_no) => {
557-
self.find_segment_by_frame_no(config, &folder_key, frame_no)
558-
.await?
559-
.ok_or_else(|| Error::FrameNotFound(frame_no))
560-
},
561-
FindSegmentReq::Timestamp(_) => todo!(),
661+
FindSegmentReq::Frame(frame_no) => self
662+
.find_segment_by_frame_no(config, &folder_key, frame_no)
663+
.await?
664+
.ok_or_else(|| Error::FrameNotFound(frame_no)),
665+
FindSegmentReq::Timestamp(ts) => self
666+
.find_segment_by_timestamp(config, &folder_key, ts)
667+
.await?
668+
.ok_or_else(|| Error::SegmentNotFoundTimestamp(ts)),
562669
}
563670
}
564671

0 commit comments

Comments
 (0)