Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions packages/agent/src/server/agent-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,7 @@ export class AgentServer {

if (result.stopReason === "end_turn") {
await this.relayAgentResponse(payload);
await this.maybeCompleteAutomationRun(payload, taskRun);
}
} catch (error) {
this.logger.error("Failed to send initial task message", error);
Expand Down Expand Up @@ -1276,6 +1277,7 @@ export class AgentServer {

if (result.stopReason === "end_turn") {
await this.relayAgentResponse(payload);
await this.maybeCompleteAutomationRun(payload, taskRun);
}
} catch (error) {
this.logger.error("Failed to send resume message", error);
Expand Down Expand Up @@ -1744,6 +1746,37 @@ ${attributionInstructions}
}
}

private shouldAutoCompleteAutomationRun(taskRun: TaskRun | null): boolean {
const state = taskRun?.state;
return Boolean(
state &&
typeof state === "object" &&
typeof state.automation_id === "string" &&
state.automation_id.trim().length > 0,
);
}

private async maybeCompleteAutomationRun(
payload: JwtPayload,
taskRun: TaskRun | null,
): Promise<void> {
if (!this.shouldAutoCompleteAutomationRun(taskRun)) {
return;
}

try {
await this.posthogAPI.updateTaskRun(payload.task_id, payload.run_id, {
status: "completed",
});
this.logger.debug("Automation run marked completed after end_turn", {
taskId: payload.task_id,
runId: payload.run_id,
});
} catch (error) {
this.logger.error("Failed to mark automation run completed", error);
}
}

private async signalTaskComplete(
payload: JwtPayload,
stopReason: string,
Expand Down
147 changes: 147 additions & 0 deletions packages/agent/src/server/question-relay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type SetupServerApi, setupServer } from "msw/node";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { classifyAgentError } from "../adapters/claude/conversion/sdk-to-acp";
import type { PostHogAPIClient } from "../posthog-api";
import type { ResumeState } from "../resume";
import { createTestRepo, type TestRepo } from "../test/fixtures/api";
import { createPostHogHandlers } from "../test/mocks/msw-handlers";
import type { Task, TaskRun } from "../types";
Expand All @@ -22,9 +23,11 @@ interface TestableAgentServer {
}>;
};
questionRelayedToSlack: boolean;
resumeState: ResumeState | null;
session: unknown;
relayAgentResponse: (payload: Record<string, unknown>) => Promise<void>;
sendInitialTaskMessage: (payload: Record<string, unknown>) => Promise<void>;
syncCloudBranchMetadata: (payload: Record<string, unknown>) => Promise<void>;
}

