@@ -322,5 +322,132 @@ describe("RunEngine cancelling", () => {
322322 }
323323 } ) ;
324324
325+ containerTest ( "Cancelling a run (dequeued)" , async ( { prisma, redisOptions } ) => {
326+ //create environment
327+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
328+
329+ const engine = new RunEngine ( {
330+ prisma,
331+ worker : {
332+ redis : redisOptions ,
333+ workers : 1 ,
334+ tasksPerWorker : 10 ,
335+ pollIntervalMs : 100 ,
336+ } ,
337+ queue : {
338+ redis : redisOptions ,
339+ masterQueueConsumersDisabled : true ,
340+ processWorkerQueueDebounceMs : 50 ,
341+ } ,
342+ runLock : {
343+ redis : redisOptions ,
344+ } ,
345+ machines : {
346+ defaultMachine : "small-1x" ,
347+ machines : {
348+ "small-1x" : {
349+ name : "small-1x" as const ,
350+ cpu : 0.5 ,
351+ memory : 0.5 ,
352+ centsPerMs : 0.0001 ,
353+ } ,
354+ } ,
355+ baseCostInCents : 0.0001 ,
356+ } ,
357+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
358+ } ) ;
359+
360+ try {
361+ const parentTask = "parent-task" ;
362+
363+ //create background worker
364+ await setupBackgroundWorker ( engine , authenticatedEnvironment , [ parentTask ] ) ;
365+
366+ //trigger the run
367+ const parentRun = await engine . trigger (
368+ {
369+ number : 1 ,
370+ friendlyId : "run_p1234" ,
371+ environment : authenticatedEnvironment ,
372+ taskIdentifier : parentTask ,
373+ payload : "{}" ,
374+ payloadType : "application/json" ,
375+ context : { } ,
376+ traceContext : { } ,
377+ traceId : "t12345" ,
378+ spanId : "s12345" ,
379+ workerQueue : "main" ,
380+ queue : `task/${ parentTask } ` ,
381+ isTest : false ,
382+ tags : [ ] ,
383+ } ,
384+ prisma
385+ ) ;
386+
387+ //dequeue the run, but don't start an attempt — this leaves TaskRun.status = DEQUEUED
388+ //and execution snapshot = PENDING_EXECUTING (a worker has claimed the run)
389+ await setTimeout ( 500 ) ;
390+ const dequeued = await engine . dequeueFromWorkerQueue ( {
391+ consumerId : "test_12345" ,
392+ workerQueue : "main" ,
393+ } ) ;
394+ expect ( dequeued . length ) . toBe ( 1 ) ;
395+
396+ const dequeuedRun = await prisma . taskRun . findFirstOrThrow ( {
397+ where : { id : parentRun . id } ,
398+ } ) ;
399+ expect ( dequeuedRun . status ) . toBe ( "DEQUEUED" ) ;
400+
401+ //cancel the dequeued run — a worker has already claimed it, so the snapshot goes to
402+ //PENDING_CANCEL pending the worker ack. TaskRun.status flips to CANCELED immediately
403+ //so the UI reflects cancellation without waiting.
404+ const result = await engine . cancelRun ( {
405+ runId : parentRun . id ,
406+ completedAt : new Date ( ) ,
407+ reason : "Cancelled by the user" ,
408+ } ) ;
409+ expect ( result . snapshot . executionStatus ) . toBe ( "PENDING_CANCEL" ) ;
410+
411+ const pendingCancel = await engine . getRunExecutionData ( { runId : parentRun . id } ) ;
412+ expect ( pendingCancel ?. snapshot . executionStatus ) . toBe ( "PENDING_CANCEL" ) ;
413+ expect ( pendingCancel ?. run . status ) . toBe ( "CANCELED" ) ;
414+
415+ let cancelledEventData : EventBusEventArgs < "runCancelled" > [ 0 ] [ ] = [ ] ;
416+ engine . eventBus . on ( "runCancelled" , ( result ) => {
417+ cancelledEventData . push ( result ) ;
418+ } ) ;
419+
420+ //simulate worker acknowledging the cancellation
421+ const completeResult = await engine . completeRunAttempt ( {
422+ runId : parentRun . id ,
423+ snapshotId : pendingCancel ! . snapshot . id ,
424+ completion : {
425+ ok : false ,
426+ id : parentRun . id ,
427+ error : {
428+ type : "INTERNAL_ERROR" as const ,
429+ code : "TASK_RUN_CANCELLED" as const ,
430+ } ,
431+ } ,
432+ } ) ;
433+ expect ( completeResult . snapshot . executionStatus ) . toBe ( "FINISHED" ) ;
434+ expect ( completeResult . run . status ) . toBe ( "CANCELED" ) ;
435+
436+ //check emitted event after worker ack
437+ expect ( cancelledEventData . length ) . toBe ( 1 ) ;
438+ const parentEvent = cancelledEventData . find ( ( r ) => r . run . id === parentRun . id ) ;
439+ assertNonNullable ( parentEvent ) ;
440+ expect ( parentEvent . run . spanId ) . toBe ( parentRun . spanId ) ;
441+
442+ //concurrency should have been released
443+ const envConcurrencyCompleted = await engine . runQueue . currentConcurrencyOfEnvironment (
444+ authenticatedEnvironment
445+ ) ;
446+ expect ( envConcurrencyCompleted ) . toBe ( 0 ) ;
447+ } finally {
448+ await engine . quit ( ) ;
449+ }
450+ } ) ;
451+
325452 //todo bulk cancelling runs
326453} ) ;
0 commit comments