@@ -8,7 +8,9 @@ use futures::StreamExt as _;
88use libsql_replication:: frame:: { Frame , FrameHeader , FrameNo } ;
99use libsql_replication:: meta:: WalIndexMeta ;
1010use libsql_replication:: replicator:: { map_frame_err, Error , ReplicatorClient } ;
11- use libsql_replication:: rpc:: replication:: { verify_session_token, Frames , HelloRequest , LogOffset , SESSION_TOKEN_KEY , HelloResponse } ;
11+ use libsql_replication:: rpc:: replication:: {
12+ verify_session_token, Frames , HelloRequest , HelloResponse , LogOffset , SESSION_TOKEN_KEY ,
13+ } ;
1214use tokio_stream:: Stream ;
1315use tonic:: metadata:: AsciiMetadataValue ;
1416use tonic:: { Response , Status } ;
@@ -81,7 +83,10 @@ impl RemoteClient {
8183 self . last_handshake_replication_index
8284 }
8385
84- async fn handle_handshake_response ( & mut self , hello : Result < Response < HelloResponse > , Status > ) -> Result < bool , Error > {
86+ async fn handle_handshake_response (
87+ & mut self ,
88+ hello : Result < Response < HelloResponse > , Status > ,
89+ ) -> Result < bool , Error > {
8590 let hello = hello?. into_inner ( ) ;
8691 verify_session_token ( & hello. session_token ) . map_err ( Error :: Client ) ?;
8792 let new_session = self . session_token != Some ( hello. session_token . clone ( ) ) ;
@@ -130,7 +135,9 @@ impl RemoteClient {
130135 ( hello_fut. await , None )
131136 } ;
132137 self . prefetched_batch_log_entries = if let Ok ( true ) = hello. 0 {
133- tracing:: warn!( "Frames prefetching failed because of new session token returned by handshake" ) ;
138+ tracing:: warn!(
139+ "Frames prefetching failed because of new session token returned by handshake"
140+ ) ;
134141 None
135142 } else {
136143 frames
@@ -139,7 +146,10 @@ impl RemoteClient {
139146 hello
140147 }
141148
142- async fn handle_next_frames_response ( & mut self , frames : Result < Response < Frames > , Status > ) -> Result < <Self as ReplicatorClient >:: FrameStream , Error > {
149+ async fn handle_next_frames_response (
150+ & mut self ,
151+ frames : Result < Response < Frames > , Status > ,
152+ ) -> Result < <Self as ReplicatorClient >:: FrameStream , Error > {
143153 let frames = frames?. into_inner ( ) . frames ;
144154
145155 if let Some ( f) = frames. last ( ) {
@@ -157,7 +167,12 @@ impl RemoteClient {
157167 Ok ( Box :: pin ( stream) )
158168 }
159169
160- async fn do_next_frames ( & mut self ) -> ( Result < <Self as ReplicatorClient >:: FrameStream , Error > , Duration ) {
170+ async fn do_next_frames (
171+ & mut self ,
172+ ) -> (
173+ Result < <Self as ReplicatorClient >:: FrameStream , Error > ,
174+ Duration ,
175+ ) {
161176 let ( frames, time) = match self . prefetched_batch_log_entries . take ( ) {
162177 Some ( ( result, time) ) => ( result, time) ,
163178 None => {
@@ -197,7 +212,13 @@ impl RemoteClient {
197212 }
198213}
199214
200- fn maybe_log < T > ( time : Duration , sum : & mut Duration , count : & mut u128 , result : & Result < T , Error > , op_name : & str ) {
215+ fn maybe_log < T > (
216+ time : Duration ,
217+ sum : & mut Duration ,
218+ count : & mut u128 ,
219+ result : & Result < T , Error > ,
220+ op_name : & str ,
221+ ) {
201222 if let Err ( e) = & result {
202223 tracing:: warn!( "Failed {} in {} ms: {:?}" , op_name, time. as_millis( ) , e) ;
203224 } else {
@@ -206,7 +227,12 @@ fn maybe_log<T>(time: Duration, sum: &mut Duration, count: &mut u128, result: &R
206227 let avg = ( * sum) . as_millis ( ) / * count;
207228 let time = time. as_millis ( ) ;
208229 if * count > 10 && time > 2 * avg {
209- tracing:: warn!( "Unusually long {}. Took {} ms, average {} ms" , op_name, time, avg) ;
230+ tracing:: warn!(
231+ "Unusually long {}. Took {} ms, average {} ms" ,
232+ op_name,
233+ time,
234+ avg
235+ ) ;
210236 }
211237 }
212238}
@@ -218,22 +244,40 @@ impl ReplicatorClient for RemoteClient {
218244 /// Perform handshake with remote
219245 async fn handshake ( & mut self ) -> Result < ( ) , Error > {
220246 let ( result, time) = self . do_handshake_with_prefetch ( ) . await ;
221- maybe_log ( time, & mut self . handshake_latency_sum , & mut self . handshake_latency_count , & result, "handshake" ) ;
247+ maybe_log (
248+ time,
249+ & mut self . handshake_latency_sum ,
250+ & mut self . handshake_latency_count ,
251+ & result,
252+ "handshake" ,
253+ ) ;
222254 result. map ( |_| ( ) )
223255 }
224256
225257 /// Return a stream of frames to apply to the database
226258 async fn next_frames ( & mut self ) -> Result < Self :: FrameStream , Error > {
227259 let ( result, time) = self . do_next_frames ( ) . await ;
228- maybe_log ( time, & mut self . frames_latency_sum , & mut self . frames_latency_count , & result, "frames fetch" ) ;
260+ maybe_log (
261+ time,
262+ & mut self . frames_latency_sum ,
263+ & mut self . frames_latency_count ,
264+ & result,
265+ "frames fetch" ,
266+ ) ;
229267 result
230268 }
231269
232270 /// Return a snapshot for the current replication index. Called after next_frame has returned a
233271 /// NeedSnapshot error
234272 async fn snapshot ( & mut self ) -> Result < Self :: FrameStream , Error > {
235273 let ( snapshot, time) = time ( self . do_snapshot ( ) ) . await ;
236- maybe_log ( time, & mut self . snapshot_latency_sum , & mut self . snapshot_latency_count , & snapshot, "snapshot fetch" ) ;
274+ maybe_log (
275+ time,
276+ & mut self . snapshot_latency_sum ,
277+ & mut self . snapshot_latency_count ,
278+ & snapshot,
279+ "snapshot fetch" ,
280+ ) ;
237281 snapshot
238282 }
239283
0 commit comments