11use std:: sync:: Arc ;
2+ use std:: time:: Duration ;
23
34use futures_core:: future:: BoxFuture ;
45use futures_core:: Stream ;
@@ -11,7 +12,7 @@ use parking_lot::Mutex as PMutex;
1112use tokio:: sync:: { mpsc, watch, Mutex } ;
1213use tokio_stream:: StreamExt ;
1314use tonic:: transport:: Channel ;
14- use tonic:: { Request , Streaming } ;
15+ use tonic:: { Code , Request , Streaming } ;
1516
1617use crate :: connection:: program:: { DescribeCol , DescribeParam } ;
1718use crate :: error:: Error ;
@@ -249,19 +250,35 @@ impl RemoteConnection {
249250 ctx : RequestContext ,
250251 builder_config : QueryBuilderConfig ,
251252 ) -> crate :: Result < Self > {
252- let ( request_sender , receiver ) = mpsc :: channel ( 1 ) ;
253+ let mut retries = 0 ;
253254
254- let stream = tokio_stream:: wrappers:: ReceiverStream :: new ( receiver) ;
255- let mut req = Request :: new ( stream) ;
256- ctx. upgrade_grpc_request ( & mut req) ;
257- let response_stream = client. stream_exec ( req) . await ?. into_inner ( ) ;
255+ loop {
256+ let ( request_sender, receiver) = mpsc:: channel ( 1 ) ;
258257
259- Ok ( Self {
260- response_stream,
261- request_sender,
262- current_request_id : 0 ,
263- builder_config,
264- } )
258+ let stream = tokio_stream:: wrappers:: ReceiverStream :: new ( receiver) ;
259+ let mut req = Request :: new ( stream) ;
260+ ctx. upgrade_grpc_request ( & mut req) ;
261+ let response_stream = match client. stream_exec ( req) . await {
262+ Ok ( i) => i. into_inner ( ) ,
263+ Err ( e) => {
264+ if e. code ( ) == Code :: Unavailable {
265+ tracing:: error!( "retrying proxy connection: {}" , e) ;
266+ tokio:: time:: sleep ( Duration :: from_millis ( 500 ) * 2u32 . pow ( retries) ) . await ;
267+ retries += 1 ;
268+ continue ;
269+ } else {
270+ return Err ( e. into ( ) ) ;
271+ }
272+ }
273+ } ;
274+
275+ return Ok ( Self {
276+ response_stream,
277+ request_sender,
278+ current_request_id : 0 ,
279+ builder_config,
280+ } ) ;
281+ }
265282 }
266283}
267284
0 commit comments