Skip to content

Commit 6c58e37

Browse files
committed
feat(webapp,sdk): dashboard AgentView → session streams
Migrates the dashboard's Agent tab (span inspector) onto the backing Session's .out / .in channels so it stays in sync with TriggerChatTransport, the server-side AgentChat, and the MCP chat tools after the chat.agent -> Sessions migration. Webapp - SpanPresenter.server.ts extracts agentSession from the run payload: prefers the explicit sessionId that TriggerChatTransport and chat.createTriggerAction now thread through; falls back to chatId for pre-Sessions agent runs (the session resource route accepts either form via resolveSessionByIdOrExternalId). - Span route (runs.$runParam.spans.$spanParam) threads agentSession through AgentViewAuth. agentView is only minted when we have an identifiable session — runs without one render a loading spinner without subscribing. - New dashboard resource route resources.orgs.../runs.$runParam/realtime/v1/sessions/$sessionId/$io proxies S2RealtimeStreams.streamResponseFromSessionStream under dashboard session auth. The run param binds the resource hierarchy (keeps callers from subscribing to arbitrary sessions); the session identity is verified against the environment. GET-only — appends go through the public session API, not the dashboard. - AgentView.tsx: - AgentViewAuth grows `sessionId: string`; `useAgentRunMessages` threads it into the effect dep array and URL construction. - Subscription URLs collapse from two run-scoped paths (.../streams/{runId}/chat + .../streams/{runId}/input/chat-messages) to one session base (.../sessions/{sessionId}/{out|in}). - Local CHAT_STREAM_KEY / CHAT_MESSAGES_STREAM_ID constants dropped. - `.in` parser switches from raw ChatTaskWirePayload to ChatInputChunk tagged union: only kind: "message" chunks surface user messages (pulled from chunk.payload.messages); kind: "stop" is ignored. - `.out` parsing is unchanged — session v2 SSE already delivers parsed UIMessageChunk objects via record.body.data. SDK type fixes (byproducts) - TriggerChatTransportOptions.sessions.sessionId is now optional so pre-Sessions localStorage state (chatId -> {runId, token, lastEventId}) hydrates without migration. The runtime already `continue`s when sessionId is missing and lets ensureSession upsert on next send; the type just catches up. - chat.test.ts session-change accumulator shape widened to match the new runtime state (adds optional runId / sessionId fields). Smoke Opened a completed test-agent run (sessionId threaded via prior smoke test) in the dashboard. Agent tab rendered: - user message from initialMessages seed - assistant reply streamed over session.out Both SSE endpoints returned 200; no console errors. Full SDK test suite still passes (86/86).
1 parent 7719150 commit 6c58e37

8 files changed

Lines changed: 234 additions & 59 deletions

File tree

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Migrate the dashboard Agent tab (span inspector) to subscribe to the backing Session's `.out` and `.in` channels instead of the run-scoped chat output + chat-messages input streams. Pairs with the SDK + MCP migrations on the ai-chat branch.
7+
8+
- `SpanPresenter.server.ts` extracts `agentSession` from the run payload (prefers `sessionId`, falls back to `chatId` for pre-Sessions agent runs — matches `resolveSessionByIdOrExternalId`).
9+
- Span route threads `agentSession` through `AgentViewAuth` and gates `agentView` creation on having one.
10+
- New dashboard resource route `resources.orgs.../runs.$runParam/realtime/v1/sessions/$sessionId/$io` proxies `S2RealtimeStreams.streamResponseFromSessionStream` under dashboard session auth. The run param binds resource hierarchy; the session identity is verified against the environment.
11+
- `AgentView.tsx` subscribes to `/out` and `/in` URLs, drops local `CHAT_STREAM_KEY`/`CHAT_MESSAGES_STREAM_ID` constants, and parses the `.in` stream as `ChatInputChunk` (`{kind: "message", payload}` for user turns; `{kind: "stop"}` ignored). Output-stream parsing is unchanged — session v2 SSE already delivers UIMessageChunk objects from `record.body.data`.
12+
- Smoke: opened a prior `test-agent` run in the dashboard, Agent tab rendered user + assistant messages end-to-end with zero console errors. Both SSE endpoints (`/out`, `/in`) returned 200.

apps/webapp/app/components/runs/v3/agent/AgentView.tsx

