Skip to content

Commit a0a8e54

Browse files
committed
add ability to list segments
1 parent 9afc7ca commit a0a8e54

5 files changed

Lines changed: 151 additions & 4 deletions

File tree

libsql-wal/src/storage/async_storage.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use chrono::Utc;
77
use libsql_sys::name::NamespaceName;
88
use tokio::sync::{mpsc, oneshot};
99
use tokio::task::JoinSet;
10+
use tokio_stream::Stream;
1011

1112
use crate::io::{FileExt, Io, StdIO};
1213
use crate::segment::compacted::CompactedSegment;
@@ -267,6 +268,16 @@ where
267268
let segment = CompactedSegment::open(file).await?;
268269
Ok(segment)
269270
}
271+
272+
fn list_segments<'a>(
273+
&'a self,
274+
namespace: &'a NamespaceName,
275+
until: u64,
276+
config_override: Option<Self::Config>,
277+
) -> impl Stream<Item = super::Result<super::SegmentInfo>> + 'a {
278+
let config = config_override.unwrap_or_else(|| self.backend.default_config());
279+
self.backend.list_segments(config, namespace, until)
280+
}
270281
}
271282

272283
pub struct AsyncStorageInitConfig<B> {

libsql-wal/src/storage/backend/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ use std::{future::Future, path::Path};
44

55
use chrono::{DateTime, Utc};
66
use fst::Map;
7+
use tokio_stream::Stream;
78
use uuid::Uuid;
89

9-
use super::{RestoreOptions, Result, SegmentKey};
10+
use super::{RestoreOptions, Result, SegmentInfo, SegmentKey};
1011
use crate::io::file::FileExt;
1112
use crate::segment::compacted::CompactedSegmentDataHeader;
1213
use libsql_sys::name::NamespaceName;
@@ -99,6 +100,13 @@ pub trait Backend: Send + Sync + 'static {
99100
dest: impl FileExt,
100101
) -> Result<()>;
101102

103+
fn list_segments<'a>(
104+
&'a self,
105+
config: Self::Config,
106+
namespace: &'a NamespaceName,
107+
until: u64,
108+
) -> impl Stream<Item = Result<SegmentInfo>> + 'a;
109+
102110
/// Returns the default configuration for this storage
103111
fn default_config(&self) -> Self::Config;
104112
}
@@ -195,4 +203,13 @@ impl<T: Backend> Backend for Arc<T> {
195203
.fetch_segment_data(config, namespace, key)
196204
.await
197205
}
206+
207+
fn list_segments<'a>(
208+
&'a self,
209+
config: Self::Config,
210+
namespace: &'a NamespaceName,
211+
until: u64,
212+
) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
213+
self.as_ref().list_segments(config, namespace, until)
214+
}
198215
}

libsql-wal/src/storage/backend/s3.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@ use aws_sdk_s3::operation::create_bucket::CreateBucketError;
1313
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
1414
use aws_sdk_s3::types::CreateBucketConfiguration;
1515
use aws_sdk_s3::Client;
16+
use aws_smithy_types_convert::date_time::DateTimeExt;
1617
use bytes::{Bytes, BytesMut};
1718
use http_body::{Frame as HttpFrame, SizeHint};
1819
use libsql_sys::name::NamespaceName;
1920
use roaring::RoaringBitmap;
2021
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
22+
use tokio_stream::Stream;
2123
use tokio_util::sync::ReusableBoxFuture;
2224
use zerocopy::byteorder::little_endian::{U16 as lu16, U32 as lu32, U64 as lu64};
2325
use zerocopy::{AsBytes, FromBytes, FromZeroes};
@@ -28,7 +30,7 @@ use crate::io::compat::copy_to_file;
2830
use crate::io::{FileExt, Io, StdIO};
2931
use crate::segment::compacted::CompactedSegmentDataHeader;
3032
use crate::segment::Frame;
31-
use crate::storage::{Error, RestoreOptions, Result, SegmentKey};
33+
use crate::storage::{Error, RestoreOptions, Result, SegmentInfo, SegmentKey};
3234
use crate::LIBSQL_MAGIC;
3335

3436
pub struct S3Backend<IO> {
@@ -306,6 +308,52 @@ impl<IO: Io> S3Backend<IO> {
306308

307309
Ok(index)
308310
}
311+
312+
fn list_segments_inner<'a>(
313+
&'a self,
314+
config: Arc<S3Config>,
315+
namespace: &'a NamespaceName,
316+
_until: u64,
317+
) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
318+
async_stream::try_stream! {
319+
let folder_key = FolderKey { cluster_id: &config.cluster_id, namespace };
320+
let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key);
321+
322+
let mut continuation_token = None;
323+
loop {
324+
let objects = self
325+
.client
326+
.list_objects_v2()
327+
.bucket(&config.bucket)
328+
.prefix(lookup_key_prefix.clone())
329+
.set_continuation_token(continuation_token.take())
330+
.send()
331+
.await
332+
.map_err(|e| Error::unhandled(e, "failed to list bucket"))?;
333+
334+
for entry in objects.contents() {
335+
let key = entry.key().expect("misssing key?");
336+
let key_path: &Path = key.as_ref();
337+
let Some(key) = SegmentKey::validate_from_path(key_path, &folder_key.namespace) else { continue };
338+
339+
let infos = SegmentInfo {
340+
key,
341+
size: entry.size().unwrap_or(0) as usize,
342+
created_at: entry.last_modified().unwrap().to_chrono_utc().unwrap(),
343+
};
344+
345+
yield infos;
346+
}
347+
348+
if objects.is_truncated().unwrap_or(false) {
349+
assert!(objects.next_continuation_token.is_some());
350+
continuation_token = objects.next_continuation_token;
351+
} else {
352+
break
353+
}
354+
}
355+
}
356+
}
309357
}
310358

