Skip to content

Commit 58e518f

Browse files
committed
try to send frames to S3 even if there were temporary failure
1 parent 99e16d8 commit 58e518f

2 files changed

Lines changed: 26 additions & 23 deletions

File tree

bottomless/src/backup.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ use arc_swap::ArcSwapOption;
55
use std::ops::{Range, RangeInclusive};
66
use std::sync::Arc;
77
use tokio::io::AsyncWriteExt;
8-
use tokio::sync::mpsc::Sender;
8+
use tokio::sync::mpsc::UnboundedSender;
99
use tokio::time::Instant;
1010
use uuid::Uuid;
1111

1212
#[derive(Debug)]
1313
pub(crate) struct WalCopier {
14-
outbox: Sender<SendReq>,
14+
outbox: UnboundedSender<SendReq>,
1515
use_compression: CompressionKind,
1616
max_frames_per_batch: usize,
1717
wal_path: String,
@@ -28,7 +28,7 @@ impl WalCopier {
2828
db_path: &str,
2929
max_frames_per_batch: usize,
3030
use_compression: CompressionKind,
31-
outbox: Sender<SendReq>,
31+
outbox: UnboundedSender<SendReq>,
3232
) -> Self {
3333
WalCopier {
3434
bucket,
@@ -76,7 +76,7 @@ impl WalCopier {
7676
meta_file.write_all(buf.as_ref()).await?;
7777
meta_file.flush().await?;
7878
let msg = format!("{}-{}/.meta", self.db_name, generation);
79-
if self.outbox.send(SendReq::new(msg)).await.is_err() {
79+
if self.outbox.send(SendReq::new(msg)).is_err() {
8080
return Err(anyhow!("couldn't initialize local backup dir: {}", dir));
8181
}
8282
}
@@ -125,7 +125,6 @@ impl WalCopier {
125125
if self
126126
.outbox
127127
.send(SendReq::wal_segment(fdesc, start, end - 1))
128-
.await
129128
.is_err()
130129
{
131130
tracing::warn!(

bottomless/src/replicator.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ impl Replicator {
350350

351351
let mut join_set = JoinSet::new();
352352

353-
let (frames_outbox, mut frames_inbox) = tokio::sync::mpsc::channel(64);
353+
let (frames_outbox, mut frames_inbox) = tokio::sync::mpsc::unbounded_channel();
354354
let _local_backup = {
355355
let mut copier = WalCopier::new(
356356
bucket.clone(),
@@ -433,24 +433,28 @@ impl Replicator {
433433

434434
let db_name = db_name.clone();
435435
join_set.spawn(async move {
436-
let fpath = format!("{}/{}", bucket, req.path);
437-
let body = ByteStream::from_path(&fpath).await.unwrap();
438-
let start_time = Instant::now();
439-
let response = client
440-
.put_object()
441-
.bucket(bucket)
442-
.key(req.path)
443-
.body(body)
444-
.send()
445-
.await;
446-
Self::record_s3_write_time(&db_name, start_time.elapsed());
447-
if let Err(e) = response {
448-
tracing::error!("Failed to send {} to S3: {}", fpath, e);
449-
} else {
450-
tokio::fs::remove_file(&fpath).await.unwrap();
451-
let elapsed = Instant::now() - start;
452-
tracing::debug!("Uploaded to S3: {} in {:?}", fpath, elapsed);
436+
let fpath = format!("{}/{}", &bucket, &req.path);
437+
loop {
438+
let start_time = Instant::now();
439+
let body = ByteStream::from_path(&fpath).await.unwrap();
440+
let response = client
441+
.put_object()
442+
.bucket(&bucket)
443+
.key(&req.path)
444+
.body(body)
445+
.send()
446+
.await;
447+
Self::record_s3_write_time(&db_name, start_time.elapsed());
448+
if let Err(e) = response {
449+
tracing::error!("Failed to send {} to S3: {}, will retry after 1 second", fpath, e);
450+
tokio::time::sleep(Duration::from_millis(1000)).await;
451+
} else {
452+
break;
453+
}
453454
}
455+
tokio::fs::remove_file(&fpath).await.unwrap();
456+
let elapsed = Instant::now() - start;
457+
tracing::debug!("Uploaded to S3: {} in {:?}", fpath, elapsed);
454458
if let Some(frames) = req.frames {
455459
let mut up = upload_progress.lock().await;
456460
up.update(*frames.start(), *frames.end());

0 commit comments

Comments
 (0)