@@ -28,6 +28,7 @@ use auth::Auth;
2828use config:: {
2929 AdminApiConfig , DbConfig , HeartbeatConfig , RpcClientConfig , RpcServerConfig , UserApiConfig ,
3030} ;
31+ use futures:: future:: { pending, ready} ;
3132use futures:: Future ;
3233use http:: user:: UserApi ;
3334use hyper:: client:: HttpConnector ;
@@ -40,6 +41,10 @@ use libsql_sys::wal::either::Either as EitherWAL;
4041#[ cfg( feature = "durable-wal" ) ]
4142use libsql_sys:: wal:: either:: Either3 as EitherWAL ;
4243use libsql_sys:: wal:: Sqlite3WalManager ;
44+ use libsql_wal:: checkpointer:: LibsqlCheckpointer ;
45+ use libsql_wal:: registry:: WalRegistry ;
46+ use libsql_wal:: storage:: NoStorage ;
47+ use libsql_wal:: wal:: LibsqlWalManager ;
4348use namespace:: meta_store:: MetaStoreHandle ;
4449use namespace:: NamespaceName ;
4550use net:: Connector ;
@@ -458,9 +463,10 @@ where
458463 let configurators = self
459464 . make_configurators (
460465 base_config,
461- scripted_backup,
462- scheduler_sender. into ( ) ,
463466 client_config. clone ( ) ,
467+ & mut join_set,
468+ scheduler_sender. into ( ) ,
469+ scripted_backup,
464470 )
465471 . await ?;
466472
@@ -596,7 +602,6 @@ where
596602 self . disable_namespaces ,
597603 ) ;
598604
599- dbg ! ( ) ;
600605 self . make_services (
601606 namespace_store. clone ( ) ,
602607 idle_shutdown_kicker,
@@ -647,42 +652,125 @@ where
647652 async fn make_configurators (
648653 & self ,
649654 base_config : BaseNamespaceConfig ,
650- scripted_backup : Option < ScriptBackupManager > ,
651- migration_scheduler_handle : SchedulerHandle ,
652655 client_config : Option < ( Channel , Uri ) > ,
656+ join_set : & mut JoinSet < anyhow:: Result < ( ) > > ,
657+ migration_scheduler_handle : SchedulerHandle ,
658+ scripted_backup : Option < ScriptBackupManager > ,
653659 ) -> anyhow:: Result < NamespaceConfigurators > {
660+ let wal_path = base_config. base_path . join ( "wals" ) ;
661+ let enable_libsql_wal_test = {
662+ let is_primary = self . rpc_server_config . is_some ( ) ;
663+ let is_libsql_wal_test = std:: env:: var ( "LIBSQL_WAL_TEST" ) . is_ok ( ) ;
664+ is_primary && is_libsql_wal_test
665+ } ;
666+ let use_libsql_wal =
667+ self . use_custom_wal == Some ( CustomWAL :: LibsqlWal ) || enable_libsql_wal_test;
668+ if !use_libsql_wal {
669+ if wal_path. try_exists ( ) ? {
670+ anyhow:: bail!( "database was previously setup to use libsql-wal" ) ;
671+ }
672+ }
673+
674+ if self . use_custom_wal . is_some ( ) {
675+ if self . db_config . bottomless_replication . is_some ( ) {
676+ anyhow:: bail!( "bottomless not supported with custom WAL" ) ;
677+ }
678+ if self . rpc_client_config . is_some ( ) {
679+ anyhow:: bail!( "custom WAL not supported in replica mode" ) ;
680+ }
681+ }
682+
654683 match self . use_custom_wal {
655- Some ( CustomWAL :: LibsqlWal ) => self . libsql_wal_configurators ( ) ,
684+ Some ( CustomWAL :: LibsqlWal ) => self . libsql_wal_configurators (
685+ base_config,
686+ client_config,
687+ join_set,
688+ migration_scheduler_handle,
689+ scripted_backup,
690+ wal_path,
691+ ) ,
656692 #[ cfg( feature = "durable-wal" ) ]
657693 Some ( CustomWAL :: DurableWal ) => self . durable_wal_configurators (
658694 base_config,
659- scripted_backup,
660- migration_scheduler_handle,
661695 client_config,
696+ migration_scheduler_handle,
697+ scripted_backup,
662698 ) ,
663699 None => {
664700 self . legacy_configurators (
665701 base_config,
666- scripted_backup,
667- migration_scheduler_handle,
668702 client_config,
703+ migration_scheduler_handle,
704+ scripted_backup,
669705 )
670706 . await
671707 }
672708 }
673709 }
674710
675- fn libsql_wal_configurators ( & self ) -> anyhow:: Result < NamespaceConfigurators > {
676- todo ! ( )
711+ fn libsql_wal_configurators (
712+ & self ,
713+ base_config : BaseNamespaceConfig ,
714+ client_config : Option < ( Channel , Uri ) > ,
715+ join_set : & mut JoinSet < anyhow:: Result < ( ) > > ,
716+ migration_scheduler_handle : SchedulerHandle ,
717+ scripted_backup : Option < ScriptBackupManager > ,
718+ wal_path : PathBuf ,
719+ ) -> anyhow:: Result < NamespaceConfigurators > {
720+ tracing:: info!( "using libsql wal" ) ;
721+ let ( sender, receiver) = tokio:: sync:: mpsc:: channel ( 64 ) ;
722+ let registry = Arc :: new ( WalRegistry :: new ( wal_path, NoStorage , sender) ?) ;
723+ let checkpointer = LibsqlCheckpointer :: new ( registry. clone ( ) , receiver, 8 ) ;
724+ self . spawn_until_shutdown_on ( join_set, async move {
725+ checkpointer. run ( ) . await ;
726+ Ok ( ( ) )
727+ } ) ;
728+
729+ let namespace_resolver = |path : & Path | {
730+ NamespaceName :: from_string (
731+ path. parent ( )
732+ . unwrap ( )
733+ . file_name ( )
734+ . unwrap ( )
735+ . to_str ( )
736+ . unwrap ( )
737+ . to_string ( ) ,
738+ )
739+ . unwrap ( )
740+ . into ( )
741+ } ;
742+ let wal = LibsqlWalManager :: new ( registry. clone ( ) , Arc :: new ( namespace_resolver) ) ;
743+
744+ self . spawn_until_shutdown_with_teardown ( join_set, pending ( ) , async move {
745+ registry. shutdown ( ) . await ?;
746+ Ok ( ( ) )
747+ } ) ;
748+
749+ let make_wal_manager = Arc :: new ( move || EitherWAL :: B ( wal. clone ( ) ) ) ;
750+ let mut configurators = NamespaceConfigurators :: empty ( ) ;
751+
752+ match client_config {
753+ Some ( _) => todo ! ( "configure replica" ) ,
754+ // configure primary
755+ None => self . configure_primary_common (
756+ base_config,
757+ & mut configurators,
758+ make_wal_manager,
759+ migration_scheduler_handle,
760+ scripted_backup,
761+ ) ,
762+ }
763+
764+ Ok ( configurators)
677765 }
678766
679767 #[ cfg( feature = "durable-wal" ) ]
680768 fn durable_wal_configurators (
681769 & self ,
682770 base_config : BaseNamespaceConfig ,
683- scripted_backup : Option < ScriptBackupManager > ,
684- migration_scheduler_handle : SchedulerHandle ,
685771 client_config : Option < ( Channel , Uri ) > ,
772+ migration_scheduler_handle : SchedulerHandle ,
773+ scripted_backup : Option < ScriptBackupManager > ,
686774 ) -> anyhow:: Result < NamespaceConfigurators > {
687775 tracing:: info!( "using durable wal" ) ;
688776 let lock_manager = Arc :: new ( std:: sync:: Mutex :: new ( LockManager :: new ( ) ) ) ;
@@ -706,22 +794,37 @@ where
706794 ) ;
707795 let make_wal_manager = Arc :: new ( move || EitherWAL :: C ( wal. clone ( ) ) ) ;
708796 self . configurators_common (
709- client_config,
710797 base_config,
798+ client_config,
711799 make_wal_manager,
712- scripted_backup,
713800 migration_scheduler_handle,
801+ scripted_backup,
714802 )
715803 }
716804
717805 fn spawn_until_shutdown_on < F > ( & self , join_set : & mut JoinSet < anyhow:: Result < ( ) > > , fut : F )
718806 where
719807 F : Future < Output = anyhow:: Result < ( ) > > + Send + ' static ,
808+ {
809+ self . spawn_until_shutdown_with_teardown ( join_set, fut, ready ( Ok ( ( ) ) ) )
810+ }
811+
812+ /// run the passed future until shutdown is called, then call the passed teardown future
813+ fn spawn_until_shutdown_with_teardown < F , T > (
814+ & self ,
815+ join_set : & mut JoinSet < anyhow:: Result < ( ) > > ,
816+ fut : F ,
817+ teardown : T ,
818+ ) where
819+ F : Future < Output = anyhow:: Result < ( ) > > + Send + ' static ,
820+ T : Future < Output = anyhow:: Result < ( ) > > + Send + ' static ,
720821 {
721822 let shutdown = self . shutdown . clone ( ) ;
722823 join_set. spawn ( async move {
723824 tokio:: select! {
724- _ = shutdown. notified( ) => Ok ( ( ) ) ,
825+ _ = shutdown. notified( ) => {
826+ teardown. await
827+ } ,
725828 ret = fut => ret
726829 }
727830 } ) ;
@@ -730,30 +833,29 @@ where
730833 async fn legacy_configurators (
731834 & self ,
732835 base_config : BaseNamespaceConfig ,
733- scripted_backup : Option < ScriptBackupManager > ,
734- migration_scheduler_handle : SchedulerHandle ,
735836 client_config : Option < ( Channel , Uri ) > ,
837+ migration_scheduler_handle : SchedulerHandle ,
838+ scripted_backup : Option < ScriptBackupManager > ,
736839 ) -> anyhow:: Result < NamespaceConfigurators > {
737840 let make_wal_manager = Arc :: new ( || EitherWAL :: A ( Sqlite3WalManager :: default ( ) ) ) ;
738841 self . configurators_common (
739- client_config,
740842 base_config,
843+ client_config,
741844 make_wal_manager,
742- scripted_backup,
743845 migration_scheduler_handle,
846+ scripted_backup,
744847 )
745848 }
746849
747850 fn configurators_common (
748851 & self ,
749- client_config : Option < ( Channel , Uri ) > ,
750852 base_config : BaseNamespaceConfig ,
853+ client_config : Option < ( Channel , Uri ) > ,
751854 make_wal_manager : Arc < dyn Fn ( ) -> InnerWalManager + Sync + Send + ' static > ,
752- scripted_backup : Option < ScriptBackupManager > ,
753855 migration_scheduler_handle : SchedulerHandle ,
856+ scripted_backup : Option < ScriptBackupManager > ,
754857 ) -> anyhow:: Result < NamespaceConfigurators > {
755858 let mut configurators = NamespaceConfigurators :: empty ( ) ;
756-
757859 match client_config {
758860 // replica mode
759861 Some ( ( channel, uri) ) => {
@@ -762,34 +864,49 @@ where
762864 configurators. with_replica ( replica_configurator) ;
763865 }
764866 // primary mode
765- None => {
766- let primary_config = PrimaryExtraConfig {
767- max_log_size : self . db_config . max_log_size ,
768- max_log_duration : self . db_config . max_log_duration . map ( Duration :: from_secs_f32 ) ,
769- bottomless_replication : self . db_config . bottomless_replication . clone ( ) ,
770- scripted_backup,
771- checkpoint_interval : self . db_config . checkpoint_interval ,
772- } ;
867+ None => self . configure_primary_common (
868+ base_config ,
869+ & mut configurators ,
870+ make_wal_manager ,
871+ migration_scheduler_handle ,
872+ scripted_backup,
873+ ) ,
874+ }
773875
774- let primary_configurator = PrimaryConfigurator :: new (
775- base_config. clone ( ) ,
776- primary_config. clone ( ) ,
777- make_wal_manager. clone ( ) ,
778- ) ;
876+ Ok ( configurators)
877+ }
779878
780- let schema_configurator = SchemaConfigurator :: new (
781- base_config. clone ( ) ,
782- primary_config,
783- make_wal_manager. clone ( ) ,
784- migration_scheduler_handle,
785- ) ;
879+ fn configure_primary_common (
880+ & self ,
881+ base_config : BaseNamespaceConfig ,
882+ configurators : & mut NamespaceConfigurators ,
883+ make_wal_manager : Arc < dyn Fn ( ) -> InnerWalManager + Sync + Send + ' static > ,
884+ migration_scheduler_handle : SchedulerHandle ,
885+ scripted_backup : Option < ScriptBackupManager > ,
886+ ) {
887+ let primary_config = PrimaryExtraConfig {
888+ max_log_size : self . db_config . max_log_size ,
889+ max_log_duration : self . db_config . max_log_duration . map ( Duration :: from_secs_f32) ,
890+ bottomless_replication : self . db_config . bottomless_replication . clone ( ) ,
891+ scripted_backup,
892+ checkpoint_interval : self . db_config . checkpoint_interval ,
893+ } ;
786894
787- configurators. with_schema ( schema_configurator) ;
788- configurators. with_primary ( primary_configurator) ;
789- }
790- }
895+ let primary_configurator = PrimaryConfigurator :: new (
896+ base_config. clone ( ) ,
897+ primary_config. clone ( ) ,
898+ make_wal_manager. clone ( ) ,
899+ ) ;
791900
792- Ok ( configurators)
901+ let schema_configurator = SchemaConfigurator :: new (
902+ base_config. clone ( ) ,
903+ primary_config,
904+ make_wal_manager. clone ( ) ,
905+ migration_scheduler_handle,
906+ ) ;
907+
908+ configurators. with_schema ( schema_configurator) ;
909+ configurators. with_primary ( primary_configurator) ;
793910 }
794911
795912 fn setup_shutdown ( & self ) -> Option < IdleShutdownKicker > {
0 commit comments