Skip to content

Commit a7c4533

Browse files
committed
allow opening unexisting shared wal
1 parent 8ffd3f7 commit a7c4533

4 files changed

Lines changed: 86 additions & 30 deletions

File tree

libsql-wal/src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ pub enum Error {
1919
InvalidPageSize,
2020
#[error("Registry is shutting down")]
2121
ShuttingDown,
22+
23+
#[error("invalid db footer magic")]
24+
InvalidFooterMagic,
25+
#[error("invalid db footer version")]
26+
InvalidFooterVersion,
2227
}
2328

2429
impl Into<libsql_sys::ffi::Error> for Error {

libsql-wal/src/io/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,11 @@ impl Io for StdIO {
6464
path: &Path,
6565
) -> io::Result<Self::File> {
6666
std::fs::OpenOptions::new()
67-
.create_new(create_new)
68-
.read(read)
69-
.write(write)
70-
.open(path)
67+
.create_new(create_new)
68+
.create(write)
69+
.read(read)
70+
.write(write)
71+
.open(path)
7172
}
7273

7374
fn tempfile(&self) -> io::Result<Self::TempFile> {

libsql-wal/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,18 @@ impl LibsqlFooter {
3838
pub fn log_id(&self) -> Uuid {
3939
Uuid::from_u128(self.log_id.get())
4040
}
41+
42+
fn validate(&self) -> error::Result<()> {
43+
if self.magic.get() != LIBSQL_MAGIC {
44+
return Err(error::Error::InvalidFooterMagic)
45+
}
46+
47+
if self.version.get() != LIBSQL_WAL_VERSION {
48+
return Err(error::Error::InvalidFooterVersion)
49+
}
50+
51+
Ok(())
52+
}
4153
}
4254

4355
#[cfg(any(debug_assertions, test))]

libsql-wal/src/registry.rs

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::io::file::FileExt;
1919
use crate::io::{Io, StdIO};
2020
use crate::replication::storage::StorageReplicator;
2121
use crate::segment::list::SegmentList;
22+
use crate::segment::Segment;
2223
use crate::segment::{current::CurrentSegment, sealed::SealedSegment};
2324
use crate::shared_wal::{SharedWal, SwapLog};
2425
use crate::storage::{OnStoreCallback, Storage};
@@ -271,47 +272,70 @@ where
271272
}
272273

273274
let db_file = self.io.open(false, true, true, db_path)?;
275+
let db_file_len = db_file.len()?;
276+
let header = if db_file_len > 0 {
277+
let mut header: Sqlite3DbHeader = Sqlite3DbHeader::new_zeroed();
278+
db_file.read_exact_at(header.as_bytes_mut(), 0)?;
279+
Some(header)
280+
} else {
281+
None
282+
};
274283

275-
let mut header: Sqlite3DbHeader = Sqlite3DbHeader::new_zeroed();
276-
db_file.read_exact_at(header.as_bytes_mut(), 0)?;
284+
let footer = self.try_read_footer(&db_file)?;
277285

278-
let log_id = if db_file.len()? <= LIBSQL_PAGE_SIZE as u64 && tail.is_empty() {
279-
// this is a new database
280-
self.io.uuid()
281-
} else if let Some(log_id) = tail.with_head(|h| h.header().log_id.get()) {
282-
// there is a segment list, read the logid from there.
283-
let log_id = Uuid::from_u128(log_id);
284-
#[cfg(debug_assertions)]
285-
{
286-
// if the main db file has footer, then the logid must match that of the segment
287-
if let Ok(db_log_id) =
288-
read_log_id_from_footer(&db_file, header.db_size.get() as u64)
289-
{
290-
assert_eq!(db_log_id, log_id);
291-
}
286+
let log_id = match footer {
287+
Some(footer) if tail.is_empty() => {
288+
footer.log_id()
289+
}
290+
None if tail.is_empty() => {
291+
self.io.uuid()
292+
}
293+
Some(footer) => {
294+
let log_id = tail.with_head(|h| h.header().log_id.get()).expect("non-empty list should have a head");
295+
let log_id = Uuid::from_u128(log_id);
296+
assert_eq!(log_id, footer.log_id());
297+
log_id
298+
}
299+
None => {
300+
let log_id = tail.with_head(|h| h.header().log_id.get()).expect("non-empty list should have a head");
301+
Uuid::from_u128(log_id)
292302
}
293-
294-
log_id
295-
} else {
296-
read_log_id_from_footer(&db_file, header.db_size.get() as u64)?
297303
};
298304

299305
let (db_size, next_frame_no) = tail
300306
.with_head(|segment| {
301307
let header = segment.header();
302308
(header.size_after(), header.next_frame_no())
303309
})
304-
.unwrap_or((
305-
header.db_size.get(),
306-
NonZeroU64::new(header.replication_index.get() + 1)
307-
.unwrap_or(NonZeroU64::new(1).unwrap()),
308-
));
310+
.unwrap_or_else(|| {
311+
match header {
312+
Some(header) => (
313+
header.db_size.get(),
314+
NonZeroU64::new(header.replication_index.get() + 1)
315+
.unwrap_or(NonZeroU64::new(1).unwrap()),
316+
),
317+
None => (0, NonZeroU64::new(1).unwrap())
318+
}
319+
});
309320

310321
let current_path = path.join(format!("{namespace}:{next_frame_no:020}.seg"));
311322

312323
let segment_file = self.io.open(true, true, true, &current_path)?;
313324
let salt = self.io.with_rng(|rng| rng.gen());
314325

326+
let checkpointed_frame_no = match tail.last() {
327+
// if there is a tail, then the latest checkpointed frame_no is one before the the
328+
// start frame_no of the tail. We must read it from the tail, because a partial
329+
// checkpoint may have occured before a crash.
330+
Some(last) => {
331+
(last.start_frame_no() - 1).max(1)
332+
}
333+
// otherwise, we read the it from the footer.
334+
None => {
335+
footer.map(|f| f.replication_index.get()).unwrap_or(0)
336+
}
337+
};
338+
315339
let current = arc_swap::ArcSwap::new(Arc::new(CurrentSegment::create(
316340
segment_file,
317341
current_path,
@@ -330,7 +354,7 @@ where
330354
db_file,
331355
registry: self.clone(),
332356
namespace: namespace.clone(),
333-
checkpointed_frame_no: header.replication_index.get().into(),
357+
checkpointed_frame_no: checkpointed_frame_no.into(),
334358
new_frame_notifier,
335359
durable_frame_no,
336360
stored_segments: Box::new(StorageReplicator::new(
@@ -349,6 +373,20 @@ where
349373
return Ok(shared);
350374
}
351375

376+
fn try_read_footer(&self, db_file: &impl FileExt) -> Result<Option<LibsqlFooter>>{
377+
let len = db_file.len()?;
378+
if len as usize % LIBSQL_PAGE_SIZE as usize == size_of::<LibsqlFooter>() {
379+
let mut footer: LibsqlFooter = LibsqlFooter::new_zeroed();
380+
let footer_offset = LIBSQL_PAGE_SIZE as u64 * len;
381+
db_file.read_exact_at(footer.as_bytes_mut(), footer_offset)?;
382+
footer.validate()?;
383+
Ok(Some(footer))
384+
} else {
385+
Ok(None)
386+
}
387+
}
388+
389+
352390
// On shutdown, we checkpoint all the WALs. This require sealing the current segment, and when
353391
// checkpointing all the segments
354392
pub async fn shutdown(self: Arc<Self>) -> Result<()> {

0 commit comments

Comments
 (0)