Skip to content

Commit 7719150

Browse files
committed
feat(cli): MCP agentChat tool → Sessions migration
Rewires the three MCP agent-chat tools onto the Session primitive so they stay in sync with TriggerChatTransport and the server-side AgentChat after the chat.agent -> Sessions migration. Tools affected: start_agent_chat, send_agent_message, close_agent_chat. All live in packages/cli-v3/src/mcp/tools/agentChat.ts. Changes - Drop imports of CHAT_STREAM_KEY / CHAT_MESSAGES_STREAM_ID / CHAT_STOP_STREAM_ID from @trigger.dev/core/v3/chat-client. Add a local ChatInputChunk type + serializeInputChunk helper that mirrors the transport's wire format (JSON.stringify({ kind, payload })). - start_agent_chat: call apiClient.createSession({ type: "chat.agent", externalId: chatId }) before triggering. The call is idempotent on externalId so two MCP clients targeting the same chatId converge. Thread sessionId into the trigger payload so the agent's sessions.open(payload.sessionId) finds the backing session. - send_agent_message: replace sendInputStream(runId, CHAT_MESSAGES_STREAM_ID, payload) with appendToSessionStream(sessionId, "in", serializeInputChunk({ kind: "message", payload })). Fall-back path on send failure re-triggers on the same session (reuse sessionId, swap runId) instead of creating a new chat. - close_agent_chat: send { kind: "message", payload: { trigger: "close", ... } } via appendToSessionStream so the agent's turn loop exits cleanly — matches the transport's close semantics. - collectAgentResponse: subscribe URL moves from /realtime/v1/streams/{runId}/chat to /realtime/v1/sessions/{sessionId}/out. Session SSE uses v2/batch format which already delivers parsed UIMessageChunk objects via record.body.data, so the chunk-switch logic is unchanged. trigger:upgrade-required path keeps the same session and triggers a new run — previously it reused the old /streams/{newRunId}/chat URL, now the URL is stable across runs on the same session. - Scopes: write:inputStreams -> read:sessions + write:sessions. The former was the transport's old input-stream write capability; the session endpoints are the new surface. - ChatSession state grows a sessionId field (friendlyId session_*). runId stays but is now a live-run hint rather than durable identity. Known limitation: the MCP server binary was spawned by Claude Code at session start from the pre-migration bundle and stays in memory for the lifetime of the Claude session — runtime verification has to wait for the next session restart. Build passes; dist bundle contains the new createSession / appendToSessionStream / realtime/v1/sessions / Session ID references.
1 parent c2a851c commit 7719150

2 files changed

Lines changed: 80 additions & 30 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"trigger.dev": patch
3+
---
4+
5+
Migrate the MCP `start_agent_chat` / `send_agent_message` / `close_agent_chat` tools onto the Session primitive. The CLI MCP server now upserts a backing Session via `POST /api/v1/sessions` on chat start, threads `sessionId` through the run payload, sends messages to `session.in` as `ChatInputChunk { kind, payload }` JSON, and subscribes to `session.out` at `/realtime/v1/sessions/{sessionId}/out`. Scopes expanded from `write:inputStreams` to `read:sessions` + `write:sessions`. Upgrade-required re-trigger keeps the same session and swaps only `runId`.

packages/cli-v3/src/mcp/tools/agentChat.ts

Lines changed: 75 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,8 @@
11
import { z } from "zod";
22
import { ApiClient, SSEStreamSubscription } from "@trigger.dev/core/v3";
3-
import {
4-
CHAT_STREAM_KEY,
5-
CHAT_MESSAGES_STREAM_ID,
6-
CHAT_STOP_STREAM_ID,
7-
} from "@trigger.dev/core/v3/chat-client";
83
import { toolsMetadata } from "../config.js";
94
import { CommonProjectsInput } from "../schemas.js";
105
import { respondWithError, toolHandler } from "../utils.js";
11-
import type { McpContext } from "../context.js";
126

137
// ─── In-memory chat sessions ──────────────────────────────────────
148

@@ -19,6 +13,9 @@ type ChatMessage = {
1913
};
2014

