diff --git a/sdk/typescript/eslint.config.js b/sdk/typescript/eslint.config.js index 742f6fc8fd..324d06818b 100644 --- a/sdk/typescript/eslint.config.js +++ b/sdk/typescript/eslint.config.js @@ -10,5 +10,12 @@ export default defineConfig(eslint.configs.recommended, tseslint.configs.recomme rules: { "node-import/prefer-node-protocol": 2, + "@typescript-eslint/no-unused-vars": [ + "error", + { + argsIgnorePattern: "^_", + varsIgnorePattern: "^_", + }, + ], }, }); diff --git a/sdk/typescript/src/exec.ts b/sdk/typescript/src/exec.ts index 8086c92a44..be05e7c3ea 100644 --- a/sdk/typescript/src/exec.ts +++ b/sdk/typescript/src/exec.ts @@ -24,6 +24,8 @@ export type CodexExecArgs = { outputSchemaFile?: string; // --config model_reasoning_effort modelReasoningEffort?: ModelReasoningEffort; + // AbortSignal to cancel the execution + signal?: AbortSignal; // --config sandbox_workspace_write.network_access networkAccessEnabled?: boolean; // --config features.web_search_request @@ -69,7 +71,10 @@ export class CodexExec { } if (args.networkAccessEnabled !== undefined) { - commandArgs.push("--config", `sandbox_workspace_write.network_access=${args.networkAccessEnabled}`); + commandArgs.push( + "--config", + `sandbox_workspace_write.network_access=${args.networkAccessEnabled}`, + ); } if (args.webSearchEnabled !== undefined) { @@ -105,6 +110,7 @@ export class CodexExec { const child = spawn(this.executablePath, commandArgs, { env, + signal: args.signal, }); let spawnError: unknown | null = null; diff --git a/sdk/typescript/src/thread.ts b/sdk/typescript/src/thread.ts index fec63cf4d4..90c7a8face 100644 --- a/sdk/typescript/src/thread.ts +++ b/sdk/typescript/src/thread.ts @@ -86,6 +86,7 @@ export class Thread { skipGitRepoCheck: options?.skipGitRepoCheck, outputSchemaFile: schemaPath, modelReasoningEffort: options?.modelReasoningEffort, + signal: turnOptions.signal, networkAccessEnabled: options?.networkAccessEnabled, webSearchEnabled: options?.webSearchEnabled, approvalPolicy: options?.approvalPolicy, diff --git a/sdk/typescript/src/turnOptions.ts b/sdk/typescript/src/turnOptions.ts index ed602f2f92..1eda806694 100644 --- a/sdk/typescript/src/turnOptions.ts +++ b/sdk/typescript/src/turnOptions.ts @@ -1,4 +1,6 @@ export type TurnOptions = { /** JSON schema describing the expected agent output. */ outputSchema?: unknown; + /** AbortSignal to cancel the turn. */ + signal?: AbortSignal; }; diff --git a/sdk/typescript/tests/abort.test.ts b/sdk/typescript/tests/abort.test.ts new file mode 100644 index 0000000000..d79319d654 --- /dev/null +++ b/sdk/typescript/tests/abort.test.ts @@ -0,0 +1,165 @@ +import path from "node:path"; + +import { describe, expect, it } from "@jest/globals"; + +import { Codex } from "../src/codex"; + +import { + assistantMessage, + responseCompleted, + responseStarted, + shell_call as shellCall, + sse, + SseResponseBody, + startResponsesTestProxy, +} from "./responsesProxy"; + +const codexExecPath = path.join(process.cwd(), "..", "..", "codex-rs", "target", "debug", "codex"); + +function* infiniteShellCall(): Generator { + while (true) { + yield sse(responseStarted(), shellCall(), responseCompleted()); + } +} + +describe("AbortSignal support", () => { + it("aborts run() when signal is aborted", async () => { + const { url, close } = await startResponsesTestProxy({ + statusCode: 200, + responseBodies: infiniteShellCall(), + }); + + try { + const client = new Codex({ codexPathOverride: codexExecPath, baseUrl: url, apiKey: "test" }); + const thread = client.startThread(); + + // Create an abort controller and abort it immediately + const controller = new AbortController(); + controller.abort("Test abort"); + + // The operation should fail because the signal is already aborted + await expect(thread.run("Hello, world!", { signal: controller.signal })).rejects.toThrow(); + } finally { + await close(); + } + }); + + it("aborts runStreamed() when signal is aborted", async () => { + const { url, close } = await startResponsesTestProxy({ + statusCode: 200, + responseBodies: infiniteShellCall(), + }); + + try { + const client = new Codex({ codexPathOverride: codexExecPath, baseUrl: url, apiKey: "test" }); + const thread = client.startThread(); + + // Create an abort controller and abort it immediately + const controller = new AbortController(); + controller.abort("Test abort"); + + const { events } = await thread.runStreamed("Hello, world!", { signal: controller.signal }); + + // Attempting to iterate should fail + let iterationStarted = false; + try { + for await (const event of events) { + iterationStarted = true; + // Should not get here + expect(event).toBeUndefined(); + } + // If we get here, the test should fail + throw new Error( + "Expected iteration to throw due to aborted signal, but it completed successfully", + ); + } catch (error) { + // We expect an error to be thrown + expect(iterationStarted).toBe(false); // Should fail before any iteration + expect(error).toBeDefined(); + } + } finally { + await close(); + } + }); + + it("aborts run() when signal is aborted during execution", async () => { + const { url, close } = await startResponsesTestProxy({ + statusCode: 200, + responseBodies: infiniteShellCall(), + }); + + try { + const client = new Codex({ codexPathOverride: codexExecPath, baseUrl: url, apiKey: "test" }); + const thread = client.startThread(); + + const controller = new AbortController(); + + // Start the operation and abort it immediately after + const runPromise = thread.run("Hello, world!", { signal: controller.signal }); + + // Abort after a tiny delay to simulate aborting during execution + setTimeout(() => controller.abort("Aborted during execution"), 10); + + // The operation should fail + await expect(runPromise).rejects.toThrow(); + } finally { + await close(); + } + }); + + it("aborts runStreamed() when signal is aborted during iteration", async () => { + const { url, close } = await startResponsesTestProxy({ + statusCode: 200, + responseBodies: infiniteShellCall(), + }); + + try { + const client = new Codex({ codexPathOverride: codexExecPath, baseUrl: url, apiKey: "test" }); + const thread = client.startThread(); + + const controller = new AbortController(); + + const { events } = await thread.runStreamed("Hello, world!", { signal: controller.signal }); + + // Abort during iteration + let eventCount = 0; + await expect( + (async () => { + for await (const event of events) { + void event; // Consume the event + eventCount++; + // Abort after first event + if (eventCount === 5) { + controller.abort("Aborted during iteration"); + } + // Continue iterating - should eventually throw + } + })(), + ).rejects.toThrow(); + } finally { + await close(); + } + }); + + it("completes normally when signal is not aborted", async () => { + const { url, close } = await startResponsesTestProxy({ + statusCode: 200, + responseBodies: [sse(responseStarted(), assistantMessage("Hi!"), responseCompleted())], + }); + + try { + const client = new Codex({ codexPathOverride: codexExecPath, baseUrl: url, apiKey: "test" }); + const thread = client.startThread(); + + const controller = new AbortController(); + + // Don't abort - should complete successfully + const result = await thread.run("Hello, world!", { signal: controller.signal }); + + expect(result.finalResponse).toBe("Hi!"); + expect(result.items).toHaveLength(1); + } finally { + await close(); + } + }); +}); diff --git a/sdk/typescript/tests/responsesProxy.ts b/sdk/typescript/tests/responsesProxy.ts index 6b8ee9bcac..e9d3b29146 100644 --- a/sdk/typescript/tests/responsesProxy.ts +++ b/sdk/typescript/tests/responsesProxy.ts @@ -19,18 +19,18 @@ const DEFAULT_COMPLETED_USAGE: ResponseCompletedUsage = { total_tokens: 47, }; -type SseEvent = { +export type SseEvent = { type: string; [key: string]: unknown; }; -type SseResponseBody = { +export type SseResponseBody = { kind: "sse"; events: SseEvent[]; }; export type ResponsesProxyOptions = { - responseBodies: SseResponseBody[]; + responseBodies: Generator | SseResponseBody[]; statusCode?: number; }; @@ -64,11 +64,17 @@ function formatSseEvent(event: SseEvent): string { export async function startResponsesTestProxy( options: ResponsesProxyOptions, ): Promise { - const responseBodies = options.responseBodies; - if (responseBodies.length === 0) { - throw new Error("responseBodies is required"); + function* createGenerator(array: SseResponseBody[]): Generator { + for (const elem of array) { + yield elem; + } + throw new Error("not enough responses provided"); } + const responseBodies: Generator = Array.isArray(options.responseBodies) + ? createGenerator(options.responseBodies) + : options.responseBodies; + const requests: RecordedRequest[] = []; function readRequestBody(req: http.IncomingMessage): Promise { @@ -84,8 +90,6 @@ export async function startResponsesTestProxy( }); } - let responseIndex = 0; - const server = http.createServer((req, res) => { async function handle(): Promise { if (req.method === "POST" && req.url === "/responses") { @@ -97,8 +101,7 @@ export async function startResponsesTestProxy( res.statusCode = status; res.setHeader("content-type", "text/event-stream"); - const responseBody = responseBodies[Math.min(responseIndex, responseBodies.length - 1)]!; - responseIndex += 1; + const responseBody = responseBodies.next().value; for (const event of responseBody.events) { res.write(formatSseEvent(event)); } @@ -177,6 +180,22 @@ export function assistantMessage(text: string, itemId: string = DEFAULT_MESSAGE_ }; } +export function shell_call(): SseEvent { + const command = ["bash", "-lc", "echo 'Hello, world!'"]; + return { + type: "response.output_item.done", + item: { + type: "function_call", + call_id: `call_id${Math.random().toString(36).slice(2)}`, + name: "shell", + arguments: JSON.stringify({ + command, + timeout_ms: 100, + }), + }, + }; +} + export function responseFailed(errorMessage: string): SseEvent { return { type: "error", diff --git a/sdk/typescript/tests/run.test.ts b/sdk/typescript/tests/run.test.ts index f461e16667..1a85b5823e 100644 --- a/sdk/typescript/tests/run.test.ts +++ b/sdk/typescript/tests/run.test.ts @@ -14,6 +14,7 @@ import { sse, responseFailed, startResponsesTestProxy, + SseResponseBody, } from "./responsesProxy"; const codexExecPath = path.join(process.cwd(), "..", "..", "codex-rs", "target", "debug", "codex"); @@ -572,10 +573,12 @@ describe("Codex", () => { it("throws ThreadRunError on turn failures", async () => { const { url, close } = await startResponsesTestProxy({ statusCode: 200, - responseBodies: [ - sse(responseStarted("response_1")), - sse(responseFailed("rate limit exceeded")), - ], + responseBodies: (function* (): Generator { + yield sse(responseStarted("response_1")); + while (true) { + yield sse(responseFailed("rate limit exceeded")); + } + })(), }); try {