Skip to content

Commit d6214f1

Browse files
committed
fix replica snapshot isolation bug
1 parent 8a1ba79 commit d6214f1

3 files changed

Lines changed: 14 additions & 98 deletions

File tree

libsql-wal/src/segment/current.rs

Lines changed: 3 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ impl<F> CurrentSegment<F> {
370370
}
371371

372372
// not a write tx, or page is not in write tx, look into the segment
373-
self.index.locate(page_no, tx.max_frame_no)
373+
self.index.locate(page_no, tx.max_offset)
374374
}
375375

376376
/// reads the page conainted in frame at offset into buf
@@ -644,13 +644,13 @@ impl SegmentIndex {
644644
}
645645
}
646646

647-
fn locate(&self, page_no: u32, max_frame_no: u64) -> Option<u32> {
647+
fn locate(&self, page_no: u32, max_offset: u64) -> Option<u32> {
648648
let offsets = self.index.get(&page_no)?;
649649
let offsets = offsets.value().read();
650650
offsets
651651
.iter()
652652
.rev()
653-
.find(|fno| self.start_frame_no + **fno as u64 <= max_frame_no)
653+
.find(|fno| **fno as u64 <= max_offset)
654654
.copied()
655655
}
656656

@@ -672,42 +672,6 @@ impl SegmentIndex {
672672
Ok(())
673673
}
674674

675-
/// returns an iterator over (page_no, offset, frame_no), where the returned offset is the most
676-
/// recent version of the page contained in start_frame_no..end_frame_no
677-
/// This method assumes that the current segment is ordered.
678-
pub(crate) fn iter(
679-
&self,
680-
start_frame_no: u64,
681-
end_frame_no: u64,
682-
) -> impl Iterator<Item = (u32, u32, u64)> + '_ {
683-
// todo: assert segment is sorted
684-
let mut entry = self.index.front();
685-
let mut fused = false;
686-
let start_offset = (start_frame_no - self.start_frame_no) as u32;
687-
let end_offset = (end_frame_no - self.start_frame_no) as u32;
688-
std::iter::from_fn(move || loop {
689-
if fused {
690-
return None;
691-
}
692-
let entry = entry.as_mut()?;
693-
let ret = {
694-
let offsets = entry.value();
695-
let offsets = offsets.read();
696-
if offsets[0] > end_offset || *offsets.last().unwrap() < start_offset {
697-
drop(offsets);
698-
fused = !entry.move_next();
699-
continue;
700-
}
701-
let offset = *offsets.iter().rev().find(|x| **x <= end_offset).unwrap();
702-
Some((*entry.key(), offset, self.start_frame_no + offset as u64))
703-
};
704-
705-
fused = !entry.move_next();
706-
707-
return ret;
708-
})
709-
}
710-
711675
pub(crate) fn insert(&self, page_no: u32, offset: u32) {
712676
let entry = self.index.get_or_insert(page_no, Default::default());
713677
let mut offsets = entry.value().write();
@@ -734,40 +698,6 @@ mod test {
734698

735699
use super::*;
736700

737-
#[test]
738-
fn index_iter() {
739-
let index = SegmentIndex::new(42);
740-
index.insert(1, 0);
741-
index.insert(1, 3);
742-
index.insert(2, 1);
743-
index.insert(2, 2);
744-
index.insert(3, 5);
745-
index.insert(3, 15);
746-
let mut iter = index.iter(42, 50);
747-
assert_eq!(iter.next(), Some((1, 3, 42 + 3)));
748-
assert_eq!(iter.next(), Some((2, 2, 42 + 2)));
749-
assert_eq!(iter.next(), Some((3, 5, 42 + 5)));
750-
assert_eq!(iter.next(), None);
751-
752-
let mut iter = index.iter(42, 100);
753-
assert_eq!(iter.next(), Some((1, 3, 42 + 3)));
754-
assert_eq!(iter.next(), Some((2, 2, 42 + 2)));
755-
assert_eq!(iter.next(), Some((3, 15, 42 + 15)));
756-
assert_eq!(iter.next(), None);
757-
}
758-
759-
#[should_panic]
760-
#[test]
761-
fn index_iter_out_of_bounds() {
762-
let index = SegmentIndex::new(42);
763-
index.insert(1, 0);
764-
index.insert(1, 3);
765-
index.insert(2, 1);
766-
index.insert(2, 2);
767-
assert_eq!(index.iter(1, 41).count(), 0);
768-
assert_eq!(index.iter(43, 72).count(), 0);
769-
}
770-
771701
#[tokio::test]
772702
async fn current_stream_frames() {
773703
let env = TestEnv::new();

libsql-wal/src/shared_wal.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,13 @@ impl<IO: Io> SharedWal<IO> {
106106
// is not sealed. If the segment is sealed, retry with the current segment
107107
let current = self.current.load();
108108
current.inc_reader_count();
109-
let (max_frame_no, db_size) =
110-
current.with_header(|header| (header.last_committed(), header.size_after()));
109+
let (max_frame_no, db_size, max_offset) = current.with_header(|header| {
110+
(
111+
header.last_committed(),
112+
header.size_after(),
113+
header.frame_count() as u64,
114+
)
115+
});
111116
let id = self.wal_lock.next_tx_id.fetch_add(1, Ordering::Relaxed);
112117
ReadTransaction {
113118
id,
@@ -119,6 +124,7 @@ impl<IO: Io> SharedWal<IO> {
119124
pages_read: 0,
120125
namespace: self.namespace.clone(),
121126
checkpoint_notifier: self.checkpoint_notifier.clone(),
127+
max_offset,
122128
}
123129
}
124130

libsql-wal/src/transaction.rs

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ pub struct ReadTransaction<F> {
7979
pub id: u64,
8080
/// Max frame number that this transaction can read
8181
pub max_frame_no: u64,
82+
// max offset that can be read from the current log
83+
pub max_offset: u64,
8284
pub db_size: u32,
8385
/// The segment to which we have a read lock
8486
pub current: Arc<CurrentSegment<F>>,
@@ -105,6 +107,7 @@ impl<F> Clone for ReadTransaction<F> {
105107
pages_read: self.pages_read,
106108
namespace: self.namespace.clone(),
107109
checkpoint_notifier: self.checkpoint_notifier.clone(),
110+
max_offset: self.max_offset,
108111
}
109112
}
110113
}
@@ -360,26 +363,3 @@ impl<F> DerefMut for WriteTransaction<F> {
360363
&mut self.read_tx
361364
}
362365
}
363-
364-
#[cfg(test)]
365-
mod test {
366-
use std::collections::BTreeMap;
367-
368-
use crate::segment::current::SegmentIndex;
369-
370-
use super::merge_savepoints;
371-
372-
#[test]
373-
fn test_merge_savepoints() {
374-
let first = [(1, 1), (3, 2)].into_iter().collect::<BTreeMap<_, _>>();
375-
let second = [(1, 3), (4, 6)].into_iter().collect::<BTreeMap<_, _>>();
376-
377-
let out = SegmentIndex::new(0);
378-
merge_savepoints([first, second].iter().rev(), &out);
379-
380-
let mut iter = out.iter(0, 100);
381-
assert_eq!(iter.next(), Some((1, 3, 3)));
382-
assert_eq!(iter.next(), Some((3, 2, 2)));
383-
assert_eq!(iter.next(), Some((4, 6, 6)));
384-
}
385-
}

0 commit comments

Comments
 (0)