@@ -129,11 +129,12 @@ pub(crate) static BLOCKING_RT: Lazy<Runtime> = Lazy::new(|| {
129129type Result < T , E = Error > = std:: result:: Result < T , E > ;
130130type StatsSender = mpsc:: Sender < ( NamespaceName , MetaStoreHandle , Weak < Stats > ) > ;
131131type MakeReplicationSvc = Box <
132- dyn FnOnce (
132+ dyn Fn (
133133 NamespaceStore ,
134134 Option < Auth > ,
135135 Option < IdleShutdownKicker > ,
136136 bool ,
137+ bool ,
137138 ) -> BoxReplicationService
138139 + Send
139140 + ' static ,
@@ -620,17 +621,18 @@ where
620621
621622 let replication_service = make_replication_svc (
622623 namespace_store. clone ( ) ,
623- None ,
624+ Some ( user_auth_strategy . clone ( ) ) ,
624625 idle_shutdown_kicker. clone ( ) ,
625626 false ,
627+ true ,
626628 ) ;
627629
628630 task_manager. spawn_until_shutdown ( run_rpc_server (
629631 proxy_service,
630632 config. acceptor ,
631633 config. tls_config ,
632634 idle_shutdown_kicker. clone ( ) ,
633- replication_service,
635+ replication_service, // internal replicaton service
634636 ) ) ;
635637 }
636638
@@ -658,12 +660,12 @@ where
658660 . await ?;
659661 }
660662
661- let replication_svc = ReplicationLogService :: new (
663+ let replication_svc = make_replication_svc (
662664 namespace_store. clone ( ) ,
663- idle_shutdown_kicker. clone ( ) ,
664665 Some ( user_auth_strategy. clone ( ) ) ,
665- self . disable_namespaces ,
666+ idle_shutdown_kicker . clone ( ) ,
666667 true ,
668+ false , // external replication service
667669 ) ;
668670
669671 let proxy_svc = ProxyService :: new (
@@ -936,9 +938,9 @@ where
936938 let make_replication_svc = Box :: new ( {
937939 let registry = registry. clone ( ) ;
938940 let disable_namespaces = self . disable_namespaces ;
939- move |store, user_auth, _, _| -> BoxReplicationService {
941+ move |store, user_auth, _, _, _ | -> BoxReplicationService {
940942 Box :: new ( LibsqlReplicationService :: new (
941- registry,
943+ registry. clone ( ) ,
942944 store,
943945 user_auth,
944946 disable_namespaces,
@@ -1023,13 +1025,19 @@ where
10231025
10241026 let make_replication_svc = Box :: new ( {
10251027 let disable_namespaces = self . disable_namespaces ;
1026- move |store, client_auth, idle_shutdown, collect_stats| -> BoxReplicationService {
1028+ move |store,
1029+ client_auth,
1030+ idle_shutdown,
1031+ collect_stats,
1032+ is_internal|
1033+ -> BoxReplicationService {
10271034 Box :: new ( ReplicationLogService :: new (
10281035 store,
10291036 idle_shutdown,
10301037 client_auth,
10311038 disable_namespaces,
10321039 collect_stats,
1040+ is_internal,
10331041 ) )
10341042 }
10351043 } ) ;
@@ -1055,13 +1063,19 @@ where
10551063
10561064 let make_replication_svc = Box :: new ( {
10571065 let disable_namespaces = self . disable_namespaces ;
1058- move |store, client_auth, idle_shutdown, collect_stats| -> BoxReplicationService {
1066+ move |store,
1067+ client_auth,
1068+ idle_shutdown,
1069+ collect_stats,
1070+ is_internal|
1071+ -> BoxReplicationService {
10591072 Box :: new ( ReplicationLogService :: new (
10601073 store,
10611074 idle_shutdown,
10621075 client_auth,
10631076 disable_namespaces,
10641077 collect_stats,
1078+ is_internal,
10651079 ) )
10661080 }
10671081 } ) ;
0 commit comments