Lines changed: 67 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,19 @@ import { useProject } from "~/hooks/useProject";
1212
export type AgentViewAuth = {
1313
publicAccessToken: string;
1414
apiOrigin: string;
15+
/**
16+
* Session identifier the AgentView uses to address the backing
17+
* {@link Session} when subscribing to `.in` / `.out`. Accepts either
18+
* a `session_*` friendlyId or the transport-supplied externalId
19+
* (typically the browser's `chatId`) — the dashboard resource route
20+
* resolves either form via `resolveSessionByIdOrExternalId`.
21+
*/
22+
sessionId: string;
1523
/**
1624
* User messages extracted from the run's task payload at load time.
17-
* Empty array for runs started with `trigger: "preload"` — in that case
18-
* the first user message will arrive over the chat-messages input stream
19-
* and get merged in by the AgentView subscription.
25+
* Empty array for runs started with `trigger: "preload"` — in that
26+
* case the first user message arrives over the session's `.in`
27+
* channel and is merged in by the AgentView subscription.
2028
*/
2129
initialMessages: UIMessage[];
2230
};
@@ -26,12 +34,6 @@ type AgentViewRun = {
2634
taskIdentifier: string;
2735
};
2836

29-
// Default stream IDs for Trigger.dev chat tasks — kept as literals so we
30-
// don't pull server-only constants from `@trigger.dev/core/v3/chat-client`
31-
// into a browser bundle.
32-
const CHAT_STREAM_KEY = "chat";
33-
const CHAT_MESSAGES_STREAM_ID = "chat-messages";
34-
3537
/**
3638
* Max state-update interval while assistant chunks are streaming. Matches
3739
* the `experimental_throttle: 100` we previously passed to `useChat`.
@@ -51,20 +53,23 @@ const INITIAL_PAYLOAD_TIMESTAMP = 0;
5153
/**
5254
* Renders an agent run's chat conversation as it unfolds.
5355
*
54-
* Subscribes to two separate realtime streams for the run:
55-
* - The **chat output stream** delivers assistant `UIMessageChunk`s (text
56-
* deltas, tool calls, reasoning, etc.) produced by `pipeChat` on the
57-
* task side.
58-
* - The **chat-messages input stream** delivers user messages sent to the
59-
* task via `sendInputStream` — each chunk carries a `ChatTaskWirePayload`
60-
* with the most recent `messages` array.
56+
* Subscribes to both channels of the run's backing {@link Session}:
57+
* - **`.out`** delivers assistant `UIMessageChunk`s (text deltas, tool
58+
* calls, reasoning, etc.) produced by the agent's
59+
* `chatStream.writer(...)` calls — objects, already parsed by the S2
60+
* SSE reader.
61+
* - **`.in`** delivers {@link ChatInputChunk}s sent by
62+
* {@link TriggerChatTransport} (or any other session writer). Each
63+
* chunk is a tagged union (`{kind: "message", payload}` for user
64+
* turns, `{kind: "stop"}` for stop signals) — the AgentView only
65+
* cares about `kind: "message"` and pulls `.payload.messages`.
6166
*
6267
* Both streams are read directly via `SSEStreamSubscription` through the
6368
* dashboard's session-authed resource routes — not through `useChat` or
6469
* `TriggerChatTransport`. This gives us per-chunk server-side timestamps
65-
* (Redis stream IDs) from both streams, which we use to produce a
70+
* (S2 sequence numbers) from both streams, which we use to produce a
6671
* chronologically correct merged message list that works for replays,
67-
* multi-message turns, and steering messages.
72+
* multi-message turns, cross-run session resumes, and steering messages.
6873
*
6974
* Intended to be mounted inside a scrollable container — the component
7075
* does not own its own scrollbar.
@@ -82,6 +87,7 @@ export function AgentView({
8287

8388
const messages = useAgentRunMessages({
8489
runFriendlyId: run.friendlyId,
90+
sessionId: agentView.sessionId,
8591
apiOrigin: agentView.apiOrigin,
8692
orgSlug: organization.slug,
8793
projectSlug: project.slug,
@@ -119,14 +125,24 @@ export function AgentView({
119125
// ---------------------------------------------------------------------------
120126

121127
/**
122-
* Shape of each chunk on the chat-messages input stream. Each chunk is a
123-
* `ChatTaskWirePayload` whose `messages` field holds either the latest user
124-
* message (for `submit-message`) or the full history (for
125-
* `regenerate-message`). We dedupe by ID either way.
128+
* Shape of each chunk on the session's `.in` channel. Mirrors the
129+
* `ChatInputChunk` tagged union produced by {@link TriggerChatTransport}:
130+
* - `kind: "message"` carries a `ChatTaskWirePayload` in `.payload`
131+
* (user-submitted messages or regenerate calls); we dedupe by id.
132+
* - `kind: "stop"` is a stop signal — no messages, nothing to render
133+
* here, so it's filtered.
134+
*
135+
* The server wraps records in `{data, id}` and writes `data` as a JSON
136+
* string; SSE v2 delivers the parsed string back. {@link parseChunkPayload}
137+
* re-parses to recover the object.
126138
*/
127139
type InputStreamChunk = {
128-
messages?: Array<{ id?: string; role?: string; parts?: unknown[] }>;
129-
trigger?: string;
140+
kind?: "message" | "stop";
141+
payload?: {
142+
messages?: Array<{ id?: string; role?: string; parts?: unknown[] }>;
143+
trigger?: string;
144+
};
145+
message?: string;
130146
};
131147

