11use std:: future:: Future ;
22use std:: path:: Path ;
33use std:: pin:: Pin ;
4+ use std:: sync:: Arc ;
5+ use std:: sync:: atomic:: AtomicU64 ;
46use std:: time:: { Duration , Instant } ;
57
68use bytes:: Bytes ;
@@ -22,6 +24,63 @@ async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
2224 ( out, before. elapsed ( ) )
2325}
2426
27+ pub ( crate ) struct SyncStats {
28+ pub prefetched_bytes : AtomicU64 ,
29+ pub prefetched_bytes_discarded_due_to_new_session : AtomicU64 ,
30+ pub prefetched_bytes_discarded_due_to_consecutive_handshake : AtomicU64 ,
31+ pub prefetched_bytes_discarded_due_to_invalid_frame_header : AtomicU64 ,
32+ pub synced_bytes_discarded_due_to_invalid_frame_header : AtomicU64 ,
33+ pub prefetched_bytes_used : AtomicU64 ,
34+ pub synced_bytes_used : AtomicU64 ,
35+ pub snapshot_bytes : AtomicU64 ,
36+ }
37+
38+ impl SyncStats {
39+ fn new ( ) -> Self {
40+ Self {
41+ prefetched_bytes : AtomicU64 :: new ( 0 ) ,
42+ prefetched_bytes_discarded_due_to_new_session : AtomicU64 :: new ( 0 ) ,
43+ prefetched_bytes_discarded_due_to_consecutive_handshake : AtomicU64 :: new ( 0 ) ,
44+ prefetched_bytes_discarded_due_to_invalid_frame_header : AtomicU64 :: new ( 0 ) ,
45+ synced_bytes_discarded_due_to_invalid_frame_header : AtomicU64 :: new ( 0 ) ,
46+ prefetched_bytes_used : AtomicU64 :: new ( 0 ) ,
47+ synced_bytes_used : AtomicU64 :: new ( 0 ) ,
48+ snapshot_bytes : AtomicU64 :: new ( 0 ) ,
49+ }
50+ }
51+
52+ fn add_prefetched_bytes ( & self , bytes : u64 ) {
53+ self . prefetched_bytes . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
54+ }
55+
56+ fn add_prefetched_bytes_discarded_due_to_new_session ( & self , bytes : u64 ) {
57+ self . prefetched_bytes_discarded_due_to_new_session . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
58+ }
59+
60+ fn add_prefetched_bytes_discarded_due_to_consecutive_handshake ( & self , bytes : u64 ) {
61+ self . prefetched_bytes_discarded_due_to_consecutive_handshake . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
62+ }
63+
64+ fn add_prefetched_bytes_discarded_due_to_invalid_frame_header ( & self , bytes : u64 ) {
65+ self . prefetched_bytes_discarded_due_to_invalid_frame_header . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
66+ }
67+
68+ fn add_synced_bytes_discarded_due_to_invalid_frame_headear ( & self , bytes : u64 ) {
69+ self . synced_bytes_discarded_due_to_invalid_frame_header . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
70+ }
71+
72+ fn add_prefetched_bytes_used ( & self , bytes : u64 ) {
73+ self . prefetched_bytes_used . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
74+ }
75+
76+ fn add_synced_bytes_used ( & self , bytes : u64 ) {
77+ self . synced_bytes_used . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
78+ }
79+ fn add_snapshot_bytes ( & self , bytes : u64 ) {
80+ self . snapshot_bytes . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
81+ }
82+ }
83+
2584/// A remote replicator client, that pulls frames over RPC
2685pub struct RemoteClient {
2786 remote : super :: client:: Client ,
@@ -38,6 +97,7 @@ pub struct RemoteClient {
3897 frames_latency_count : u128 ,
3998 snapshot_latency_sum : Duration ,
4099 snapshot_latency_count : u128 ,
100+ sync_stats : Arc < SyncStats > ,
41101}
42102
43103impl RemoteClient {
@@ -57,9 +117,14 @@ impl RemoteClient {
57117 frames_latency_count : 0 ,
58118 snapshot_latency_sum : Duration :: default ( ) ,
59119 snapshot_latency_count : 0 ,
120+ sync_stats : Arc :: new ( SyncStats :: new ( ) ) ,
60121 } )
61122 }
62123
124+ pub ( crate ) fn sync_stats ( & self ) -> Arc < SyncStats > {
125+ self . sync_stats . clone ( )
126+ }
127+
63128 fn next_offset ( & self ) -> FrameNo {
64129 match self . last_received {
65130 Some ( fno) => fno + 1 ,
@@ -89,7 +154,7 @@ impl RemoteClient {
89154 ) -> Result < bool , Error > {
90155 let hello = hello?. into_inner ( ) ;
91156 verify_session_token ( & hello. session_token ) . map_err ( Error :: Client ) ?;
92- let new_session = self . session_token != Some ( hello. session_token . clone ( ) ) ;
157+ let new_session = self . session_token . as_ref ( ) != Some ( & hello. session_token ) ;
93158 self . session_token = Some ( hello. session_token . clone ( ) ) ;
94159 let current_replication_index = hello. current_replication_index ;
95160 if let Err ( e) = self . meta . init_from_hello ( hello) {
@@ -107,8 +172,13 @@ impl RemoteClient {
107172 Ok ( new_session)
108173 }
109174
110- async fn do_handshake_with_prefetch ( & mut self ) -> ( Result < bool , Error > , Duration ) {
175+ async fn do_handshake_with_prefetch ( & mut self ) -> ( Result < ( ) , Error > , Duration ) {
111176 tracing:: info!( "Attempting to perform handshake with primary." ) ;
177+ if let Some ( ( Ok ( frames) , _) ) = & self . prefetched_batch_log_entries {
178+ // TODO: check if it's ok to just do 4096 * frames.len()
179+ let bytes = frames. get_ref ( ) . frames . iter ( ) . map ( |f| f. data . len ( ) as u64 ) . sum ( ) ;
180+ self . sync_stats . add_prefetched_bytes_discarded_due_to_consecutive_handshake ( bytes) ;
181+ }
112182 if self . dirty {
113183 self . prefetched_batch_log_entries = None ;
114184 self . meta . reset ( ) ;
@@ -135,30 +205,55 @@ impl RemoteClient {
135205 } else {
136206 ( hello_fut. await , None )
137207 } ;
208+ let mut prefetched_bytes = None ;
209+ if let Some ( ( Ok ( frames) , _) ) = & frames {
210+ let bytes = frames. get_ref ( ) . frames . iter ( ) . map ( |f| f. data . len ( ) as u64 ) . sum ( ) ;
211+ self . sync_stats . add_prefetched_bytes ( bytes) ;
212+ prefetched_bytes = Some ( bytes) ;
213+ }
138214 self . prefetched_batch_log_entries = if let Ok ( true ) = hello. 0 {
139215 tracing:: debug!(
140216 "Frames prefetching failed because of new session token returned by handshake"
141217 ) ;
218+ if let Some ( bytes) = prefetched_bytes {
219+ self . sync_stats . add_prefetched_bytes_discarded_due_to_new_session ( bytes) ;
220+ }
142221 None
143222 } else {
144223 frames
145224 } ;
146225
147- hello
226+ ( hello. 0 . map ( |_| ( ) ) , hello . 1 )
148227 }
149228
150229 async fn handle_next_frames_response (
151230 & mut self ,
152231 frames : Result < Response < Frames > , Status > ,
232+ prefetched : bool ,
153233 ) -> Result < <Self as ReplicatorClient >:: FrameStream , Error > {
154234 let frames = frames?. into_inner ( ) . frames ;
235+ let bytes = frames. iter ( ) . map ( |f| f. data . len ( ) as u64 ) . sum ( ) ;
155236
156237 if let Some ( f) = frames. last ( ) {
157- let header: FrameHeader = FrameHeader :: read_from_prefix ( & f. data )
238+ let header_result = FrameHeader :: read_from_prefix ( & f. data ) ;
239+ if header_result. is_none ( ) {
240+ if prefetched {
241+ self . sync_stats . add_prefetched_bytes_discarded_due_to_invalid_frame_header ( bytes) ;
242+ } else {
243+ self . sync_stats . add_synced_bytes_discarded_due_to_invalid_frame_headear ( bytes) ;
244+ }
245+ }
246+ let header: FrameHeader = header_result
158247 . ok_or_else ( || Error :: Internal ( "invalid frame header" . into ( ) ) ) ?;
159248 self . last_received = Some ( header. frame_no . get ( ) ) ;
160249 }
161250
251+ if prefetched {
252+ self . sync_stats . add_prefetched_bytes_used ( bytes) ;
253+ } else {
254+ self . sync_stats . add_synced_bytes_used ( bytes) ;
255+ }
256+
162257 let frames_iter = frames
163258 . into_iter ( )
164259 . map ( Ok ) ;
@@ -174,17 +269,18 @@ impl RemoteClient {
174269 Result < <Self as ReplicatorClient >:: FrameStream , Error > ,
175270 Duration ,
176271 ) {
177- let ( frames, time) = match self . prefetched_batch_log_entries . take ( ) {
178- Some ( ( result, time) ) => ( result, time) ,
272+ let ( ( frames, time) , prefetched ) = match self . prefetched_batch_log_entries . take ( ) {
273+ Some ( ( result, time) ) => ( ( result, time) , true ) ,
179274 None => {
180275 let req = self . make_request ( LogOffset {
181276 next_offset : self . next_offset ( ) ,
182277 wal_flavor : None ,
183278 } ) ;
184- time ( self . remote . replication . batch_log_entries ( req) ) . await
279+ let result = time ( self . remote . replication . batch_log_entries ( req) ) . await ;
280+ ( result, false )
185281 }
186282 } ;
187- let res = self . handle_next_frames_response ( frames) . await ;
283+ let res = self . handle_next_frames_response ( frames, prefetched ) . await ;
188284 ( res, time)
189285 }
190286
@@ -193,13 +289,18 @@ impl RemoteClient {
193289 next_offset : self . next_offset ( ) ,
194290 wal_flavor : None ,
195291 } ) ;
292+ let sync_stats = self . sync_stats . clone ( ) ;
196293 let mut frames = self
197294 . remote
198295 . replication
199296 . snapshot ( req)
200297 . await ?
201298 . into_inner ( )
202299 . map_err ( |e| e. into ( ) )
300+ . map_ok ( move |f| {
301+ sync_stats. add_snapshot_bytes ( f. data . len ( ) as u64 ) ;
302+ f
303+ } )
203304 . peekable ( ) ;
204305
205306 {
@@ -212,6 +313,7 @@ impl RemoteClient {
212313 }
213314 }
214315
316+
215317 Ok ( Box :: pin ( frames) )
216318 }
217319}
@@ -255,7 +357,7 @@ impl ReplicatorClient for RemoteClient {
255357 & result,
256358 "handshake" ,
257359 ) ;
258- result. map ( |_| ( ) )
360+ result
259361 }
260362
261363 /// Return a stream of frames to apply to the database
0 commit comments