Skip to content

Commit a636c9c

Browse files
authored
Merge pull request #1734 from tursodatabase/fix-replica-snapshot-bug
fix replica snapshot isolation bug
2 parents c1cf1f4 + d6214f1 commit a636c9c

3 files changed

Lines changed: 14 additions & 98 deletions

File tree

libsql-wal/src/segment/current.rs

Lines changed: 3 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ impl<F> CurrentSegment<F> {
372372
}
373373

374374
// not a write tx, or page is not in write tx, look into the segment
375-
self.index.locate(page_no, tx.max_frame_no)
375+
self.index.locate(page_no, tx.max_offset)
376376
}
377377

378378
/// reads the page conainted in frame at offset into buf
@@ -647,13 +647,13 @@ impl SegmentIndex {
647647
}
648648
}
649649

650-
fn locate(&self, page_no: u32, max_frame_no: u64) -> Option<u32> {
650+
fn locate(&self, page_no: u32, max_offset: u64) -> Option<u32> {
651651
let offsets = self.index.get(&page_no)?;
652652
let offsets = offsets.value().read();
653653
offsets
654654
.iter()
655655
.rev()
656-
.find(|fno| self.start_frame_no + **fno as u64 <= max_frame_no)
656+
.find(|fno| **fno as u64 <= max_offset)
657657
.copied()
658658
}
659659

@@ -675,42 +675,6 @@ impl SegmentIndex {
675675
Ok(())
676676
}
677677

678-
/// returns an iterator over (page_no, offset, frame_no), where the returned offset is the most
679-
/// recent version of the page contained in start_frame_no..end_frame_no
680-
/// This method assumes that the current segment is ordered.
681-
pub(crate) fn iter(
682-
&self,
683-
start_frame_no: u64,
684-
end_frame_no: u64,
685-
) -> impl Iterator<Item = (u32, u32, u64)> + '_ {
686-
// todo: assert segment is sorted
687-
let mut entry = self.index.front();
688-
let mut fused = false;
689-
let start_offset = (start_frame_no - self.start_frame_no) as u32;
690-
let end_offset = (end_frame_no - self.start_frame_no) as u32;
691-
std::iter::from_fn(move || loop {
692-
if fused {
693-
return None;
694-
}
695-
let entry = entry.as_mut()?;
696-
let ret = {
697-
let offsets = entry.value();
698-
let offsets = offsets.read();
699-
if offsets[0] > end_offset || *offsets.last().unwrap() < start_offset {
700-
drop(offsets);
701-
fused = !entry.move_next();
702-
continue;
703-
}
704-
let offset = *offsets.iter().rev().find(|x| **x <= end_offset).unwrap();
705-
Some((*entry.key(), offset, self.start_frame_no + offset as u64))
706-
};
707-
708-
fused = !entry.move_next();
709-
710-
return ret;
711-
})
712-
}
713-
714678
pub(crate) fn insert(&self, page_no: u32, offset: u32) {
715679
let entry = self.index.get_or_insert(page_no, Default::default());
716680
let mut offsets = entry.value().write();
@@ -737,40 +701,6 @@ mod test {
737701

738702
use super::*;
739703

740-
#[test]
741-
fn index_iter() {
742-
let index = SegmentIndex::new(42);
743-
index.insert(1, 0);
744-
index.insert(1, 3);
745-
index.insert(2, 1);
746-
index.insert(2, 2);
747-
index.insert(3, 5);
748-
index.insert(3, 15);
749-
let mut iter = index.iter(42, 50);
750-
assert_eq!(iter.next(), Some((1, 3, 42 + 3)));
751-
assert_eq!(iter.next(), Some((2, 2, 42 + 2)));
752-
assert_eq!(iter.next(), Some((3, 5, 42 + 5)));
753-
assert_eq!(iter.next(), None);
754-
755-
let mut iter = index.iter(42, 100);
756-
assert_eq!(iter.next(), Some((1, 3, 42 + 3)));
757-
assert_eq!(iter.next(), Some((2, 2, 42 + 2)));
758-
assert_eq!(iter.next(), Some((3, 15, 42 + 15)));
759-
assert_eq!(iter.next(), None);
760-
}
761-
762-
#[should_panic]
763-
#[test]
764-
fn index_iter_out_of_bounds() {
765-
let index = SegmentIndex::new(42);
766-
index.insert(1, 0);
767-
index.insert(1, 3);
768-
index.insert(2, 1);
769-
index.insert(2, 2);
770-
assert_eq!(index.iter(1, 41).count(), 0);
771-
assert_eq!(index.iter(43, 72).count(), 0);
772-
}
773-
774704
#[tokio::test]
775705
async fn current_stream_frames() {
776706
let env = TestEnv::new();

libsql-wal/src/shared_wal.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,13 @@ impl<IO: Io> SharedWal<IO> {
106106
// is not sealed. If the segment is sealed, retry with the current segment
107107
let current = self.current.load();
108108
current.inc_reader_count();
109-
let (max_frame_no, db_size) =
110-
current.with_header(|header| (header.last_committed(), header.size_after()));
109+
let (max_frame_no, db_size, max_offset) = current.with_header(|header| {
110+
(
111+
header.last_committed(),
112+
header.size_after(),
113+
header.frame_count() as u64,
114+
)
115+
});
111116
let id = self.wal_lock.next_tx_id.fetch_add(1, Ordering::Relaxed);
112117
ReadTransaction {
113118
id,
@@ -119,6 +124,7 @@ impl<IO: Io> SharedWal<IO> {
119124
pages_read: 0,
120125
namespace: self.namespace.clone(),
121126
checkpoint_notifier: self.checkpoint_notifier.clone(),
127+
max_offset,
122128
}
123129
}
124130

libsql-wal/src/transaction.rs

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ pub struct ReadTransaction<F> {
7979
pub id: u64,
8080
/// Max frame number that this transaction can read
8181
pub max_frame_no: u64,
82+
// max offset that can be read from the current log
83+
pub max_offset: u64,
8284
pub db_size: u32,
8385
/// The segment to which we have a read lock
8486
pub current: Arc<CurrentSegment<F>>,
@@ -105,6 +107,7 @@ impl<F> Clone for ReadTransaction<F> {
105107
pages_read: self.pages_read,
106108
namespace: self.namespace.clone(),
107109
checkpoint_notifier: self.checkpoint_notifier.clone(),
110+
max_offset: self.max_offset,
108111
}
109112
}
110113
}
@@ -360,26 +363,3 @@ impl<F> DerefMut for WriteTransaction<F> {
360363
&mut self.read_tx
361364
}
362365
}
363-
364-
#[cfg(test)]
365-
mod test {
366-
use std::collections::BTreeMap;
367-
368-
use crate::segment::current::SegmentIndex;
369-
370-
use super::merge_savepoints;
371-
372-
#[test]
373-
fn test_merge_savepoints() {
374-
let first = [(1, 1), (3, 2)].into_iter().collect::<BTreeMap<_, _>>();
375-
let second = [(1, 3), (4, 6)].into_iter().collect::<BTreeMap<_, _>>();
376-
377-
let out = SegmentIndex::new(0);
378-
merge_savepoints([first, second].iter().rev(), &out);
379-
380-
let mut iter = out.iter(0, 100);
381-
assert_eq!(iter.next(), Some((1, 3, 3)));
382-
assert_eq!(iter.next(), Some((3, 2, 2)));
383-
assert_eq!(iter.next(), Some((4, 6, 6)));
384-
}
385-
}

0 commit comments

Comments
 (0)