Skip to content

Commit 9893b67

Browse files
authored
Merge pull request #1730 from tursodatabase/fix-namespace-deletion
fix namespace deletion
2 parents 8e06705 + 11066a4 commit 9893b67

8 files changed

Lines changed: 124 additions & 31 deletions

File tree

libsql-server/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ pub enum Error {
124124
AttachInMigration,
125125
#[error("join failure: {0}")]
126126
RuntimeTaskJoinError(#[from] tokio::task::JoinError),
127+
#[error("wal error: {0}")]
128+
LibsqlWal(#[from] libsql_wal::error::Error),
127129
}
128130

129131
impl AsRef<Self> for Error {
@@ -218,6 +220,7 @@ impl IntoResponse for &Error {
218220
HasLinkedDbs(_) => self.format_err(StatusCode::BAD_REQUEST),
219221
AttachInMigration => self.format_err(StatusCode::BAD_REQUEST),
220222
RuntimeTaskJoinError(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
223+
LibsqlWal(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
221224
}
222225
}
223226
}

libsql-server/src/namespace/configurator/helpers.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use bytes::Bytes;
99
use enclose::enclose;
1010
use futures::Stream;
1111
use libsql_sys::EncryptionConfig;
12+
use libsql_wal::io::StdIO;
13+
use libsql_wal::registry::WalRegistry;
1214
use tokio::io::AsyncBufReadExt as _;
1315
use tokio::sync::watch;
1416
use tokio::task::JoinSet;
@@ -29,7 +31,7 @@ use crate::namespace::{
2931
};
3032
use crate::replication::{FrameNo, ReplicationLogger};
3133
use crate::stats::Stats;
32-
use crate::{StatsSender, BLOCKING_RT, DB_CREATE_TIMEOUT, DEFAULT_AUTO_CHECKPOINT};
34+
use crate::{SqldStorage, StatsSender, BLOCKING_RT, DB_CREATE_TIMEOUT, DEFAULT_AUTO_CHECKPOINT};
3335

3436
use super::{BaseNamespaceConfig, PrimaryConfig};
3537

@@ -464,3 +466,53 @@ pub(super) async fn cleanup_primary(
464466

465467
Ok(())
466468
}
469+
470+
pub async fn cleanup_libsql(
471+
namespace: &NamespaceName,
472+
registry: &WalRegistry<StdIO, SqldStorage>,
473+
base_path: &Path,
474+
) -> crate::Result<()> {
475+
let namespace = namespace.clone().into();
476+
if let Some(shared) = registry.tombstone(&namespace).await {
477+
// shutdown the registry, don't seal the current segment so that it's not
478+
tokio::task::spawn_blocking({
479+
let shared = shared.clone();
480+
move || shared.shutdown()
481+
})
482+
.await
483+
.unwrap()?;
484+
}
485+
486+
let ns_db_path = base_path.join("dbs").join(namespace.as_str());
487+
if ns_db_path.try_exists()? {
488+
tracing::debug!("removing database directory: {}", ns_db_path.display());
489+
let _ = tokio::fs::remove_dir_all(ns_db_path).await;
490+
}
491+
492+
let ns_wals_path = base_path.join("wals").join(namespace.as_str());
493+
if ns_wals_path.try_exists()? {
494+
tracing::debug!("removing database directory: {}", ns_wals_path.display());
495+
if let Err(e) = tokio::fs::remove_dir_all(ns_wals_path).await {
496+
// what can go wrong?:
497+
match e.kind() {
498+
// alright, there's nothing to delete anyway
499+
std::io::ErrorKind::NotFound => (),
500+
_ => {
501+
// something unexpected happened, this namespaces is in a bad state.
502+
// The entry will not be removed from the registry to prevent another
503+
// namespace with the same name to be reuse the same wal files. a
504+
// manual intervention is necessary
505+
// FIXME: on namespace creation, we could ensure that this directory is
506+
// clean.
507+
tracing::error!("error deleting `{namespace}` wal directory, manual intervention may be necessary: {e}");
508+
return Err(e.into());
509+
}
510+
}
511+
}
512+
}
513+
514+
// when all is cleaned, leave place for next one
515+
registry.remove(&namespace).await;
516+
517+
Ok(())
518+
}

libsql-server/src/namespace/configurator/libsql_primary.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::schema::{has_pending_migration_task, setup_migration_table};
2525
use crate::stats::Stats;
2626
use crate::{SqldStorage, DB_CREATE_TIMEOUT, DEFAULT_AUTO_CHECKPOINT};
2727

28+
use super::helpers::cleanup_libsql;
2829
use super::{BaseNamespaceConfig, ConfigureNamespace, PrimaryConfig};
2930

3031
pub struct LibsqlPrimaryConfigurator {
@@ -248,12 +249,16 @@ impl ConfigureNamespace for LibsqlPrimaryConfigurator {
248249

249250
fn cleanup<'a>(
250251
&'a self,
251-
_namespace: &'a NamespaceName,
252+
namespace: &'a NamespaceName,
252253
_db_config: &'a DatabaseConfig,
253254
_prune_all: bool,
254255
_bottomless_db_id_init: NamespaceBottomlessDbIdInit,
255256
) -> Pin<Box<dyn Future<Output = crate::Result<()>> + Send + 'a>> {
256-
unimplemented!()
257+
Box::pin(cleanup_libsql(
258+
namespace,
259+
&self.registry,
260+
&self.base.base_path,
261+
))
257262
}
258263

259264
fn fork<'a>(

libsql-server/src/namespace/configurator/libsql_replica.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::namespace::{
2929
};
3030
use crate::{SqldStorage, DB_CREATE_TIMEOUT};
3131

32+
use super::helpers::cleanup_libsql;
3233
use super::{BaseNamespaceConfig, ConfigureNamespace};
3334

3435
pub struct LibsqlReplicaConfigurator {
@@ -251,14 +252,11 @@ impl ConfigureNamespace for LibsqlReplicaConfigurator {
251252
_prune_all: bool,
252253
_bottomless_db_id_init: NamespaceBottomlessDbIdInit,
253254
) -> Pin<Box<dyn Future<Output = crate::Result<()>> + Send + 'a>> {
254-
Box::pin(async move {
255-
let ns_path = self.base.base_path.join("dbs").join(namespace.as_str());
256-
if ns_path.try_exists()? {
257-
tracing::debug!("removing database directory: {}", ns_path.display());
258-
tokio::fs::remove_dir_all(ns_path).await?;
259-
}
260-
Ok(())
261-
})
255+
Box::pin(cleanup_libsql(
256+
namespace,
257+
&self.registry,
258+
&self.base.base_path,
259+
))
262260
}
263261

264262
fn fork<'a>(

libsql-server/src/namespace/configurator/libsql_schema.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::namespace::{
1616
use crate::schema::SchedulerHandle;
1717
use crate::SqldStorage;
1818

19-
use super::helpers::cleanup_primary;
19+
use super::helpers::cleanup_libsql;
2020
use super::libsql_primary::{libsql_primary_common, LibsqlPrimaryCommon};
2121
use super::{BaseNamespaceConfig, ConfigureNamespace, PrimaryConfig};
2222

@@ -146,21 +146,15 @@ impl ConfigureNamespace for LibsqlSchemaConfigurator {
146146
fn cleanup<'a>(
147147
&'a self,
148148
namespace: &'a NamespaceName,
149-
db_config: &'a DatabaseConfig,
150-
prune_all: bool,
151-
bottomless_db_id_init: crate::namespace::NamespaceBottomlessDbIdInit,
149+
_db_config: &'a DatabaseConfig,
150+
_prune_all: bool,
151+
_bottomless_db_id_init: crate::namespace::NamespaceBottomlessDbIdInit,
152152
) -> std::pin::Pin<Box<dyn Future<Output = crate::Result<()>> + Send + 'a>> {
153-
Box::pin(async move {
154-
cleanup_primary(
155-
&self.base,
156-
&self.primary_config,
157-
namespace,
158-
db_config,
159-
prune_all,
160-
bottomless_db_id_init,
161-
)
162-
.await
163-
})
153+
Box::pin(cleanup_libsql(
154+
namespace,
155+
&self.registry,
156+
&self.base.base_path,
157+
))
164158
}
165159

166160
fn fork<'a>(

libsql-wal/src/checkpointer.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,9 @@ where
5959
) -> impl Future<Output = crate::error::Result<()>> + Send {
6060
let namespace = namespace.clone();
6161
async move {
62-
let registry = self
63-
.get_async(&namespace)
64-
.await
65-
.expect("namespace not openned");
66-
registry.checkpoint().await?;
62+
if let Some(registry) = self.get_async(&namespace).await {
63+
registry.checkpoint().await?;
64+
}
6765
Ok(())
6866
}
6967
}

libsql-wal/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub enum Error {
2727

2828
#[error("storage error: {0}")]
2929
Storage(#[from] Box<crate::storage::Error>),
30+
#[error("wal is being deleted")]
31+
DeletingWal,
3032
}
3133

3234
impl Into<libsql_sys::ffi::Error> for Error {

libsql-wal/src/registry.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ enum Slot<IO: Io> {
3737
/// entry in the registry map puts a building slot. Other connections will wait for the mutex
3838
/// to turn to true, after the slot has been updated to contain the wal
3939
Building(Arc<(Condvar, Mutex<bool>)>, Arc<Notify>),
40+
/// The namespace was removed
41+
Tombstone,
4042
}
4143

4244
/// Wal Registry maintains a set of shared Wal, and their respective set of files.
@@ -85,6 +87,7 @@ impl<IO: Io, S> WalRegistry<IO, S> {
8587
match self.opened.get(namespace).as_deref() {
8688
Some(Slot::Wal(wal)) => return Some(wal.clone()),
8789
Some(Slot::Building(_, notify)) => notify.clone(),
90+
Some(Slot::Tombstone) => return None,
8891
None => return None,
8992
}
9093
};
@@ -178,13 +181,15 @@ where
178181
// the slot was updated: try again
179182
continue;
180183
}
184+
Slot::Tombstone => return Err(crate::error::Error::DeletingWal),
181185
}
182186
}
183187

184188
let action = match self.opened.entry(namespace.clone()) {
185189
dashmap::Entry::Occupied(e) => match e.get() {
186190
Slot::Wal(shared) => return Ok(shared.clone()),
187191
Slot::Building(wait, _) => Err(wait.clone()),
192+
Slot::Tombstone => return Err(crate::error::Error::DeletingWal),
188193
},
189194
dashmap::Entry::Vacant(e) => {
190195
let notifier = Arc::new((Condvar::new(), Mutex::new(false)));
@@ -370,6 +375,37 @@ where
370375
}
371376
}
372377

378+
pub async fn tombstone(&self, namespace: &NamespaceName) -> Option<Arc<SharedWal<IO>>> {
379+
// if a wal is currently being openned, let it
380+
{
381+
let v = self.opened.get(namespace)?;
382+
if let Slot::Building(_, ref notify) = *v {
383+
notify.clone().notified().await;
384+
}
385+
}
386+
387+
match self.opened.insert(namespace.clone(), Slot::Tombstone) {
388+
Some(Slot::Tombstone) => None,
389+
Some(Slot::Building(_, _)) => {
390+
unreachable!("already waited for ns to open")
391+
}
392+
Some(Slot::Wal(wal)) => Some(wal),
393+
None => None,
394+
}
395+
}
396+
397+
pub async fn remove(&self, namespace: &NamespaceName) {
398+
// if a wal is currently being openned, let it
399+
{
400+
let v = self.opened.get(namespace);
401+
if let Some(Slot::Building(_, ref notify)) = v.as_deref() {
402+
notify.clone().notified().await;
403+
}
404+
}
405+
406+
self.opened.remove(namespace);
407+
}
408+
373409
/// Attempts to sync all loaded dbs with durable storage
374410
pub async fn sync_all(&self, conccurency: usize) -> Result<()>
375411
where
@@ -445,6 +481,7 @@ where
445481
// wait for shared to finish building
446482
notify.notified().await;
447483
}
484+
Slot::Tombstone => continue,
448485
}
449486
}
450487
}
@@ -507,6 +544,10 @@ where
507544

508545
Ok(())
509546
}
547+
548+
pub fn storage(&self) -> &S {
549+
&self.storage
550+
}
510551
}
511552

512553
#[tracing::instrument(skip_all, fields(namespace = shared.namespace().as_str()))]

0 commit comments

Comments
 (0)