@@ -56,6 +56,7 @@ pub struct Replicator {
5656 /// Always: [last_committed_frame_no] <= [last_sent_frame_no].
5757 last_committed_frame_no : Receiver < Result < u32 > > ,
5858 flush_trigger : Option < Sender < ( ) > > ,
59+ shutdown_trigger : Option < tokio:: sync:: watch:: Sender < ( ) > > ,
5960 snapshot_waiter : Receiver < Result < Option < Uuid > > > ,
6061 snapshot_notifier : Arc < Sender < Result < Option < Uuid > > > > ,
6162
@@ -350,6 +351,7 @@ impl Replicator {
350351
351352 let mut join_set = JoinSet :: new ( ) ;
352353
354+ let ( shutdown_trigger, shutdown_watch) = tokio:: sync:: watch:: channel ( ( ) ) ;
353355 let ( frames_outbox, mut frames_inbox) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
354356 let _local_backup = {
355357 let mut copier = WalCopier :: new (
@@ -414,6 +416,7 @@ impl Replicator {
414416 let max_parallelism = options. s3_upload_max_parallelism ;
415417 let upload_progress = upload_progress. clone ( ) ;
416418 let db_name = db_name. clone ( ) ;
419+ let shutdown_watch = Arc :: new ( shutdown_watch) ;
417420 join_set. spawn ( async move {
418421 let sem = Arc :: new ( tokio:: sync:: Semaphore :: new ( max_parallelism) ) ;
419422 let mut join_set = JoinSet :: new ( ) ;
@@ -432,6 +435,7 @@ impl Replicator {
432435 Self :: set_s3_queue_size ( & db_name, frames_inbox. len ( ) ) ;
433436
434437 let db_name = db_name. clone ( ) ;
438+ let shutdown_watch = shutdown_watch. clone ( ) ;
435439 join_set. spawn ( async move {
436440 let fpath = format ! ( "{}/{}" , & bucket, & req. path) ;
437441 loop {
@@ -445,16 +449,15 @@ impl Replicator {
445449 . send ( )
446450 . await ;
447451 Self :: record_s3_write_time ( & db_name, start_time. elapsed ( ) ) ;
448- if let Err ( e) = response {
449- tracing:: error!(
450- "Failed to send {} to S3: {}, will retry after 1 second" ,
451- fpath,
452- e
453- ) ;
454- tokio:: time:: sleep ( Duration :: from_millis ( 1000 ) ) . await ;
455- } else {
452+ if response. is_ok ( ) {
456453 break ;
457454 }
455+ tracing:: error!( "Failed to send {} to S3: {}, will retry after 1 second" , fpath, response. err( ) . unwrap( ) ) ;
456+ if shutdown_watch. has_changed ( ) . is_err ( ) {
457+ tracing:: error!( "stop retry for failed S3 frames upload because shutdown was requested" ) ;
458+ return
459+ }
460+ tokio:: time:: sleep ( Duration :: from_millis ( 1000 ) ) . await ;
458461 }
459462 tokio:: fs:: remove_file ( & fpath) . await . unwrap ( ) ;
460463 let elapsed = Instant :: now ( ) - start;
@@ -480,6 +483,7 @@ impl Replicator {
480483 next_frame_no,
481484 last_sent_frame_no,
482485 flush_trigger : Some ( flush_trigger) ,
486+ shutdown_trigger : Some ( shutdown_trigger) ,
483487 last_committed_frame_no,
484488 verify_crc : options. verify_crc ,
485489 db_path,
@@ -528,17 +532,25 @@ impl Replicator {
528532 tracing:: info!( "bottomless replicator: shutting down..." ) ;
529533 // 1. wait for all committed WAL frames to be committed locally
530534 let last_frame_no = self . last_known_frame ( ) ;
531- // force flush in order to not wait for periodic wake up of local back up process
535+ // force flush in order to not wait for periodic wake up of local back up process
532536 if let Some ( tx) = & self . flush_trigger {
533537 let _ = tx. send ( ( ) ) ;
534538 }
535539 self . wait_until_committed ( last_frame_no) . await ?;
540+ tracing:: info!(
541+ "bottomless replicator: local backup replicated frames until {}" ,
542+ last_frame_no
543+ ) ;
536544 // 2. wait for snapshot upload to S3 to finish
537545 self . wait_until_snapshotted ( ) . await ?;
546+ tracing:: info!( "bottomless replicator: snapshot succesfully uploaded to S3" ) ;
538547 // 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
539548 // close the channel used by wait_until_committed, it must happen after wait_until_committed
540549 // has finished. If trigger won't be dropped, tasks from join_set will never finish.
541550 self . flush_trigger . take ( ) ;
551+ // 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
552+ // and finish upload process
553+ self . shutdown_trigger . take ( ) ;
542554 while let Some ( t) = self . join_set . join_next ( ) . await {
543555 // one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
544556 // this should ensure that all WAL frames are one S3
0 commit comments