@@ -20,6 +20,7 @@ use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
2020use libsql_replication:: injector:: Injector as _;
2121use libsql_replication:: rpc:: replication:: Frame as RpcFrame ;
2222use libsql_sys:: { Cipher , EncryptionConfig } ;
23+ use metrics:: { counter, gauge, histogram} ;
2324use std:: ops:: Deref ;
2425use std:: path:: { Path , PathBuf } ;
2526use std:: str:: FromStr ;
@@ -264,6 +265,52 @@ impl Replicator {
264265 Self :: with_options ( db_path, Options :: from_env ( ) ?) . await
265266 }
266267
268+ fn set_local_last_frame_no ( db_name : & str , last_frame_no : u32 ) {
269+ let db_name = db_name. to_string ( ) ;
270+ gauge ! ( "bottomless_local_last_frame_no" , last_frame_no as f64 , "db_name" => db_name) ;
271+ }
272+
273+ fn increment_local_ready_frame_ranges ( db_name : & str , ready_ranges : u32 ) {
274+ let db_name = db_name. to_string ( ) ;
275+ counter ! ( "bottomless_local_ready_frame_ranges" , ready_ranges as u64 , "db_name" => db_name) ;
276+ }
277+
278+ fn record_local_flush_time ( db_name : & str , duration : Duration ) {
279+ let db_name = db_name. to_string ( ) ;
280+ histogram ! ( "bottomless_local_flush_time" , duration. as_secs_f64( ) , "db_name" => db_name) ;
281+ }
282+
283+ fn record_s3_write_time ( db_name : & str , duration : Duration ) {
284+ let db_name = db_name. to_string ( ) ;
285+ histogram ! ( "bottomless_s3_write_time" , duration. as_secs_f64( ) , "db_name" => db_name) ;
286+ }
287+
288+ fn increment_s3_processed_frame_ranges ( db_name : & str , processed : u64 ) {
289+ let db_name = db_name. to_string ( ) ;
290+ counter ! ( "bottomless_s3_processed_frame_ranges" , processed, "db_name" => db_name) ;
291+ }
292+
293+ fn record_snapshot_upload_time ( db_name : & str , duration : Duration ) {
294+ let db_name = db_name. to_string ( ) ;
295+ histogram ! ( "bottomless_snapshot_upload_time" , duration. as_secs_f64( ) , "db_name" => db_name) ;
296+ }
297+ fn record_restore_upload_files_time ( db_name : & str , duration : Duration ) {
298+ let db_name = db_name. to_string ( ) ;
299+ histogram ! ( "bottomless_restore_upload_files_time" , duration. as_secs_f64( ) , "db_name" => db_name) ;
300+ }
301+ fn record_restore_time ( db_name : & str , duration : Duration ) {
302+ let db_name = db_name. to_string ( ) ;
303+ histogram ! ( "bottomless_restore_time" , duration. as_secs_f64( ) , "db_name" => db_name) ;
304+ }
305+ fn set_s3_processing_frame_no ( db_name : & str , frame_no : u32 ) {
306+ let db_name = db_name. to_string ( ) ;
307+ gauge ! ( "bottomless_s3_processing_frame_no" , frame_no as f64 , "db_name" => db_name) ;
308+ }
309+ fn set_s3_queue_size ( db_name : & str , size : usize ) {
310+ let db_name = db_name. to_string ( ) ;
311+ gauge ! ( "bottomless_s3_queue_size" , size as f64 , "db_name" => db_name) ;
312+ }
313+
267314 pub async fn with_options < S : Into < String > > ( db_path : S , options : Options ) -> Result < Self > {
268315 let config = options. client_config ( ) . await ?;
269316 let client = Client :: from_conf ( config) ;
@@ -317,6 +364,7 @@ impl Replicator {
317364 let next_frame_no = next_frame_no. clone ( ) ;
318365 let last_sent_frame_no = last_sent_frame_no. clone ( ) ;
319366 let batch_interval = options. max_batch_interval ;
367+ let db_name = db_name. clone ( ) ;
320368 join_set. spawn ( async move {
321369 loop {
322370 let timeout = Instant :: now ( ) + batch_interval;
@@ -336,8 +384,18 @@ impl Replicator {
336384 let frames = ( last_sent_frame + 1 ) ..next_frame;
337385
338386 if !frames. is_empty ( ) {
387+ let start_time = Instant :: now ( ) ;
339388 let res = copier. flush ( frames) . await ;
340- if last_committed_frame_no_sender. send ( res) . is_err ( ) {
389+ Self :: record_local_flush_time ( & db_name, start_time. elapsed ( ) ) ;
390+
391+ if let Ok ( ( last_frame_no, ready_ranges) ) = res {
392+ Self :: set_local_last_frame_no ( & db_name, last_frame_no) ;
393+ Self :: increment_local_ready_frame_ranges ( & db_name, ready_ranges) ;
394+ }
395+ if last_committed_frame_no_sender
396+ . send ( res. map ( |r| r. 0 ) )
397+ . is_err ( )
398+ {
341399 // Replicator was probably dropped and therefore corresponding
342400 // receiver has been closed
343401 return ;
@@ -355,6 +413,7 @@ impl Replicator {
355413 let bucket = options. bucket_name . clone ( ) ;
356414 let max_parallelism = options. s3_upload_max_parallelism ;
357415 let upload_progress = upload_progress. clone ( ) ;
416+ let db_name = db_name. clone ( ) ;
358417 join_set. spawn ( async move {
359418 let sem = Arc :: new ( tokio:: sync:: Semaphore :: new ( max_parallelism) ) ;
360419 let mut join_set = JoinSet :: new ( ) ;
@@ -366,17 +425,26 @@ impl Replicator {
366425 let client = client. clone ( ) ;
367426 let bucket = bucket. clone ( ) ;
368427 let upload_progress = upload_progress. clone ( ) ;
428+
429+ if let Some ( ref frames) = req. frames {
430+ Self :: set_s3_processing_frame_no ( & db_name, * frames. end ( ) ) ;
431+ }
432+ Self :: set_s3_queue_size ( & db_name, frames_inbox. len ( ) ) ;
433+
434+ let db_name = db_name. clone ( ) ;
369435 join_set. spawn ( async move {
370436 let fpath = format ! ( "{}/{}" , bucket, req. path) ;
371437 let body = ByteStream :: from_path ( & fpath) . await . unwrap ( ) ;
372- if let Err ( e) = client
438+ let start_time = Instant :: now ( ) ;
439+ let response = client
373440 . put_object ( )
374441 . bucket ( bucket)
375442 . key ( req. path )
376443 . body ( body)
377444 . send ( )
378- . await
379- {
445+ . await ;
446+ Self :: record_s3_write_time ( & db_name, start_time. elapsed ( ) ) ;
447+ if let Err ( e) = response {
380448 tracing:: error!( "Failed to send {} to S3: {}" , fpath, e) ;
381449 } else {
382450 tokio:: fs:: remove_file ( & fpath) . await . unwrap ( ) ;
@@ -386,6 +454,7 @@ impl Replicator {
386454 if let Some ( frames) = req. frames {
387455 let mut up = upload_progress. lock ( ) . await ;
388456 up. update ( * frames. start ( ) , * frames. end ( ) ) ;
457+ Self :: increment_s3_processed_frame_ranges ( & db_name, 1 ) ;
389458 }
390459 drop ( permit) ;
391460 } ) ;
@@ -1039,6 +1108,7 @@ impl Replicator {
10391108 }
10401109 } ) ;
10411110 let elapsed = Instant :: now ( ) - start_ts;
1111+ Self :: record_snapshot_upload_time ( & self . db_name , elapsed) ;
10421112 tracing:: debug!( "Scheduled DB snapshot {} (took {:?})" , generation, elapsed) ;
10431113
10441114 Ok ( Some ( handle) )
@@ -1201,6 +1271,7 @@ impl Replicator {
12011271 // first check if there are any remaining files that we didn't manage to upload
12021272 // on time in the last run
12031273 self . upload_remaining_files ( & generation) . await ?;
1274+ Self :: record_restore_upload_files_time ( & self . db_name , start_ts. elapsed ( ) ) ;
12041275
12051276 tracing:: debug!( "done uploading remaining files" ) ;
12061277
@@ -1219,10 +1290,12 @@ impl Replicator {
12191290 . await
12201291 {
12211292 Ok ( result) => {
1222- let elapsed = Instant :: now ( ) - start_ts;
1223- tracing:: info!( "Finished database restoration in {:?}" , elapsed) ;
12241293 tokio:: fs:: rename ( & restore_path, & self . db_path ) . await ?;
12251294 let _ = self . remove_wal_files ( ) . await ; // best effort, WAL files may not exists
1295+
1296+ let elapsed = Instant :: now ( ) - start_ts;
1297+ tracing:: info!( "Finished database restoration in {:?}" , elapsed) ;
1298+ Self :: record_restore_time ( & self . db_name , elapsed) ;
12261299 Ok ( result)
12271300 }
12281301 Err ( e) => {
0 commit comments