11use std:: future:: Future ;
22use std:: path:: Path ;
33use std:: pin:: Pin ;
4+ use std:: sync:: Arc ;
5+ use std:: sync:: atomic:: AtomicU64 ;
46use std:: time:: { Duration , Instant } ;
57
68use bytes:: Bytes ;
@@ -22,6 +24,63 @@ async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
2224 ( out, before. elapsed ( ) )
2325}
2426
27+ struct SyncStats {
28+ pub prefetched_bytes : AtomicU64 ,
29+ pub prefetched_bytes_discarded_due_to_new_session : AtomicU64 ,
30+ pub prefetched_bytes_discarded_due_to_consecutive_handshake : AtomicU64 ,
31+ pub prefetched_bytes_discarded_due_to_invalid_frame_header : AtomicU64 ,
32+ pub synced_bytes_discarded_due_to_invalid_frame_header : AtomicU64 ,
33+ pub prefetched_bytes_used : AtomicU64 ,
34+ pub synced_bytes_used : AtomicU64 ,
35+ pub snapshot_bytes : AtomicU64 ,
36+ }
37+
38+ impl SyncStats {
39+ fn new ( ) -> Self {
40+ Self {
41+ prefetched_bytes : AtomicU64 :: new ( 0 ) ,
42+ prefetched_bytes_discarded_due_to_new_session : AtomicU64 :: new ( 0 ) ,
43+ prefetched_bytes_discarded_due_to_consecutive_handshake : AtomicU64 :: new ( 0 ) ,
44+ prefetched_bytes_discarded_due_to_invalid_frame_header : AtomicU64 :: new ( 0 ) ,
45+ synced_bytes_discarded_due_to_invalid_frame_header : AtomicU64 :: new ( 0 ) ,
46+ prefetched_bytes_used : AtomicU64 :: new ( 0 ) ,
47+ synced_bytes_used : AtomicU64 :: new ( 0 ) ,
48+ snapshot_bytes : AtomicU64 :: new ( 0 ) ,
49+ }
50+ }
51+
52+ fn add_prefetched_bytes ( & self , bytes : u64 ) {
53+ self . prefetched_bytes . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
54+ }
55+
56+ fn add_prefetched_bytes_discarded_due_to_new_session ( & self , bytes : u64 ) {
57+ self . prefetched_bytes_discarded_due_to_new_session . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
58+ }
59+
60+ fn add_prefetched_bytes_discarded_due_to_consecutive_handshake ( & self , bytes : u64 ) {
61+ self . prefetched_bytes_discarded_due_to_consecutive_handshake . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
62+ }
63+
64+ fn add_prefetched_bytes_discarded_due_to_invalid_frame_header ( & self , bytes : u64 ) {
65+ self . prefetched_bytes_discarded_due_to_invalid_frame_header . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
66+ }
67+
68+ fn add_synced_bytes_discarded_due_to_invalid_frame_headear ( & self , bytes : u64 ) {
69+ self . synced_bytes_discarded_due_to_invalid_frame_header . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
70+ }
71+
72+ fn add_prefetched_bytes_used ( & self , bytes : u64 ) {
73+ self . prefetched_bytes_used . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
74+ }
75+
76+ fn add_synced_bytes_used ( & self , bytes : u64 ) {
77+ self . synced_bytes_used . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
78+ }
79+ fn add_snapshot_bytes ( & self , bytes : u64 ) {
80+ self . snapshot_bytes . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
81+ }
82+ }
83+
2584/// A remote replicator client, that pulls frames over RPC
2685pub struct RemoteClient {
2786 remote : super :: client:: Client ,
@@ -38,6 +97,7 @@ pub struct RemoteClient {
3897 frames_latency_count : u128 ,
3998 snapshot_latency_sum : Duration ,
4099 snapshot_latency_count : u128 ,
100+ sync_stats : Arc < SyncStats > ,
41101}
42102
43103impl RemoteClient {
@@ -57,6 +117,7 @@ impl RemoteClient {
57117 frames_latency_count : 0 ,
58118 snapshot_latency_sum : Duration :: default ( ) ,
59119 snapshot_latency_count : 0 ,
120+ sync_stats : Arc :: new ( SyncStats :: new ( ) ) ,
60121 } )
61122 }
62123
@@ -109,6 +170,11 @@ impl RemoteClient {
109170
110171 async fn do_handshake_with_prefetch ( & mut self ) -> ( Result < ( ) , Error > , Duration ) {
111172 tracing:: info!( "Attempting to perform handshake with primary." ) ;
173+ if let Some ( ( Ok ( frames) , _) ) = & self . prefetched_batch_log_entries {
174+ // TODO: check if it's ok to just do 4096 * frames.len()
175+ let bytes = frames. get_ref ( ) . frames . iter ( ) . map ( |f| f. data . len ( ) as u64 ) . sum ( ) ;
176+ self . sync_stats . add_prefetched_bytes_discarded_due_to_consecutive_handshake ( bytes) ;
177+ }
112178 if self . dirty {
113179 self . prefetched_batch_log_entries = None ;
114180 self . meta . reset ( ) ;
@@ -135,10 +201,19 @@ impl RemoteClient {
135201 } else {
136202 ( hello_fut. await , None )
137203 } ;
204+ let mut prefetched_bytes = None ;
205+ if let Some ( ( Ok ( frames) , _) ) = & frames {
206+ let bytes = frames. get_ref ( ) . frames . iter ( ) . map ( |f| f. data . len ( ) as u64 ) . sum ( ) ;
207+ self . sync_stats . add_prefetched_bytes ( bytes) ;
208+ prefetched_bytes = Some ( bytes) ;
209+ }
138210 self . prefetched_batch_log_entries = if let Ok ( true ) = hello. 0 {
139211 tracing:: debug!(
140212 "Frames prefetching failed because of new session token returned by handshake"
141213 ) ;
214+ if let Some ( bytes) = prefetched_bytes {
215+ self . sync_stats . add_prefetched_bytes_discarded_due_to_new_session ( bytes) ;
216+ }
142217 None
143218 } else {
144219 frames
@@ -150,15 +225,31 @@ impl RemoteClient {
150225 async fn handle_next_frames_response (
151226 & mut self ,
152227 frames : Result < Response < Frames > , Status > ,
228+ prefetched : bool ,
153229 ) -> Result < <Self as ReplicatorClient >:: FrameStream , Error > {
154230 let frames = frames?. into_inner ( ) . frames ;
231+ let bytes = frames. iter ( ) . map ( |f| f. data . len ( ) as u64 ) . sum ( ) ;
155232
156233 if let Some ( f) = frames. last ( ) {
157- let header: FrameHeader = FrameHeader :: read_from_prefix ( & f. data )
234+ let header_result = FrameHeader :: read_from_prefix ( & f. data ) ;
235+ if header_result. is_none ( ) {
236+ if prefetched {
237+ self . sync_stats . add_prefetched_bytes_discarded_due_to_invalid_frame_header ( bytes) ;
238+ } else {
239+ self . sync_stats . add_synced_bytes_discarded_due_to_invalid_frame_headear ( bytes) ;
240+ }
241+ }
242+ let header: FrameHeader = header_result
158243 . ok_or_else ( || Error :: Internal ( "invalid frame header" . into ( ) ) ) ?;
159244 self . last_received = Some ( header. frame_no . get ( ) ) ;
160245 }
161246
247+ if prefetched {
248+ self . sync_stats . add_prefetched_bytes_used ( bytes) ;
249+ } else {
250+ self . sync_stats . add_synced_bytes_used ( bytes) ;
251+ }
252+
162253 let frames_iter = frames
163254 . into_iter ( )
164255 . map ( Ok ) ;
@@ -174,17 +265,18 @@ impl RemoteClient {
174265 Result < <Self as ReplicatorClient >:: FrameStream , Error > ,
175266 Duration ,
176267 ) {
177- let ( frames, time) = match self . prefetched_batch_log_entries . take ( ) {
178- Some ( ( result, time) ) => ( result, time) ,
268+ let ( ( frames, time) , prefetched ) = match self . prefetched_batch_log_entries . take ( ) {
269+ Some ( ( result, time) ) => ( ( result, time) , true ) ,
179270 None => {
180271 let req = self . make_request ( LogOffset {
181272 next_offset : self . next_offset ( ) ,
182273 wal_flavor : None ,
183274 } ) ;
184- time ( self . remote . replication . batch_log_entries ( req) ) . await
275+ let result = time ( self . remote . replication . batch_log_entries ( req) ) . await ;
276+ ( result, false )
185277 }
186278 } ;
187- let res = self . handle_next_frames_response ( frames) . await ;
279+ let res = self . handle_next_frames_response ( frames, prefetched ) . await ;
188280 ( res, time)
189281 }
190282
@@ -193,13 +285,18 @@ impl RemoteClient {
193285 next_offset : self . next_offset ( ) ,
194286 wal_flavor : None ,
195287 } ) ;
288+ let sync_stats = self . sync_stats . clone ( ) ;
196289 let mut frames = self
197290 . remote
198291 . replication
199292 . snapshot ( req)
200293 . await ?
201294 . into_inner ( )
202295 . map_err ( |e| e. into ( ) )
296+ . map_ok ( move |f| {
297+ sync_stats. add_snapshot_bytes ( f. data . len ( ) as u64 ) ;
298+ f
299+ } )
203300 . peekable ( ) ;
204301
205302 {
@@ -212,6 +309,7 @@ impl RemoteClient {
212309 }
213310 }
214311
312+
215313 Ok ( Box :: pin ( frames) )
216314 }
217315}
0 commit comments