1+ use std:: pin:: Pin ;
12use std:: sync:: Arc ;
23
4+ use futures:: Future ;
35use futures_core:: future:: BoxFuture ;
46use futures_core:: Stream ;
57use libsql_replication:: rpc:: proxy:: proxy_client:: ProxyClient ;
@@ -8,7 +10,7 @@ use libsql_replication::rpc::proxy::{
810} ;
911use libsql_sys:: EncryptionConfig ;
1012use parking_lot:: Mutex as PMutex ;
11- use tokio:: sync:: { mpsc, watch , Mutex } ;
13+ use tokio:: sync:: { mpsc, Mutex } ;
1214use tokio_stream:: StreamExt ;
1315use tonic:: transport:: Channel ;
1416use tonic:: { Request , Streaming } ;
@@ -22,16 +24,21 @@ use crate::replication::FrameNo;
2224use crate :: stats:: Stats ;
2325use crate :: { Result , DEFAULT_AUTO_CHECKPOINT } ;
2426
27+ use super :: connection_core:: GetCurrentFrameNo ;
2528use super :: program:: DescribeResponse ;
2629use super :: { Connection , RequestContext } ;
2730use super :: { MakeConnection , Program } ;
2831
2932pub type RpcStream = Streaming < ExecResp > ;
33+ pub type WaitForFrameNo = Arc <
34+ dyn Fn ( FrameNo ) -> Pin < Box < dyn Future < Output = ( ) > + Send + ' static > > + Send + ' static + Sync ,
35+ > ;
3036
3137pub struct MakeWriteProxyConn < M > {
3238 client : ProxyClient < Channel > ,
3339 stats : Arc < Stats > ,
34- applied_frame_no_receiver : watch:: Receiver < Option < FrameNo > > ,
40+ wait_for_frame_no : WaitForFrameNo ,
41+ get_current_frame_no : GetCurrentFrameNo ,
3542 max_response_size : u64 ,
3643 max_total_response_size : u64 ,
3744 primary_replication_index : Option < FrameNo > ,
@@ -46,23 +53,25 @@ impl<M> MakeWriteProxyConn<M> {
4653 channel : Channel ,
4754 uri : tonic:: transport:: Uri ,
4855 stats : Arc < Stats > ,
49- applied_frame_no_receiver : watch :: Receiver < Option < FrameNo > > ,
56+ wait_for_frame_no : WaitForFrameNo ,
5057 max_response_size : u64 ,
5158 max_total_response_size : u64 ,
5259 primary_replication_index : Option < FrameNo > ,
5360 encryption_config : Option < EncryptionConfig > ,
5461 make_read_only_conn : M ,
62+ get_current_frame_no : GetCurrentFrameNo ,
5563 ) -> Self {
5664 let client = ProxyClient :: with_origin ( channel, uri) ;
5765 Self {
5866 client,
5967 stats,
60- applied_frame_no_receiver ,
68+ wait_for_frame_no ,
6169 max_response_size,
6270 max_total_response_size,
6371 make_read_only_conn,
6472 primary_replication_index,
6573 encryption_config,
74+ get_current_frame_no,
6675 }
6776 }
6877}
@@ -77,14 +86,15 @@ where
7786 Ok ( WriteProxyConnection :: new (
7887 self . client . clone ( ) ,
7988 self . stats . clone ( ) ,
80- self . applied_frame_no_receiver . clone ( ) ,
89+ self . wait_for_frame_no . clone ( ) ,
8190 QueryBuilderConfig {
8291 max_size : Some ( self . max_response_size ) ,
8392 max_total_size : Some ( self . max_total_response_size ) ,
8493 auto_checkpoint : DEFAULT_AUTO_CHECKPOINT ,
8594 encryption_config : self . encryption_config . clone ( ) ,
8695 } ,
8796 self . primary_replication_index ,
97+ self . get_current_frame_no . clone ( ) ,
8898 self . make_read_only_conn . create ( ) . await ?,
8999 ) ?)
90100 }
@@ -99,8 +109,9 @@ pub struct WriteProxyConnection<R, C> {
99109 /// any subsequent read on this connection must wait for the replicator to catch up with this
100110 /// frame_no
101111 last_write_frame_no : PMutex < Option < FrameNo > > ,
102- /// Notifier from the repliator of the currently applied frameno
103- applied_frame_no_receiver : watch:: Receiver < Option < FrameNo > > ,
112+ /// Notifier from the replicator of the currently applied frame_no
113+ wait_for_frame_no : WaitForFrameNo ,
114+ get_current_frame_no : GetCurrentFrameNo ,
104115 builder_config : QueryBuilderConfig ,
105116 stats : Arc < Stats > ,
106117
@@ -114,21 +125,23 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
114125 fn new (
115126 write_proxy : ProxyClient < Channel > ,
116127 stats : Arc < Stats > ,
117- applied_frame_no_receiver : watch :: Receiver < Option < FrameNo > > ,
128+ wait_for_frame_no : WaitForFrameNo ,
118129 builder_config : QueryBuilderConfig ,
119130 primary_replication_index : Option < u64 > ,
131+ get_current_frame_no : GetCurrentFrameNo ,
120132 read_conn : C ,
121133 ) -> Result < Self > {
122134 Ok ( Self {
123135 read_conn,
124136 write_proxy,
125137 state : Mutex :: new ( TxnStatus :: Init ) ,
126138 last_write_frame_no : Default :: default ( ) ,
127- applied_frame_no_receiver ,
139+ wait_for_frame_no ,
128140 builder_config,
129141 stats,
130142 remote_conn : Default :: default ( ) ,
131143 primary_replication_index,
144+ get_current_frame_no,
132145 } )
133146 }
134147
@@ -199,15 +212,7 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
199212 let current_fno = replication_index. or_else ( || * self . last_write_frame_no . lock ( ) ) ;
200213 match current_fno {
201214 Some ( current_frame_no) => {
202- let mut receiver = self . applied_frame_no_receiver . clone ( ) ;
203- receiver
204- . wait_for ( |last_applied| match last_applied {
205- Some ( x) => * x >= current_frame_no,
206- None => true ,
207- } )
208- . await
209- . map_err ( |_| Error :: ReplicatorExited ) ?;
210-
215+ ( self . wait_for_frame_no ) ( current_frame_no) . await ;
211216 Ok ( ( ) )
212217 }
213218 None => Ok ( ( ) ) ,
@@ -219,7 +224,7 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
219224 fn should_proxy ( & self ) -> bool {
220225 // There primary has data
221226 if let Some ( primary_index) = self . primary_replication_index {
222- let last_applied = * self . applied_frame_no_receiver . borrow ( ) ;
227+ let last_applied = ( self . get_current_frame_no ) ( ) ;
223228 // if we either don't have data while the primary has, or the data we have is
224229 // anterior to that of the primary when we loaded the namespace, then proxy the
225230 // request to the primary
0 commit comments