Skip to content

Commit 0bb4b18

Browse files
committed
test(sdk,core): session-aware mockChatAgent harness + chat.test.ts mocks
Unblocks the unit tests after the chat.agent -> Sessions migration (phases B/C/D). Before: 43 passed / 43 failed (35 in chat.test.ts + 10 mockChatAgent + 2 skillsRuntime). After: 86 passed / 0 failed. Core (@trigger.dev/core/v3/test) - TestSessionStreamManager: in-memory SessionStreamManager keyed on (sessionId, io) mirroring TestInputStreamManager. Dispatch rules match production with one test-only tweak — when a record arrives and only handlers are registered (no .once waiter), it's buffered for the next once() instead of discarded. Production doesn't need this because the SSE tail naturally serializes records after the agent's turn-loop has re-registered a waiter; tests send synchronously right after turn-complete, so without the buffer the next waitWithIdleTimeout loses the message. - runInMockTaskContext installs the manager via sessionStreams.setGlobalManager, exposes drivers.sessions.in.send / .close, and tears down on exit. SDK (@trigger.dev/sdk/v3/test) - __setSessionOpenImplForTests hook in sessions.ts lets the harness override sessions.open(id) with an in-memory SessionHandle. SessionHandle constructor now accepts { in?, out? } overrides. - TestSessionOutputChannel extends SessionOutputChannel and intercepts pipe / writer / append into a shared TestSessionOutState (chunks + listener registry). Never constructs SessionStreamInstance so it avoids initializeSessionStream / StreamsWriterV2 entirely. - mockChatAgent rewritten: drops CHAT_MESSAGES_STREAM_ID / CHAT_STOP_STREAM_ID / the "chat" output stream key. sendMessage / sendRegenerate / sendAction push ChatInputChunk { kind: "message", payload } via drivers.sessions.in.send. sendStop pushes { kind: "stop" }. Turn-complete detection moves from drivers.outputs.onWrite to a TestSessionOutputChannel listener. chat.test.ts - New URL-predicate helpers at the top (isSessionCreateUrl, isTriggerTaskUrl, isSessionOutSubscribeUrl, isSessionStreamAppendUrl) + defaultSessionCreateResponse / defaultAppendResponse so every global.fetch mock speaks the same vocabulary. - Bulk-updated all 25 mock blocks: added session-create handler (transport's accessToken path now lazily upserts via POST /api/v1/sessions before trigger), swapped /realtime/v1/streams/ for /realtime/v1/sessions/ URL matchers, and replaced (streams/ && /input/) append-URL matchers with isSessionStreamAppendUrl. - Three tests updated for new semantics: onSessionChange fires twice on first message (ensureSession -> sessionId only, then triggerNewRun -> adds runId + isStreaming). Async-token call count goes 1 -> 2 on first message because ensureSession and trigger both resolve the token with purpose: "trigger". - "minimal wire payloads" test's body parsing updated — session.in append body is a raw JSON.stringify({ kind, payload }) string, not a { data } wrapper. - Replaced the vestigial "custom streamKey URL" test with a "subscribes to the backing Session's .out" assertion. streamKey option is a no-op under sessions; removal can land in a follow-up. - One test (stream closes without control chunk) legitimately needs 9s for SSE-close fallback — bumped its timeout to 15s.
1 parent 7847d78 commit 0bb4b18

7 files changed

Lines changed: 833 additions & 162 deletions

File tree

