Skip to content

Commit 8db5ea8

Browse files
authored
Merge pull request #1644 from tursodatabase/query-cancel
cancel query when request is dropped
2 parents 0511ec3 + 9595315 commit 8db5ea8

2 files changed

Lines changed: 57 additions & 3 deletions

File tree

libsql-server/src/connection/libsql.rs

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ use tokio::sync::watch;
1515
use tokio::time::{Duration, Instant};
1616

1717
use crate::error::Error;
18-
use crate::metrics::{DESCRIBE_COUNT, PROGRAM_EXEC_COUNT, VACUUM_COUNT, WAL_CHECKPOINT_COUNT};
18+
use crate::metrics::{
19+
DESCRIBE_COUNT, PROGRAM_EXEC_COUNT, QUERY_CANCELED, VACUUM_COUNT, WAL_CHECKPOINT_COUNT,
20+
};
1921
use crate::namespace::broadcasters::BroadcasterHandle;
2022
use crate::namespace::meta_store::MetaStoreHandle;
2123
use crate::namespace::ResolveNamespacePathFn;
@@ -391,14 +393,44 @@ where
391393
ctx: RequestContext,
392394
builder: B,
393395
) -> Result<(B, Program)> {
396+
struct Bomb {
397+
canceled: Arc<AtomicBool>,
398+
defused: bool,
399+
}
400+
401+
impl Drop for Bomb {
402+
fn drop(&mut self) {
403+
if !self.defused {
404+
tracing::trace!("cancelling request");
405+
self.canceled.store(true, Ordering::Relaxed);
406+
}
407+
}
408+
}
409+
410+
let canceled = {
411+
let cancelled = self.inner.lock().canceled.clone();
412+
cancelled.store(false, Ordering::Relaxed);
413+
cancelled
414+
};
415+
394416
PROGRAM_EXEC_COUNT.increment(1);
395417

396418
check_program_auth(&ctx, &pgm, &self.inner.lock().config_store.get())?;
419+
420+
// create the bomb right before spawning the blocking task.
421+
let mut bomb = Bomb {
422+
canceled,
423+
defused: false,
424+
};
397425
let conn = self.inner.clone();
398-
BLOCKING_RT
426+
let ret = BLOCKING_RT
399427
.spawn_blocking(move || Connection::run(conn, pgm, builder))
400428
.await
401-
.unwrap()
429+
.unwrap();
430+
431+
bomb.defused = true;
432+
433+
ret
402434
}
403435
}
404436

@@ -413,6 +445,7 @@ pub(super) struct Connection<W> {
413445
forced_rollback: bool,
414446
broadcaster: BroadcasterHandle,
415447
hooked: bool,
448+
canceled: Arc<AtomicBool>,
416449
}
417450

418451
fn update_stats(
@@ -475,6 +508,20 @@ impl<W: Wal> Connection<W> {
475508
);
476509
}
477510

511+
let canceled = Arc::new(AtomicBool::new(false));
512+
513+
conn.progress_handler(100, {
514+
let canceled = canceled.clone();
515+
Some(move || {
516+
let canceled = canceled.load(Ordering::Relaxed);
517+
if canceled {
518+
QUERY_CANCELED.increment(1);
519+
tracing::trace!("request canceled");
520+
}
521+
canceled
522+
})
523+
});
524+
478525
let this = Self {
479526
conn,
480527
stats,
@@ -486,6 +533,7 @@ impl<W: Wal> Connection<W> {
486533
forced_rollback: false,
487534
broadcaster,
488535
hooked: false,
536+
canceled,
489537
};
490538

491539
for ext in extensions.iter() {
@@ -795,6 +843,7 @@ mod test {
795843
forced_rollback: false,
796844
broadcaster: Default::default(),
797845
hooked: false,
846+
canceled: Arc::new(false.into()),
798847
};
799848

800849
let conn = Arc::new(Mutex::new(conn));

libsql-server/src/metrics.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,8 @@ pub static LISTEN_EVENTS_DROPPED: Lazy<Counter> = Lazy::new(|| {
153153
describe_counter!(NAME, "Number of listen events dropped");
154154
register_counter!(NAME)
155155
});
156+
pub static QUERY_CANCELED: Lazy<Counter> = Lazy::new(|| {
157+
const NAME: &str = "libsql_server_query_canceled";
158+
describe_counter!(NAME, "Number of canceled queries");
159+
register_counter!(NAME)
160+
});

0 commit comments

Comments
 (0)