diff --git a/.changeset/quiet-dogs-fly.md b/.changeset/quiet-dogs-fly.md new file mode 100644 index 0000000000..e601730476 --- /dev/null +++ b/.changeset/quiet-dogs-fly.md @@ -0,0 +1,7 @@ +--- +"@trigger.dev/sdk": patch +"@trigger.dev/core": patch +"trigger.dev": patch +--- + +Add support for setting TTL (time-to-live) defaults at the task level and globally in trigger.config.ts, with per-trigger overrides still taking precedence diff --git a/apps/webapp/CLAUDE.md b/apps/webapp/CLAUDE.md index 00ed6aaf78..b0f5e09b82 100644 --- a/apps/webapp/CLAUDE.md +++ b/apps/webapp/CLAUDE.md @@ -99,6 +99,17 @@ The `app/v3/` directory name is misleading - most code is actively used by V2. O Some services (e.g., `cancelTaskRun.server.ts`, `batchTriggerV3.server.ts`) branch on `RunEngineVersion` to support both V1 and V2. When editing these, only modify V2 code paths. +## Performance: Trigger Hot Path + +The `triggerTask.server.ts` service is the **highest-throughput code path** in the system. Every API trigger call goes through it. Keep it fast: + +- **Do NOT add database queries** to `triggerTask.server.ts` or `batchTriggerV3.server.ts`. Task defaults (TTL, etc.) are resolved via `backgroundWorkerTask.findFirst()` in the queue concern (`queues.server.ts`) - one query per request, in mutually exclusive branches depending on locked/non-locked path. Piggyback on the existing query instead of adding new ones. +- **Two-stage resolution pattern**: Task metadata is resolved in two stages by design: + 1. **Trigger time** (`triggerTask.server.ts`): Only TTL is resolved from task defaults. Everything else uses whatever the caller provides. + 2. **Dequeue time** (`dequeueSystem.ts`): Full `BackgroundWorkerTask` is loaded and retry config, machine config, maxDuration, etc. are resolved against task defaults. +- If you need to add a new task-level default, **add it to the existing `select` clause** in the `backgroundWorkerTask.findFirst()` query — do NOT add a second query. If the default doesn't need to be known at trigger time, resolve it at dequeue time instead. +- Batch triggers (`batchTriggerV3.server.ts`) follow the same pattern — keep batch paths equally fast. + ## Prisma Query Patterns - **Always use `findFirst` instead of `findUnique`.** Prisma's `findUnique` has an implicit DataLoader that batches concurrent calls into a single `IN` query. This batching cannot be disabled and has active bugs even in Prisma 6.x: uppercase UUIDs returning null (#25484, confirmed 6.4.1), composite key SQL correctness issues (#22202), and 5-10x worse performance than manual DataLoader (#6573, open since 2021). `findFirst` is never batched and avoids this entire class of issues. diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index 12b0b29c5f..4a4dcc8e98 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -73,19 +73,20 @@ export class DefaultQueueManager implements QueueManager { ): Promise { let queueName: string; let lockedQueueId: string | undefined; + let taskTtl: string | null | undefined; // Determine queue name based on lockToVersion and provided options if (lockedBackgroundWorker) { // Task is locked to a specific worker version const specifiedQueueName = extractQueueName(request.body.options?.queue); + if (specifiedQueueName) { - // A specific queue name is provided + // A specific queue name is provided, validate it exists for the locked worker const specifiedQueue = await this.prisma.taskQueue.findFirst({ - // Validate it exists for the locked worker where: { name: specifiedQueueName, runtimeEnvironmentId: request.environment.id, - workers: { some: { id: lockedBackgroundWorker.id } }, // Ensure the queue is associated with any task of the locked worker + workers: { some: { id: lockedBackgroundWorker.id } }, }, }); @@ -95,11 +96,26 @@ export class DefaultQueueManager implements QueueManager { }'.` ); } + // Use the validated queue name directly queueName = specifiedQueue.name; lockedQueueId = specifiedQueue.id; + + // Only fetch task for TTL if caller didn't provide a per-trigger TTL + if (request.body.options?.ttl === undefined) { + const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({ + where: { + workerId: lockedBackgroundWorker.id, + runtimeEnvironmentId: request.environment.id, + slug: request.taskId, + }, + select: { ttl: true }, + }); + + taskTtl = lockedTask?.ttl; + } } else { - // No specific queue name provided, use the default queue for the task on the locked worker + // No queue override - fetch task with queue to get both default queue and TTL const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({ where: { workerId: lockedBackgroundWorker.id, @@ -118,6 +134,8 @@ export class DefaultQueueManager implements QueueManager { ); } + taskTtl = lockedTask.ttl; + if (!lockedTask.queue) { // This case should ideally be prevented by earlier checks or schema constraints, // but handle it defensively. @@ -131,6 +149,7 @@ export class DefaultQueueManager implements QueueManager { }'.` ); } + // Use the task's default queue name queueName = lockedTask.queue.name; lockedQueueId = lockedTask.queue.id; @@ -145,7 +164,9 @@ export class DefaultQueueManager implements QueueManager { } // Get queue name using the helper for non-locked case (handles provided name or finds default) - queueName = await this.getQueueName(request); + const taskInfo = await this.getTaskQueueInfo(request); + queueName = taskInfo.queueName; + taskTtl = taskInfo.taskTtl; } // Sanitize the final determined queue name once @@ -161,21 +182,27 @@ export class DefaultQueueManager implements QueueManager { return { queueName, lockedQueueId, + taskTtl, }; } - async getQueueName(request: TriggerTaskRequest): Promise { + private async getTaskQueueInfo( + request: TriggerTaskRequest + ): Promise<{ queueName: string; taskTtl?: string | null }> { const { taskId, environment, body } = request; const { queue } = body.options ?? {}; // Use extractQueueName to handle double-wrapped queue objects - const queueName = extractQueueName(queue); - if (queueName) { - return queueName; - } + const overriddenQueueName = extractQueueName(queue); const defaultQueueName = `task/${taskId}`; + // When caller provides both a queue override and a per-trigger TTL, + // we don't need any DB queries - the per-trigger TTL takes precedence + if (overriddenQueueName && body.options?.ttl !== undefined) { + return { queueName: overriddenQueueName, taskTtl: undefined }; + } + // Find the current worker for the environment const worker = await findCurrentWorkerFromEnvironment(environment, this.prisma); @@ -185,7 +212,21 @@ export class DefaultQueueManager implements QueueManager { environmentId: environment.id, }); - return defaultQueueName; + return { queueName: overriddenQueueName ?? defaultQueueName, taskTtl: undefined }; + } + + // When queue is overridden, we only need TTL from the task (no queue join needed) + if (overriddenQueueName) { + const task = await this.prisma.backgroundWorkerTask.findFirst({ + where: { + workerId: worker.id, + runtimeEnvironmentId: environment.id, + slug: taskId, + }, + select: { ttl: true }, + }); + + return { queueName: overriddenQueueName, taskTtl: task?.ttl }; } const task = await this.prisma.backgroundWorkerTask.findFirst({ @@ -205,7 +246,7 @@ export class DefaultQueueManager implements QueueManager { environmentId: environment.id, }); - return defaultQueueName; + return { queueName: defaultQueueName, taskTtl: undefined }; } if (!task.queue) { @@ -215,10 +256,10 @@ export class DefaultQueueManager implements QueueManager { queueConfig: task.queueConfig, }); - return defaultQueueName; + return { queueName: defaultQueueName, taskTtl: task.ttl }; } - return task.queue.name ?? defaultQueueName; + return { queueName: task.queue.name ?? defaultQueueName, taskTtl: task.ttl }; } async validateQueueLimits( diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 1fe3b839e3..d87bfa67db 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -191,11 +191,6 @@ export class RunEngineTriggerTaskService { } } - const ttl = - typeof body.options?.ttl === "number" - ? stringifyDuration(body.options?.ttl) - : body.options?.ttl ?? (environment.type === "DEVELOPMENT" ? "10m" : undefined); - // Get parent run if specified const parentRun = body.options?.parentRunId ? await this.prisma.taskRun.findFirst({ @@ -251,10 +246,23 @@ export class RunEngineTriggerTaskService { }) : undefined; - const { queueName, lockedQueueId } = await this.queueConcern.resolveQueueProperties( - triggerRequest, - lockedToBackgroundWorker ?? undefined - ); + const { queueName, lockedQueueId, taskTtl } = + await this.queueConcern.resolveQueueProperties( + triggerRequest, + lockedToBackgroundWorker ?? undefined + ); + + // Resolve TTL with precedence: per-trigger > task-level > dev default + let ttl: string | undefined; + + if (body.options?.ttl !== undefined) { + ttl = + typeof body.options.ttl === "number" + ? stringifyDuration(body.options.ttl) + : body.options.ttl; + } else { + ttl = taskTtl ?? (environment.type === "DEVELOPMENT" ? "10m" : undefined); + } if (!options.skipChecks) { const queueSizeGuard = await this.queueConcern.validateQueueLimits( diff --git a/apps/webapp/app/runEngine/types.ts b/apps/webapp/app/runEngine/types.ts index 3fc8d8034b..310e0b32e5 100644 --- a/apps/webapp/app/runEngine/types.ts +++ b/apps/webapp/app/runEngine/types.ts @@ -48,6 +48,7 @@ export type QueueValidationResult = export type QueueProperties = { queueName: string; lockedQueueId?: string; + taskTtl?: string | null; }; export type LockedBackgroundWorker = Pick< @@ -61,7 +62,6 @@ export interface QueueManager { request: TriggerTaskRequest, lockedBackgroundWorker?: LockedBackgroundWorker ): Promise; - getQueueName(request: TriggerTaskRequest): Promise; validateQueueLimits( env: AuthenticatedEnvironment, queueName: string, diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index c191ae5f39..1b51ec04ae 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -6,7 +6,7 @@ import { QueueManifest, TaskResource, } from "@trigger.dev/core/v3"; -import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic"; +import { BackgroundWorkerId, stringifyDuration } from "@trigger.dev/core/v3/isomorphic"; import type { BackgroundWorker, TaskQueue, TaskQueueType } from "@trigger.dev/database"; import cronstrue from "cronstrue"; import { $transaction, Prisma, PrismaClientOrTransaction } from "~/db.server"; @@ -286,6 +286,8 @@ async function createWorkerTask( triggerSource: task.triggerSource === "schedule" ? "SCHEDULED" : "STANDARD", fileId: tasksToBackgroundFiles?.get(task.id) ?? null, maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null, + ttl: + typeof task.ttl === "number" ? stringifyDuration(task.ttl) ?? null : task.ttl ?? null, queueId: queue.id, payloadSchema: task.payloadSchema as any, }, diff --git a/internal-packages/database/prisma/migrations/20260309120000_add_ttl_to_background_worker_task/migration.sql b/internal-packages/database/prisma/migrations/20260309120000_add_ttl_to_background_worker_task/migration.sql new file mode 100644 index 0000000000..569fb94643 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260309120000_add_ttl_to_background_worker_task/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "public"."BackgroundWorkerTask" ADD COLUMN IF NOT EXISTS "ttl" TEXT; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 9b2388f87b..142fcb6dff 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -648,6 +648,8 @@ model BackgroundWorkerTask { maxDurationInSeconds Int? + ttl String? + triggerSource TaskTriggerSource @default(STANDARD) payloadSchema Json? diff --git a/packages/cli-v3/src/entryPoints/dev-index-worker.ts b/packages/cli-v3/src/entryPoints/dev-index-worker.ts index 3328f553ca..0b8ee54b0a 100644 --- a/packages/cli-v3/src/entryPoints/dev-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-index-worker.ts @@ -132,6 +132,20 @@ if (typeof config.maxDuration === "number") { }); } +// If the config has a TTL, we need to apply it to all tasks that don't have a TTL +if (config.ttl !== undefined) { + tasks = tasks.map((task) => { + if (task.ttl === undefined) { + return { + ...task, + ttl: config.ttl, + } satisfies TaskManifest; + } + + return task; + }); +} + // If the config has a machine preset, we need to apply it to all tasks that don't have a machine preset if (typeof config.machine === "string") { tasks = tasks.map((task) => { diff --git a/packages/cli-v3/src/entryPoints/managed-index-worker.ts b/packages/cli-v3/src/entryPoints/managed-index-worker.ts index f300e7ac71..efc226b99e 100644 --- a/packages/cli-v3/src/entryPoints/managed-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-index-worker.ts @@ -132,6 +132,20 @@ if (typeof config.maxDuration === "number") { }); } +// If the config has a TTL, we need to apply it to all tasks that don't have a TTL +if (config.ttl !== undefined) { + tasks = tasks.map((task) => { + if (task.ttl === undefined) { + return { + ...task, + ttl: config.ttl, + } satisfies TaskManifest; + } + + return task; + }); +} + // If the config has a machine preset, we need to apply it to all tasks that don't have a machine preset if (typeof config.machine === "string") { tasks = tasks.map((task) => { diff --git a/packages/core/src/v3/config.ts b/packages/core/src/v3/config.ts index bf1e6c687c..40334f0428 100644 --- a/packages/core/src/v3/config.ts +++ b/packages/core/src/v3/config.ts @@ -176,6 +176,25 @@ export type TriggerConfig = { */ maxDuration: number; + /** + * Set a default time-to-live (TTL) for all task runs in the project. If a run is not executed within this time, it will be removed from the queue and never execute. + * + * This can be a string like "1h" (1 hour), "30m" (30 minutes), "1d" (1 day), or a number of seconds. + * + * You can override this on a per-task basis by setting the `ttl` option on the task definition, or per-trigger by setting the `ttl` option when triggering. + * + * @example + * + * ```ts + * export default defineConfig({ + * project: "my-project", + * maxDuration: 3600, + * ttl: "1h", + * }); + * ``` + */ + ttl?: string | number; + /** * Enable console logging while running the dev CLI. This will print out logs from console.log, console.warn, and console.error. By default all logs are sent to the trigger.dev backend, and not logged to the console. */ diff --git a/packages/core/src/v3/schemas/resources.ts b/packages/core/src/v3/schemas/resources.ts index 9a9a86e4e9..e681c72841 100644 --- a/packages/core/src/v3/schemas/resources.ts +++ b/packages/core/src/v3/schemas/resources.ts @@ -13,6 +13,7 @@ export const TaskResource = z.object({ triggerSource: z.string().optional(), schedule: ScheduleMetadata.optional(), maxDuration: z.number().optional(), + ttl: z.string().or(z.number().nonnegative().int()).optional(), // JSONSchema type - using z.unknown() for runtime validation to accept JSONSchema7 payloadSchema: z.unknown().optional(), }); diff --git a/packages/core/src/v3/schemas/schemas.ts b/packages/core/src/v3/schemas/schemas.ts index 2d79d01742..4ec559ebf4 100644 --- a/packages/core/src/v3/schemas/schemas.ts +++ b/packages/core/src/v3/schemas/schemas.ts @@ -189,6 +189,7 @@ const taskMetadata = { triggerSource: z.string().optional(), schedule: ScheduleMetadata.optional(), maxDuration: z.number().optional(), + ttl: z.string().or(z.number().nonnegative().int()).optional(), payloadSchema: z.unknown().optional(), }; diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index 184aebe78a..d04d088ef1 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -278,6 +278,29 @@ type CommonTaskOptions< */ maxDuration?: number; + /** + * Set a default time-to-live for runs of this task. If the run is not executed within this time, it will be removed from the queue and never execute. + * + * This can be a string like "1h" (1 hour), "30m" (30 minutes), "1d" (1 day), or a number of seconds. + * + * If omitted it will use the value in your `trigger.config.ts` file, if set. + * + * You can override this on a per-trigger basis by setting the `ttl` option when triggering the task. + * + * @example + * + * ```ts + * export const myTask = task({ + * id: "my-task", + * ttl: "10m", + * run: async (payload) => { + * //... + * }, + * }); + * ``` + */ + ttl?: string | number; + /** This gets called when a task is triggered. It's where you put the code you want to execute. * * @param payload - The payload that is passed to your task when it's triggered. This must be JSON serializable. diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index c03732c12e..c69bceeb53 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -236,6 +236,7 @@ export function createTask< retry: params.retry ? { ...defaultRetryOptions, ...params.retry } : undefined, machine: typeof params.machine === "string" ? { preset: params.machine } : params.machine, maxDuration: params.maxDuration, + ttl: params.ttl, payloadSchema: params.jsonSchema, fns: { run: params.run, @@ -367,6 +368,7 @@ export function createSchemaTask< retry: params.retry ? { ...defaultRetryOptions, ...params.retry } : undefined, machine: typeof params.machine === "string" ? { preset: params.machine } : params.machine, maxDuration: params.maxDuration, + ttl: params.ttl, fns: { run: params.run, parsePayload, diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 8ecfdb033e..5e04f57144 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -5,6 +5,7 @@ import { fixedLengthTask } from "./batches.js"; export const helloWorldTask = task({ id: "hello-world", + ttl: "10m", retry: { maxAttempts: 3, minTimeoutInMs: 500, diff --git a/references/hello-world/src/trigger/schedule.ts b/references/hello-world/src/trigger/schedule.ts index 29c363d64c..46a7d9a18c 100644 --- a/references/hello-world/src/trigger/schedule.ts +++ b/references/hello-world/src/trigger/schedule.ts @@ -3,6 +3,7 @@ import { schedules } from "@trigger.dev/sdk/v3"; export const simpleSchedule = schedules.task({ id: "simple-schedule", cron: "0 0 * * *", + ttl: "30m", run: async (payload, { ctx }) => { return { message: "Hello, world!", diff --git a/references/hello-world/trigger.config.ts b/references/hello-world/trigger.config.ts index 2c3751b041..e0b875cd6d 100644 --- a/references/hello-world/trigger.config.ts +++ b/references/hello-world/trigger.config.ts @@ -11,6 +11,7 @@ export default defineConfig({ }, logLevel: "debug", maxDuration: 3600, + ttl: "1h", retries: { enabledInDev: true, default: {