@@ -6,26 +6,29 @@ use std::time::Duration;
66use anyhow:: Context as _;
77use bottomless:: replicator:: Options ;
88use bytes:: Bytes ;
9+ use enclose:: enclose;
910use futures:: Stream ;
1011use libsql_sys:: wal:: Sqlite3WalManager ;
1112use tokio:: io:: AsyncBufReadExt as _;
1213use tokio:: sync:: watch;
1314use tokio:: task:: JoinSet ;
1415use tokio_util:: io:: StreamReader ;
15- use enclose:: enclose;
1616
1717use crate :: connection:: config:: DatabaseConfig ;
1818use crate :: connection:: connection_manager:: InnerWalManager ;
1919use crate :: connection:: libsql:: { open_conn, MakeLibSqlConn } ;
2020use crate :: connection:: { Connection as _, MakeConnection as _} ;
21+ use crate :: database:: { PrimaryConnection , PrimaryConnectionMaker } ;
2122use crate :: error:: LoadDumpError ;
23+ use crate :: namespace:: broadcasters:: BroadcasterHandle ;
24+ use crate :: namespace:: meta_store:: MetaStoreHandle ;
25+ use crate :: namespace:: replication_wal:: { make_replication_wal_wrapper, ReplicationWalWrapper } ;
26+ use crate :: namespace:: {
27+ NamespaceBottomlessDbId , NamespaceBottomlessDbIdInit , NamespaceName , ResolveNamespacePathFn ,
28+ RestoreOption ,
29+ } ;
2230use crate :: replication:: { FrameNo , ReplicationLogger } ;
2331use crate :: stats:: Stats ;
24- use crate :: namespace:: { NamespaceBottomlessDbId , NamespaceBottomlessDbIdInit , NamespaceName , ResolveNamespacePathFn , RestoreOption } ;
25- use crate :: namespace:: replication_wal:: { make_replication_wal_wrapper, ReplicationWalWrapper } ;
26- use crate :: namespace:: meta_store:: MetaStoreHandle ;
27- use crate :: namespace:: broadcasters:: BroadcasterHandle ;
28- use crate :: database:: { PrimaryConnection , PrimaryConnectionMaker } ;
2932use crate :: { StatsSender , BLOCKING_RT , DB_CREATE_TIMEOUT , DEFAULT_AUTO_CHECKPOINT } ;
3033
3134use super :: { BaseNamespaceConfig , PrimaryExtraConfig } ;
@@ -74,8 +77,7 @@ pub(super) async fn make_primary_connection_maker(
7477 tracing:: debug!( "Checkpointed before initializing bottomless" ) ;
7578 let options = make_bottomless_options ( options, bottomless_db_id, name. clone ( ) ) ;
7679 let ( replicator, did_recover) =
77- init_bottomless_replicator ( db_path. join ( "data" ) , options, & restore_option)
78- . await ?;
80+ init_bottomless_replicator ( db_path. join ( "data" ) , options, & restore_option) . await ?;
7981 tracing:: debug!( "Completed init of bottomless replicator" ) ;
8082 is_dirty |= did_recover;
8183 Some ( replicator)
@@ -93,14 +95,14 @@ pub(super) async fn make_primary_connection_maker(
9395 } ;
9496
9597 let logger = Arc :: new ( ReplicationLogger :: open (
96- & db_path,
97- primary_config. max_log_size ,
98- primary_config. max_log_duration ,
99- is_dirty,
100- auto_checkpoint,
101- primary_config. scripted_backup . clone ( ) ,
102- name. clone ( ) ,
103- None ,
98+ & db_path,
99+ primary_config. max_log_size ,
100+ primary_config. max_log_duration ,
101+ is_dirty,
102+ auto_checkpoint,
103+ primary_config. scripted_backup . clone ( ) ,
104+ name. clone ( ) ,
105+ None ,
104106 ) ?) ;
105107
106108 tracing:: debug!( "sending stats" ) ;
@@ -113,7 +115,7 @@ pub(super) async fn make_primary_connection_maker(
113115 name. clone ( ) ,
114116 logger. new_frame_notifier . subscribe ( ) ,
115117 )
116- . await ?;
118+ . await ?;
117119
118120 tracing:: debug!( "Making replication wal wrapper" ) ;
119121 let wal_wrapper = make_replication_wal_wrapper ( bottomless_replicator, logger. clone ( ) ) ;
@@ -136,13 +138,13 @@ pub(super) async fn make_primary_connection_maker(
136138 resolve_attach_path,
137139 make_wal_manager. clone ( ) ,
138140 )
139- . await ?
140- . throttled (
141- base_config. max_concurrent_connections . clone ( ) ,
142- Some ( DB_CREATE_TIMEOUT ) ,
143- base_config. max_total_response_size ,
144- base_config. max_concurrent_requests ,
145- ) ;
141+ . await ?
142+ . throttled (
143+ base_config. max_concurrent_connections . clone ( ) ,
144+ Some ( DB_CREATE_TIMEOUT ) ,
145+ base_config. max_total_response_size ,
146+ base_config. max_concurrent_requests ,
147+ ) ;
146148
147149 tracing:: debug!( "Completed opening libsql connection" ) ;
148150
@@ -356,10 +358,7 @@ pub(super) async fn make_stats(
356358 }
357359 } ) ;
358360
359- join_set. spawn ( run_storage_monitor (
360- db_path. into ( ) ,
361- Arc :: downgrade ( & stats) ,
362- ) ) ;
361+ join_set. spawn ( run_storage_monitor ( db_path. into ( ) , Arc :: downgrade ( & stats) ) ) ;
363362
364363 tracing:: debug!( "done sending stats, and creating bg tasks" ) ;
365364
@@ -369,10 +368,7 @@ pub(super) async fn make_stats(
369368// Periodically check the storage used by the database and save it in the Stats structure.
370369// TODO: Once we have a separate fiber that does WAL checkpoints, running this routine
371370// right after checkpointing is exactly where it should be done.
372- async fn run_storage_monitor (
373- db_path : PathBuf ,
374- stats : Weak < Stats > ,
375- ) -> anyhow:: Result < ( ) > {
371+ async fn run_storage_monitor ( db_path : PathBuf , stats : Weak < Stats > ) -> anyhow:: Result < ( ) > {
376372 // on initialization, the database file doesn't exist yet, so we wait a bit for it to be
377373 // created
378374 tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
0 commit comments