Skip to content

Commit 331d425

Browse files
committed
Merge branch 'feat/admin-back-office-rate-limit' of https://github.com/triggerdotdev/trigger.dev into feat/admin-back-office-rate-limit
2 parents 76be07a + 0e28498 commit 331d425

13 files changed

Lines changed: 286 additions & 59 deletions

File tree

.github/workflows/pr_checks.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ on:
66
paths-ignore:
77
- "docs/**"
88
- ".changeset/**"
9+
- "hosting/**"
910

1011
concurrency:
1112
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix memory leak where every aborted SSE connection pinned the full request/response graph on Node 20, caused by `AbortSignal.any()` in `sse.ts` retaining its source signals indefinitely (see nodejs/node#54614, nodejs/node#55351). Also clear the `setTimeout(abort)` timer in `entry.server.tsx` so successful HTML renders don't pin the React tree for 30s per request.

apps/webapp/app/entry.server.tsx

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ function handleBotRequest(
8383
) {
8484
return new Promise((resolve, reject) => {
8585
let shellRendered = false;
86+
// Timer handle is cleared in every terminal callback so the abort closure
87+
// (which captures the full React render tree + remixContext) doesn't pin
88+
// memory for 30s per successful request. See react-router PR #14200.
89+
let abortTimer: NodeJS.Timeout | undefined;
8690
const { pipe, abort } = renderToPipeableStream(
8791
<OperatingSystemContextProvider platform={platform}>
8892
<LocaleContextProvider locales={locales}>
@@ -105,8 +109,10 @@ function handleBotRequest(
105109
);
106110

107111
pipe(body);
112+
clearTimeout(abortTimer);
108113
},
109114
onShellError(error: unknown) {
115+
clearTimeout(abortTimer);
110116
reject(error);
111117
},
112118
onError(error: unknown) {
@@ -121,7 +127,7 @@ function handleBotRequest(
121127
}
122128
);
123129

124-
setTimeout(abort, ABORT_DELAY);
130+
abortTimer = setTimeout(abort, ABORT_DELAY);
125131
});
126132
}
127133

@@ -135,6 +141,10 @@ function handleBrowserRequest(
135141
) {
136142
return new Promise((resolve, reject) => {
137143
let shellRendered = false;
144+
// Timer handle is cleared in every terminal callback so the abort closure
145+
// (which captures the full React render tree + remixContext) doesn't pin
146+
// memory for 30s per successful request. See react-router PR #14200.
147+
let abortTimer: NodeJS.Timeout | undefined;
138148
const { pipe, abort } = renderToPipeableStream(
139149
<OperatingSystemContextProvider platform={platform}>
140150
<LocaleContextProvider locales={locales}>
@@ -157,8 +167,10 @@ function handleBrowserRequest(
157167
);
158168

159169
pipe(body);
170+
clearTimeout(abortTimer);
160171
},
161172
onShellError(error: unknown) {
173+
clearTimeout(abortTimer);
162174
reject(error);
163175
},
164176
onError(error: unknown) {
@@ -173,7 +185,7 @@ function handleBrowserRequest(
173185
}
174186
);
175187

176-
setTimeout(abort, ABORT_DELAY);
188+
abortTimer = setTimeout(abort, ABORT_DELAY);
177189
});
178190
}
179191

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ const EnvironmentSchema = z
438438
INTERNAL_OTEL_TRACE_SAMPLING_RATE: z.string().default("20"),
439439
INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED: z.string().default("0"),
440440
INTERNAL_OTEL_TRACE_DISABLED: z.string().default("0"),
441+
DISABLE_HTTP_INSTRUMENTATION: BoolEnv.default(false),
441442

442443
INTERNAL_OTEL_LOG_EXPORTER_URL: z.string().optional(),
443444
INTERNAL_OTEL_METRIC_EXPORTER_URL: z.string().optional(),

apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { type PrismaClient, prisma } from "~/db.server";
22
import { logger } from "~/services/logger.server";
33
import { singleton } from "~/utils/singleton";
4-
import { createSSELoader, SendFunction } from "~/utils/sse";
4+
import { ABORT_REASON_SEND_ERROR, createSSELoader, SendFunction } from "~/utils/sse";
55
import { throttle } from "~/utils/throttle";
66
import { tracePubSub } from "~/v3/services/tracePubSub.server";
77

@@ -66,8 +66,10 @@ export class RunStreamPresenter {
6666
});
6767
}
6868
}
69-
// Abort the stream on send error
70-
context.controller.abort("Send error");
69+
// Abort the stream on send error. Uses a stackless string sentinel
70+
// from sse.ts — a no-arg abort() would create a DOMException with a
71+
// stack trace, which is unnecessary retention on the signal.reason.
72+
context.controller.abort(ABORT_REASON_SEND_ERROR);
7173
}
7274
},
7375
1000

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -212,15 +212,18 @@ export class StreamBatchItemsService extends WithRunEngine {
212212
// Validate we received the expected number of items
213213
if (enqueuedCount !== batch.runCount) {
214214
// The batch queue consumers may have already processed all items and
215-
// cleaned up the Redis keys before we got here (especially likely when
216-
// items include pre-failed runs that complete instantly). Check if the
217-
// batch was already sealed/completed in Postgres.
218-
const currentBatch = await this._prisma.batchTaskRun.findUnique({
215+
// cleaned up the Redis keys before we got here. This happens when all
216+
// runs complete fast enough that cleanup() deletes the enqueuedItemsKey
217+
// before we read it — typically when the last item executes in the
218+
// milliseconds between the loop ending and getBatchEnqueuedCount() being called.
219+
// Check both sealed (sealed by this endpoint on a concurrent request) and
220+
// COMPLETED (sealed by the BatchQueue completion path before we got here).
221+
const currentBatch = await this._prisma.batchTaskRun.findFirst({
219222
where: { id: batchId },
220223
select: { sealed: true, status: true },
221224
});
222225

223-
if (currentBatch?.sealed) {
226+
if (currentBatch?.sealed || currentBatch?.status === "COMPLETED") {
224227
logger.info("Batch already sealed before count check (fast completion)", {
225228
batchId: batchFriendlyId,
226229
itemsAccepted,
@@ -279,8 +282,18 @@ export class StreamBatchItemsService extends WithRunEngine {
279282

280283
// Check if we won the race to seal the batch
281284
if (sealResult.count === 0) {
282-
// Another request sealed the batch first - re-query to check current state
283-
const currentBatch = await this._prisma.batchTaskRun.findUnique({
285+
// The conditional update failed because the batch was no longer in
286+
// PENDING status. Re-query to determine which path got there first:
287+
// - A concurrent streaming request already sealed and moved it to
288+
// PROCESSING.
289+
// - The BatchQueue completion path finished all runs and set it to
290+
// COMPLETED (without setting sealed=true — that's this endpoint's
291+
// job). This window exists between completionCallback (which calls
292+
// tryCompleteBatch) and cleanup() in BatchQueue — see
293+
// batch-queue/index.ts.
294+
// Either way the goal — a durable batch that the SDK stops retrying —
295+
// has been achieved, so we return sealed: true.
296+
const currentBatch = await this._prisma.batchTaskRun.findFirst({
284297
where: { id: batchId },
285298
select: {
286299
id: true,
@@ -290,13 +303,17 @@ export class StreamBatchItemsService extends WithRunEngine {
290303
},
291304
});
292305

293-
if (currentBatch?.sealed && currentBatch.status === "PROCESSING") {
294-
// The batch was sealed by another request - this is fine, the goal was achieved
295-
logger.info("Batch already sealed by concurrent request", {
306+
if (
307+
(currentBatch?.sealed && currentBatch.status === "PROCESSING") ||
308+
currentBatch?.status === "COMPLETED"
309+
) {
310+
logger.info("Batch already sealed/completed by concurrent path", {
296311
batchId: batchFriendlyId,
297312
itemsAccepted,
298313
itemsDeduplicated,
299314
envId: environment.id,
315+
batchStatus: currentBatch.status,
316+
batchSealed: currentBatch.sealed,
300317
});
301318

302319
span.setAttribute("itemsAccepted", itemsAccepted);

apps/webapp/app/utils/sse.ts

Lines changed: 62 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,27 @@ type SSEOptions = {
3838
// This is used to track the open connections, for debugging
3939
const connections: Set<string> = new Set();
4040

41+
// Stackless sentinel reasons passed to AbortController#abort. Calling .abort()
42+
// with no argument produces a DOMException that captures a ~500-byte stack
43+
// trace; a string reason is stored verbatim with no stack. The choice of
44+
// reason type does not cause the retention we saw in prod (that was the
45+
// AbortSignal.any composite — see comment near the timeoutTimer below for the
46+
// Node issue refs), but naming the sentinels keeps call sites readable and
47+
// lets future signal.reason consumers branch on the cause.
48+
export const ABORT_REASON_REQUEST = "request_aborted";
49+
export const ABORT_REASON_TIMEOUT = "timeout";
50+
export const ABORT_REASON_SEND_ERROR = "send_error";
51+
export const ABORT_REASON_INIT_STOP = "init_requested_stop";
52+
export const ABORT_REASON_ITERATOR_STOP = "iterator_requested_stop";
53+
export const ABORT_REASON_ITERATOR_ERROR = "iterator_error";
54+
4155
export function createSSELoader(options: SSEOptions) {
4256
const { timeout, interval = 500, debug = false, handler } = options;
4357

4458
return async function loader({ request, params }: LoaderFunctionArgs) {
4559
const id = request.headers.get("x-request-id") || Math.random().toString(36).slice(2, 8);
4660

4761
const internalController = new AbortController();
48-
const timeoutSignal = AbortSignal.timeout(timeout);
4962

5063
const log = (message: string) => {
5164
if (debug)
@@ -60,16 +73,20 @@ export function createSSELoader(options: SSEOptions) {
6073
if (!internalController.signal.aborted) {
6174
originalSend(event);
6275
}
63-
// If controller is aborted, silently ignore the send attempt
6476
} catch (error) {
6577
if (error instanceof Error) {
6678
if (error.message?.includes("Controller is already closed")) {
67-
// Silently handle controller closed errors
6879
return;
6980
}
7081
log(`Error sending event: ${error.message}`);
7182
}
72-
throw error; // Re-throw other errors
83+
// Abort before rethrowing so timer + request-abort listener are cleaned
84+
// up immediately. Otherwise a send-failure in initStream leaves them
85+
// alive until `timeout` fires.
86+
if (!internalController.signal.aborted) {
87+
internalController.abort(ABORT_REASON_SEND_ERROR);
88+
}
89+
throw error;
7390
}
7491
};
7592
};
@@ -92,51 +109,57 @@ export function createSSELoader(options: SSEOptions) {
92109

93110
const requestAbortSignal = getRequestAbortSignal();
94111

95-
const combinedSignal = AbortSignal.any([
96-
requestAbortSignal,
97-
timeoutSignal,
98-
internalController.signal,
99-
]);
100-
101112
log("Start");
102113

103-
requestAbortSignal.addEventListener(
104-
"abort",
105-
() => {
106-
log(`request signal aborted`);
107-
internalController.abort("Request aborted");
108-
},
109-
{ once: true, signal: internalController.signal }
110-
);
114+
// Single-signal abort chain: everything rolls up into internalController.
115+
// Timeout is a plain setTimeout cleared on abort rather than an
116+
// AbortSignal.timeout() combined via AbortSignal.any() — AbortSignal.any
117+
// keeps its source signals in an internal Set<WeakRef> managed by a
118+
// FinalizationRegistry, and under sustained request traffic those entries
119+
// accumulate faster than they get cleaned up, pinning every source signal
120+
// (and its listeners, and anything those listeners close over) until the
121+
// parent signal is GC'd or aborts. Reproduced locally in isolation; shape
122+
// matches the ChainSafe Lodestar production case described in
123+
// nodejs/node#54614. See also nodejs/node#55351 (mechanism confirmed by
124+
// @jasnell, narrow fix in 22.12.0 via #55354) and nodejs/node#57584
125+
// (circular-dep variant, still open).
126+
const timeoutTimer = setTimeout(() => {
127+
if (!internalController.signal.aborted) internalController.abort(ABORT_REASON_TIMEOUT);
128+
}, timeout);
129+
130+
const onRequestAbort = () => {
131+
log("request signal aborted");
132+
if (!internalController.signal.aborted) internalController.abort(ABORT_REASON_REQUEST);
133+
};
111134

112-
combinedSignal.addEventListener(
135+
internalController.signal.addEventListener(
113136
"abort",
114137
() => {
115-
log(`combinedSignal aborted: ${combinedSignal.reason}`);
138+
clearTimeout(timeoutTimer);
139+
requestAbortSignal.removeEventListener("abort", onRequestAbort);
116140
},
117-
{ once: true, signal: internalController.signal }
141+
{ once: true }
118142
);
119143

120-
timeoutSignal.addEventListener(
121-
"abort",
122-
() => {
123-
if (internalController.signal.aborted) return;
124-
log(`timeoutSignal aborted: ${timeoutSignal.reason}`);
125-
internalController.abort("Timeout");
126-
},
127-
{ once: true, signal: internalController.signal }
128-
);
144+
// The request could have been aborted during `await handler(context)` above.
145+
// AbortSignal listeners added after the signal is already aborted never fire,
146+
// so invoke cleanup synchronously in that case instead of waiting for `timeout`.
147+
if (requestAbortSignal.aborted) {
148+
onRequestAbort();
149+
} else {
150+
requestAbortSignal.addEventListener("abort", onRequestAbort, { once: true });
151+
}
129152

130153
if (handlers.beforeStream) {
131154
const shouldContinue = await handlers.beforeStream();
132155
if (shouldContinue === false) {
133156
log("beforeStream returned false, so we'll exit before creating the stream");
134-
internalController.abort("Init requested stop");
157+
internalController.abort(ABORT_REASON_INIT_STOP);
135158
return;
136159
}
137160
}
138161

139-
return eventStream(combinedSignal, function setup(send) {
162+
return eventStream(internalController.signal, function setup(send) {
140163
connections.add(id);
141164
const safeSend = createSafeSend(send);
142165

@@ -147,14 +170,14 @@ export function createSSELoader(options: SSEOptions) {
147170
const shouldContinue = await handlers.initStream({ send: safeSend });
148171
if (shouldContinue === false) {
149172
log("initStream returned false, so we'll stop the stream");
150-
internalController.abort("Init requested stop");
173+
internalController.abort(ABORT_REASON_INIT_STOP);
151174
return;
152175
}
153176
}
154177

155178
log("Starting interval");
156179
for await (const _ of setInterval(interval, null, {
157-
signal: combinedSignal,
180+
signal: internalController.signal,
158181
})) {
159182
log("PING");
160183

@@ -165,13 +188,16 @@ export function createSSELoader(options: SSEOptions) {
165188
const shouldContinue = await handlers.iterator({ date, send: safeSend });
166189
if (shouldContinue === false) {
167190
log("iterator return false, so we'll stop the stream");
168-
internalController.abort("Iterator requested stop");
191+
internalController.abort(ABORT_REASON_ITERATOR_STOP);
169192
break;
170193
}
171194
} catch (error) {
172195
log("iterator threw an error, aborting stream");
173196
// Immediately abort to trigger cleanup
174-
internalController.abort(error instanceof Error ? error.message : "Iterator error");
197+
if (error instanceof Error && error.name !== "AbortError") {
198+
log(`iterator error: ${error.message}`);
199+
}
200+
internalController.abort(ABORT_REASON_ITERATOR_ERROR);
175201
// No need to re-throw as we're handling it by aborting
176202
return; // Exit the run function immediately
177203
}

apps/webapp/app/v3/tracer.server.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,13 +302,15 @@ function setupTelemetry() {
302302
provider.register();
303303

304304
let instrumentations: Instrumentation[] = [
305-
new HttpInstrumentation(),
306-
new ExpressInstrumentation(),
307305
new AwsSdkInstrumentation({
308306
suppressInternalInstrumentation: true,
309307
}),
310308
];
311309

310+
if (!env.DISABLE_HTTP_INSTRUMENTATION) {
311+
instrumentations.unshift(new HttpInstrumentation(), new ExpressInstrumentation());
312+
}
313+
312314
if (env.INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED === "1") {
313315
instrumentations.push(new PrismaInstrumentation());
314316
}

0 commit comments

Comments
 (0)