@@ -33,7 +33,7 @@ pub struct Database {
3333 #[ cfg( feature = "replication" ) ]
3434 pub replication_ctx : Option < ReplicationContext > ,
3535 #[ cfg( feature = "sync" ) ]
36- pub sync_ctx : Option < SyncContext > ,
36+ pub sync_ctx : Option < tokio :: sync :: Mutex < SyncContext > > ,
3737}
3838
3939impl Database {
@@ -143,7 +143,10 @@ impl Database {
143143 endpoint
144144 } ;
145145 let mut db = Database :: open ( & db_path, flags) ?;
146- db. sync_ctx = Some ( SyncContext :: new ( endpoint, Some ( auth_token) ) ) ;
146+ db. sync_ctx = Some ( tokio:: sync:: Mutex :: new ( SyncContext :: new (
147+ endpoint,
148+ Some ( auth_token) ,
149+ ) ) ) ;
147150 Ok ( db)
148151 }
149152
@@ -383,7 +386,7 @@ impl Database {
383386 #[ cfg( feature = "sync" ) ]
384387 /// Push WAL frames to remote.
385388 pub async fn push ( & self ) -> Result < crate :: database:: Replicated > {
386- let sync_ctx = self . sync_ctx . as_ref ( ) . unwrap ( ) ;
389+ let sync_ctx = self . sync_ctx . as_ref ( ) . unwrap ( ) . lock ( ) . await ;
387390 let conn = self . connect ( ) ?;
388391
389392 let page_size = {
@@ -398,17 +401,20 @@ impl Database {
398401 let max_frame_no = conn. wal_frame_count ( ) ;
399402
400403 let generation = 1 ; // TODO: Probe from WAL.
401- let start_frame_no = sync_ctx. durable_frame_num + 1 ;
404+ let start_frame_no = sync_ctx. durable_frame_num ( ) + 1 ;
402405 let end_frame_no = max_frame_no;
403406
404407 let mut frame_no = start_frame_no;
405408 while frame_no <= end_frame_no {
409+ let frame = conn. wal_get_frame ( frame_no, page_size) ?;
410+
406411 // The server returns its maximum frame number. To avoid resending
407412 // frames the server already knows about, we need to update the
408413 // frame number to the one returned by the server.
409- let max_frame_no = self
410- . push_one_frame ( & conn , & sync_ctx , generation, frame_no, page_size )
414+ let max_frame_no = sync_ctx
415+ . push_one_frame ( frame . to_vec ( ) , generation, frame_no)
411416 . await ?;
417+
412418 if max_frame_no > frame_no {
413419 frame_no = max_frame_no;
414420 }
@@ -422,72 +428,6 @@ impl Database {
422428 } )
423429 }
424430
425- #[ cfg( feature = "sync" ) ]
426- async fn push_one_frame (
427- & self ,
428- conn : & Connection ,
429- sync_ctx : & SyncContext ,
430- generation : u32 ,
431- frame_no : u32 ,
432- page_size : u32 ,
433- ) -> Result < u32 > {
434- let frame = conn. wal_get_frame ( frame_no, page_size) ?;
435-
436- let uri = format ! (
437- "{}/sync/{}/{}/{}" ,
438- sync_ctx. sync_url,
439- generation,
440- frame_no,
441- frame_no + 1
442- ) ;
443- let max_frame_no = self
444- . push_with_retry (
445- uri,
446- & sync_ctx. auth_token ,
447- frame. to_vec ( ) ,
448- sync_ctx. max_retries ,
449- )
450- . await ?;
451- Ok ( max_frame_no)
452- }
453-
454- #[ cfg( feature = "sync" ) ]
455- async fn push_with_retry (
456- & self ,
457- uri : String ,
458- auth_token : & Option < String > ,
459- frame : Vec < u8 > ,
460- max_retries : usize ,
461- ) -> Result < u32 > {
462- let mut nr_retries = 0 ;
463- loop {
464- let client = reqwest:: Client :: new ( ) ;
465- let mut builder = client. post ( uri. to_owned ( ) ) ;
466- match auth_token {
467- Some ( ref auth_token) => {
468- builder = builder
469- . header ( "Authorization" , format ! ( "Bearer {}" , auth_token. to_owned( ) ) ) ;
470- }
471- None => { }
472- }
473- let res = builder. body ( frame. to_vec ( ) ) . send ( ) . await . unwrap ( ) ;
474- if res. status ( ) . is_success ( ) {
475- let resp = res. json :: < serde_json:: Value > ( ) . await . unwrap ( ) ;
476- let max_frame_no = resp. get ( "max_frame_no" ) . unwrap ( ) . as_u64 ( ) . unwrap ( ) ;
477- return Ok ( max_frame_no as u32 ) ;
478- }
479- if nr_retries > max_retries {
480- return Err ( crate :: errors:: Error :: ConnectionFailed ( format ! (
481- "Failed to push frame: {}" ,
482- res. status( )
483- ) ) ) ;
484- }
485- let delay = std:: time:: Duration :: from_millis ( 100 * ( 1 << nr_retries) ) ;
486- tokio:: time:: sleep ( delay) . await ;
487- nr_retries += 1 ;
488- }
489- }
490-
491431 pub ( crate ) fn path ( & self ) -> & str {
492432 & self . db_path
493433 }
0 commit comments