Skip to content

Commit 8918d29

Browse files
committed
fmt
1 parent 7ad2188 commit 8918d29

7 files changed

Lines changed: 30 additions & 17 deletions

File tree

libsql-replication/src/injector/sqlite_injector/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl Injector for SqliteInjector {
4848
}
4949

5050
#[inline]
51-
fn durable_frame_no(&mut self, _frame_no: u64) { }
51+
fn durable_frame_no(&mut self, _frame_no: u64) {}
5252
}
5353

5454
impl SqliteInjector {

libsql-server/src/rpc/replication/libsql_replicator.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,11 @@ pin_project_lite::pin_project! {
7979

8080
impl<S> FrameStreamAdapter<S> {
8181
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO>>) -> Self {
82-
Self { inner, flavor, shared }
82+
Self {
83+
inner,
84+
flavor,
85+
shared,
86+
}
8387
}
8488
}
8589

@@ -150,8 +154,10 @@ impl ReplicationLog for LibsqlReplicationService {
150154
let shared = self.registry.get_async(&namespace.into()).await.unwrap();
151155
let req = req.into_inner();
152156
// TODO: replicator should only accecpt NonZero
153-
let replicator =
154-
libsql_wal::replication::replicator::Replicator::new(shared.clone(), req.next_offset.max(1));
157+
let replicator = libsql_wal::replication::replicator::Replicator::new(
158+
shared.clone(),
159+
req.next_offset.max(1),
160+
);
155161

156162
let flavor = req.wal_flavor();
157163
let stream = FrameStreamAdapter::new(replicator.into_frame_stream(), flavor, shared);

libsql-wal/src/replication/injector.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl<IO: Io> Injector<IO> {
2828
capacity: buffer_capacity,
2929
tx: None,
3030
max_tx_frame_no: 0,
31-
previous_durable_frame_no: 0,
31+
previous_durable_frame_no: 0,
3232
})
3333
}
3434

@@ -84,25 +84,29 @@ impl<IO: Io> Injector<IO> {
8484
if commit_data.is_some() {
8585
self.max_tx_frame_no = 0;
8686
}
87-
let buffer = current
88-
.inject_frames(buffer, commit_data, tx)
89-
.await?;
87+
let buffer = current.inject_frames(buffer, commit_data, tx).await?;
9088
self.buffer = buffer;
9189
self.buffer.clear();
9290
}
9391

9492
if size_after.is_some() {
9593
let mut tx = self.tx.take().unwrap();
96-
self.wal.new_frame_notifier.send_replace(last_committed_frame_no);
94+
self.wal
95+
.new_frame_notifier
96+
.send_replace(last_committed_frame_no);
9797
// the strategy to swap the current log is to do it on change of durable boundary,
9898
// when we have caught up with the current durable frame_no
99-
if self.current_durable() != self.previous_durable_frame_no && self.current_durable() >= self.max_tx_frame_no {
99+
if self.current_durable() != self.previous_durable_frame_no
100+
&& self.current_durable() >= self.max_tx_frame_no
101+
{
100102
let wal = self.wal.clone();
101103
// FIXME: tokio dependency here is annoying, we need an async version of swap_current.
102104
tokio::task::spawn_blocking(move || {
103105
tx.commit();
104106
wal.swap_current(&tx)
105-
}).await.unwrap()?
107+
})
108+
.await
109+
.unwrap()?
106110
}
107111
}
108112
}

libsql-wal/src/segment/current.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::io::file::FileExt;
2323
use crate::io::Inspect;
2424
use crate::segment::{checked_frame_offset, SegmentFlags};
2525
use crate::segment::{frame_offset, page_offset, sealed::SealedSegment};
26-
use crate::transaction::{Transaction, TxGuardShared, TxGuardOwned};
26+
use crate::transaction::{Transaction, TxGuardOwned, TxGuardShared};
2727
use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION};
2828

2929
use super::list::SegmentList;

libsql-wal/src/segment/list.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@ where
109109
// readers pointing to them
110110
while let Some(segment) = &*current {
111111
// skip any segment more recent than until_frame_no
112-
tracing::debug!(last_committed = segment.last_committed(), until = until_frame_no);
112+
tracing::debug!(
113+
last_committed = segment.last_committed(),
114+
until = until_frame_no
115+
);
113116
if segment.last_committed() <= until_frame_no {
114117
if !segment.is_checkpointable() {
115118
segs.clear();

libsql-wal/src/shared_wal.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl<IO: Io> SharedWal<IO> {
9999
pub fn durable_frame_no(&self) -> u64 {
100100
*self.durable_frame_no.lock()
101101
}
102-
102+
103103
#[tracing::instrument(skip_all)]
104104
pub fn begin_read(&self, conn_id: u64) -> ReadTransaction<IO::File> {
105105
// FIXME: this is not enough to just increment the counter, we must make sure that the segment

libsql-wal/src/transaction.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,10 @@ impl<F> DerefMut for TxGuardOwned<F> {
189189
}
190190
}
191191

192-
pub trait TxGuard<F>: Deref<Target = WriteTransaction<F>> + DerefMut + Send + Sync { }
192+
pub trait TxGuard<F>: Deref<Target = WriteTransaction<F>> + DerefMut + Send + Sync {}
193193

194-
impl<'a, F: Send + Sync> TxGuard<F> for TxGuardShared<'a, F> { }
195-
impl<F: Send + Sync> TxGuard<F> for TxGuardOwned<F> { }
194+
impl<'a, F: Send + Sync> TxGuard<F> for TxGuardShared<'a, F> {}
195+
impl<F: Send + Sync> TxGuard<F> for TxGuardOwned<F> {}
196196

197197
pub struct TxGuardShared<'a, F> {
198198
_lock: async_lock::MutexGuardArc<Option<u64>>,

0 commit comments

Comments
 (0)