132148
/**
@@ -173,17 +189,17 @@ type MessageOrchestrationState = {
173189

174190
/**
175191
* `SSEStreamSubscription`'s v2 batch path delivers `parsedBody.data` as-is
176-
* — but for streams written via `sendInputStream` (which stores the user
177-
* payload as a JSON string in the record body), `data` is itself a string
178-
* that needs a second `JSON.parse` to recover the actual object. This
179-
* happens for the chat-messages input stream because the action handler
180-
* does `JSON.stringify(body.data.data)` before storing.
192+
* — but session channels diverge by direction:
181193
*
182-
* Output streams from `pipeChat` write objects directly, so the v2 path
183-
* delivers them already-parsed. Either way this helper accepts both shapes
184-
* defensively: a string is parsed; an object is returned as-is.
194+
* - `.in`: {@link TriggerChatTransport.serializeInputChunk} writes the
195+
* `ChatInputChunk` as a JSON **string**, so `data` is a string that
196+
* needs a second `JSON.parse` to recover the tagged union.
197+
* - `.out`: the agent's `chatStream.writer(...)` writes
198+
* {@link UIMessageChunk} **objects** directly; `data` arrives
199+
* already-parsed.
185200
*
186-
* Returns `null` for unparseable / unexpected payloads.
201+
* This helper accepts both shapes defensively: a string is parsed; an
202+
* object is returned as-is. Returns `null` for unparseable payloads.
187203
*/
188204
function parseChunkPayload(raw: unknown): Record<string, unknown> | null {
189205
if (raw == null) return null;
@@ -208,13 +224,15 @@ function createOrchestrationState(): MessageOrchestrationState {
208224

209225
function useAgentRunMessages({
210226
runFriendlyId,
227+
sessionId,
211228
apiOrigin,
212229
orgSlug,
213230
projectSlug,
214231
envSlug,
215232
initialMessages,
216233
}: {
217234
runFriendlyId: string;
235+
sessionId: string;
218236
apiOrigin: string;
219237
orgSlug: string;
220238
projectSlug: string;
@@ -268,13 +286,13 @@ function useAgentRunMessages({
268286
useEffect(() => {
269287
const abort = new AbortController();
270288

271-
const outputUrl =
289+
const encodedSession = encodeURIComponent(sessionId);
290+
const sessionBase =
272291
`${apiOrigin}/resources/orgs/${orgSlug}/projects/${projectSlug}/env/${envSlug}` +
273-
`/runs/${runFriendlyId}/realtime/v1/streams/${runFriendlyId}/${CHAT_STREAM_KEY}`;
292+
`/runs/${runFriendlyId}/realtime/v1/sessions/${encodedSession}`;
274293

275-
const inputUrl =
276-
`${apiOrigin}/resources/orgs/${orgSlug}/projects/${projectSlug}/env/${envSlug}` +
277-
`/runs/${runFriendlyId}/realtime/v1/streams/${runFriendlyId}/input/${CHAT_MESSAGES_STREAM_ID}`;
294+
const outputUrl = `${sessionBase}/out`;
295+
const inputUrl = `${sessionBase}/in`;
278296

279297
const commonSubOptions = {
280298
signal: abort.signal,
@@ -385,7 +403,12 @@ function useAgentRunMessages({
385403
}
386404
};
387405

388-
// ---- Input stream: user messages ---------------------------------------
406+
// ---- Input channel: user messages (`ChatInputChunk`) -------------------
407+
//
408+
// The transport appends a `{kind: "message", payload}` ChatInputChunk
409+
// for every user turn (and `{kind: "stop"}` for stop signals). We pull
410+
// user messages out of `payload.messages` for `kind: "message"` chunks
411+
// and ignore the rest.
389412
const runInput = async () => {
390413
try {
391414
const sub = new SSEStreamSubscription(inputUrl, commonSubOptions);
@@ -397,9 +420,11 @@ function useAgentRunMessages({
397420
if (done) return;
398421

399422
const chunk = parseChunkPayload(value.chunk) as InputStreamChunk | null;
400-
if (!chunk || !Array.isArray(chunk.messages)) continue;
423+
if (!chunk || chunk.kind !== "message") continue;
424+
const payload = chunk.payload;
425+
if (!payload || !Array.isArray(payload.messages)) continue;
401426

402-
const incomingUsers = chunk.messages.filter(
427+
const incomingUsers = payload.messages.filter(
403428
(m): m is UIMessage =>
404429
m != null && (m as { role?: string }).role === "user" && typeof m.id === "string"
405430
);
@@ -438,7 +463,7 @@ function useAgentRunMessages({
438463
pendingTimerRef.current = null;
439464
}
440465
};
441-
}, [runFriendlyId, apiOrigin, orgSlug, projectSlug, envSlug]);
466+
}, [runFriendlyId, sessionId, apiOrigin, orgSlug, projectSlug, envSlug]);
442467

443468
return useMemo(() => {
444469
const timestamps = timestampsRef.current;

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -262,24 +262,43 @@ export class SpanPresenter extends BasePresenter {
262262
const taskKind = RunAnnotations.safeParse(run.annotations).data?.taskKind;
263263
const isAgentRun = taskKind === "AGENT";
264264

265-
// For agent runs, extract the initial user messages that were supplied
266-
// via the task payload (from the original `triggerTask({ payload: { messages: [...] } })`
267-
// call). When the run was started with `trigger: "preload"`, this array
268-
// will be empty — in that case the first user message arrives later via
269-
// the chat-messages input stream and is picked up by the AgentView.
265+
// For agent runs, extract the initial user messages + the backing
266+
// Session handle from the task payload (from the original
267+
// `triggerTask({ payload: { messages, sessionId, chatId, ... } })`
268+
// call). When the run was started with `trigger: "preload"`,
269+
// `messages` is empty — the first user message arrives later over
270+
// the session `.in` channel and is merged in by the AgentView.
271+
//
272+
// `agentSession` is the identifier the dashboard uses to address the
273+
// backing Session when subscribing to `.out` / `.in`. Prefer the
274+
// explicit `sessionId` threaded by `TriggerChatTransport` /
275+
// `chat.createTriggerAction`; fall back to `chatId` for pre-migration
276+
// agent runs (the session resource route accepts either, matching
277+
// `resolveSessionByIdOrExternalId`).
270278
let agentInitialMessages: AgentInitialMessage[] = [];
279+
let agentSession: string | null = null;
271280
if (isAgentRun && run.payload && run.payloadType !== "application/store") {
272281
try {
273282
const parsed = await parsePacket({
274283
data: typeof run.payload === "string" ? run.payload : JSON.stringify(run.payload),
275284
dataType: run.payloadType ?? "application/json",
276285
});
277-
if (parsed && typeof parsed === "object" && Array.isArray((parsed as any).messages)) {
278-
agentInitialMessages = (parsed as any).messages as AgentInitialMessage[];
286+
if (parsed && typeof parsed === "object") {
287+
if (Array.isArray((parsed as any).messages)) {
288+
agentInitialMessages = (parsed as any).messages as AgentInitialMessage[];
289+
}
290+
const sessionId = (parsed as any).sessionId;
291+
const chatId = (parsed as any).chatId;
292+
if (typeof sessionId === "string" && sessionId.length > 0) {
293+
agentSession = sessionId;
294+
} else if (typeof chatId === "string" && chatId.length > 0) {
295+
agentSession = chatId;
296+
}
279297
}
280298
} catch {
281-
// Fall back to an empty initial message list — the AgentView will
282-
// render whatever arrives over the input/output streams.
299+
// Fall back to empty initial messages + null session — the
300+
// AgentView will show a loading spinner and surface any stream
301+
// subscription errors to the console.
283302
}
284303
}
285304

@@ -342,6 +361,7 @@ export class SpanPresenter extends BasePresenter {
342361
isError: isFailedRunStatus(run.status),
343362
isAgentRun,
344363
agentInitialMessages,
364+
agentSession,
345365
payload,
346366
payloadType: run.payloadType,
347367
output,

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.$agentParam/route.tsx

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,15 @@ function PlaygroundSidebar({
720720
onRegionChange: (val: string | undefined) => void;
721721
regions: Array<{ id: string; name: string; description?: string; isDefault: boolean }>;
722722
isDev: boolean;
723-
session: { runId: string; publicAccessToken: string; lastEventId?: string } | undefined;
723+
session:
724+
| {
725+
sessionId: string;
726+
runId?: string;
727+
publicAccessToken: string;
728+
lastEventId?: string;
729+
isStreaming?: boolean;
730+
}
731+
| undefined;
724732
messageCount: number;
725733
isStreaming: boolean;
726734
status: string;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import { type LoaderFunctionArgs } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { $replica } from "~/db.server";
4+
import { findProjectBySlug } from "~/models/project.server";
5+
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
6+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
7+
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
8+
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
9+
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
10+
import { requireUserId } from "~/services/session.server";
11+
import { EnvironmentParamSchema } from "~/utils/pathBuilder";
12+
13+
const ParamsSchema = z.object({
14+
runParam: z.string(),
15+
sessionId: z.string(),
16+
io: z.enum(["out", "in"]),
17+
});
18+
19+
// GET: SSE stream subscription for a backing Session's `.out` / `.in`
20+
// channel. Dashboard-auth counterpart to the public API's
21+
// `/realtime/v1/sessions/:sessionId/:io` endpoint. Used by the Agent tab
22+
// in the span inspector to observe assistant chunks (`.out`) and
23+
// user-side ChatInputChunk payloads (`.in`) for a chat.agent run.
24+
//
25+
// The `:sessionId` segment accepts either the `session_*` friendlyId or
26+
// the externalId the transport registered for the chat (typically the
27+
// browser's `chatId`). Runs pre-dating the Sessions migration that have
28+
// `chatId` but no `sessionId` in the payload take the externalId path.
29+
//
30+
// Authenticated by the dashboard session — the user must have access to
31+
// the project, environment, and run. The run binds this resource
32+
// hierarchy; the session identity is verified against the environment.
33+
export async function loader({ request, params }: LoaderFunctionArgs) {
34+
const userId = await requireUserId(request);
35+
const { organizationSlug, projectParam, envParam } = EnvironmentParamSchema.parse(params);
36+
const { runParam, sessionId, io } = ParamsSchema.parse(params);
37+
38+
const project = await findProjectBySlug(organizationSlug, projectParam, userId);
39+
if (!project) {
40+
return new Response("Project not found", { status: 404 });
41+
}
42+
43+
const environment = await findEnvironmentBySlug(project.id, envParam, userId);
44+
if (!environment) {
45+
return new Response("Environment not found", { status: 404 });
46+
}
47+
48+
// Verify the run lives in this environment — keeps callers from
49+
// subscribing to arbitrary sessions via `/runs/$runParam/...`.
50+
const run = await $replica.taskRun.findFirst({
51+
where: {
52+
friendlyId: runParam,
53+
runtimeEnvironmentId: environment.id,
54+
},
55+
select: { id: true, friendlyId: true },
56+
});
57+
58+
if (!run) {
59+
return new Response("Run not found", { status: 404 });
60+
}
61+
62+
const session = await resolveSessionByIdOrExternalId(
63+
$replica,
64+
environment.id,
65+
sessionId
66+
);
67+
68+
if (!session) {
69+
return new Response("Session not found", { status: 404 });
70+
}
71+
72+
const realtimeStream = getRealtimeStreamInstance(environment, "v2");
73+
74+
if (!(realtimeStream instanceof S2RealtimeStreams)) {
75+
return new Response("Session channels require the S2 realtime backend", {
76+
status: 501,
77+
});
78+
}
79+
80+
const lastEventId = request.headers.get("Last-Event-ID") || undefined;
81+
const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds") ?? undefined;
82+
const timeoutInSeconds = timeoutInSecondsRaw ? parseInt(timeoutInSecondsRaw) : undefined;
83+
84+
if (
85+
timeoutInSeconds &&
86+
(isNaN(timeoutInSeconds) || timeoutInSeconds < 1 || timeoutInSeconds > 600)
87+
) {
88+
return new Response("Invalid timeout", { status: 400 });
89+
}
90+
91+
return realtimeStream.streamResponseFromSessionStream(
92+
request,
93+
session.friendlyId,
94+
io,
95+
getRequestAbortSignal(),
96+
{ lastEventId, timeoutInSeconds }
97+
);
98+
}

0 commit comments

Comments
 (0)