Skip to content

Commit f30f251

Browse files
committed
feat(webapp): add skipIfActive trigger option for drop-on-conflict dedup
`skipIfActive: boolean` is a new option on `tasks.trigger()` (and batch items). When set, and at least one `tag` is supplied, the webapp checks Postgres for an in-flight (non-terminal) TaskRun with the same `taskIdentifier` + environment + all-of the supplied tags. If one exists, the trigger is a no-op: the existing run is returned with `isCached: true` + new `wasSkipped: true` flag, and no new run is created. Why: current options don't cover the "cron scanner drops duplicates" pattern. - `idempotencyKey` caches successful completions too -> scheduled retriggers blocked until TTL expires. - `concurrencyKey` queues duplicates FIFO -> backlog grows when a sync hangs; dashboard fills with redundant QUEUED rows. - Manual `runs.list()` dedup in user code is expensive (ClickHouse `task_runs_v2 FINAL + hasAny(tags)` under load) and gets reinvented poorly in every downstream project. Implementation follows the `IdempotencyKeyConcern` pattern: - `SkipIfActiveConcern.handleTriggerRequest()` does one indexed SELECT on `"TaskRun"` (GIN `runTags_idx` + composite status/env index). - Invoked in `RunEngineTriggerTaskService` AFTER idempotency (explicit idempotency match wins) but BEFORE run creation (skipped triggers never touch the queue). - Route surfaces `wasSkipped: true` in the response. Additive — older SDK clients ignore unknown field. Scope: - Engine v2 only (the V1 `TriggerTaskServiceV1` path is not touched; new code in v1 is frozen upstream). - Per-item honor in batch trigger via the same per-item concern call. - No schema migration; reuses existing indexes. Docs: `docs/skip-if-active.mdx` compares with `idempotencyKey` / `concurrencyKey` and documents the interaction matrix. Tests: unit tests for `SkipIfActiveConcern` covering no-flag, no-tags, no-match, single-match, string-tag normalization, and the row-disappeared-mid-query race. Refs: #3428 (related context for cron-scanner load patterns that motivated this option).
1 parent 28a66ac commit f30f251

8 files changed

Lines changed: 370 additions & 0 deletions

