1+ use crate :: Result ;
2+
13const DEFAULT_MAX_RETRIES : usize = 5 ;
24pub struct SyncContext {
35 pub sync_url : String ,
@@ -15,4 +17,59 @@ impl SyncContext {
1517 max_retries : DEFAULT_MAX_RETRIES ,
1618 }
1719 }
20+
21+ pub ( crate ) async fn push_with_retry (
22+ & self ,
23+ uri : String ,
24+ auth_token : & Option < String > ,
25+ frame : Vec < u8 > ,
26+ max_retries : usize ,
27+ ) -> Result < u32 > {
28+ let mut nr_retries = 0 ;
29+ loop {
30+ // TODO(lucio): add custom connector + tls support here
31+ let client = hyper:: client:: Client :: builder ( ) . build_http :: < hyper:: Body > ( ) ;
32+
33+ let mut req = http:: Request :: post ( uri. clone ( ) ) ;
34+
35+ match auth_token {
36+ Some ( ref auth_token) => {
37+ let auth_header =
38+ http:: HeaderValue :: try_from ( format ! ( "Bearer {}" , auth_token. to_owned( ) ) )
39+ . unwrap ( ) ;
40+
41+ req. headers_mut ( )
42+ . expect ( "valid http request" )
43+ . insert ( "Authorization" , auth_header) ;
44+ }
45+ None => { }
46+ }
47+
48+ // TODO(lucio): convert this to use bytes to make this clone cheap, it should be
49+ // to possible use BytesMut when reading frames from the WAL and efficiently use Bytes
50+ // from that.
51+ let req = req. body ( frame. clone ( ) . into ( ) ) . expect ( "valid body" ) ;
52+
53+ let res = client. request ( req) . await . unwrap ( ) ;
54+
55+ // TODO(lucio): only retry on server side errors
56+ if res. status ( ) . is_success ( ) {
57+ let res_body = hyper:: body:: to_bytes ( res. into_body ( ) ) . await . unwrap ( ) ;
58+ let resp = serde_json:: from_slice :: < serde_json:: Value > ( & res_body[ ..] ) . unwrap ( ) ;
59+
60+ let max_frame_no = resp. get ( "max_frame_no" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
61+ return Ok ( max_frame_no as u32 ) ;
62+ }
63+
64+ if nr_retries > max_retries {
65+ return Err ( crate :: errors:: Error :: ConnectionFailed ( format ! (
66+ "Failed to push frame: {}" ,
67+ res. status( )
68+ ) ) ) ;
69+ }
70+ let delay = std:: time:: Duration :: from_millis ( 100 * ( 1 << nr_retries) ) ;
71+ tokio:: time:: sleep ( delay) . await ;
72+ nr_retries += 1 ;
73+ }
74+ }
1875}
0 commit comments