const TEST_PAYLOAD = {
Expand Down Expand Up @@ -457,6 +460,7 @@ describe("Question relay", () => {
acpSessionId: "acp-session",
clientConnection: { prompt: promptSpy },
logWriter: {
appendRawLine: vi.fn(),
flushAll: vi.fn().mockResolvedValue(undefined),
getFullAgentResponse: vi.fn().mockReturnValue(null),
resetTurnMessages: vi.fn(),
Expand Down Expand Up @@ -501,6 +505,7 @@ describe("Question relay", () => {
acpSessionId: "acp-session",
clientConnection: { prompt: promptSpy },
logWriter: {
appendRawLine: vi.fn(),
flushAll: vi.fn().mockResolvedValue(undefined),
getFullAgentResponse: vi.fn().mockReturnValue(null),
resetTurnMessages: vi.fn(),
Expand Down Expand Up @@ -535,6 +540,7 @@ describe("Question relay", () => {
acpSessionId: "acp-session",
clientConnection: { prompt: promptSpy },
logWriter: {
appendRawLine: vi.fn(),
flushAll: vi.fn().mockResolvedValue(undefined),
getFullAgentResponse: vi.fn().mockReturnValue(null),
resetTurnMessages: vi.fn(),
Expand All @@ -551,6 +557,147 @@ describe("Question relay", () => {
});
});

it.each([
{
name: "marks automation-triggered runs completed after a successful first turn",
state: { automation_id: "automation-1" },
agentResponse: "At 2026-05-13 17:45, the waitlist count is 2827.",
expectCompleted: true,
},
{
name: "keeps non-automation runs open after a successful first turn",
state: {},
agentResponse: "done",
expectCompleted: false,
},
])("$name", async ({ state, agentResponse, expectCompleted }) => {
vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({
id: "test-task-id",
title: "t",
description: "original task description",
} as unknown as Task);
vi.spyOn(server.posthogAPI, "getTaskRun").mockResolvedValue({
id: "test-run-id",
task: "test-task-id",
state,
} as unknown as TaskRun);
vi.spyOn(server, "syncCloudBranchMetadata").mockResolvedValue(undefined);

const promptSpy = vi.fn().mockResolvedValue({ stopReason: "end_turn" });
const updateTaskRunSpy = vi
.spyOn(server.posthogAPI, "updateTaskRun")
.mockResolvedValue({} as TaskRun);
const relaySpy = vi
.spyOn(server.posthogAPI, "relayMessage")
.mockResolvedValue(undefined);
server.session = {
payload: TEST_PAYLOAD,
acpSessionId: "acp-session",
clientConnection: { prompt: promptSpy },
logWriter: {
appendRawLine: vi.fn(),
flushAll: vi.fn().mockResolvedValue(undefined),
getFullAgentResponse: vi.fn().mockReturnValue(agentResponse),
resetTurnMessages: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
isRegistered: vi.fn().mockReturnValue(true),
},
};

await server.sendInitialTaskMessage(TEST_PAYLOAD);

expect(relaySpy).toHaveBeenCalledWith(
"test-task-id",
"test-run-id",
agentResponse,
);

if (expectCompleted) {
expect(updateTaskRunSpy).toHaveBeenCalledWith(
"test-task-id",
"test-run-id",
{
status: "completed",
},
);
} else {
expect(updateTaskRunSpy).not.toHaveBeenCalledWith(
"test-task-id",
"test-run-id",
{
status: "completed",
},
);
}
});

it("marks automation-triggered runs completed after a successful resume turn", async () => {
vi.spyOn(server.posthogAPI, "getTaskRun").mockResolvedValue({
id: "test-run-id",
task: "test-task-id",
state: { automation_id: "automation-1" },
} as unknown as TaskRun);
vi.spyOn(server, "syncCloudBranchMetadata").mockResolvedValue(undefined);

const promptSpy = vi.fn().mockResolvedValue({ stopReason: "end_turn" });
const updateTaskRunSpy = vi
.spyOn(server.posthogAPI, "updateTaskRun")
.mockResolvedValue({} as TaskRun);
const relaySpy = vi
.spyOn(server.posthogAPI, "relayMessage")
.mockResolvedValue(undefined);
server.resumeState = {
conversation: [
{
role: "user",
content: [{ type: "text", text: "Please continue from here." }],
},
],
latestGitCheckpoint: null,
interrupted: false,
logEntryCount: 1,
};
server.session = {
payload: TEST_PAYLOAD,
acpSessionId: "acp-session",
clientConnection: { prompt: promptSpy },
logWriter: {
appendRawLine: vi.fn(),
flushAll: vi.fn().mockResolvedValue(undefined),
getFullAgentResponse: vi.fn().mockReturnValue("resume reply"),
resetTurnMessages: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
isRegistered: vi.fn().mockReturnValue(true),
},
};

await server.sendInitialTaskMessage(TEST_PAYLOAD);

expect(promptSpy).toHaveBeenCalledWith({
sessionId: "acp-session",
prompt: [
expect.objectContaining({
type: "text",
text: expect.stringContaining(
"You are resuming a previous conversation.",
),
}),
],
});
expect(relaySpy).toHaveBeenCalledWith(
"test-task-id",
"test-run-id",
"resume reply",
);
expect(updateTaskRunSpy).toHaveBeenCalledWith(
"test-task-id",
"test-run-id",
{
status: "completed",
},
);
});

it("does not replay a transient upstream termination before any session activity", async () => {
vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({
id: "test-task-id",
Comment thread
annikaschmid marked this conversation as resolved.
Expand Down
Loading