@@ -70,8 +70,8 @@ struct MetaStoreInner {
7070 // TODO(lucio): Use a concurrent hashmap so we don't block connection creation
7171 // when we are updating the config. The config si already synced via the watch
7272 // channel.
73- configs : Mutex < HashMap < NamespaceName , Sender < InnerConfig > > > ,
74- conn : Mutex < MetaStoreConnection > ,
73+ configs : tokio :: sync :: Mutex < HashMap < NamespaceName , Sender < InnerConfig > > > ,
74+ conn : tokio :: sync :: Mutex < MetaStoreConnection > ,
7575 wal_manager : MetaStoreWalManager ,
7676}
7777
@@ -313,7 +313,7 @@ fn process(msg: ChangeMsg, inner: Arc<MetaStoreInner>) {
313313 } else {
314314 Ok ( ( ) )
315315 } ;
316- let mut configs = inner. configs . lock ( ) ;
316+ let mut configs = inner. configs . blocking_lock ( ) ;
317317 if let Some ( config_watch) = configs. get_mut ( & namespace) {
318318 let new_version = config_watch. borrow ( ) . version . wrapping_add ( 1 ) ;
319319
@@ -330,7 +330,7 @@ fn process(msg: ChangeMsg, inner: Arc<MetaStoreInner>) {
330330 let _ = ret_chan. send ( ret) ;
331331 } else {
332332 let ret = if flush {
333- let mut configs = inner. configs . lock ( ) ;
333+ let mut configs = inner. configs . blocking_lock ( ) ;
334334 if let Some ( config_watch) = configs. get_mut ( & namespace) {
335335 let config = config_watch. subscribe ( ) . borrow ( ) . clone ( ) ;
336336 try_process ( & inner, & namespace, & config. config )
@@ -351,7 +351,7 @@ fn try_process(
351351) -> Result < ( ) > {
352352 let config_encoded = metadata:: DatabaseConfig :: from ( & * config) . encode_to_vec ( ) ;
353353
354- let mut conn = inner. conn . lock ( ) ;
354+ let mut conn = inner. conn . blocking_lock ( ) ;
355355 if let Some ( schema) = config. shared_schema_name . as_ref ( ) {
356356 let tx = conn. transaction ( ) ?;
357357 if let Some ( ref schema) = config. shared_schema_name {
@@ -470,11 +470,11 @@ impl MetaStore {
470470 Ok ( Self { changes_tx, inner } )
471471 }
472472
473- pub fn handle ( & self , namespace : NamespaceName ) -> MetaStoreHandle {
473+ pub async fn handle ( & self , namespace : NamespaceName ) -> MetaStoreHandle {
474474 tracing:: debug!( "getting meta store handle" ) ;
475475 let change_tx = self . changes_tx . clone ( ) ;
476476
477- let mut configs = self . inner . configs . lock ( ) ;
477+ let mut configs = self . inner . configs . lock ( ) . await ;
478478 let sender = configs. entry ( namespace. clone ( ) ) . or_insert_with ( || {
479479 // TODO(lucio): if no entry exists we need to ensure we send the update to
480480 // the bg channel.
@@ -495,11 +495,11 @@ impl MetaStore {
495495 pub fn remove ( & self , namespace : NamespaceName ) -> Result < Option < Arc < DatabaseConfig > > > {
496496 tracing:: debug!( "removing namespace `{}` from meta store" , namespace) ;
497497
498- let mut configs = self . inner . configs . lock ( ) ;
498+ let mut configs = self . inner . configs . blocking_lock ( ) ;
499499 let r = if let Some ( sender) = configs. get ( & namespace) {
500500 tracing:: debug!( "removed namespace `{}` from meta store" , namespace) ;
501501 let config = sender. borrow ( ) . clone ( ) ;
502- let mut conn = self . inner . conn . lock ( ) ;
502+ let mut conn = self . inner . conn . blocking_lock ( ) ;
503503 let tx = conn. transaction ( ) ?;
504504 if config. config . is_shared_schema {
505505 if crate :: schema:: db:: schema_has_linked_dbs ( & tx, & namespace) ? {
@@ -535,8 +535,8 @@ impl MetaStore {
535535 // TODO: we need to either make sure that the metastore is restored
536536 // before we start accepting connections or we need to contact bottomless
537537 // here to check if a namespace exists. Preferably the former.
538- pub fn exists ( & self , namespace : & NamespaceName ) -> bool {
539- self . inner . configs . lock ( ) . contains_key ( namespace)
538+ pub async fn exists ( & self , namespace : & NamespaceName ) -> bool {
539+ self . inner . configs . lock ( ) . await . contains_key ( namespace)
540540 }
541541
542542 pub ( crate ) async fn shutdown ( & self ) -> crate :: Result < ( ) > {
@@ -559,7 +559,7 @@ impl MetaStore {
559559 ) -> crate :: Result < MigrationSummary > {
560560 let inner = self . inner . clone ( ) ;
561561 let summary = tokio:: task:: spawn_blocking ( move || {
562- let mut conn = inner. conn . lock ( ) ;
562+ let mut conn = inner. conn . blocking_lock ( ) ;
563563 crate :: schema:: get_migrations_summary ( & mut conn, schema)
564564 } )
565565 . await
@@ -574,7 +574,7 @@ impl MetaStore {
574574 ) -> crate :: Result < Option < MigrationDetails > > {
575575 let inner = self . inner . clone ( ) ;
576576 let details = tokio:: task:: spawn_blocking ( move || {
577- let mut conn = inner. conn . lock ( ) ;
577+ let mut conn = inner. conn . blocking_lock ( ) ;
578578 crate :: schema:: get_migration_details ( & mut conn, schema, job_id)
579579 } )
580580 . await
0 commit comments