Skip to content

Commit 4453a45

Browse files
committed
feat(webapp): session.out wait=0 + X-Session-Settled on settled tail
/realtime/v1/sessions/:session/:io=out now peeks the tail record in S2 at connection time. When the tail chunk is trigger:turn-complete, the agent has finished a turn and is either idle-waiting on .in or has exited — either way no more chunks will arrive without further user action. In that case the downstream S2 read switches to wait=0 so the SSE drains and closes in ~1s instead of long-polling for 60s, and the response carries X-Session-Settled: true so the client can tell the close is terminal rather than a normal 60s cycle. Mid-turn tails (streaming UIMessageChunks in flight) fall through to the existing wait=60 long-poll. Crashed-mid-turn is indistinguishable from live-streaming at this point and gets the same 60s retry loop as today — that's a separate hardening, not in scope here. The peek uses GET /records?tail_offset=1&count=1&wait=0 (single-digit ms on S2), then unwraps the agent-side envelope written by StreamsWriterV2: record.body parses to {data: <chunk>, id}, where <chunk> is the raw UIMessageChunk object. No double-parse on data. 404 / 416 from the peek (stream never written / empty stream) short- circuit to settled=false so first-connect on a freshly-created session keeps the long-poll semantics the agent's first chunks depend on. Verified end-to-end against an idle chat-agent-smoke session: caught- up reconnect (Last-Event-ID = tail) closes in 1.08s with the header; behind reconnect (Last-Event-ID < tail) drains remaining records then closes in 0.94s with the header; empty-stream reconnect keeps the 60s long-poll behavior unchanged.
1 parent b6e642f commit 4453a45

2 files changed

Lines changed: 112 additions & 1 deletion

File tree

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
`/realtime/v1/sessions/:session/out` now peeks the tail record in S2 at connection time. If the last chunk is `trigger:turn-complete` (agent finished a turn and is either idle-waiting on `.in` or has exited), the downstream S2 read uses `wait=0` so the SSE drains and closes immediately instead of holding the connection open for 60s. The response also carries `X-Session-Settled: true` so the client can tell the close is terminal rather than a normal long-poll cycle.
7+
8+
Lets `TriggerChatTransport.reconnectToStream` return quickly on page reloads of settled chats without requiring callers to persist an `isStreaming` flag — the server decides from the stream's own tail. Mid-turn tails still take the 60s long-poll path unchanged.

apps/webapp/app/services/realtime/s2realtimeStreams.server.ts

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,18 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
293293
/**
294294
* Serve SSE from a `Session`-primitive channel addressed by
295295
* `(friendlyId, io)`.
296+
*
297+
* For `io=out`, peek the tail record first. If it's
298+
* `trigger:turn-complete`, the agent has finished a turn and is
299+
* either idle-waiting on `.in` or has exited — either way, no more
300+
* chunks will arrive without further user action. We switch the
301+
* downstream S2 read to `wait=0` (drain whatever's left, close fast)
302+
* and set `X-Session-Settled: true` so the client knows this SSE
303+
* close is terminal instead of the normal 60s long-poll cycle.
304+
*
305+
* Mid-turn tail (streaming UIMessageChunk) falls through to the
306+
* long-poll path; a crashed-mid-turn stream is indistinguishable
307+
* here and behaves like today (client sees wait=60 close, retries).
296308
*/
297309
async streamResponseFromSessionStream(
298310
request: Request,
@@ -301,7 +313,98 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
301313
signal: AbortSignal,
302314
options?: StreamResponseOptions
303315
): Promise<Response> {
304-
return this.#streamResponseByName(this.toSessionStreamName(friendlyId, io), signal, options);
316+
const s2Stream = this.toSessionStreamName(friendlyId, io);
317+
318+
let waitSeconds = options?.timeoutInSeconds ?? this.s2WaitSeconds;
319+
let settled = false;
320+
321+
if (io === "out") {
322+
const lastChunk = await this.#peekLastChunkBody(s2Stream);
323+
if (
324+
lastChunk != null &&
325+
typeof lastChunk === "object" &&
326+
(lastChunk as { type?: unknown }).type === "trigger:turn-complete"
327+
) {
328+
settled = true;
329+
waitSeconds = 0;
330+
}
331+
}
332+
333+
const s2Response = await this.#streamResponseByName(s2Stream, signal, {
334+
...options,
335+
timeoutInSeconds: waitSeconds,
336+
});
337+
338+
if (!settled) return s2Response;
339+
340+
const headers = new Headers(s2Response.headers);
341+
headers.set("X-Session-Settled", "true");
342+
return new Response(s2Response.body, {
343+
status: s2Response.status,
344+
statusText: s2Response.statusText,
345+
headers,
346+
});
347+
}
348+
349+
async #peekLastChunkBody(s2Stream: string): Promise<unknown | null> {
350+
const qs = new URLSearchParams();
351+
// `tail_offset=1` reads one record before the next seq — i.e. the
352+
// most recently appended record. `count=1` caps it to just that
353+
// record. `wait=0` returns immediately with no long-poll.
354+
qs.set("tail_offset", "1");
355+
qs.set("count", "1");
356+
qs.set("wait", "0");
357+
358+
let res: Response;
359+
try {
360+
res = await fetch(
361+
`${this.baseUrl}/streams/${encodeURIComponent(s2Stream)}/records?${qs}`,
362+
{
363+
method: "GET",
364+
headers: {
365+
Authorization: `Bearer ${this.token}`,
366+
Accept: "application/json",
367+
"S2-Format": "raw",
368+
"S2-Basin": this.basin,
369+
},
370+
}
371+
);
372+
} catch (err) {
373+
this.logger.warn("S2 peek last record: fetch failed", { err, stream: s2Stream });
374+
return null;
375+
}
376+
377+
if (!res.ok) {
378+
// 404: stream has never been written to. 416: range not
379+
// satisfiable (empty stream). Both mean "nothing to peek."
380+
if (res.status === 404 || res.status === 416) return null;
381+
const text = await res.text().catch(() => "");
382+
this.logger.warn("S2 peek last record failed", {
383+
status: res.status,
384+
statusText: res.statusText,
385+
text,
386+
stream: s2Stream,
387+
});
388+
return null;
389+
}
390+
391+
try {
392+
const json = (await res.json()) as {
393+
records?: Array<{ body: string; seq_num: number; timestamp: number }>;
394+
};
395+
const record = json.records?.[0];
396+
if (!record) return null;
397+
// The record body is a JSON string `{data: <chunk>, id: partId}`
398+
// where `<chunk>` is the raw UIMessageChunk object (see
399+
// `StreamsWriterV2` — the agent-side writer serializes the chunk
400+
// object directly, not double-encoded). Unwrap the envelope and
401+
// return `data` as-is.
402+
const envelope = JSON.parse(record.body) as { data: unknown; id: string };
403+
return envelope.data;
404+
} catch (err) {
405+
this.logger.warn("S2 peek last record: parse failed", { err, stream: s2Stream });
406+
return null;
407+
}
305408
}
306409

307410
async #streamResponseByName(

0 commit comments

Comments
 (0)