Skip to content

Commit 34389a3

Browse files
committed
add more logging
1 parent 15e392c commit 34389a3

6 files changed

Lines changed: 19 additions & 5 deletions

File tree

libsql-wal/src/replication/replicator.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,21 @@ impl<IO: Io> Replicator<IO> {
4040
///
4141
/// In a single replication step, the replicator guarantees that a minimal set of frames is
4242
/// sent to the replica.
43+
#[tracing::instrument(skip(self))]
4344
pub fn into_frame_stream(mut self) -> impl Stream<Item = Result<Box<Frame>>> + Send {
4445
async_stream::try_stream! {
4546
loop {
4647
// First we decide up to what frame_no we want to replicate in this step. If we are
4748
// already up to date, wait for something to happen
49+
tracing::debug!(next_frame_no = self.next_frame_no);
4850
let most_recent_frame_no = *self
4951
.new_frame_notifier
5052
.wait_for(|fno| *fno >= self.next_frame_no)
5153
.await
5254
.expect("channel cannot be closed because we hold a ref to the sending end");
5355

56+
tracing::debug!(most_recent_frame_no, "new frame_no available");
57+
5458
let mut commit_frame_no = 0;
5559
// we have stuff to replicate
5660
if most_recent_frame_no >= self.next_frame_no {
@@ -66,6 +70,7 @@ impl<IO: Io> Replicator<IO> {
6670

6771
let mut stream = stream.peekable();
6872

73+
tracing::debug!(replicated_until, "replicating from current log");
6974
loop {
7075
let Some(frame) = stream.next().await else { break };
7176
let mut frame = frame.map_err(|e| Error::CurrentSegment(e.into()))?;
@@ -88,6 +93,7 @@ impl<IO: Io> Replicator<IO> {
8893
.stream_pages_from(replicated_until, self.next_frame_no, &mut seen).await;
8994
tokio::pin!(stream);
9095

96+
tracing::debug!(replicated_until, "replicating from tail");
9197
let mut stream = stream.peekable();
9298

9399
let should_replicate_from_storage = replicated_until != self.next_frame_no;
@@ -110,6 +116,7 @@ impl<IO: Io> Replicator<IO> {
110116
// Replicating from sealed segments was not enough, so we replicate from
111117
// durable storage
112118
if let Some(replicated_until) = replicated_until {
119+
tracing::debug!("replicating from durable storage");
113120
let stream = self
114121
.shared
115122
.stored_segments

libsql-wal/src/segment/list.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ where
7878

7979
/// Checkpoints as many segments as possible to the main db file, and return the checkpointed
8080
/// frame_no, if anything was checkpointed
81+
#[tracing::instrument(skip_all)]
8182
pub async fn checkpoint<IO: Io>(
8283
&self,
8384
db_file: &IO::File,
@@ -108,6 +109,7 @@ where
108109
// readers pointing to them
109110
while let Some(segment) = &*current {
110111
// skip any segment more recent than until_frame_no
112+
tracing::debug!(last_committed = segment.last_committed(), until = until_frame_no);
111113
if segment.last_committed() <= until_frame_no {
112114
if !segment.is_checkpointable() {
113115
segs.clear();
@@ -120,6 +122,7 @@ where
120122

121123
// nothing to checkpoint rn
122124
if segs.is_empty() {
125+
tracing::debug!("nothing to checkpoint");
123126
return Ok(None);
124127
}
125128

@@ -133,6 +136,7 @@ where
133136
let mut last_replication_index = 0;
134137
while let Some((k, v)) = union.next() {
135138
let page_no = u32::from_be_bytes(k.try_into().unwrap());
139+
tracing::debug!(page_no);
136140
let v = v.iter().min_by_key(|i| i.index).unwrap();
137141
let offset = v.value as u32;
138142

libsql-wal/src/segment/sealed.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@ where
181181
}
182182

183183
fn is_checkpointable(&self) -> bool {
184-
self.read_locks.load(Ordering::Relaxed) == 0
184+
let read_locks = self.read_locks.load(Ordering::Relaxed);
185+
tracing::debug!(read_locks);
186+
read_locks == 0
185187
}
186188

187189
fn size_after(&self) -> u32 {

libsql-wal/src/shared_wal.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ impl<IO: Io> SharedWal<IO> {
305305
Ok(())
306306
}
307307

308+
#[tracing::instrument(skip(self))]
308309
pub async fn checkpoint(&self) -> Result<Option<u64>> {
309310
let durable_frame_no = *self.durable_frame_no.lock();
310311
let checkpointed_frame_no = self

libsql-wal/src/transaction.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ impl<F> WriteTransaction<F> {
318318
}
319319
}
320320

321-
tracing::debug!(id = read_tx.id, "lock released");
321+
tracing::trace!(id = read_tx.id, "lock released");
322322

323323
read_tx
324324
}

libsql-wal/src/wal.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl<FS: Io> Wal for LibsqlWal<FS> {
131131
self.last_read_frame_no = Some(tx.max_frame_no);
132132
self.tx = Some(Transaction::Read(tx));
133133

134-
tracing::debug!(invalidate_cache, "read started");
134+
tracing::trace!(invalidate_cache, "read started");
135135
Ok(invalidate_cache)
136136
}
137137

@@ -182,9 +182,9 @@ impl<FS: Io> Wal for LibsqlWal<FS> {
182182
match self.tx.as_mut() {
183183
Some(tx) => {
184184
self.shared.upgrade(tx).map_err(Into::into)?;
185-
tracing::debug!("write lock acquired");
185+
tracing::trace!("write lock acquired");
186186
}
187-
None => todo!("should acquire read txn first"),
187+
None => panic!("should acquire read txn first"),
188188
}
189189

190190
Ok(())

0 commit comments

Comments
 (0)