Skip to content

Commit 9afc7ca

Browse files
committed
return CompactedSegmentHeader when fetching compacted segment
1 parent d9f7a46 commit 9afc7ca

3 files changed

Lines changed: 17 additions & 9 deletions

File tree

libsql-wal/src/storage/backend/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use uuid::Uuid;
88

99
use super::{RestoreOptions, Result, SegmentKey};
1010
use crate::io::file::FileExt;
11+
use crate::segment::compacted::CompactedSegmentDataHeader;
1112
use libsql_sys::name::NamespaceName;
1213

1314
// pub mod fs;
@@ -63,7 +64,7 @@ pub trait Backend: Send + Sync + 'static {
6364
namespace: &NamespaceName,
6465
key: &SegmentKey,
6566
file: &impl FileExt,
66-
) -> Result<()>;
67+
) -> Result<CompactedSegmentDataHeader>;
6768

6869
// this method taking self: Arc<Self> is an infortunate consequence of rust type system making
6970
// impl FileExt variant with all the arguments, with no escape hatch...
@@ -176,7 +177,7 @@ impl<T: Backend> Backend for Arc<T> {
176177
namespace: &NamespaceName,
177178
key: &SegmentKey,
178179
file: &impl FileExt,
179-
) -> Result<()> {
180+
) -> Result<CompactedSegmentDataHeader> {
180181
self.as_ref()
181182
.fetch_segment_data_to_file(config, namespace, key, file)
182183
.await

libsql-wal/src/storage/backend/s3.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use bytes::{Bytes, BytesMut};
1717
use http_body::{Frame as HttpFrame, SizeHint};
1818
use libsql_sys::name::NamespaceName;
1919
use roaring::RoaringBitmap;
20-
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
20+
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
2121
use tokio_util::sync::ReusableBoxFuture;
2222
use zerocopy::byteorder::little_endian::{U16 as lu16, U32 as lu32, U64 as lu64};
2323
use zerocopy::{AsBytes, FromBytes, FromZeroes};
@@ -135,12 +135,17 @@ impl<IO: Io> S3Backend<IO> {
135135
folder_key: &FolderKey<'_>,
136136
segment_key: &SegmentKey,
137137
file: &impl FileExt,
138-
) -> Result<()> {
138+
) -> Result<CompactedSegmentDataHeader> {
139139
let reader = self
140140
.fetch_segment_data_reader(config, folder_key, segment_key)
141141
.await?;
142+
let mut reader = tokio::io::BufReader::with_capacity(8196, reader);
143+
while reader.fill_buf().await?.len() < size_of::<CompactedSegmentDataHeader>() {}
144+
let header = CompactedSegmentDataHeader::read_from_prefix(reader.buffer()).unwrap();
145+
142146
copy_to_file(reader, file).await?;
143-
Ok(())
147+
148+
Ok(header)
144149
}
145150

146151
async fn s3_get(&self, config: &S3Config, key: String) -> Result<ByteStream> {
@@ -487,14 +492,15 @@ where
487492
namespace: &NamespaceName,
488493
key: &SegmentKey,
489494
file: &impl FileExt,
490-
) -> Result<()> {
495+
) -> Result<CompactedSegmentDataHeader> {
491496
let folder_key = FolderKey {
492497
cluster_id: &config.cluster_id,
493498
namespace: &namespace,
494499
};
495-
self.fetch_segment_data_inner(config, &folder_key, key, file)
500+
let header = self
501+
.fetch_segment_data_inner(config, &folder_key, key, file)
496502
.await?;
497-
Ok(())
503+
Ok(header)
498504
}
499505

500506
async fn fetch_segment_data(

libsql-wal/src/storage/job.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ mod test {
116116

117117
use crate::io::file::FileExt;
118118
use crate::io::StdIO;
119+
use crate::segment::compacted::CompactedSegmentDataHeader;
119120
use crate::storage::{RestoreOptions, SegmentKey};
120121
// use crate::registry::WalRegistry;
121122
// use crate::segment::compacted::CompactedSegmentDataHeader;
@@ -497,7 +498,7 @@ mod test {
497498
_namespace: &NamespaceName,
498499
_key: &SegmentKey,
499500
_file: &impl FileExt,
500-
) -> Result<()> {
501+
) -> Result<CompactedSegmentDataHeader> {
501502
todo!()
502503
}
503504

0 commit comments

Comments
 (0)