Skip to content

Commit 4c0cfda

Browse files
authored
Merge pull request #1737 from tursodatabase/fix-s3-keyspace
libsql-wal: fix SegmentKey
2 parents 6e0daad + c5359a6 commit 4c0cfda

File tree

8 files changed

+37
-116
lines changed

8 files changed

+37
-116
lines changed

libsql-wal/src/replication/storage.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use zerocopy::FromZeroes;
99

1010
use crate::io::buf::ZeroCopyBoxIoBuf;
1111
use crate::segment::Frame;
12+
use crate::storage::backend::FindSegmentReq;
1213
use crate::storage::Storage;
1314

1415
use super::Result;
@@ -45,7 +46,7 @@ where
4546
) -> Pin<Box<dyn Stream<Item = Result<Box<Frame>>> + Send + 'a>> {
4647
Box::pin(async_stream::try_stream! {
4748
loop {
48-
let key = self.storage.find_segment(&self.namespace, current, None).await?;
49+
let key = self.storage.find_segment(&self.namespace, FindSegmentReq::EndFrameNoLessThan(current), None).await?;
4950
let index = self.storage.fetch_segment_index(&self.namespace, &key, None).await?;
5051
let mut pages = index.into_stream();
5152
let mut maybe_seg = None;

libsql-wal/src/storage/async_storage.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::io::{FileExt, Io, StdIO};
1313
use crate::segment::compacted::CompactedSegment;
1414
use crate::segment::Segment;
1515

16-
use super::backend::Backend;
16+
use super::backend::{Backend, FindSegmentReq};
1717
use super::scheduler::Scheduler;
1818
use super::{OnStoreCallback, RestoreOptions, Storage, StoreSegmentRequest};
1919

@@ -228,18 +228,11 @@ where
228228
async fn find_segment(
229229
&self,
230230
namespace: &NamespaceName,
231-
frame_no: u64,
231+
req: FindSegmentReq,
232232
config_override: Option<Self::Config>,
233233
) -> super::Result<super::SegmentKey> {
234234
let config = config_override.unwrap_or_else(|| self.backend.default_config());
235-
let key = self
236-
.backend
237-
.find_segment(
238-
&config,
239-
namespace,
240-
super::backend::FindSegmentReq::Frame(frame_no),
241-
)
242-
.await?;
235+
let key = self.backend.find_segment(&config, namespace, req).await?;
243236
Ok(key)
244237
}
245238

libsql-wal/src/storage/backend/mod.rs

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![allow(dead_code)]
2+
use std::future::Future;
23
use std::sync::Arc;
3-
use std::{future::Future, path::Path};
44

55
use chrono::{DateTime, Utc};
66
use fst::Map;
@@ -31,9 +31,10 @@ pub struct DbMeta {
3131
pub max_frame_no: u64,
3232
}
3333

34+
#[derive(Debug, Clone, Copy)]
3435
pub enum FindSegmentReq {
3536
/// returns a segment containing this frame
36-
Frame(u64),
37+
EndFrameNoLessThan(u64),
3738
/// Returns the segment with closest timestamp less than or equal to the requested timestamp
3839
Timestamp(DateTime<Utc>),
3940
}
@@ -83,15 +84,6 @@ pub trait Backend: Send + Sync + 'static {
8384
key: SegmentKey,
8485
) -> impl Future<Output = Result<impl FileExt>> + Send;
8586

86-
// /// Fetch a segment for `namespace` containing `frame_no`, and writes it to `dest`.
87-
async fn fetch_segment(
88-
&self,
89-
config: &Self::Config,
90-
namespace: &NamespaceName,
91-
frame_no: u64,
92-
dest_path: &Path,
93-
) -> Result<Map<Arc<[u8]>>>;
94-
9587
/// Fetch meta for `namespace`
9688
fn meta(
9789
&self,
@@ -132,18 +124,6 @@ impl<T: Backend> Backend for Arc<T> {
132124
.store(config, meta, segment_data, segment_index)
133125
}
134126

135-
async fn fetch_segment(
136-
&self,
137-
config: &Self::Config,
138-
namespace: &NamespaceName,
139-
frame_no: u64,
140-
dest_path: &Path,
141-
) -> Result<fst::Map<Arc<[u8]>>> {
142-
self.as_ref()
143-
.fetch_segment(config, namespace, frame_no, dest_path)
144-
.await
145-
}
146-
147127
async fn meta(&self, config: &Self::Config, namespace: &NamespaceName) -> Result<DbMeta> {
148128
self.as_ref().meta(config, namespace).await
149129
}

libsql-wal/src/storage/backend/s3.rs

Lines changed: 12 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ pub struct S3Backend<IO> {
3939
default_config: Arc<S3Config>,
4040
io: IO,
4141
}
42-
4342
impl S3Backend<StdIO> {
4443
pub async fn from_sdk_config(
4544
aws_config: SdkConfig,
@@ -210,7 +209,7 @@ impl<IO: Io> S3Backend<IO> {
210209
frame_no: u64,
211210
) -> Result<Option<SegmentKey>> {
212211
let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key);
213-
let lookup_key = s3_segment_index_lookup_key(&folder_key, frame_no);
212+
let lookup_key = s3_segment_index_ends_before_lookup_key(&folder_key, frame_no);
214213

215214
let objects = self
216215
.client
@@ -527,7 +526,8 @@ impl fmt::Display for SegmentIndexLookupKey<'_> {
527526
}
528527
}
529528

530-
fn s3_segment_index_lookup_key<'a>(
529+
/// return the biggest segment whose end frame number is less than frame_no
530+
fn s3_segment_index_ends_before_lookup_key<'a>(
531531
folder_key: &'a FolderKey,
532532
frame_no: u64,
533533
) -> SegmentIndexLookupKey<'a> {
@@ -580,35 +580,6 @@ where
580580
Ok(())
581581
}
582582

583-
async fn fetch_segment(
584-
&self,
585-
config: &Self::Config,
586-
namespace: &NamespaceName,
587-
frame_no: u64,
588-
dest_path: &Path,
589-
) -> Result<fst::Map<Arc<[u8]>>> {
590-
let folder_key = FolderKey {
591-
cluster_id: &config.cluster_id,
592-
namespace: &namespace,
593-
};
594-
595-
let Some(segment_key) = self
596-
.find_segment_by_frame_no(config, &folder_key, frame_no)
597-
.await?
598-
else {
599-
return Err(Error::FrameNotFound(frame_no));
600-
};
601-
602-
if segment_key.includes(frame_no) {
603-
// TODO: make open async
604-
let file = self.io.open(false, false, true, dest_path)?;
605-
self.fetch_segment_from_key(config, &folder_key, &segment_key, &file)
606-
.await
607-
} else {
608-
return Err(Error::FrameNotFound(frame_no));
609-
}
610-
}
611-
612583
async fn meta(
613584
&self,
614585
config: &Self::Config,
@@ -658,14 +629,14 @@ where
658629
};
659630

660631
match req {
661-
FindSegmentReq::Frame(frame_no) => self
632+
FindSegmentReq::EndFrameNoLessThan(frame_no) => self
662633
.find_segment_by_frame_no(config, &folder_key, frame_no)
663634
.await?
664-
.ok_or_else(|| Error::FrameNotFound(frame_no)),
635+
.ok_or_else(|| Error::SegmentNotFound(req)),
665636
FindSegmentReq::Timestamp(ts) => self
666637
.find_segment_by_timestamp(config, &folder_key, ts)
667638
.await?
668-
.ok_or_else(|| Error::SegmentNotFoundTimestamp(ts)),
639+
.ok_or_else(|| Error::SegmentNotFound(req)),
669640
}
670641
}
671642

