Skip to content

Commit 348c683

Browse files
committed
introduce TxnGuard trait to pass either TxnGuardOwned/TxnGuardShared
1 parent 980ef5b commit 348c683

3 files changed

Lines changed: 61 additions & 49 deletions

File tree

libsql-wal/src/registry.rs

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::segment::Segment;
2727
use crate::segment::{current::CurrentSegment, sealed::SealedSegment};
2828
use crate::shared_wal::{SharedWal, SwapLog};
2929
use crate::storage::{OnStoreCallback, Storage};
30-
use crate::transaction::{Transaction, TxGuard};
30+
use crate::transaction::TxGuard;
3131
use crate::{LibsqlFooter, LIBSQL_PAGE_SIZE};
3232
use libsql_sys::name::NamespaceName;
3333

@@ -100,48 +100,13 @@ where
100100
S: Storage<Segment = SealedSegment<IO::File>>,
101101
{
102102
#[tracing::instrument(skip_all)]
103-
fn swap_current(&self, shared: &SharedWal<IO>, tx: &TxGuard<<IO as Io>::File>) -> Result<()> {
103+
fn swap_current(
104+
&self,
105+
shared: &SharedWal<IO>,
106+
tx: &dyn TxGuard<<IO as Io>::File>,
107+
) -> Result<()> {
104108
assert!(tx.is_commited());
105-
// at this point we must hold a lock to a commited transaction.
106-
107-
let current = shared.current.load();
108-
if current.is_empty() {
109-
return Ok(());
110-
}
111-
let start_frame_no = current.next_frame_no();
112-
let path = self
113-
.path
114-
.join(shared.namespace().as_str())
115-
.join(format!("{}:{start_frame_no:020}.seg", shared.namespace()));
116-
117-
let segment_file = self.io.open(true, true, true, &path)?;
118-
let salt = self.io.with_rng(|rng| rng.gen());
119-
let new = CurrentSegment::create(
120-
segment_file,
121-
path,
122-
start_frame_no,
123-
current.db_size(),
124-
current.tail().clone(),
125-
salt,
126-
current.log_id(),
127-
)?;
128-
// sealing must the last fallible operation, because we don't want to end up in a situation
129-
// where the current log is sealed and it wasn't swapped.
130-
if let Some(sealed) = current.seal()? {
131-
new.tail().push(sealed.clone());
132-
maybe_store_segment(
133-
self.storage.as_ref(),
134-
&self.checkpoint_notifier,
135-
&shared.namespace,
136-
&shared.durable_frame_no,
137-
sealed,
138-
);
139-
}
140-
141-
shared.current.swap(Arc::new(new));
142-
tracing::debug!("current segment swapped");
143-
144-
Ok(())
109+
self.swap_current_inner(shared)
145110
}
146111
}
147112

@@ -498,6 +463,48 @@ where
498463

499464
Ok(())
500465
}
466+
467+
#[tracing::instrument(skip_all)]
468+
fn swap_current_inner(&self, shared: &SharedWal<IO>) -> Result<()> {
469+
let current = shared.current.load();
470+
if current.is_empty() {
471+
return Ok(());
472+
}
473+
let start_frame_no = current.next_frame_no();
474+
let path = self
475+
.path
476+
.join(shared.namespace().as_str())
477+
.join(format!("{}:{start_frame_no:020}.seg", shared.namespace()));
478+
479+
let segment_file = self.io.open(true, true, true, &path)?;
480+
let salt = self.io.with_rng(|rng| rng.gen());
481+
let new = CurrentSegment::create(
482+
segment_file,
483+
path,
484+
start_frame_no,
485+
current.db_size(),
486+
current.tail().clone(),
487+
salt,
488+
current.log_id(),
489+
)?;
490+
// sealing must the last fallible operation, because we don't want to end up in a situation
491+
// where the current log is sealed and it wasn't swapped.
492+
if let Some(sealed) = current.seal()? {
493+
new.tail().push(sealed.clone());
494+
maybe_store_segment(
495+
self.storage.as_ref(),
496+
&self.checkpoint_notifier,
497+
&shared.namespace,
498+
&shared.durable_frame_no,
499+
sealed,
500+
);
501+
}
502+
503+
shared.current.swap(Arc::new(new));
504+
tracing::debug!("current segment swapped");
505+
506+
Ok(())
507+
}
501508
}
502509

503510
#[tracing::instrument(skip_all, fields(namespace = shared.namespace().as_str()))]

libsql-wal/src/shared_wal.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub struct WalLock {
3535
}
3636

3737
pub(crate) trait SwapLog<IO: Io>: Sync + Send + 'static {
38-
fn swap_current(&self, shared: &SharedWal<IO>, tx: &TxGuard<IO::File>) -> Result<()>;
38+
fn swap_current(&self, shared: &SharedWal<IO>, tx: &dyn TxGuard<IO::File>) -> Result<()>;
3939
}
4040

4141
pub struct SharedWal<IO: Io> {
@@ -300,7 +300,7 @@ impl<IO: Io> SharedWal<IO> {
300300
}
301301

302302
/// Swap the current log. A write lock must be held, but the transaction must be must be committed already.
303-
pub(crate) fn swap_current(&self, tx: &TxGuard<IO::File>) -> Result<()> {
303+
pub(crate) fn swap_current(&self, tx: &impl TxGuard<IO::File>) -> Result<()> {
304304
self.registry.swap_current(self, tx)?;
305305
Ok(())
306306
}

libsql-wal/src/transaction.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,20 +189,25 @@ impl<F> DerefMut for TxGuardOwned<F> {
189189
}
190190
}
191191

192-
pub struct TxGuard<'a, F> {
192+
pub trait TxGuard<F>: Deref<Target = WriteTransaction<F>> + DerefMut + Send + Sync { }
193+
194+
impl<'a, F: Send + Sync> TxGuard<F> for TxGuardShared<'a, F> { }
195+
impl<F: Send + Sync> TxGuard<F> for TxGuardOwned<F> { }
196+
197+
pub struct TxGuardShared<'a, F> {
193198
_lock: async_lock::MutexGuardArc<Option<u64>>,
194199
inner: &'a mut WriteTransaction<F>,
195200
}
196201

197-
impl<'a, F> Deref for TxGuard<'a, F> {
202+
impl<'a, F> Deref for TxGuardShared<'a, F> {
198203
type Target = WriteTransaction<F>;
199204

200205
fn deref(&self) -> &Self::Target {
201206
&self.inner
202207
}
203208
}
204209

205-
impl<'a, F> DerefMut for TxGuard<'a, F> {
210+
impl<'a, F> DerefMut for TxGuardShared<'a, F> {
206211
fn deref_mut(&mut self) -> &mut Self::Target {
207212
self.inner
208213
}
@@ -225,11 +230,11 @@ impl<F> WriteTransaction<F> {
225230
savepoint_id
226231
}
227232

228-
pub fn lock(&mut self) -> TxGuard<F> {
233+
pub fn lock(&mut self) -> TxGuardShared<F> {
229234
let g = self.wal_lock.tx_id.lock_arc_blocking();
230235
match *g {
231236
// we still hold the lock, we can proceed
232-
Some(id) if self.id == id => TxGuard {
237+
Some(id) if self.id == id => TxGuardShared {
233238
_lock: g,
234239
inner: self,
235240
},

0 commit comments

Comments
 (0)