2115
type ChatSession = {
16+
/** `session_*` friendlyId — durable identity for the conversation. */
17+
sessionId: string;
18+
/** Last-known live run id. Cleared when a run ends. */
2219
runId: string;
2320
chatId: string;
2421
agentId: string;
@@ -31,6 +28,24 @@ type ChatSession = {
3128

3229
const activeSessions = new Map<string, ChatSession>();
3330

31+
// ─── ChatInputChunk serialization (mirrors TriggerChatTransport) ──
32+
33+
type ChatInputChunk =
34+
| {
35+
kind: "message";
36+
payload: {
37+
messages: ChatMessage[];
38+
chatId: string;
39+
trigger: "submit-message" | "close" | "preload" | "regenerate-message" | "action";
40+
metadata?: unknown;
41+
};
42+
}
43+
| { kind: "stop"; message?: string };
44+
45+
function serializeInputChunk(chunk: ChatInputChunk): string {
46+
return JSON.stringify(chunk);
47+
}
48+
3449
// ─── Start Agent Chat ─────────────────────────────────────────────
3550

3651
const StartAgentChatInput = CommonProjectsInput.extend({
@@ -75,7 +90,12 @@ export const startAgentChatTool = {
7590
const apiClient = await ctx.getApiClient({
7691
projectRef,
7792
environment: input.environment,
78-
scopes: ["write:tasks", "read:runs", "write:inputStreams"],
93+
scopes: [
94+
"write:tasks",
95+
"read:runs",
96+
"read:sessions",
97+
"write:sessions",
98+
],
7999
branch: input.branch,
80100
});
81101

@@ -93,11 +113,20 @@ export const startAgentChatTool = {
93113
};
94114
}
95115

116+
// Create (or upsert) the backing Session. Idempotent via externalId —
117+
// two MCP clients targeting the same chatId converge to the same row.
118+
const session = await apiClient.createSession({
119+
type: "chat.agent",
120+
externalId: chatId,
121+
});
122+
96123
if (input.preload) {
97-
// Trigger a preload run
124+
// Trigger a preload run. The agent opens the session via
125+
// `sessions.open(payload.sessionId)` on startup.
98126
const payload = {
99127
messages: [],
100128
chatId,
129+
sessionId: session.id,
101130
trigger: "preload",
102131
metadata: input.clientData,
103132
};
@@ -111,6 +140,7 @@ export const startAgentChatTool = {
111140
});
112141

113142
activeSessions.set(chatId, {
143+
sessionId: session.id,
114144
runId: result.id,
115145
chatId,
116146
agentId: input.agentId,
@@ -126,6 +156,7 @@ export const startAgentChatTool = {
126156
text: [
127157
`Agent chat started and preloaded.`,
128158
`- Chat ID: ${chatId}`,
159+
`- Session ID: ${session.id}`,
129160
`- Agent: ${input.agentId}`,
130161
`- Run ID: ${result.id}`,
131162
``,
@@ -136,8 +167,9 @@ export const startAgentChatTool = {
136167
};
137168
}
138169

139-
// No preload — just register the session, first sendMessage will trigger
170+
// No preload — register the session, first sendMessage will trigger.
140171
activeSessions.set(chatId, {
172+
sessionId: session.id,
141173
runId: "",
142174
chatId,
143175
agentId: input.agentId,
@@ -153,6 +185,7 @@ export const startAgentChatTool = {
153185
text: [
154186
`Agent chat created (not yet preloaded).`,
155187
`- Chat ID: ${chatId}`,
188+
`- Session ID: ${session.id}`,
156189
`- Agent: ${input.agentId}`,
157190
``,
158191
`Use send_agent_message with chatId "${chatId}" to send the first message (this will trigger the run).`,
@@ -193,27 +226,29 @@ export const sendAgentMessageTool = {
193226
// Track the outgoing user message
194227
session.messages.push(userMessage);
195228

196-
const messagePayload = {
229+
const wirePayload = {
197230
messages: [userMessage],
198231
chatId: session.chatId,
199-
trigger: "submit-message",
232+
trigger: "submit-message" as const,
200233
metadata: session.clientData,
201234
};
202235

203-
// If we have an active run, send via input stream
236+
// If we have an active run, send via session.in. If that fails
237+
// (run ended, token expired, etc.) fall back to triggering a new
238+
// run on the same session with the full history.
204239
if (session.runId) {
205240
try {
206-
await session.apiClient.sendInputStream(
207-
session.runId,
208-
CHAT_MESSAGES_STREAM_ID,
209-
messagePayload
241+
await session.apiClient.appendToSessionStream(
242+
session.sessionId,
243+
"in",
244+
serializeInputChunk({ kind: "message", payload: wirePayload })
210245
);
211246
} catch (sendErr: any) {
212-
// Run may have ended — trigger a new one with full history
213247
const result = await session.apiClient.triggerTask(session.agentId, {
214248
payload: {
215249
messages: session.messages,
216250
chatId: session.chatId,
251+
sessionId: session.sessionId,
217252
trigger: "submit-message",
218253
metadata: session.clientData,
219254
continuation: true,
@@ -228,9 +263,12 @@ export const sendAgentMessageTool = {
228263
session.lastEventId = undefined;
229264
}
230265
} else {
231-
// No run yet — trigger one
266+
// No run yet — trigger one (agent opens the session on startup).
232267
const result = await session.apiClient.triggerTask(session.agentId, {
233-
payload: messagePayload,
268+
payload: {
269+
...wirePayload,
270+
sessionId: session.sessionId,
271+
},
234272
options: {
235273
payloadType: "application/json",
236274
tags: [`chat:${session.chatId}`],
@@ -277,14 +315,17 @@ export const closeAgentChatTool = {
277315

278316
if (session.runId) {
279317
try {
280-
await session.apiClient.sendInputStream(
281-
session.runId,
282-
CHAT_MESSAGES_STREAM_ID,
283-
{
284-
messages: [],
285-
chatId: session.chatId,
286-
trigger: "close",
287-
}
318+
await session.apiClient.appendToSessionStream(
319+
session.sessionId,
320+
"in",
321+
serializeInputChunk({
322+
kind: "message",
323+
payload: {
324+
messages: [],
325+
chatId: session.chatId,
326+
trigger: "close",
327+
},
328+
})
288329
);
289330
} catch {
290331
// Best effort — run may already be done
@@ -310,7 +351,7 @@ async function collectAgentResponse(
310351
session: ChatSession
311352
): Promise<{ text: string; toolCalls: string[]; assistantMessage: ChatMessage }> {
312353
const baseURL = session.apiClient.baseUrl;
313-
const streamUrl = `${baseURL}/realtime/v1/streams/${session.runId}/${CHAT_STREAM_KEY}`;
354+
const streamUrl = `${baseURL}/realtime/v1/sessions/${encodeURIComponent(session.sessionId)}/out`;
314355

315356
const subscription = new SSEStreamSubscription(streamUrl, {
316357
headers: {
@@ -340,6 +381,8 @@ async function collectAgentResponse(
340381
session.lastEventId = value.id;
341382
}
342383

384+
// v2 (session) SSE already parses record.body.data, so `chunk` is
385+
// the UIMessageChunk object written by the agent.
343386
if (value.chunk != null && typeof value.chunk === "object") {
344387
const chunk = value.chunk as Record<string, unknown>;
345388

@@ -348,12 +391,14 @@ async function collectAgentResponse(
348391
}
349392

350393
if (chunk.type === "trigger:upgrade-required") {
351-
// Agent requested upgrade — trigger continuation with full history
394+
// Agent requested upgrade — trigger continuation with full history.
395+
// Same session, new run — reuse sessionId, swap runId.
352396
const previousRunId = session.runId;
353397
const result = await session.apiClient.triggerTask(session.agentId, {
354398
payload: {
355399
messages: session.messages,
356400
chatId: session.chatId,
401+
sessionId: session.sessionId,
357402
trigger: "submit-message",
358403
metadata: session.clientData,
359404
continuation: true,
@@ -367,7 +412,7 @@ async function collectAgentResponse(
367412
session.runId = result.id;
368413
session.lastEventId = undefined;
369414
reader.releaseLock();
370-
// Recurse — subscribe to the new run's stream
415+
// Recurse — subscribe to the new run's stream (same session.out URL)
371416
return collectAgentResponse(session);
372417
}
373418

0 commit comments

Comments
 (0)