@@ -56,6 +56,7 @@ pub struct Replicator {
5656 /// Always: [last_committed_frame_no] <= [last_sent_frame_no].
5757 last_committed_frame_no : Receiver < Result < u32 > > ,
5858 flush_trigger : Option < Sender < ( ) > > ,
59+ shutdown_trigger : Option < tokio:: sync:: watch:: Sender < ( ) > > ,
5960 snapshot_waiter : Receiver < Result < Option < Uuid > > > ,
6061 snapshot_notifier : Arc < Sender < Result < Option < Uuid > > > > ,
6162
@@ -350,7 +351,8 @@ impl Replicator {
350351
351352 let mut join_set = JoinSet :: new ( ) ;
352353
353- let ( frames_outbox, mut frames_inbox) = tokio:: sync:: mpsc:: channel ( 64 ) ;
354+ let ( shutdown_trigger, shutdown_watch) = tokio:: sync:: watch:: channel ( ( ) ) ;
355+ let ( frames_outbox, mut frames_inbox) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
354356 let _local_backup = {
355357 let mut copier = WalCopier :: new (
356358 bucket. clone ( ) ,
@@ -414,6 +416,7 @@ impl Replicator {
414416 let max_parallelism = options. s3_upload_max_parallelism ;
415417 let upload_progress = upload_progress. clone ( ) ;
416418 let db_name = db_name. clone ( ) ;
419+ let shutdown_watch = Arc :: new ( shutdown_watch) ;
417420 join_set. spawn ( async move {
418421 let sem = Arc :: new ( tokio:: sync:: Semaphore :: new ( max_parallelism) ) ;
419422 let mut join_set = JoinSet :: new ( ) ;
@@ -432,25 +435,33 @@ impl Replicator {
432435 Self :: set_s3_queue_size ( & db_name, frames_inbox. len ( ) ) ;
433436
434437 let db_name = db_name. clone ( ) ;
438+ let shutdown_watch = shutdown_watch. clone ( ) ;
435439 join_set. spawn ( async move {
436- let fpath = format ! ( "{}/{}" , bucket, req. path) ;
437- let body = ByteStream :: from_path ( & fpath) . await . unwrap ( ) ;
438- let start_time = Instant :: now ( ) ;
439- let response = client
440- . put_object ( )
441- . bucket ( bucket)
442- . key ( req. path )
443- . body ( body)
444- . send ( )
445- . await ;
446- Self :: record_s3_write_time ( & db_name, start_time. elapsed ( ) ) ;
447- if let Err ( e) = response {
448- tracing:: error!( "Failed to send {} to S3: {}" , fpath, e) ;
449- } else {
450- tokio:: fs:: remove_file ( & fpath) . await . unwrap ( ) ;
451- let elapsed = Instant :: now ( ) - start;
452- tracing:: debug!( "Uploaded to S3: {} in {:?}" , fpath, elapsed) ;
440+ let fpath = format ! ( "{}/{}" , & bucket, & req. path) ;
441+ loop {
442+ let start_time = Instant :: now ( ) ;
443+ let body = ByteStream :: from_path ( & fpath) . await . unwrap ( ) ;
444+ let response = client
445+ . put_object ( )
446+ . bucket ( & bucket)
447+ . key ( & req. path )
448+ . body ( body)
449+ . send ( )
450+ . await ;
451+ Self :: record_s3_write_time ( & db_name, start_time. elapsed ( ) ) ;
452+ if response. is_ok ( ) {
453+ break ;
454+ }
455+ tracing:: error!( "Failed to send {} to S3: {}, will retry after 1 second" , fpath, response. err( ) . unwrap( ) ) ;
456+ if shutdown_watch. has_changed ( ) . is_err ( ) {
457+ tracing:: error!( "stop retry for failed S3 frames upload because shutdown was requested" ) ;
458+ return
459+ }
460+ tokio:: time:: sleep ( Duration :: from_millis ( 1000 ) ) . await ;
453461 }
462+ tokio:: fs:: remove_file ( & fpath) . await . unwrap ( ) ;
463+ let elapsed = Instant :: now ( ) - start;
464+ tracing:: debug!( "Uploaded to S3: {} in {:?}" , fpath, elapsed) ;
454465 if let Some ( frames) = req. frames {
455466 let mut up = upload_progress. lock ( ) . await ;
456467 up. update ( * frames. start ( ) , * frames. end ( ) ) ;
@@ -472,6 +483,7 @@ impl Replicator {
472483 next_frame_no,
473484 last_sent_frame_no,
474485 flush_trigger : Some ( flush_trigger) ,
486+ shutdown_trigger : Some ( shutdown_trigger) ,
475487 last_committed_frame_no,
476488 verify_crc : options. verify_crc ,
477489 db_path,
@@ -520,13 +532,25 @@ impl Replicator {
520532 tracing:: info!( "bottomless replicator: shutting down..." ) ;
521533 // 1. wait for all committed WAL frames to be committed locally
522534 let last_frame_no = self . last_known_frame ( ) ;
535+ // force flush in order to not wait for periodic wake up of local back up process
536+ if let Some ( tx) = & self . flush_trigger {
537+ let _ = tx. send ( ( ) ) ;
538+ }
523539 self . wait_until_committed ( last_frame_no) . await ?;
540+ tracing:: info!(
541+ "bottomless replicator: local backup replicated frames until {}" ,
542+ last_frame_no
543+ ) ;
524544 // 2. wait for snapshot upload to S3 to finish
525545 self . wait_until_snapshotted ( ) . await ?;
546+ tracing:: info!( "bottomless replicator: snapshot succesfully uploaded to S3" ) ;
526547 // 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
527548 // close the channel used by wait_until_committed, it must happen after wait_until_committed
528549 // has finished. If trigger won't be dropped, tasks from join_set will never finish.
529550 self . flush_trigger . take ( ) ;
551+ // 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
552+ // and finish upload process
553+ self . shutdown_trigger . take ( ) ;
530554 while let Some ( t) = self . join_set . join_next ( ) . await {
531555 // one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
532556 // this should ensure that all WAL frames are one S3
0 commit comments