311359
pub struct S3Config {
@@ -514,6 +562,15 @@ where
514562
.await?;
515563
Ok(file)
516564
}
565+
566+
fn list_segments<'a>(
567+
&'a self,
568+
config: Self::Config,
569+
namespace: &'a NamespaceName,
570+
until: u64,
571+
) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
572+
self.list_segments_inner(config, namespace, until)
573+
}
517574
}
518575

519576
#[derive(Clone, Copy)]

libsql-wal/src/storage/job.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,16 @@ mod test {
520520
) -> Result<fst::Map<Arc<[u8]>>> {
521521
todo!()
522522
}
523+
524+
fn list_segments<'a>(
525+
&'a self,
526+
_config: Self::Config,
527+
_namespace: &'a NamespaceName,
528+
_until: u64,
529+
) -> impl tokio_stream::Stream<Item = Result<crate::storage::SegmentInfo>> + 'a
530+
{
531+
tokio_stream::iter(std::iter::from_fn(|| todo!()))
532+
}
523533
}
524534

525535
let job = Job {

libsql-wal/src/storage/mod.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
use std::collections::BTreeMap;
2-
use std::fmt;
3-
use std::future::Future;
42
use std::path::{Path, PathBuf};
53
use std::pin::Pin;
64
use std::str::FromStr;
75
use std::sync::Arc;
6+
use std::{fmt, future::Future};
87

98
use chrono::{DateTime, Utc};
109
use fst::Map;
1110
use hashbrown::HashMap;
1211
use libsql_sys::name::NamespaceName;
1312
use libsql_sys::wal::either::Either;
1413
use tempfile::{tempdir, TempDir};
14+
use tokio_stream::Stream;
1515

1616
use crate::io::{FileExt, Io, StdIO};
1717
use crate::segment::compacted::CompactedSegment;
@@ -205,6 +205,20 @@ pub trait Storage: Send + Sync + 'static {
205205
fn shutdown(&self) -> impl Future<Output = ()> + Send {
206206
async { () }
207207
}
208+
209+
fn list_segments<'a>(
210+
&'a self,
211+
namespace: &'a NamespaceName,
212+
until: u64,
213+
config_override: Option<Self::Config>,
214+
) -> impl Stream<Item = Result<SegmentInfo>> + 'a;
215+
}
216+
217+
#[derive(Debug)]
218+
pub struct SegmentInfo {
219+
pub key: SegmentKey,
220+
pub size: usize,
221+
pub created_at: DateTime<Utc>,
208222
}
209223

210224
/// special zip function for Either storage implementation
@@ -323,6 +337,22 @@ where
323337
Either::B(b) => b.shutdown().await,
324338
}
325339
}
340+
341+
fn list_segments<'a>(
342+
&'a self,
343+
namespace: &'a NamespaceName,
344+
until: u64,
345+
config_override: Option<Self::Config>,
346+
) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
347+
match zip(self, config_override) {
348+
Either::A((s, c)) => {
349+
tokio_util::either::Either::Left(s.list_segments(namespace, until, c))
350+
}
351+
Either::B((s, c)) => {
352+
tokio_util::either::Either::Right(s.list_segments(namespace, until, c))
353+
}
354+
}
355+
}
326356
}
327357

328358
/// a placeholder storage that doesn't store segment
@@ -388,6 +418,17 @@ impl Storage for NoStorage {
388418
#[allow(unreachable_code)]
389419
Result::<CompactedSegment<std::fs::File>>::Err(Error::InvalidIndex(""))
390420
}
421+
422+
fn list_segments<'a>(
423+
&'a self,
424+
_namespace: &'a NamespaceName,
425+
_until: u64,
426+
_config_override: Option<Self::Config>,
427+
) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
428+
unimplemented!("no storage!");
429+
#[allow(unreachable_code)]
430+
tokio_stream::empty()
431+
}
391432
}
392433

393434
#[doc(hidden)]
@@ -555,6 +596,17 @@ impl<IO: Io> Storage for TestStorage<IO> {
555596
panic!("not storing")
556597
}
557598
}
599+
600+
fn list_segments<'a>(
601+
&'a self,
602+
_namespace: &'a NamespaceName,
603+
_until: u64,
604+
_config_override: Option<Self::Config>,
605+
) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
606+
todo!();
607+
#[allow(unreachable_code)]
608+
tokio_stream::empty()
609+
}
558610
}
559611

560612
pub struct StoreSegmentRequest<S, C> {

0 commit comments

Comments
 (0)