@@ -831,7 +802,6 @@ mod tests {
831802
use fst::MapBuilder;
832803
use s3s::auth::SimpleAuth;
833804
use s3s::service::{S3ServiceBuilder, SharedS3Service};
834-
use tempfile::NamedTempFile;
835805
use uuid::Uuid;
836806

837807
use crate::io::StdIO;
@@ -901,7 +871,7 @@ mod tests {
901871
SegmentMeta {
902872
namespace: ns.clone(),
903873
segment_id: Uuid::new_v4(),
904-
start_frame_no: 0u64.into(),
874+
start_frame_no: 1u64.into(),
905875
end_frame_no: 64u64.into(),
906876
segment_timestamp: Utc::now(),
907877
},
@@ -936,30 +906,17 @@ mod tests {
936906
let db_meta = storage.meta(&s3_config, &ns).await.unwrap();
937907
assert_eq!(db_meta.max_frame_no, 128);
938908

939-
let tmp = NamedTempFile::new().unwrap();
940-
941-
let index = storage
942-
.fetch_segment(&s3_config, &ns, 1, tmp.path())
909+
let key = storage
910+
.find_segment(&s3_config, &ns, FindSegmentReq::EndFrameNoLessThan(65))
943911
.await
944912
.unwrap();
945-
assert_eq!(index.get(42u32.to_be_bytes()).unwrap(), 42);
913+
assert_eq!(key.start_frame_no, 1);
914+
assert_eq!(key.end_frame_no, 64);
946915

947916
let index = storage
948-
.fetch_segment(&s3_config, &ns, 63, tmp.path())
917+
.fetch_segment_index(&s3_config, &ns, &key)
949918
.await
950919
.unwrap();
951920
assert_eq!(index.get(42u32.to_be_bytes()).unwrap(), 42);
952-
953-
let index = storage
954-
.fetch_segment(&s3_config, &ns, 64, tmp.path())
955-
.await
956-
.unwrap();
957-
assert_eq!(index.get(44u32.to_be_bytes()).unwrap(), 44);
958-
959-
let index = storage
960-
.fetch_segment(&s3_config, &ns, 65, tmp.path())
961-
.await
962-
.unwrap();
963-
assert_eq!(index.get(44u32.to_be_bytes()).unwrap(), 44);
964921
}
965922
}

libsql-wal/src/storage/compaction/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,6 @@ impl AnalyzedSegments {
392392
end_frame_no,
393393
timestamp,
394394
};
395-
dbg!(&key);
396395
segments.push(key);
397396
}
398397
}

