@@ -38,14 +38,27 @@ type SSEOptions = {
3838// This is used to track the open connections, for debugging
3939const connections : Set < string > = new Set ( ) ;
4040
41+ // Stackless sentinel reasons passed to AbortController#abort. Calling .abort()
42+ // with no argument produces a DOMException that captures a ~500-byte stack
43+ // trace; a string reason is stored verbatim with no stack. The choice of
44+ // reason type does not cause the retention we saw in prod (that was the
45+ // AbortSignal.any composite — see comment near the timeoutTimer below for the
46+ // Node issue refs), but naming the sentinels keeps call sites readable and
47+ // lets future signal.reason consumers branch on the cause.
48+ export const ABORT_REASON_REQUEST = "request_aborted" ;
49+ export const ABORT_REASON_TIMEOUT = "timeout" ;
50+ export const ABORT_REASON_SEND_ERROR = "send_error" ;
51+ export const ABORT_REASON_INIT_STOP = "init_requested_stop" ;
52+ export const ABORT_REASON_ITERATOR_STOP = "iterator_requested_stop" ;
53+ export const ABORT_REASON_ITERATOR_ERROR = "iterator_error" ;
54+
4155export function createSSELoader ( options : SSEOptions ) {
4256 const { timeout, interval = 500 , debug = false , handler } = options ;
4357
4458 return async function loader ( { request, params } : LoaderFunctionArgs ) {
4559 const id = request . headers . get ( "x-request-id" ) || Math . random ( ) . toString ( 36 ) . slice ( 2 , 8 ) ;
4660
4761 const internalController = new AbortController ( ) ;
48- const timeoutSignal = AbortSignal . timeout ( timeout ) ;
4962
5063 const log = ( message : string ) => {
5164 if ( debug )
@@ -60,16 +73,20 @@ export function createSSELoader(options: SSEOptions) {
6073 if ( ! internalController . signal . aborted ) {
6174 originalSend ( event ) ;
6275 }
63- // If controller is aborted, silently ignore the send attempt
6476 } catch ( error ) {
6577 if ( error instanceof Error ) {
6678 if ( error . message ?. includes ( "Controller is already closed" ) ) {
67- // Silently handle controller closed errors
6879 return ;
6980 }
7081 log ( `Error sending event: ${ error . message } ` ) ;
7182 }
72- throw error ; // Re-throw other errors
83+ // Abort before rethrowing so timer + request-abort listener are cleaned
84+ // up immediately. Otherwise a send-failure in initStream leaves them
85+ // alive until `timeout` fires.
86+ if ( ! internalController . signal . aborted ) {
87+ internalController . abort ( ABORT_REASON_SEND_ERROR ) ;
88+ }
89+ throw error ;
7390 }
7491 } ;
7592 } ;
@@ -92,51 +109,57 @@ export function createSSELoader(options: SSEOptions) {
92109
93110 const requestAbortSignal = getRequestAbortSignal ( ) ;
94111
95- const combinedSignal = AbortSignal . any ( [
96- requestAbortSignal ,
97- timeoutSignal ,
98- internalController . signal ,
99- ] ) ;
100-
101112 log ( "Start" ) ;
102113
103- requestAbortSignal . addEventListener (
104- "abort" ,
105- ( ) => {
106- log ( `request signal aborted` ) ;
107- internalController . abort ( "Request aborted" ) ;
108- } ,
109- { once : true , signal : internalController . signal }
110- ) ;
114+ // Single-signal abort chain: everything rolls up into internalController.
115+ // Timeout is a plain setTimeout cleared on abort rather than an
116+ // AbortSignal.timeout() combined via AbortSignal.any() — AbortSignal.any
117+ // keeps its source signals in an internal Set<WeakRef> managed by a
118+ // FinalizationRegistry, and under sustained request traffic those entries
119+ // accumulate faster than they get cleaned up, pinning every source signal
120+ // (and its listeners, and anything those listeners close over) until the
121+ // parent signal is GC'd or aborts. Reproduced locally in isolation; shape
122+ // matches the ChainSafe Lodestar production case described in
123+ // nodejs/node#54614. See also nodejs/node#55351 (mechanism confirmed by
124+ // @jasnell , narrow fix in 22.12.0 via #55354) and nodejs/node#57584
125+ // (circular-dep variant, still open).
126+ const timeoutTimer = setTimeout ( ( ) => {
127+ if ( ! internalController . signal . aborted ) internalController . abort ( ABORT_REASON_TIMEOUT ) ;
128+ } , timeout ) ;
129+
130+ const onRequestAbort = ( ) => {
131+ log ( "request signal aborted" ) ;
132+ if ( ! internalController . signal . aborted ) internalController . abort ( ABORT_REASON_REQUEST ) ;
133+ } ;
111134
112- combinedSignal . addEventListener (
135+ internalController . signal . addEventListener (
113136 "abort" ,
114137 ( ) => {
115- log ( `combinedSignal aborted: ${ combinedSignal . reason } ` ) ;
138+ clearTimeout ( timeoutTimer ) ;
139+ requestAbortSignal . removeEventListener ( "abort" , onRequestAbort ) ;
116140 } ,
117- { once : true , signal : internalController . signal }
141+ { once : true }
118142 ) ;
119143
120- timeoutSignal . addEventListener (
121- "abort" ,
122- ( ) => {
123- if ( internalController . signal . aborted ) return ;
124- log ( `timeoutSignal aborted: ${ timeoutSignal . reason } ` ) ;
125- internalController . abort ( "Timeout" ) ;
126- } ,
127- { once : true , signal : internalController . signal }
128- ) ;
144+ // The request could have been aborted during `await handler(context)` above.
145+ // AbortSignal listeners added after the signal is already aborted never fire,
146+ // so invoke cleanup synchronously in that case instead of waiting for `timeout`.
147+ if ( requestAbortSignal . aborted ) {
148+ onRequestAbort ( ) ;
149+ } else {
150+ requestAbortSignal . addEventListener ( "abort" , onRequestAbort , { once : true } ) ;
151+ }
129152
130153 if ( handlers . beforeStream ) {
131154 const shouldContinue = await handlers . beforeStream ( ) ;
132155 if ( shouldContinue === false ) {
133156 log ( "beforeStream returned false, so we'll exit before creating the stream" ) ;
134- internalController . abort ( "Init requested stop" ) ;
157+ internalController . abort ( ABORT_REASON_INIT_STOP ) ;
135158 return ;
136159 }
137160 }
138161
139- return eventStream ( combinedSignal , function setup ( send ) {
162+ return eventStream ( internalController . signal , function setup ( send ) {
140163 connections . add ( id ) ;
141164 const safeSend = createSafeSend ( send ) ;
142165
@@ -147,14 +170,14 @@ export function createSSELoader(options: SSEOptions) {
147170 const shouldContinue = await handlers . initStream ( { send : safeSend } ) ;
148171 if ( shouldContinue === false ) {
149172 log ( "initStream returned false, so we'll stop the stream" ) ;
150- internalController . abort ( "Init requested stop" ) ;
173+ internalController . abort ( ABORT_REASON_INIT_STOP ) ;
151174 return ;
152175 }
153176 }
154177
155178 log ( "Starting interval" ) ;
156179 for await ( const _ of setInterval ( interval , null , {
157- signal : combinedSignal ,
180+ signal : internalController . signal ,
158181 } ) ) {
159182 log ( "PING" ) ;
160183
@@ -165,13 +188,16 @@ export function createSSELoader(options: SSEOptions) {
165188 const shouldContinue = await handlers . iterator ( { date, send : safeSend } ) ;
166189 if ( shouldContinue === false ) {
167190 log ( "iterator return false, so we'll stop the stream" ) ;
168- internalController . abort ( "Iterator requested stop" ) ;
191+ internalController . abort ( ABORT_REASON_ITERATOR_STOP ) ;
169192 break ;
170193 }
171194 } catch ( error ) {
172195 log ( "iterator threw an error, aborting stream" ) ;
173196 // Immediately abort to trigger cleanup
174- internalController . abort ( error instanceof Error ? error . message : "Iterator error" ) ;
197+ if ( error instanceof Error && error . name !== "AbortError" ) {
198+ log ( `iterator error: ${ error . message } ` ) ;
199+ }
200+ internalController . abort ( ABORT_REASON_ITERATOR_ERROR ) ;
175201 // No need to re-throw as we're handling it by aborting
176202 return ; // Exit the run function immediately
177203 }
0 commit comments