Skip to content

Commit 6ba85b4

Browse files
authored
Merge pull request #1668 from tursodatabase/use-tokio-sync-instead-of-parking-lot
replace parking_lot with tokio::sync::Mutex in some places
2 parents 4fa2ce8 + 47bf462 commit 6ba85b4

7 files changed

Lines changed: 40 additions & 29 deletions

File tree

libsql-server/src/connection/libsql.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,8 @@ where
415415

416416
PROGRAM_EXEC_COUNT.increment(1);
417417

418-
check_program_auth(&ctx, &pgm, &self.inner.lock().config_store.get())?;
418+
let config = self.inner.lock().config_store.get();
419+
check_program_auth(&ctx, &pgm, &config).await?;
419420

420421
// create the bomb right before spawning the blocking task.
421422
let mut bomb = Bomb {

libsql-server/src/connection/program.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ fn value_size(val: &rusqlite::types::ValueRef) -> usize {
341341
}
342342
}
343343

344-
pub fn check_program_auth(
344+
pub async fn check_program_auth(
345345
ctx: &RequestContext,
346346
pgm: &Program,
347347
config: &DatabaseConfig,
@@ -363,7 +363,7 @@ pub fn check_program_auth(
363363
}
364364
StmtKind::Attach(ref ns) => {
365365
ctx.auth.has_right(ns, Permission::AttachRead)?;
366-
if !ctx.meta_store.handle(ns.clone()).get().allow_attach {
366+
if !ctx.meta_store.handle(ns.clone()).await.get().allow_attach {
367367
return Err(Error::NotAuthorized(format!(
368368
"Namespace `{ns}` doesn't allow attach"
369369
)));

libsql-server/src/database/schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl crate::connection::Connection for SchemaConnection {
5050

5151
res
5252
} else {
53-
check_program_auth(&ctx, &migration, &self.config.get())?;
53+
check_program_auth(&ctx, &migration, &self.config.get()).await?;
5454
let connection = self.connection.clone();
5555
validate_migration(&mut migration)?;
5656
let migration = Arc::new(migration);

libsql-server/src/http/admin/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ async fn handle_create_namespace<C: Connector>(
331331
));
332332
}
333333
// TODO: move this check into meta store
334-
if !app_state.namespaces.exists(&ns) {
334+
if !app_state.namespaces.exists(&ns).await {
335335
return Err(Error::NamespaceDoesntExist(ns.to_string()));
336336
}
337337

libsql-server/src/namespace/meta_store.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ struct MetaStoreInner {
7070
// TODO(lucio): Use a concurrent hashmap so we don't block connection creation
7171
// when we are updating the config. The config si already synced via the watch
7272
// channel.
73-
configs: Mutex<HashMap<NamespaceName, Sender<InnerConfig>>>,
74-
conn: Mutex<MetaStoreConnection>,
73+
configs: tokio::sync::Mutex<HashMap<NamespaceName, Sender<InnerConfig>>>,
74+
conn: tokio::sync::Mutex<MetaStoreConnection>,
7575
wal_manager: MetaStoreWalManager,
7676
}
7777

@@ -313,7 +313,7 @@ fn process(msg: ChangeMsg, inner: Arc<MetaStoreInner>) {
313313
} else {
314314
Ok(())
315315
};
316-
let mut configs = inner.configs.lock();
316+
let mut configs = inner.configs.blocking_lock();
317317
if let Some(config_watch) = configs.get_mut(&namespace) {
318318
let new_version = config_watch.borrow().version.wrapping_add(1);
319319

@@ -330,7 +330,7 @@ fn process(msg: ChangeMsg, inner: Arc<MetaStoreInner>) {
330330
let _ = ret_chan.send(ret);
331331
} else {
332332
let ret = if flush {
333-
let mut configs = inner.configs.lock();
333+
let mut configs = inner.configs.blocking_lock();
334334
if let Some(config_watch) = configs.get_mut(&namespace) {
335335
let config = config_watch.subscribe().borrow().clone();
336336
try_process(&inner, &namespace, &config.config)
@@ -351,7 +351,7 @@ fn try_process(
351351
) -> Result<()> {
352352
let config_encoded = metadata::DatabaseConfig::from(&*config).encode_to_vec();
353353

354-
let mut conn = inner.conn.lock();
354+
let mut conn = inner.conn.blocking_lock();
355355
if let Some(schema) = config.shared_schema_name.as_ref() {
356356
let tx = conn.transaction()?;
357357
if let Some(ref schema) = config.shared_schema_name {
@@ -470,11 +470,11 @@ impl MetaStore {
470470
Ok(Self { changes_tx, inner })
471471
}
472472

473-
pub fn handle(&self, namespace: NamespaceName) -> MetaStoreHandle {
473+
pub async fn handle(&self, namespace: NamespaceName) -> MetaStoreHandle {
474474
tracing::debug!("getting meta store handle");
475475
let change_tx = self.changes_tx.clone();
476476

477-
let mut configs = self.inner.configs.lock();
477+
let mut configs = self.inner.configs.lock().await;
478478
let sender = configs.entry(namespace.clone()).or_insert_with(|| {
479479
// TODO(lucio): if no entry exists we need to ensure we send the update to
480480
// the bg channel.
@@ -495,11 +495,18 @@ impl MetaStore {
495495
pub fn remove(&self, namespace: NamespaceName) -> Result<Option<Arc<DatabaseConfig>>> {
496496
tracing::debug!("removing namespace `{}` from meta store", namespace);
497497

498-
let mut configs = self.inner.configs.lock();
498+
// "configs" lock can be used in both async and sync contexts while "conn" lock always used
499+
// in blocking context
500+
//
501+
// so, we better to acquire "conn" lock first in order to prevent situation when "configs"
502+
// lock is taken but "conn" lock is not free (so, we potentially will block async tasks for
503+
// indefinite amount of time while "conn" lock will be acquired by other thread)
504+
let mut conn = self.inner.conn.blocking_lock();
505+
506+
let mut configs = self.inner.configs.blocking_lock();
499507
let r = if let Some(sender) = configs.get(&namespace) {
500508
tracing::debug!("removed namespace `{}` from meta store", namespace);
501509
let config = sender.borrow().clone();
502-
let mut conn = self.inner.conn.lock();
503510
let tx = conn.transaction()?;
504511
if config.config.is_shared_schema {
505512
if crate::schema::db::schema_has_linked_dbs(&tx, &namespace)? {
@@ -535,8 +542,8 @@ impl MetaStore {
535542
// TODO: we need to either make sure that the metastore is restored
536543
// before we start accepting connections or we need to contact bottomless
537544
// here to check if a namespace exists. Preferably the former.
538-
pub fn exists(&self, namespace: &NamespaceName) -> bool {
539-
self.inner.configs.lock().contains_key(namespace)
545+
pub async fn exists(&self, namespace: &NamespaceName) -> bool {
546+
self.inner.configs.lock().await.contains_key(namespace)
540547
}
541548

542549
pub(crate) async fn shutdown(&self) -> crate::Result<()> {
@@ -559,7 +566,7 @@ impl MetaStore {
559566
) -> crate::Result<MigrationSummary> {
560567
let inner = self.inner.clone();
561568
let summary = tokio::task::spawn_blocking(move || {
562-
let mut conn = inner.conn.lock();
569+
let mut conn = inner.conn.blocking_lock();
563570
crate::schema::get_migrations_summary(&mut conn, schema)
564571
})
565572
.await
@@ -574,7 +581,7 @@ impl MetaStore {
574581
) -> crate::Result<Option<MigrationDetails>> {
575582
let inner = self.inner.clone();
576583
let details = tokio::task::spawn_blocking(move || {
577-
let mut conn = inner.conn.lock();
584+
let mut conn = inner.conn.blocking_lock();
578585
crate::schema::get_migration_details(&mut conn, schema, job_id)
579586
})
580587
.await

libsql-server/src/namespace/store.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ impl NamespaceStore {
9999
})
100100
}
101101

102-
pub fn exists(&self, namespace: &NamespaceName) -> bool {
103-
self.inner.metadata.exists(namespace)
102+
pub async fn exists(&self, namespace: &NamespaceName) -> bool {
103+
self.inner.metadata.exists(namespace).await
104104
}
105105

106106
pub async fn destroy(&self, namespace: NamespaceName, prune_all: bool) -> crate::Result<()> {
@@ -173,7 +173,7 @@ impl NamespaceStore {
173173
ns.destroy().await?;
174174
}
175175

176-
let db_config = self.inner.metadata.handle(namespace.clone());
176+
let db_config = self.inner.metadata.handle(namespace.clone()).await;
177177
// destroy on-disk database
178178
self.cleanup(
179179
&namespace,
@@ -226,7 +226,7 @@ impl NamespaceStore {
226226
}
227227

228228
// check that the source namespace exists
229-
if !self.inner.metadata.exists(&from) {
229+
if !self.inner.metadata.exists(&from).await {
230230
return Err(crate::error::Error::NamespaceDoesntExist(from.to_string()));
231231
}
232232

@@ -241,11 +241,11 @@ impl NamespaceStore {
241241
}
242242

243243
// FIXME: we could potentially delete the namespace while trying to fork it
244-
if !self.inner.metadata.exists(&from) {
244+
if !self.inner.metadata.exists(&from).await {
245245
return Err(crate::Error::NamespaceDoesntExist(from.to_string()));
246246
}
247247

248-
let from_config = self.inner.metadata.handle(from.clone());
248+
let from_config = self.inner.metadata.handle(from.clone()).await;
249249
let from_entry = self
250250
.load_namespace(&from, from_config.clone(), RestoreOption::Latest)
251251
.await?;
@@ -280,7 +280,7 @@ impl NamespaceStore {
280280
should_delete: true,
281281
};
282282

283-
let handle = self.inner.metadata.handle(to.clone());
283+
let handle = self.inner.metadata.handle(to.clone()).await;
284284
handle
285285
.store_and_maybe_flush(Some(to_config.into()), false)
286286
.await?;
@@ -328,7 +328,7 @@ impl NamespaceStore {
328328
Fun: FnOnce(&Namespace) -> R,
329329
{
330330
if namespace != NamespaceName::default()
331-
&& !self.inner.metadata.exists(&namespace)
331+
&& !self.inner.metadata.exists(&namespace).await
332332
&& !self.inner.allow_lazy_creation
333333
{
334334
return Err(Error::NamespaceDoesntExist(namespace.to_string()));
@@ -346,7 +346,7 @@ impl NamespaceStore {
346346
}
347347
};
348348

349-
let handle = self.inner.metadata.handle(namespace.to_owned());
349+
let handle = self.inner.metadata.handle(namespace.to_owned()).await;
350350
f(self
351351
.load_namespace(&namespace, handle, RestoreOption::Latest)
352352
.await?)
@@ -440,12 +440,12 @@ impl NamespaceStore {
440440
// FIXME: move the default namespace check out of this function.
441441
if self.inner.allow_lazy_creation || namespace == NamespaceName::default() {
442442
tracing::trace!("auto-creating the namespace");
443-
} else if self.inner.metadata.exists(&namespace) {
443+
} else if self.inner.metadata.exists(&namespace).await {
444444
return Err(Error::NamespaceAlreadyExist(namespace.to_string()));
445445
}
446446

447447
let db_config = Arc::new(db_config);
448-
let handle = self.inner.metadata.handle(namespace.clone());
448+
let handle = self.inner.metadata.handle(namespace.clone()).await;
449449
tracing::debug!("storing db config");
450450
handle.store(db_config).await?;
451451
tracing::debug!("completed storing db config, loading namespace");

libsql-server/src/schema/db.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ mod test {
482482
async fn register_schema(meta_store: &MetaStore, schema: &'static str) {
483483
meta_store
484484
.handle(schema.into())
485+
.await
485486
.store(DatabaseConfig {
486487
is_shared_schema: true,
487488
..Default::default()
@@ -497,6 +498,7 @@ mod test {
497498
) -> crate::Result<()> {
498499
meta_store
499500
.handle(name.into())
501+
.await
500502
.store(DatabaseConfig {
501503
shared_schema_name: Some(schema.into()),
502504
..Default::default()
@@ -561,6 +563,7 @@ mod test {
561563
// necessary checks beforehand, and return a nice error message.
562564
assert!(meta_store
563565
.handle("ns1".into())
566+
.await
564567
.store(DatabaseConfig {
565568
shared_schema_name: Some("schema1".into()),
566569
..Default::default()

0 commit comments

Comments
 (0)