Skip to content

Commit 1ea3f3d

Browse files
committed
allow tompstoning and removing wal from registry
1 parent 6948789 commit 1ea3f3d

2 files changed

Lines changed: 43 additions & 0 deletions

File tree

libsql-wal/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub enum Error {
2727

2828
#[error("storage error: {0}")]
2929
Storage(#[from] Box<crate::storage::Error>),
30+
#[error("wal is being deleted")]
31+
DeletingWal,
3032
}
3133

3234
impl Into<libsql_sys::ffi::Error> for Error {

libsql-wal/src/registry.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ enum Slot<IO: Io> {
3737
/// entry in the registry map puts a building slot. Other connections will wait for the mutex
3838
/// to turn to true, after the slot has been updated to contain the wal
3939
Building(Arc<(Condvar, Mutex<bool>)>, Arc<Notify>),
40+
/// The namespace was removed
41+
Tombstone,
4042
}
4143

4244
/// Wal Registry maintains a set of shared Wal, and their respective set of files.
@@ -85,6 +87,7 @@ impl<IO: Io, S> WalRegistry<IO, S> {
8587
match self.opened.get(namespace).as_deref() {
8688
Some(Slot::Wal(wal)) => return Some(wal.clone()),
8789
Some(Slot::Building(_, notify)) => notify.clone(),
90+
Some(Slot::Tombstone) => return None,
8891
None => return None,
8992
}
9093
};
@@ -178,13 +181,15 @@ where
178181
// the slot was updated: try again
179182
continue;
180183
}
184+
Slot::Tombstone => return Err(crate::error::Error::DeletingWal),
181185
}
182186
}
183187

184188
let action = match self.opened.entry(namespace.clone()) {
185189
dashmap::Entry::Occupied(e) => match e.get() {
186190
Slot::Wal(shared) => return Ok(shared.clone()),
187191
Slot::Building(wait, _) => Err(wait.clone()),
192+
Slot::Tombstone => return Err(crate::error::Error::DeletingWal),
188193
},
189194
dashmap::Entry::Vacant(e) => {
190195
let notifier = Arc::new((Condvar::new(), Mutex::new(false)));
@@ -370,6 +375,37 @@ where
370375
}
371376
}
372377

378+
pub async fn tombstone(&self, namespace: &NamespaceName) -> Option<Arc<SharedWal<IO>>> {
379+
// if a wal is currently being openned, let it
380+
{
381+
let v = self.opened.get(namespace)?;
382+
if let Slot::Building(_, ref notify) = *v {
383+
notify.clone().notified().await;
384+
}
385+
}
386+
387+
match self.opened.insert(namespace.clone(), Slot::Tombstone) {
388+
Some(Slot::Tombstone) => None,
389+
Some(Slot::Building(_, _)) => {
390+
unreachable!("already waited for ns to open")
391+
}
392+
Some(Slot::Wal(wal)) => Some(wal),
393+
None => None,
394+
}
395+
}
396+
397+
pub async fn remove(&self, namespace: &NamespaceName) {
398+
// if a wal is currently being openned, let it
399+
{
400+
let v = self.opened.get(namespace);
401+
if let Some(Slot::Building(_, ref notify)) = v.as_deref() {
402+
notify.clone().notified().await;
403+
}
404+
}
405+
406+
self.opened.remove(namespace);
407+
}
408+
373409
/// Attempts to sync all loaded dbs with durable storage
374410
pub async fn sync_all(&self, conccurency: usize) -> Result<()>
375411
where
@@ -445,6 +481,7 @@ where
445481
// wait for shared to finish building
446482
notify.notified().await;
447483
}
484+
Slot::Tombstone => continue,
448485
}
449486
}
450487
}
@@ -507,6 +544,10 @@ where
507544

508545
Ok(())
509546
}
547+
548+
pub fn storage(&self) -> &S {
549+
&self.storage
550+
}
510551
}
511552

512553
#[tracing::instrument(skip_all, fields(namespace = shared.namespace().as_str()))]

0 commit comments

Comments
 (0)