File tree

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ const { action, loader } = createActionApiRoute(
135135
{
136136
id: result.run.friendlyId,
137137
isCached: result.isCached,
138+
...(result.wasSkipped ? { wasSkipped: true } : {}),
138139
},
139140
{
140141
headers: $responseHeaders,
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { Prisma, type PrismaClientOrTransaction, type TaskRun } from "@trigger.dev/database";
2+
import { logger } from "~/services/logger.server";
3+
import type { TriggerTaskRequest } from "../types";
4+
5+
export type SkipIfActiveConcernResult =
6+
| { wasSkipped: true; run: TaskRun }
7+
| { wasSkipped: false };
8+
9+
/**
10+
* DB-level `TaskRunStatus` values that represent a run that has not reached
11+
* a terminal state — i.e. still counts as "active" for dedup purposes.
12+
* Mirrors the non-final statuses in `TaskRunStatus` (see
13+
* `internal-packages/database/prisma/schema.prisma`).
14+
*/
15+
const ACTIVE_TASK_RUN_STATUSES = [
16+
"DELAYED",
17+
"PENDING",
18+
"PENDING_VERSION",
19+
"WAITING_FOR_DEPLOY",
20+
"DEQUEUED",
21+
"EXECUTING",
22+
"WAITING_TO_RESUME",
23+
"RETRYING_AFTER_FAILURE",
24+
"PAUSED",
25+
] as const;
26+
27+
/**
28+
* Implements the `skipIfActive` trigger option.
29+
*
30+
* When `body.options.skipIfActive === true` and at least one tag is set, the
31+
* concern looks for any in-flight TaskRun with:
32+
*
33+
* runtimeEnvironmentId = <env>
34+
* taskIdentifier = <task>
35+
* status IN (non-terminal)
36+
* runTags @> <supplied tags>
37+
*
38+
* If found, the existing run is returned and the trigger is short-circuited.
39+
* If not found, the caller proceeds to create a new run as usual.
40+
*
41+
* Intended use case: cron-style scanners that poll at a fixed cadence but
42+
* should drop duplicate triggers while a prior invocation is still running —
43+
* without generating queue backlog (`concurrencyKey`) or caching successful
44+
* completions (`idempotencyKey`).
45+
*/
46+
export class SkipIfActiveConcern {
47+
constructor(private readonly prisma: PrismaClientOrTransaction) {}
48+
49+
async handleTriggerRequest(request: TriggerTaskRequest): Promise<SkipIfActiveConcernResult> {
50+
if (request.body.options?.skipIfActive !== true) {
51+
return { wasSkipped: false };
52+
}
53+
54+
const rawTags = request.body.options?.tags;
55+
const tags = Array.isArray(rawTags) ? rawTags : rawTags ? [rawTags] : [];
56+
57+
if (tags.length === 0) {
58+
// `skipIfActive` requires a tag scope — without tags, every run of this
59+
// task would dedup against every other, which is rarely the intent.
60+
// Treat as no-op rather than silently matching.
61+
logger.debug("[SkipIfActiveConcern] skipIfActive=true with no tags — skipping the check", {
62+
taskIdentifier: request.taskId,
63+
});
64+
return { wasSkipped: false };
65+
}
66+
67+
// `runTags @> ARRAY[...]::text[]` hits the GIN ArrayOps index on
68+
// `TaskRun.runTags` and AND-matches every supplied tag. Bounded by
69+
// runtimeEnvironmentId + taskIdentifier + status via existing indexes.
70+
const statusArray = Prisma.sql`ARRAY[${Prisma.join(
71+
ACTIVE_TASK_RUN_STATUSES.map((s) => Prisma.sql`${s}`)
72+
)}]::"TaskRunStatus"[]`;
73+
74+
const existing = await this.prisma.$queryRaw<Array<{ id: string }>>`
75+
SELECT id
76+
FROM "TaskRun"
77+
WHERE "runtimeEnvironmentId" = ${request.environment.id}
78+
AND "taskIdentifier" = ${request.taskId}
79+
AND status = ANY(${statusArray})
80+
AND "runTags" @> ${tags}::text[]
81+
LIMIT 1
82+
`;
83+
84+
if (existing.length === 0) {
85+
return { wasSkipped: false };
86+
}
87+
88+
const run = await this.prisma.taskRun.findUnique({ where: { id: existing[0].id } });
89+
if (!run) {
90+
// Row disappeared between the existence probe and the fetch (e.g.
91+
// completed + deleted mid-query). Treat as "no active run" so the
92+
// caller creates a fresh one instead of failing.
93+
return { wasSkipped: false };
94+
}
95+
96+
logger.debug("[SkipIfActiveConcern] active run matched, skipping new trigger", {
97+
runId: run.id,
98+
taskIdentifier: request.taskId,
99+
environmentId: request.environment.id,
100+
tags,
101+
});
102+
103+
return { wasSkipped: true, run };
104+
}
105+
}

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import type {
3131
} from "../../v3/services/triggerTask.server";
3232
import { clampMaxDuration } from "../../v3/utils/maxDuration";
3333
import { IdempotencyKeyConcern } from "../concerns/idempotencyKeys.server";
34+
import { SkipIfActiveConcern } from "../concerns/skipIfActive.server";
3435
import type {
3536
PayloadProcessor,
3637
QueueManager,
@@ -54,6 +55,7 @@ export class RunEngineTriggerTaskService {
5455
private readonly validator: TriggerTaskValidator;
5556
private readonly payloadProcessor: PayloadProcessor;
5657
private readonly idempotencyKeyConcern: IdempotencyKeyConcern;
58+
private readonly skipIfActiveConcern: SkipIfActiveConcern;
5759
private readonly runNumberIncrementer: RunNumberIncrementer;
5860
private readonly prisma: PrismaClientOrTransaction;
5961
private readonly engine: RunEngine;
@@ -69,6 +71,7 @@ export class RunEngineTriggerTaskService {
6971
validator: TriggerTaskValidator;
7072
payloadProcessor: PayloadProcessor;
7173
idempotencyKeyConcern: IdempotencyKeyConcern;
74+
skipIfActiveConcern?: SkipIfActiveConcern;
7275
runNumberIncrementer: RunNumberIncrementer;
7376
traceEventConcern: TraceEventConcern;
7477
tracer: Tracer;
@@ -81,6 +84,7 @@ export class RunEngineTriggerTaskService {
8184
this.validator = opts.validator;
8285
this.payloadProcessor = opts.payloadProcessor;
8386
this.idempotencyKeyConcern = opts.idempotencyKeyConcern;
87+
this.skipIfActiveConcern = opts.skipIfActiveConcern ?? new SkipIfActiveConcern(opts.prisma);
8488
this.runNumberIncrementer = opts.runNumberIncrementer;
8589
this.tracer = opts.tracer;
8690
this.traceEventConcern = opts.traceEventConcern;
@@ -207,6 +211,14 @@ export class RunEngineTriggerTaskService {
207211

208212
const { idempotencyKey, idempotencyKeyExpiresAt } = idempotencyKeyConcernResult;
209213

214+
// `skipIfActive` — drop-on-conflict. Runs *after* idempotency so a
215+
// deliberate idempotency cache-hit wins, but *before* run creation so
216+
// we never touch the queue for a skipped trigger.
217+
const skipIfActiveResult = await this.skipIfActiveConcern.handleTriggerRequest(triggerRequest);
218+
if (skipIfActiveResult.wasSkipped) {
219+
return { run: skipIfActiveResult.run, isCached: true, wasSkipped: true };
220+
}
221+
210222
if (idempotencyKey) {
211223
await this.triggerRacepointSystem.waitForRacepoint({
212224
racepoint: "idempotencyKey",

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ export class OutOfEntitlementError extends Error {
4545
export type TriggerTaskServiceResult = {
4646
run: TaskRun;
4747
isCached: boolean;
48+
/**
49+
* True when the run returned was matched by the `skipIfActive` option and
50+
* no new run was created. Always false/undefined for idempotency-cached
51+
* runs — `isCached` distinguishes those.
52+
*/
53+
wasSkipped?: boolean;
4854
};
4955

5056
export const MAX_ATTEMPTS = 2;
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
vi.mock("~/services/logger.server", () => ({
4+
logger: {
5+
debug: vi.fn(),
6+
info: vi.fn(),
7+
warn: vi.fn(),
8+
error: vi.fn(),
9+
},
10+
}));
11+
12+
import { SkipIfActiveConcern } from "../../app/runEngine/concerns/skipIfActive.server";
13+
import type { TriggerTaskRequest } from "../../app/runEngine/types";
14+
15+
type MockPrisma = {
16+
$queryRaw: ReturnType<typeof vi.fn>;
17+
taskRun: { findUnique: ReturnType<typeof vi.fn> };
18+
};
19+
20+
function buildRequest(overrides: {
21+
skipIfActive?: boolean;
22+
tags?: string | string[];
23+
taskId?: string;
24+
environmentId?: string;
25+
}): TriggerTaskRequest {
26+
return {
27+
taskId: overrides.taskId ?? "ezderm-notes-fetch",
28+
friendlyId: "run_test",
29+
environment: {
30+
id: overrides.environmentId ?? "env_123",
31+
organizationId: "org_1",
32+
projectId: "proj_1",
33+
} as TriggerTaskRequest["environment"],
34+
body: {
35+
payload: {},
36+
options: {
37+
skipIfActive: overrides.skipIfActive,
38+
tags: overrides.tags,
39+
},
40+
},
41+
options: {},
42+
} as TriggerTaskRequest;
43+
}
44+
45+
function mockPrisma(initial?: {
46+
existing?: Array<{ id: string }>;
47+
run?: { id: string } | null;
48+
}): MockPrisma {
49+
return {
50+
$queryRaw: vi.fn().mockResolvedValue(initial?.existing ?? []),
51+
taskRun: { findUnique: vi.fn().mockResolvedValue(initial?.run ?? null) },
52+
};
53+
}
54+
55+
describe("SkipIfActiveConcern", () => {
56+
it("returns wasSkipped=false when the flag is not set", async () => {
57+
const prisma = mockPrisma();
58+
const concern = new SkipIfActiveConcern(prisma as never);
59+
60+
const result = await concern.handleTriggerRequest(
61+
buildRequest({ skipIfActive: undefined, tags: ["connector:abc"] })
62+
);
63+
64+
expect(result).toEqual({ wasSkipped: false });
65+
expect(prisma.$queryRaw).not.toHaveBeenCalled();
66+
});
67+
68+
it("returns wasSkipped=false when skipIfActive=false", async () => {
69+
const prisma = mockPrisma();
70+
const concern = new SkipIfActiveConcern(prisma as never);
71+
72+
const result = await concern.handleTriggerRequest(
73+
buildRequest({ skipIfActive: false, tags: ["connector:abc"] })
74+
);
75+
76+
expect(result).toEqual({ wasSkipped: false });
77+
expect(prisma.$queryRaw).not.toHaveBeenCalled();
78+
});
79+
80+
it("returns wasSkipped=false when no tags are supplied (tag scope required)", async () => {
81+
const prisma = mockPrisma();
82+
const concern = new SkipIfActiveConcern(prisma as never);
83+
84+
const result = await concern.handleTriggerRequest(
85+
buildRequest({ skipIfActive: true, tags: undefined })
86+
);
87+
88+
expect(result).toEqual({ wasSkipped: false });
89+
expect(prisma.$queryRaw).not.toHaveBeenCalled();
90+
});
91+
92+
it("returns wasSkipped=false when no active run matches", async () => {
93+
const prisma = mockPrisma({ existing: [] });
94+
const concern = new SkipIfActiveConcern(prisma as never);
95+
96+
const result = await concern.handleTriggerRequest(
97+
buildRequest({ skipIfActive: true, tags: ["connector:abc"] })
98+
);
99+
100+
expect(result).toEqual({ wasSkipped: false });
101+
expect(prisma.$queryRaw).toHaveBeenCalledTimes(1);
102+
expect(prisma.taskRun.findUnique).not.toHaveBeenCalled();
103+
});
104+
105+
it("returns wasSkipped=true with the existing run when an active run matches all tags", async () => {
106+
const existingRun = { id: "run_existing", status: "EXECUTING", runTags: ["connector:abc"] };
107+
const prisma = mockPrisma({ existing: [{ id: existingRun.id }], run: existingRun });
108+
const concern = new SkipIfActiveConcern(prisma as never);
109+
110+
const result = await concern.handleTriggerRequest(
111+
buildRequest({ skipIfActive: true, tags: ["connector:abc"] })
112+
);
113+
114+
expect(result).toMatchObject({ wasSkipped: true, run: existingRun });
115+
expect(prisma.$queryRaw).toHaveBeenCalledTimes(1);
116+
expect(prisma.taskRun.findUnique).toHaveBeenCalledWith({ where: { id: "run_existing" } });
117+
});
118+
119+
it("normalizes a single tag string into an array", async () => {
120+
const prisma = mockPrisma({
121+
existing: [{ id: "run_x" }],
122+
run: { id: "run_x", status: "PENDING", runTags: ["connector:abc"] },
123+
});
124+
const concern = new SkipIfActiveConcern(prisma as never);
125+
126+
const result = await concern.handleTriggerRequest(
127+
// @ts-expect-error Zod allows both string and string[] for tags
128+
buildRequest({ skipIfActive: true, tags: "connector:abc" })
129+
);
130+
131+
expect(result.wasSkipped).toBe(true);
132+
});
133+
134+
it("recovers gracefully when the row disappears between the probe and the fetch", async () => {
135+
const prisma = mockPrisma({ existing: [{ id: "run_gone" }], run: null });
136+
const concern = new SkipIfActiveConcern(prisma as never);
137+
138+
const result = await concern.handleTriggerRequest(
139+
buildRequest({ skipIfActive: true, tags: ["connector:abc"] })
140+
);
141+
142+
expect(result).toEqual({ wasSkipped: false });
143+
});
144+
});

docs/docs.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
"versioning",
7171
"machines",
7272
"idempotency",
73+
"skip-if-active",
7374
"runs/max-duration",
7475
"tags",
7576
"runs/metadata",

0 commit comments

Comments
 (0)