libsql-wal/src/storage/error.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::panic::Location;
22

3-
use chrono::{DateTime, Utc};
3+
use super::backend::FindSegmentReq;
44

55
#[derive(thiserror::Error, Debug)]
66
pub enum Error {
@@ -10,10 +10,8 @@ pub enum Error {
1010
Store(String),
1111
#[error("error compacting segment: {0}")]
1212
Compact(#[from] crate::error::Error),
13-
#[error("frame not {0} found")]
14-
FrameNotFound(u64),
15-
#[error("No satisfying segment found for timestamp {0}")]
16-
SegmentNotFoundTimestamp(DateTime<Utc>),
13+
#[error("segment not found for request {0:?}")]
14+
SegmentNotFound(FindSegmentReq),
1715
#[error("unhandled storage error: {error}, in {context}")]
1816
UnhandledStorageError {
1917
error: Box<dyn std::error::Error + Send + Sync + 'static>,

libsql-wal/src/storage/job.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -266,16 +266,6 @@ mod test {
266266
Ok(std::fs::File::open("").unwrap())
267267
}
268268

269-
async fn fetch_segment(
270-
&self,
271-
_config: &Self::Config,
272-
_namespace: &NamespaceName,
273-
_frame_no: u64,
274-
_dest_path: &std::path::Path,
275-
) -> Result<fst::Map<Arc<[u8]>>> {
276-
todo!()
277-
}
278-
279269
fn list_segments<'a>(
280270
&'a self,
281271
_config: Self::Config,

libsql-wal/src/storage/mod.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::io::{FileExt, Io, StdIO};
1818
use crate::segment::compacted::CompactedSegment;
1919
use crate::segment::{sealed::SealedSegment, Segment};
2020

21-
use self::backend::SegmentMeta;
21+
use self::backend::{FindSegmentReq, SegmentMeta};
2222
pub use self::error::Error;
2323

2424
pub mod async_storage;
@@ -142,10 +142,10 @@ impl FromStr for SegmentKey {
142142
type Err = ();
143143

144144
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
145-
let (rev_start_fno, s) = s.split_at(20);
146-
let start_frame_no = u64::MAX - rev_start_fno.parse::<u64>().map_err(|_| ())?;
147-
let (rev_end_fno, timestamp) = s[1..].split_at(20);
145+
let (rev_end_fno, s) = s.split_at(20);
148146
let end_frame_no = u64::MAX - rev_end_fno.parse::<u64>().map_err(|_| ())?;
147+
let (start_fno, timestamp) = s[1..].split_at(20);
148+
let start_frame_no = start_fno.parse::<u64>().map_err(|_| ())?;
149149
let timestamp = timestamp[1..].parse().map_err(|_| ())?;
150150
Ok(Self {
151151
start_frame_no,
@@ -160,8 +160,8 @@ impl fmt::Display for SegmentKey {
160160
write!(
161161
f,
162162
"{:020}-{:020}-{:020}",
163-
u64::MAX - self.start_frame_no,
164163
u64::MAX - self.end_frame_no,
164+
self.start_frame_no,
165165
self.timestamp,
166166
)
167167
}
@@ -207,7 +207,7 @@ pub trait Storage: Send + Sync + 'static {
207207
fn find_segment(
208208
&self,
209209
namespace: &NamespaceName,
210-
frame_no: u64,
210+
frame_no: FindSegmentReq,
211211
config_override: Option<Self::Config>,
212212
) -> impl Future<Output = Result<SegmentKey>> + Send;
213213

@@ -306,7 +306,7 @@ where
306306
fn find_segment(
307307
&self,
308308
namespace: &NamespaceName,
309-
frame_no: u64,
309+
frame_no: FindSegmentReq,
310310
config_override: Option<Self::Config>,
311311
) -> impl Future<Output = Result<SegmentKey>> + Send {
312312
async move {
@@ -415,7 +415,7 @@ impl Storage for NoStorage {
415415
async fn find_segment(
416416
&self,
417417
_namespace: &NamespaceName,
418-
_frame_no: u64,
418+
_frame_no: FindSegmentReq,
419419
_config_override: Option<Self::Config>,
420420
) -> Result<SegmentKey> {
421421
unimplemented!()
@@ -564,14 +564,17 @@ impl<IO: Io> Storage for TestStorage<IO> {
564564
async fn find_segment(
565565
&self,
566566
namespace: &NamespaceName,
567-
frame_no: u64,
567+
req: FindSegmentReq,
568568
_config_override: Option<Self::Config>,
569569
) -> Result<SegmentKey> {
570570
let inner = self.inner.lock().await;
571571
if inner.store {
572+
let FindSegmentReq::EndFrameNoLessThan(fno) = req else {
573+
panic!("unsupported lookup by ts")
574+
};
572575
if let Some(segs) = inner.stored.get(namespace) {
573-
let Some((key, _path)) = segs.iter().find(|(k, _)| k.includes(frame_no)) else {
574-
return Err(Error::FrameNotFound(frame_no));
576+
let Some((key, _path)) = segs.iter().find(|(k, _)| k.includes(fno)) else {
577+
return Err(Error::SegmentNotFound(req));
575578
};
576579
return Ok(*key);
577580
} else {

0 commit comments

Comments
 (0)