packages/core/src/v3/test/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ export {
66
export { TestInputStreamManager } from "./test-input-stream-manager.js";
77
export { TestRealtimeStreamsManager } from "./test-realtime-streams-manager.js";
88
export { TestRunMetadataManager } from "./test-run-metadata-manager.js";
9+
export { TestSessionStreamManager } from "./test-session-stream-manager.js";

packages/core/src/v3/test/mock-task-context.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { inputStreams } from "../input-streams-api.js";
22
import { realtimeStreams } from "../realtime-streams-api.js";
3+
import { sessionStreams } from "../session-streams-api.js";
34
import { localsAPI } from "../locals-api.js";
45
import { runMetadata } from "../run-metadata-api.js";
56
import { taskContext } from "../task-context-api.js";
@@ -11,9 +12,11 @@ import { NoopRuntimeManager } from "../runtime/noopRuntimeManager.js";
1112
import { unregisterGlobal } from "../utils/globals.js";
1213
import type { ServerBackgroundWorker, TaskRunContext } from "../schemas/index.js";
1314
import type { LocalsKey } from "../locals/types.js";
15+
import type { SessionChannelIO } from "../sessionStreams/types.js";
1416
import { TestInputStreamManager } from "./test-input-stream-manager.js";
1517
import { TestRealtimeStreamsManager } from "./test-realtime-streams-manager.js";
1618
import { TestRunMetadataManager } from "./test-run-metadata-manager.js";
19+
import { TestSessionStreamManager } from "./test-session-stream-manager.js";
1720

1821
/**
1922
* Shallow-partial overrides applied on top of the default mock
@@ -83,6 +86,23 @@ export type MockTaskContextDrivers = {
8386
*/
8487
set<T>(key: LocalsKey<T>, value: T): void;
8588
};
89+
/**
90+
* Session-scoped channel drivers. The `.in` side is backed by a
91+
* {@link TestSessionStreamManager} installed as the `sessionStreams`
92+
* global — so the task's `session.in.on/once/peek/waitWithIdleTimeout`
93+
* calls receive records sent through this driver.
94+
*/
95+
sessions: {
96+
in: {
97+
/**
98+
* Send a record onto `session.in` for the given session. Resolves
99+
* pending `once()` waiters and fires all `on()` handlers.
100+
*/
101+
send(sessionId: string, data: unknown, io?: SessionChannelIO): Promise<void>;
102+
/** Close pending `once()` waiters with a timeout error. */
103+
close(sessionId: string, io?: SessionChannelIO): void;
104+
};
105+
};
86106
/** The mock `TaskRunContext` assembled from defaults + user overrides. */
87107
ctx: TaskRunContext;
88108
};
@@ -198,6 +218,7 @@ export async function runInMockTaskContext<T>(
198218
const metadataManager = new TestRunMetadataManager();
199219
const inputManager = new TestInputStreamManager();
200220
const outputManager = new TestRealtimeStreamsManager();
221+
const sessionStreamManager = new TestSessionStreamManager();
201222

202223
// Unregister any previously-installed managers so `setGlobal*` wins —
203224
// `registerGlobal` returns false silently if an entry already exists.
@@ -207,6 +228,7 @@ export async function runInMockTaskContext<T>(
207228
unregisterGlobal("run-metadata");
208229
unregisterGlobal("input-streams");
209230
unregisterGlobal("realtime-streams");
231+
unregisterGlobal("session-streams");
210232
unregisterGlobal("task-context");
211233

212234
localsAPI.setGlobalLocalsManager(localsManager);
@@ -215,6 +237,7 @@ export async function runInMockTaskContext<T>(
215237
runMetadata.setGlobalManager(metadataManager);
216238
inputStreams.setGlobalManager(inputManager);
217239
realtimeStreams.setGlobalManager(outputManager);
240+
sessionStreams.setGlobalManager(sessionStreamManager);
218241
taskContext.setGlobalTaskContext({
219242
ctx,
220243
worker,
@@ -237,6 +260,14 @@ export async function runInMockTaskContext<T>(
237260
set: <TValue>(key: LocalsKey<TValue>, value: TValue) =>
238261
localsManager.setLocal(key, value),
239262
},
263+
sessions: {
264+
in: {
265+
send: (sessionId, data, io = "in") =>
266+
sessionStreamManager.__sendFromTest(sessionId, io, data),
267+
close: (sessionId, io = "in") =>
268+
sessionStreamManager.__closeFromTest(sessionId, io),
269+
},
270+
},
240271
ctx,
241272
};
242273

@@ -251,10 +282,12 @@ export async function runInMockTaskContext<T>(
251282
unregisterGlobal("task-context");
252283
unregisterGlobal("input-streams");
253284
unregisterGlobal("realtime-streams");
285+
unregisterGlobal("session-streams");
254286
unregisterGlobal("run-metadata");
255287
localsManager.reset();
256288
inputManager.reset();
257289
outputManager.reset();
290+
sessionStreamManager.reset();
258291
metadataManager.reset();
259292
}
260293
}
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
import {
2+
InputStreamOncePromise,
3+
InputStreamOnceResult,
4+
InputStreamTimeoutError,
5+
} from "../inputStreams/types.js";
6+
import type { InputStreamOnceOptions } from "../realtimeStreams/types.js";
7+
import type {
8+
SessionChannelIO,
9+
SessionStreamManager,
10+
} from "../sessionStreams/types.js";
11+
12+
type OnceWaiter = {
13+
resolve: (value: InputStreamOnceResult<unknown>) => void;
14+
timer?: ReturnType<typeof setTimeout>;
15+
signal?: AbortSignal;
16+
abortHandler?: () => void;
17+
};
18+
19+
type Handler = (data: unknown) => void | Promise<void>;
20+
21+
function keyFor(sessionId: string, io: SessionChannelIO): string {
22+
return `${sessionId}:${io}`;
23+
}
24+
25+
/**
26+
* In-memory implementation of `SessionStreamManager` for unit tests. Same
27+
* shape as {@link TestInputStreamManager} but keyed on `(sessionId, io)`.
28+
*
29+
* Tests push data via `__sendFromTest(sessionId, io, data)` — any pending
30+
* `once()` waiters resolve immediately, and all `on()` handlers fire (awaited
31+
* if they return a promise). Records that arrive before a listener is
32+
* registered are buffered so the first `once()` picks them up.
33+
*/
34+
export class TestSessionStreamManager implements SessionStreamManager {
35+
private handlers = new Map<string, Set<Handler>>();
36+
private onceWaiters = new Map<string, OnceWaiter[]>();
37+
private buffer = new Map<string, unknown[]>();
38+
private seqNums = new Map<string, number>();
39+
40+
on(
41+
sessionId: string,
42+
io: SessionChannelIO,
43+
handler: Handler
44+
): { off: () => void } {
45+
const key = keyFor(sessionId, io);
46+
47+
let set = this.handlers.get(key);
48+
if (!set) {
49+
set = new Set();
50+
this.handlers.set(key, set);
51+
}
52+
set.add(handler);
53+
54+
const buffered = this.buffer.get(key);
55+
if (buffered && buffered.length > 0) {
56+
for (const data of buffered) {
57+
this.invoke(handler, data);
58+
}
59+
this.buffer.delete(key);
60+
}
61+
62+
return {
63+
off: () => {
64+
this.handlers.get(key)?.delete(handler);
65+
},
66+
};
67+
}
68+
69+
once(
70+
sessionId: string,
71+
io: SessionChannelIO,
72+
options?: InputStreamOnceOptions
73+
): InputStreamOncePromise<unknown> {
74+
const key = keyFor(sessionId, io);
75+
76+
return new InputStreamOncePromise<unknown>((resolve) => {
77+
if (options?.signal?.aborted) {
78+
resolve({
79+
ok: false,
80+
error: new InputStreamTimeoutError(key, options.timeoutMs ?? 0),
81+
});
82+
return;
83+
}
84+
85+
const buffered = this.buffer.get(key);
86+
if (buffered && buffered.length > 0) {
87+
const next = buffered.shift();
88+
if (buffered.length === 0) this.buffer.delete(key);
89+
resolve({ ok: true, output: next });
90+
return;
91+
}
92+
93+
const waiter: OnceWaiter = { resolve, signal: options?.signal };
94+
95+
if (options?.timeoutMs !== undefined) {
96+
waiter.timer = setTimeout(() => {
97+
this.removeWaiter(key, waiter);
98+
resolve({
99+
ok: false,
100+
error: new InputStreamTimeoutError(key, options.timeoutMs!),
101+
});
102+
}, options.timeoutMs);
103+
}
104+
105+
if (options?.signal) {
106+
const abortHandler = () => {
107+
this.removeWaiter(key, waiter);
108+
if (waiter.timer) clearTimeout(waiter.timer);
109+
resolve({
110+
ok: false,
111+
error: new InputStreamTimeoutError(key, options.timeoutMs ?? 0),
112+
});
113+
};
114+
waiter.abortHandler = abortHandler;
115+
options.signal.addEventListener("abort", abortHandler, { once: true });
116+
}
117+
118+
let waiters = this.onceWaiters.get(key);
119+
if (!waiters) {
120+
waiters = [];
121+
this.onceWaiters.set(key, waiters);
122+
}
123+
waiters.push(waiter);
124+
});
125+
}
126+
127+
peek(sessionId: string, io: SessionChannelIO): unknown | undefined {
128+
const buffered = this.buffer.get(keyFor(sessionId, io));
129+
if (buffered && buffered.length > 0) return buffered[0];
130+
return undefined;
131+
}
132+
133+
lastSeqNum(sessionId: string, io: SessionChannelIO): number | undefined {
134+
return this.seqNums.get(keyFor(sessionId, io));
135+
}
136+
137+
setLastSeqNum(sessionId: string, io: SessionChannelIO, seqNum: number): void {
138+
this.seqNums.set(keyFor(sessionId, io), seqNum);
139+
}
140+
141+
shiftBuffer(sessionId: string, io: SessionChannelIO): boolean {
142+
const key = keyFor(sessionId, io);
143+
const buffered = this.buffer.get(key);
144+
if (buffered && buffered.length > 0) {
145+
buffered.shift();
146+
if (buffered.length === 0) this.buffer.delete(key);
147+
return true;
148+
}
149+
return false;
150+
}
151+
152+
disconnectStream(_sessionId: string, _io: SessionChannelIO): void {
153+
// no-op — no real SSE tail in tests
154+
}
155+
156+
clearHandlers(): void {
157+
this.handlers.clear();
158+
}
159+
160+
reset(): void {
161+
for (const waiters of this.onceWaiters.values()) {
162+
for (const w of waiters) {
163+
if (w.timer) clearTimeout(w.timer);
164+
if (w.signal && w.abortHandler) {
165+
w.signal.removeEventListener("abort", w.abortHandler);
166+
}
167+
}
168+
}
169+
this.onceWaiters.clear();
170+
this.handlers.clear();
171+
this.buffer.clear();
172+
this.seqNums.clear();
173+
}
174+
175+
disconnect(): void {
176+
this.reset();
177+
}
178+
179+
// ── Test driver API (not part of SessionStreamManager interface) ──────
180+
181+
/**
182+
* Push a record onto the given channel.
183+
*
184+
* Dispatch rules — similar to the production manager, but with a tweak
185+
* that makes unit tests deterministic:
186+
*
187+
* 1. **Handlers always observe** (like production). A session-level `.on`
188+
* is a filter-observer — it fires every time a record arrives,
189+
* regardless of whether a `.once` waiter is also active.
190+
* 2. **First waiter consumes** the record if present (like production).
191+
* 3. **If no waiter, the record is buffered for the next `.once` call.**
192+
* Production discards records that only match handlers — but in
193+
* production the SSE tail introduces enough latency that the next
194+
* `.once` is usually registered before the next record arrives. Tests
195+
* send synchronously right after `turn-complete`, so without this
196+
* buffer the next `waitWithIdleTimeout` would race and lose the
197+
* message. The buffer is the only deviation from production semantics.
198+
*/
199+
async __sendFromTest(
200+
sessionId: string,
201+
io: SessionChannelIO,
202+
data: unknown
203+
): Promise<void> {
204+
const key = keyFor(sessionId, io);
205+
206+
const handlers = this.handlers.get(key);
207+
if (handlers && handlers.size > 0) {
208+
await Promise.all(
209+
Array.from(handlers).map((h) => Promise.resolve().then(() => h(data)))
210+
);
211+
}
212+
213+
const waiters = this.onceWaiters.get(key);
214+
if (waiters && waiters.length > 0) {
215+
const w = waiters.shift()!;
216+
if (waiters.length === 0) this.onceWaiters.delete(key);
217+
if (w.timer) clearTimeout(w.timer);
218+
if (w.signal && w.abortHandler) {
219+
w.signal.removeEventListener("abort", w.abortHandler);
220+
}
221+
w.resolve({ ok: true, output: data });
222+
return;
223+
}
224+
225+
let buffered = this.buffer.get(key);
226+
if (!buffered) {
227+
buffered = [];
228+
this.buffer.set(key, buffered);
229+
}
230+
buffered.push(data);
231+
}
232+
233+
/**
234+
* Immediately resolve every pending `once()` waiter for the given channel
235+
* with a timeout error. Simulates a closed stream (e.g. session closed).
236+
*/
237+
__closeFromTest(sessionId: string, io: SessionChannelIO): void {
238+
const key = keyFor(sessionId, io);
239+
const waiters = this.onceWaiters.get(key);
240+
if (!waiters) return;
241+
const pending = waiters.splice(0);
242+
for (const w of pending) {
243+
if (w.timer) clearTimeout(w.timer);
244+
if (w.signal && w.abortHandler) {
245+
w.signal.removeEventListener("abort", w.abortHandler);
246+
}
247+
w.resolve({
248+
ok: false,
249+
error: new InputStreamTimeoutError(key, 0),
250+
});
251+
}
252+
}
253+
254+
private invoke(handler: Handler, data: unknown): void {
255+
try {
256+
const result = handler(data);
257+
if (result && typeof result === "object" && "catch" in result) {
258+
(result as Promise<void>).catch(() => {});
259+
}
260+
} catch {
261+
// Never let a handler error break test state
262+
}
263+
}
264+
265+
private removeWaiter(key: string, waiter: OnceWaiter): void {
266+
const waiters = this.onceWaiters.get(key);
267+
if (!waiters) return;
268+
const idx = waiters.indexOf(waiter);
269+
if (idx >= 0) waiters.splice(idx, 1);
270+
}
271+
}

0 commit comments

Comments
 (0)