Skip to content

Commit b87084d

Browse files
committed
feat(sdk,core): ChatChunkTooLargeError for oversized chat-stream chunks
The realtime stream caps each record at ~1 MiB. Today the chat.agent path through StreamsWriterV2 surfaces a generic S2Error from deep in the batching layer when a chunk exceeds the cap, with no chunk-type context and no guidance for callers. Add a pre-write byte check in StreamsWriterV2.initializeServerStream that fires before the chunk hits the underlying batcher, and a typed ChatChunkTooLargeError carrying the chunk's discriminant (type/kind), serialized size, and cap. Also exports an isChatChunkTooLargeError guard from the SDK so callers can branch cleanly. Threshold is 1 MiB minus 1 KiB to leave headroom for the JSON record envelope. The error message links to the new docs pattern (Pattern: ID-reference for large tool outputs / out-of-band streams.writer for run-scoped data).
1 parent 6c624d9 commit b87084d

4 files changed

Lines changed: 205 additions & 2 deletions

File tree

packages/core/src/v3/errors.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,26 @@ export class GracefulExitTimeoutError extends Error {
629629
}
630630
}
631631

632+
export class ChatChunkTooLargeError extends Error {
633+
constructor(
634+
public readonly chunkSize: number,
635+
public readonly maxSize: number,
636+
public readonly chunkType?: string
637+
) {
638+
super(
639+
`chat.agent chunk${chunkType ? ` of type "${chunkType}"` : ""} is ${chunkSize} bytes, ` +
640+
`over the realtime stream's per-record cap of ${maxSize} bytes. ` +
641+
`For oversized payloads (e.g. large tool outputs), write the value to your own store and ` +
642+
`emit only an id/url through the chat stream — see https://trigger.dev/docs/ai-chat/patterns/large-payloads.`
643+
);
644+
this.name = "ChatChunkTooLargeError";
645+
}
646+
}
647+
648+
export function isChatChunkTooLargeError(error: unknown): error is ChatChunkTooLargeError {
649+
return error instanceof Error && error.name === "ChatChunkTooLargeError";
650+
}
651+
632652
export class MaxDurationExceededError extends Error {
633653
constructor(
634654
public readonly maxDurationInSeconds: number,
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import { afterEach, describe, expect, it, vi } from "vitest";
2+
3+
import { ChatChunkTooLargeError, isChatChunkTooLargeError } from "../errors.js";
4+
5+
const lastAckedPosition = vi.fn(() => undefined);
6+
7+
const appendSession = vi.fn(async () => {
8+
// A WritableStream that just consumes records — we never reach S2 because
9+
// the size check fires upstream of this for the oversize case, but we still
10+
// need a valid writable for the small-chunk path.
11+
const writable = new WritableStream<unknown>({});
12+
return {
13+
writable,
14+
lastAckedPosition,
15+
};
16+
});
17+
18+
vi.mock("@s2-dev/streamstore", async (importOriginal) => {
19+
const actual = await importOriginal<typeof import("@s2-dev/streamstore")>();
20+
return {
21+
...actual,
22+
S2: class FakeS2 {
23+
basin() {
24+
return {
25+
stream: () => ({
26+
appendSession,
27+
}),
28+
};
29+
}
30+
},
31+
};
32+
});
33+
34+
import { StreamsWriterV2 } from "./streamsWriterV2.js";
35+
36+
afterEach(() => {
37+
vi.clearAllMocks();
38+
});
39+
40+
describe("StreamsWriterV2", () => {
41+
it("rejects with ChatChunkTooLargeError when a single chunk exceeds the per-record cap", async () => {
42+
const oversized = {
43+
type: "tool-output-available",
44+
output: { text: "x".repeat(2_000_000) },
45+
};
46+
const source = new ReadableStream<unknown>({
47+
start(controller) {
48+
controller.enqueue(oversized);
49+
controller.close();
50+
},
51+
});
52+
53+
const writer = new StreamsWriterV2({
54+
basin: "test",
55+
stream: "test",
56+
accessToken: "test",
57+
source,
58+
});
59+
60+
await expect(writer.wait()).rejects.toBeInstanceOf(ChatChunkTooLargeError);
61+
62+
let captured: unknown;
63+
try {
64+
await writer.wait();
65+
} catch (err) {
66+
captured = err;
67+
}
68+
expect(isChatChunkTooLargeError(captured)).toBe(true);
69+
const e = captured as ChatChunkTooLargeError;
70+
expect(e.chunkType).toBe("tool-output-available");
71+
expect(e.chunkSize).toBeGreaterThan(1_000_000);
72+
expect(e.maxSize).toBe(1024 * 1024 - 1024);
73+
expect(e.message).toMatch(/tool-output-available/);
74+
expect(e.message).toMatch(/chat\.agent chunk/);
75+
});
76+
77+
it("uses chunk.kind when chunk.type is missing (ChatInputChunk-style)", async () => {
78+
const oversized = {
79+
kind: "action",
80+
payload: "x".repeat(2_000_000),
81+
};
82+
const source = new ReadableStream<unknown>({
83+
start(controller) {
84+
controller.enqueue(oversized);
85+
controller.close();
86+
},
87+
});
88+
89+
const writer = new StreamsWriterV2({
90+
basin: "test",
91+
stream: "test",
92+
accessToken: "test",
93+
source,
94+
});
95+
96+
let captured: unknown;
97+
try {
98+
await writer.wait();
99+
} catch (err) {
100+
captured = err;
101+
}
102+
expect(isChatChunkTooLargeError(captured)).toBe(true);
103+
expect((captured as ChatChunkTooLargeError).chunkType).toBe("action");
104+
});
105+
106+
it("omits chunkType when chunk has no discriminant", async () => {
107+
const oversized = "x".repeat(2_000_000);
108+
const source = new ReadableStream<unknown>({
109+
start(controller) {
110+
controller.enqueue(oversized);
111+
controller.close();
112+
},
113+
});
114+
115+
const writer = new StreamsWriterV2({
116+
basin: "test",
117+
stream: "test",
118+
accessToken: "test",
119+
source,
120+
});
121+
122+
let captured: unknown;
123+
try {
124+
await writer.wait();
125+
} catch (err) {
126+
captured = err;
127+
}
128+
expect(isChatChunkTooLargeError(captured)).toBe(true);
129+
expect((captured as ChatChunkTooLargeError).chunkType).toBeUndefined();
130+
});
131+
132+
it("does not reject for chunks under the cap", async () => {
133+
const small = { type: "text-delta", delta: "hello" };
134+
const source = new ReadableStream<unknown>({
135+
start(controller) {
136+
controller.enqueue(small);
137+
controller.close();
138+
},
139+
});
140+
141+
const writer = new StreamsWriterV2({
142+
basin: "test",
143+
stream: "test",
144+
accessToken: "test",
145+
source,
146+
});
147+
148+
await expect(writer.wait()).resolves.toBeDefined();
149+
});
150+
});

packages/core/src/v3/realtimeStreams/streamsWriterV2.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
11
import { S2, AppendRecord, BatchTransform } from "@s2-dev/streamstore";
2+
import { ChatChunkTooLargeError } from "../errors.js";
23
import { StreamsWriter, StreamWriteResult } from "./types.js";
34
import { nanoid } from "nanoid";
45

6+
// S2 caps a single record at 1 MiB of metered bytes (body + headers + 8 byte
7+
// overhead). We give ourselves ~1 KiB of headroom for the JSON envelope and
8+
// metering bytes so the check fires before the SDK's internal `BatchTransform`
9+
// rejects the record with an opaque `S2Error`.
10+
const RECORD_BODY_MAX_BYTES = 1024 * 1024 - 1024;
11+
12+
const utf8Encoder = new TextEncoder();
13+
514
export type StreamsWriterV2Options<T = any> = {
615
basin: string;
716
stream: string;
@@ -152,8 +161,16 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
152161
controller.error(new Error("Stream aborted"));
153162
return;
154163
}
155-
// Convert each chunk to JSON string and wrap in AppendRecord
156-
controller.enqueue(AppendRecord.string({ body: JSON.stringify({ data: chunk, id: nanoid(7) }) }));
164+
const body = JSON.stringify({ data: chunk, id: nanoid(7) });
165+
const size = utf8Encoder.encode(body).length;
166+
if (size > RECORD_BODY_MAX_BYTES) {
167+
const chunkType = extractChunkType(chunk);
168+
controller.error(
169+
new ChatChunkTooLargeError(size, RECORD_BODY_MAX_BYTES, chunkType)
170+
);
171+
return;
172+
}
173+
controller.enqueue(AppendRecord.string({ body }));
157174
},
158175
})
159176
)
@@ -227,3 +244,17 @@ function safeReleaseLock(reader: ReadableStreamDefaultReader<any>) {
227244
reader.releaseLock();
228245
} catch (error) {}
229246
}
247+
248+
// chat.agent emits two chunk shapes through this writer:
249+
// - UIMessageChunks + custom data parts: `{ type: "tool-output-available" | "data-..." | ... }`
250+
// - ChatInputChunks (mostly seen on `.in`, but reused as the discriminant
251+
// elsewhere): `{ kind: "message" | "stop" | "action" }`
252+
// Surfacing whichever discriminant exists turns "chunk too large" into
253+
// "tool-output-available chunk too large", which is what users actually need.
254+
function extractChunkType(chunk: unknown): string | undefined {
255+
if (!chunk || typeof chunk !== "object") return undefined;
256+
const c = chunk as { type?: unknown; kind?: unknown };
257+
if (typeof c.type === "string") return c.type;
258+
if (typeof c.kind === "string") return c.kind;
259+
return undefined;
260+
}

packages/trigger-sdk/src/v3/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ export {
4040
AbortTaskRunError,
4141
OutOfMemoryError,
4242
CompleteTaskWithOutput,
43+
ChatChunkTooLargeError,
44+
isChatChunkTooLargeError,
4345
logger,
4446
type LogLevel,
4547
} from "@trigger.dev/core/v3";

0 commit comments

Comments
 (0)