From 4950e7d8a67aaca5afac87a36987a31e83d516b6 Mon Sep 17 00:00:00 2001 From: Ruslan Nigmatullin Date: Mon, 4 May 2026 16:43:58 -0700 Subject: [PATCH] [codex] Add unsandboxed process exec API (#19040) ## Why App-server clients sometimes need argv-based local process execution while sandbox policy is controlled outside Codex. Those environments can reject sandbox-disabling paths before a command ever starts, even when the caller intentionally wants unsandboxed execution. This PR adds a distinct `process/*` API for that use case instead of extending `command/exec` with another sandbox-disabling shape. Keeping the new surface separate also makes the future removal of `command/exec` simpler: clients that need explicit process lifecycle control can move to the newer handle-based API without depending on `command/exec` business logic. ## What changed - Added v2 process lifecycle methods: `process/spawn`, `process/writeStdin`, `process/resizePty`, and `process/kill`. - Added process notifications: `process/outputDelta` for streamed stdout/stderr chunks and `process/exited` for final exit status and buffered output. - Made `process/spawn` intentionally unsandboxed and omitted sandbox-selection fields such as `sandboxPolicy` and `permissionProfile`. - Added client-supplied, connection-scoped `processHandle` values for follow-up control requests and notification routing. - Supported cwd, environment overrides, PTY mode and size, stdin streaming, stdout/stderr streaming, per-stream output caps, and timeout controls. - Killed active process sessions when the originating app-server connection closes. - Wired the implementation through the modular `request_processors/` app-server layout, with process-handle request serialization for follow-up control calls. - Updated generated JSON/TypeScript schema fixtures and documented the new API in `codex-rs/app-server/README.md`. - Added v2 app-server integration coverage in `codex-rs/app-server/tests/suite/v2/process_exec.rs` for spawn acknowledgement before exit, buffered output caps, and process termination. ## Verification - `cargo test -p codex-app-server-protocol` - `cargo test -p codex-app-server` --------- Co-authored-by: Owen Lin --- .../schema/json/ClientRequest.json | 22 + .../schema/json/ServerNotification.json | 132 ++++ .../codex_app_server_protocol.schemas.json | 158 ++++ .../codex_app_server_protocol.v2.schemas.json | 158 ++++ .../json/v2/ProcessExitedNotification.json | 41 + .../v2/ProcessOutputDeltaNotification.json | 55 ++ .../schema/typescript/ServerNotification.ts | 4 +- .../v2/ProcessExitedNotification.ts | 42 ++ .../v2/ProcessOutputDeltaNotification.ts | 26 + .../typescript/v2/ProcessOutputStream.ts | 8 + .../typescript/v2/ProcessTerminalSize.ts | 16 + .../schema/typescript/v2/index.ts | 4 + .../src/protocol/common.rs | 40 + .../app-server-protocol/src/protocol/v2.rs | 393 ++++++++++ codex-rs/app-server/README.md | 84 +++ codex-rs/app-server/src/command_exec.rs | 2 +- codex-rs/app-server/src/message_processor.rs | 27 + codex-rs/app-server/src/request_processors.rs | 2 + .../process_exec_processor.rs | 708 ++++++++++++++++++ .../app-server/src/request_serialization.rs | 8 + .../app-server/tests/common/mcp_process.rs | 40 + codex-rs/app-server/tests/suite/v2/mod.rs | 1 + .../app-server/tests/suite/v2/process_exec.rs | 250 +++++++ .../tui/src/app/app_server_event_targets.rs | 2 + codex-rs/tui/src/chatwidget.rs | 2 + 25 files changed, 2223 insertions(+), 2 deletions(-) create mode 100644 codex-rs/app-server-protocol/schema/json/v2/ProcessExitedNotification.json create mode 100644 codex-rs/app-server-protocol/schema/json/v2/ProcessOutputDeltaNotification.json create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/ProcessExitedNotification.ts create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/ProcessOutputDeltaNotification.ts create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/ProcessOutputStream.ts create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/ProcessTerminalSize.ts create mode 100644 codex-rs/app-server/src/request_processors/process_exec_processor.rs create mode 100644 codex-rs/app-server/tests/suite/v2/process_exec.rs diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index fdd4c155b0..17c782ee09 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -2265,6 +2265,28 @@ ], "type": "object" }, + "ProcessTerminalSize": { + "description": "PTY size in character cells for `process/spawn` PTY sessions.", + "properties": { + "cols": { + "description": "Terminal width in character cells.", + "format": "uint16", + "minimum": 0.0, + "type": "integer" + }, + "rows": { + "description": "Terminal height in character cells.", + "format": "uint16", + "minimum": 0.0, + "type": "integer" + } + }, + "required": [ + "cols", + "rows" + ], + "type": "object" + }, "RealtimeOutputModality": { "enum": [ "text", diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 8b1d158313..3a1a9744a5 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -2415,6 +2415,96 @@ ], "type": "string" }, + "ProcessExitedNotification": { + "description": "Final process exit notification for `process/spawn`.", + "properties": { + "exitCode": { + "description": "Process exit code.", + "format": "int32", + "type": "integer" + }, + "processHandle": { + "description": "Client-supplied, connection-scoped `processHandle` from `process/spawn`.", + "type": "string" + }, + "stderr": { + "description": "Buffered stderr capture.\n\nEmpty when stderr was streamed via `process/outputDelta`.", + "type": "string" + }, + "stderrCapReached": { + "description": "Whether stderr reached `outputBytesCap`.\n\nIn streaming mode, stderr is empty and cap state is also reported on the final stderr `process/outputDelta` notification.", + "type": "boolean" + }, + "stdout": { + "description": "Buffered stdout capture.\n\nEmpty when stdout was streamed via `process/outputDelta`.", + "type": "string" + }, + "stdoutCapReached": { + "description": "Whether stdout reached `outputBytesCap`.\n\nIn streaming mode, stdout is empty and cap state is also reported on the final stdout `process/outputDelta` notification.", + "type": "boolean" + } + }, + "required": [ + "exitCode", + "processHandle", + "stderr", + "stderrCapReached", + "stdout", + "stdoutCapReached" + ], + "type": "object" + }, + "ProcessOutputDeltaNotification": { + "description": "Base64-encoded output chunk emitted for a streaming `process/spawn` request.", + "properties": { + "capReached": { + "description": "True on the final streamed chunk for this stream when output was truncated by `outputBytesCap`.", + "type": "boolean" + }, + "deltaBase64": { + "description": "Base64-encoded output bytes.", + "type": "string" + }, + "processHandle": { + "description": "Client-supplied, connection-scoped `processHandle` from `process/spawn`.", + "type": "string" + }, + "stream": { + "allOf": [ + { + "$ref": "#/definitions/ProcessOutputStream" + } + ], + "description": "Output stream this chunk belongs to." + } + }, + "required": [ + "capReached", + "deltaBase64", + "processHandle", + "stream" + ], + "type": "object" + }, + "ProcessOutputStream": { + "description": "Stream label for `process/outputDelta` notifications.", + "oneOf": [ + { + "description": "stdout stream. PTY mode multiplexes terminal output here.", + "enum": [ + "stdout" + ], + "type": "string" + }, + { + "description": "stderr stream.", + "enum": [ + "stderr" + ], + "type": "string" + } + ] + }, "RateLimitReachedType": { "enum": [ "rate_limit_reached", @@ -5163,6 +5253,48 @@ "title": "Command/exec/outputDeltaNotification", "type": "object" }, + { + "description": "Stream base64-encoded stdout/stderr chunks for a running `process/spawn` session.", + "properties": { + "method": { + "enum": [ + "process/outputDelta" + ], + "title": "Process/outputDeltaNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ProcessOutputDeltaNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Process/outputDeltaNotification", + "type": "object" + }, + { + "description": "Final exit notification for a `process/spawn` session.", + "properties": { + "method": { + "enum": [ + "process/exited" + ], + "title": "Process/exitedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ProcessExitedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Process/exitedNotification", + "type": "object" + }, { "properties": { "method": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index edcca850c2..00278cccb6 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -4248,6 +4248,48 @@ "title": "Command/exec/outputDeltaNotification", "type": "object" }, + { + "description": "Stream base64-encoded stdout/stderr chunks for a running `process/spawn` session.", + "properties": { + "method": { + "enum": [ + "process/outputDelta" + ], + "title": "Process/outputDeltaNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/ProcessOutputDeltaNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Process/outputDeltaNotification", + "type": "object" + }, + { + "description": "Final exit notification for a `process/spawn` session.", + "properties": { + "method": { + "enum": [ + "process/exited" + ], + "title": "Process/exitedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/ProcessExitedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Process/exitedNotification", + "type": "object" + }, { "properties": { "method": { @@ -12655,6 +12697,122 @@ ], "type": "object" }, + "ProcessExitedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "description": "Final process exit notification for `process/spawn`.", + "properties": { + "exitCode": { + "description": "Process exit code.", + "format": "int32", + "type": "integer" + }, + "processHandle": { + "description": "Client-supplied, connection-scoped `processHandle` from `process/spawn`.", + "type": "string" + }, + "stderr": { + "description": "Buffered stderr capture.\n\nEmpty when stderr was streamed via `process/outputDelta`.", + "type": "string" + }, + "stderrCapReached": { + "description": "Whether stderr reached `outputBytesCap`.\n\nIn streaming mode, stderr is empty and cap state is also reported on the final stderr `process/outputDelta` notification.", + "type": "boolean" + }, + "stdout": { + "description": "Buffered stdout capture.\n\nEmpty when stdout was streamed via `process/outputDelta`.", + "type": "string" + }, + "stdoutCapReached": { + "description": "Whether stdout reached `outputBytesCap`.\n\nIn streaming mode, stdout is empty and cap state is also reported on the final stdout `process/outputDelta` notification.", + "type": "boolean" + } + }, + "required": [ + "exitCode", + "processHandle", + "stderr", + "stderrCapReached", + "stdout", + "stdoutCapReached" + ], + "title": "ProcessExitedNotification", + "type": "object" + }, + "ProcessOutputDeltaNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "description": "Base64-encoded output chunk emitted for a streaming `process/spawn` request.", + "properties": { + "capReached": { + "description": "True on the final streamed chunk for this stream when output was truncated by `outputBytesCap`.", + "type": "boolean" + }, + "deltaBase64": { + "description": "Base64-encoded output bytes.", + "type": "string" + }, + "processHandle": { + "description": "Client-supplied, connection-scoped `processHandle` from `process/spawn`.", + "type": "string" + }, + "stream": { + "allOf": [ + { + "$ref": "#/definitions/v2/ProcessOutputStream" + } + ], + "description": "Output stream this chunk belongs to." + } + }, + "required": [ + "capReached", + "deltaBase64", + "processHandle", + "stream" + ], + "title": "ProcessOutputDeltaNotification", + "type": "object" + }, + "ProcessOutputStream": { + "description": "Stream label for `process/outputDelta` notifications.", + "oneOf": [ + { + "description": "stdout stream. PTY mode multiplexes terminal output here.", + "enum": [ + "stdout" + ], + "type": "string" + }, + { + "description": "stderr stream.", + "enum": [ + "stderr" + ], + "type": "string" + } + ] + }, + "ProcessTerminalSize": { + "description": "PTY size in character cells for `process/spawn` PTY sessions.", + "properties": { + "cols": { + "description": "Terminal width in character cells.", + "format": "uint16", + "minimum": 0.0, + "type": "integer" + }, + "rows": { + "description": "Terminal height in character cells.", + "format": "uint16", + "minimum": 0.0, + "type": "integer" + } + }, + "required": [ + "cols", + "rows" + ], + "type": "object" + }, "ProfileV2": { "additionalProperties": true, "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index e58a8451f2..597fde945e 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -9308,6 +9308,122 @@ ], "type": "object" }, + "ProcessExitedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "description": "Final process exit notification for `process/spawn`.", + "properties": { + "exitCode": { + "description": "Process exit code.", + "format": "int32", + "type": "integer" + }, + "processHandle": { + "description": "Client-supplied, connection-scoped `processHandle` from `process/spawn`.", + "type": "string" + }, + "stderr": { + "description": "Buffered stderr capture.\n\nEmpty when stderr was streamed via `process/outputDelta`.", + "type": "string" + }, + "stderrCapReached": { + "description": "Whether stderr reached `outputBytesCap`.\n\nIn streaming mode, stderr is empty and cap state is also reported on the final stderr `process/outputDelta` notification.", + "type": "boolean" + }, + "stdout": { + "description": "Buffered stdout capture.\n\nEmpty when stdout was streamed via `process/outputDelta`.", + "type": "string" + }, + "stdoutCapReached": { + "description": "Whether stdout reached `outputBytesCap`.\n\nIn streaming mode, stdout is empty and cap state is also reported on the final stdout `process/outputDelta` notification.", + "type": "boolean" + } + }, + "required": [ + "exitCode", + "processHandle", + "stderr", + "stderrCapReached", + "stdout", + "stdoutCapReached" + ], + "title": "ProcessExitedNotification", + "type": "object" + }, + "ProcessOutputDeltaNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "description": "Base64-encoded output chunk emitted for a streaming `process/spawn` request.", + "properties": { + "capReached": { + "description": "True on the final streamed chunk for this stream when output was truncated by `outputBytesCap`.", + "type": "boolean" + }, + "deltaBase64": { + "description": "Base64-encoded output bytes.", + "type": "string" + }, + "processHandle": { + "description": "Client-supplied, connection-scoped `processHandle` from `process/spawn`.", + "type": "string" + }, + "stream": { + "allOf": [ + { + "$ref": "#/definitions/ProcessOutputStream" + } + ], + "description": "Output stream this chunk belongs to." + } + }, + "required": [ + "capReached", + "deltaBase64", + "processHandle", + "stream" + ], + "title": "ProcessOutputDeltaNotification", + "type": "object" + }, + "ProcessOutputStream": { + "description": "Stream label for `process/outputDelta` notifications.", + "oneOf": [ + { + "description": "stdout stream. PTY mode multiplexes terminal output here.", + "enum": [ + "stdout" + ], + "type": "string" + }, + { + "description": "stderr stream.", + "enum": [ + "stderr" + ], + "type": "string" + } + ] + }, + "ProcessTerminalSize": { + "description": "PTY size in character cells for `process/spawn` PTY sessions.", + "properties": { + "cols": { + "description": "Terminal width in character cells.", + "format": "uint16", + "minimum": 0.0, + "type": "integer" + }, + "rows": { + "description": "Terminal height in character cells.", + "format": "uint16", + "minimum": 0.0, + "type": "integer" + } + }, + "required": [ + "cols", + "rows" + ], + "type": "object" + }, "ProfileV2": { "additionalProperties": true, "properties": { @@ -11386,6 +11502,48 @@ "title": "Command/exec/outputDeltaNotification", "type": "object" }, + { + "description": "Stream base64-encoded stdout/stderr chunks for a running `process/spawn` session.", + "properties": { + "method": { + "enum": [ + "process/outputDelta" + ], + "title": "Process/outputDeltaNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ProcessOutputDeltaNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Process/outputDeltaNotification", + "type": "object" + }, + { + "description": "Final exit notification for a `process/spawn` session.", + "properties": { + "method": { + "enum": [ + "process/exited" + ], + "title": "Process/exitedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ProcessExitedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Process/exitedNotification", + "type": "object" + }, { "properties": { "method": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ProcessExitedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ProcessExitedNotification.json new file mode 100644 index 0000000000..3a0a81d316 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ProcessExitedNotification.json @@ -0,0 +1,41 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "description": "Final process exit notification for `process/spawn`.", + "properties": { + "exitCode": { + "description": "Process exit code.", + "format": "int32", + "type": "integer" + }, + "processHandle": { + "description": "Client-supplied, connection-scoped `processHandle` from `process/spawn`.", + "type": "string" + }, + "stderr": { + "description": "Buffered stderr capture.\n\nEmpty when stderr was streamed via `process/outputDelta`.", + "type": "string" + }, + "stderrCapReached": { + "description": "Whether stderr reached `outputBytesCap`.\n\nIn streaming mode, stderr is empty and cap state is also reported on the final stderr `process/outputDelta` notification.", + "type": "boolean" + }, + "stdout": { + "description": "Buffered stdout capture.\n\nEmpty when stdout was streamed via `process/outputDelta`.", + "type": "string" + }, + "stdoutCapReached": { + "description": "Whether stdout reached `outputBytesCap`.\n\nIn streaming mode, stdout is empty and cap state is also reported on the final stdout `process/outputDelta` notification.", + "type": "boolean" + } + }, + "required": [ + "exitCode", + "processHandle", + "stderr", + "stderrCapReached", + "stdout", + "stdoutCapReached" + ], + "title": "ProcessExitedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/ProcessOutputDeltaNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ProcessOutputDeltaNotification.json new file mode 100644 index 0000000000..1800833f2e --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ProcessOutputDeltaNotification.json @@ -0,0 +1,55 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "ProcessOutputStream": { + "description": "Stream label for `process/outputDelta` notifications.", + "oneOf": [ + { + "description": "stdout stream. PTY mode multiplexes terminal output here.", + "enum": [ + "stdout" + ], + "type": "string" + }, + { + "description": "stderr stream.", + "enum": [ + "stderr" + ], + "type": "string" + } + ] + } + }, + "description": "Base64-encoded output chunk emitted for a streaming `process/spawn` request.", + "properties": { + "capReached": { + "description": "True on the final streamed chunk for this stream when output was truncated by `outputBytesCap`.", + "type": "boolean" + }, + "deltaBase64": { + "description": "Base64-encoded output bytes.", + "type": "string" + }, + "processHandle": { + "description": "Client-supplied, connection-scoped `processHandle` from `process/spawn`.", + "type": "string" + }, + "stream": { + "allOf": [ + { + "$ref": "#/definitions/ProcessOutputStream" + } + ], + "description": "Output stream this chunk belongs to." + } + }, + "required": [ + "capReached", + "deltaBase64", + "processHandle", + "stream" + ], + "title": "ProcessOutputDeltaNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts index c32618c090..f4dd0e1864 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts @@ -31,6 +31,8 @@ import type { McpToolCallProgressNotification } from "./v2/McpToolCallProgressNo import type { ModelReroutedNotification } from "./v2/ModelReroutedNotification"; import type { ModelVerificationNotification } from "./v2/ModelVerificationNotification"; import type { PlanDeltaNotification } from "./v2/PlanDeltaNotification"; +import type { ProcessExitedNotification } from "./v2/ProcessExitedNotification"; +import type { ProcessOutputDeltaNotification } from "./v2/ProcessOutputDeltaNotification"; import type { RawResponseItemCompletedNotification } from "./v2/RawResponseItemCompletedNotification"; import type { ReasoningSummaryPartAddedNotification } from "./v2/ReasoningSummaryPartAddedNotification"; import type { ReasoningSummaryTextDeltaNotification } from "./v2/ReasoningSummaryTextDeltaNotification"; @@ -67,4 +69,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW /** * Notification sent from the server to the client. */ -export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "skills/changed", "params": SkillsChangedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/goal/updated", "params": ThreadGoalUpdatedNotification } | { "method": "thread/goal/cleared", "params": ThreadGoalClearedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "hook/started", "params": HookStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "hook/completed", "params": HookCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/autoApprovalReview/started", "params": ItemGuardianApprovalReviewStartedNotification } | { "method": "item/autoApprovalReview/completed", "params": ItemGuardianApprovalReviewCompletedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "command/exec/outputDelta", "params": CommandExecOutputDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/fileChange/patchUpdated", "params": FileChangePatchUpdatedNotification } | { "method": "serverRequest/resolved", "params": ServerRequestResolvedNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "mcpServer/startupStatus/updated", "params": McpServerStatusUpdatedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "remoteControl/status/changed", "params": RemoteControlStatusChangedNotification } | { "method": "externalAgentConfig/import/completed", "params": ExternalAgentConfigImportCompletedNotification } | { "method": "fs/changed", "params": FsChangedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "model/verification", "params": ModelVerificationNotification } | { "method": "warning", "params": WarningNotification } | { "method": "guardianWarning", "params": GuardianWarningNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/transcript/delta", "params": ThreadRealtimeTranscriptDeltaNotification } | { "method": "thread/realtime/transcript/done", "params": ThreadRealtimeTranscriptDoneNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/sdp", "params": ThreadRealtimeSdpNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification }; +export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "skills/changed", "params": SkillsChangedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/goal/updated", "params": ThreadGoalUpdatedNotification } | { "method": "thread/goal/cleared", "params": ThreadGoalClearedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "hook/started", "params": HookStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "hook/completed", "params": HookCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/autoApprovalReview/started", "params": ItemGuardianApprovalReviewStartedNotification } | { "method": "item/autoApprovalReview/completed", "params": ItemGuardianApprovalReviewCompletedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "command/exec/outputDelta", "params": CommandExecOutputDeltaNotification } | { "method": "process/outputDelta", "params": ProcessOutputDeltaNotification } | { "method": "process/exited", "params": ProcessExitedNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/fileChange/patchUpdated", "params": FileChangePatchUpdatedNotification } | { "method": "serverRequest/resolved", "params": ServerRequestResolvedNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "mcpServer/startupStatus/updated", "params": McpServerStatusUpdatedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "remoteControl/status/changed", "params": RemoteControlStatusChangedNotification } | { "method": "externalAgentConfig/import/completed", "params": ExternalAgentConfigImportCompletedNotification } | { "method": "fs/changed", "params": FsChangedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "model/verification", "params": ModelVerificationNotification } | { "method": "warning", "params": WarningNotification } | { "method": "guardianWarning", "params": GuardianWarningNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/transcript/delta", "params": ThreadRealtimeTranscriptDeltaNotification } | { "method": "thread/realtime/transcript/done", "params": ThreadRealtimeTranscriptDoneNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/sdp", "params": ThreadRealtimeSdpNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ProcessExitedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ProcessExitedNotification.ts new file mode 100644 index 0000000000..0d82633421 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ProcessExitedNotification.ts @@ -0,0 +1,42 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Final process exit notification for `process/spawn`. + */ +export type ProcessExitedNotification = { +/** + * Client-supplied, connection-scoped `processHandle` from `process/spawn`. + */ +processHandle: string, +/** + * Process exit code. + */ +exitCode: number, +/** + * Buffered stdout capture. + * + * Empty when stdout was streamed via `process/outputDelta`. + */ +stdout: string, +/** + * Whether stdout reached `outputBytesCap`. + * + * In streaming mode, stdout is empty and cap state is also reported on the + * final stdout `process/outputDelta` notification. + */ +stdoutCapReached: boolean, +/** + * Buffered stderr capture. + * + * Empty when stderr was streamed via `process/outputDelta`. + */ +stderr: string, +/** + * Whether stderr reached `outputBytesCap`. + * + * In streaming mode, stderr is empty and cap state is also reported on the + * final stderr `process/outputDelta` notification. + */ +stderrCapReached: boolean, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ProcessOutputDeltaNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ProcessOutputDeltaNotification.ts new file mode 100644 index 0000000000..46369e396a --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ProcessOutputDeltaNotification.ts @@ -0,0 +1,26 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ProcessOutputStream } from "./ProcessOutputStream"; + +/** + * Base64-encoded output chunk emitted for a streaming `process/spawn` request. + */ +export type ProcessOutputDeltaNotification = { +/** + * Client-supplied, connection-scoped `processHandle` from `process/spawn`. + */ +processHandle: string, +/** + * Output stream this chunk belongs to. + */ +stream: ProcessOutputStream, +/** + * Base64-encoded output bytes. + */ +deltaBase64: string, +/** + * True on the final streamed chunk for this stream when output was + * truncated by `outputBytesCap`. + */ +capReached: boolean, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ProcessOutputStream.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ProcessOutputStream.ts new file mode 100644 index 0000000000..1bb550d90d --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ProcessOutputStream.ts @@ -0,0 +1,8 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Stream label for `process/outputDelta` notifications. + */ +export type ProcessOutputStream = "stdout" | "stderr"; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ProcessTerminalSize.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ProcessTerminalSize.ts new file mode 100644 index 0000000000..1c4b467038 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ProcessTerminalSize.ts @@ -0,0 +1,16 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * PTY size in character cells for `process/spawn` PTY sessions. + */ +export type ProcessTerminalSize = { +/** + * Terminal height in character cells. + */ +rows: number, +/** + * Terminal width in character cells. + */ +cols: number, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index d369ba3423..037534a22f 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -295,6 +295,10 @@ export type { PluginSummary } from "./PluginSummary"; export type { PluginUninstallParams } from "./PluginUninstallParams"; export type { PluginUninstallResponse } from "./PluginUninstallResponse"; export type { PluginsMigration } from "./PluginsMigration"; +export type { ProcessExitedNotification } from "./ProcessExitedNotification"; +export type { ProcessOutputDeltaNotification } from "./ProcessOutputDeltaNotification"; +export type { ProcessOutputStream } from "./ProcessOutputStream"; +export type { ProcessTerminalSize } from "./ProcessTerminalSize"; export type { ProfileV2 } from "./ProfileV2"; export type { RateLimitReachedType } from "./RateLimitReachedType"; export type { RateLimitSnapshot } from "./RateLimitSnapshot"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index c5a7d61f01..477d0427b2 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -80,6 +80,7 @@ pub enum ClientRequestSerializationScope { Thread { thread_id: String }, ThreadPath { path: PathBuf }, CommandExecProcess { process_id: String }, + Process { process_handle: String }, FuzzyFileSearchSession { session_id: String }, FsWatch { watch_id: String }, McpOauth { server_name: String }, @@ -127,6 +128,11 @@ macro_rules! serialization_scope_expr { process_id: $actual_params.$field.clone(), }) }; + ($actual_params:ident, process_handle($params:ident . $field:ident)) => { + Some(ClientRequestSerializationScope::Process { + process_handle: $actual_params.$field.clone(), + }) + }; ($actual_params:ident, fuzzy_session_id($params:ident . $field:ident)) => { Some(ClientRequestSerializationScope::FuzzyFileSearchSession { session_id: $actual_params.$field.clone(), @@ -900,6 +906,34 @@ client_request_definitions! { serialization: command_process_id(params.process_id), response: v2::CommandExecResizeResponse, }, + #[experimental("process/spawn")] + /// Spawn a standalone process (argv vector) without a Codex sandbox. + ProcessSpawn => "process/spawn" { + params: v2::ProcessSpawnParams, + serialization: process_handle(params.process_handle), + response: v2::ProcessSpawnResponse, + }, + #[experimental("process/writeStdin")] + /// Write stdin bytes to a running `process/spawn` session or close stdin. + ProcessWriteStdin => "process/writeStdin" { + params: v2::ProcessWriteStdinParams, + serialization: process_handle(params.process_handle), + response: v2::ProcessWriteStdinResponse, + }, + #[experimental("process/kill")] + /// Terminate a running `process/spawn` session by client-supplied `processHandle`. + ProcessKill => "process/kill" { + params: v2::ProcessKillParams, + serialization: process_handle(params.process_handle), + response: v2::ProcessKillResponse, + }, + #[experimental("process/resizePty")] + /// Resize a running PTY-backed `process/spawn` session by client-supplied `processHandle`. + ProcessResizePty => "process/resizePty" { + params: v2::ProcessResizePtyParams, + serialization: process_handle(params.process_handle), + response: v2::ProcessResizePtyResponse, + }, ConfigRead => "config/read" { params: v2::ConfigReadParams, @@ -1401,6 +1435,12 @@ server_notification_definitions! { PlanDelta => "item/plan/delta" (v2::PlanDeltaNotification), /// Stream base64-encoded stdout/stderr chunks for a running `command/exec` session. CommandExecOutputDelta => "command/exec/outputDelta" (v2::CommandExecOutputDeltaNotification), + /// Stream base64-encoded stdout/stderr chunks for a running `process/spawn` session. + #[experimental("process/outputDelta")] + ProcessOutputDelta => "process/outputDelta" (v2::ProcessOutputDeltaNotification), + /// Final exit notification for a `process/spawn` session. + #[experimental("process/exited")] + ProcessExited => "process/exited" (v2::ProcessExitedNotification), CommandExecutionOutputDelta => "item/commandExecution/outputDelta" (v2::CommandExecutionOutputDeltaNotification), TerminalInteraction => "item/commandExecution/terminalInteraction" (v2::TerminalInteractionNotification), /// Deprecated legacy apply_patch output stream notification. diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 053895c2bb..7ab25a943f 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3558,6 +3558,204 @@ pub enum CommandExecOutputStream { Stderr, } +/// PTY size in character cells for `process/spawn` PTY sessions. +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ProcessTerminalSize { + /// Terminal height in character cells. + pub rows: u16, + /// Terminal width in character cells. + pub cols: u16, +} + +/// Spawn a standalone process (argv vector) without a Codex sandbox on the host +/// where the app server is running. +/// +/// `process/spawn` returns after the process has started and the connection-scoped +/// `processHandle` has been registered. Process output and exit are reported via +/// `process/outputDelta` and `process/exited` notifications. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ProcessSpawnParams { + /// Command argv vector. Empty arrays are rejected. + pub command: Vec, + /// Client-supplied, connection-scoped process handle. + /// + /// Duplicate active handles are rejected on the same connection. The same + /// handle can be reused after the prior process exits. + pub process_handle: String, + /// Absolute working directory for the process. + pub cwd: AbsolutePathBuf, + /// Enable PTY mode. + /// + /// This implies `streamStdin` and `streamStdoutStderr`. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub tty: bool, + /// Allow follow-up `process/writeStdin` requests to write stdin bytes. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub stream_stdin: bool, + /// Stream stdout/stderr via `process/outputDelta` notifications. + /// + /// Streamed bytes are not duplicated into the `process/exited` notification. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub stream_stdout_stderr: bool, + /// Optional per-stream stdout/stderr capture cap in bytes. + /// + /// When omitted, the server default applies. Set to `null` to disable the + /// cap. + #[serde( + default, + deserialize_with = "super::serde_helpers::deserialize_double_option", + serialize_with = "super::serde_helpers::serialize_double_option", + skip_serializing_if = "Option::is_none" + )] + #[ts(type = "number | null")] + #[ts(optional = nullable)] + pub output_bytes_cap: Option>, + /// Optional timeout in milliseconds. + /// + /// When omitted, the server default applies. Set to `null` to disable the + /// timeout. + #[serde( + default, + deserialize_with = "super::serde_helpers::deserialize_double_option", + serialize_with = "super::serde_helpers::serialize_double_option", + skip_serializing_if = "Option::is_none" + )] + #[ts(type = "number | null")] + #[ts(optional = nullable)] + pub timeout_ms: Option>, + /// Optional environment overrides merged into the app-server process + /// environment. + /// + /// Matching names override inherited values. Set a key to `null` to unset + /// an inherited variable. + #[ts(optional = nullable)] + pub env: Option>>, + /// Optional initial PTY size in character cells. Only valid when `tty` is + /// true. + #[ts(optional = nullable)] + pub size: Option, +} + +/// Successful response for `process/spawn`. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ProcessSpawnResponse {} + +/// Write stdin bytes to a running `process/spawn` session, close stdin, or +/// both. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ProcessWriteStdinParams { + /// Client-supplied, connection-scoped `processHandle` from `process/spawn`. + pub process_handle: String, + /// Optional base64-encoded stdin bytes to write. + #[ts(optional = nullable)] + pub delta_base64: Option, + /// Close stdin after writing `deltaBase64`, if present. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub close_stdin: bool, +} + +/// Empty success response for `process/writeStdin`. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ProcessWriteStdinResponse {} + +/// Terminate a running `process/spawn` session. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ProcessKillParams { + /// Client-supplied, connection-scoped `processHandle` from `process/spawn`. + pub process_handle: String, +} + +/// Empty success response for `process/kill`. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ProcessKillResponse {} + +/// Resize a running PTY-backed `process/spawn` session. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ProcessResizePtyParams { + /// Client-supplied, connection-scoped `processHandle` from `process/spawn`. + pub process_handle: String, + /// New PTY size in character cells. + pub size: ProcessTerminalSize, +} + +/// Empty success response for `process/resizePty`. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ProcessResizePtyResponse {} + +/// Stream label for `process/outputDelta` notifications. +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub enum ProcessOutputStream { + /// stdout stream. PTY mode multiplexes terminal output here. + Stdout, + /// stderr stream. + Stderr, +} + +/// Base64-encoded output chunk emitted for a streaming `process/spawn` request. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ProcessOutputDeltaNotification { + /// Client-supplied, connection-scoped `processHandle` from `process/spawn`. + pub process_handle: String, + /// Output stream this chunk belongs to. + pub stream: ProcessOutputStream, + /// Base64-encoded output bytes. + pub delta_base64: String, + /// True on the final streamed chunk for this stream when output was + /// truncated by `outputBytesCap`. + pub cap_reached: bool, +} + +/// Final process exit notification for `process/spawn`. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ProcessExitedNotification { + /// Client-supplied, connection-scoped `processHandle` from `process/spawn`. + pub process_handle: String, + /// Process exit code. + pub exit_code: i32, + /// Buffered stdout capture. + /// + /// Empty when stdout was streamed via `process/outputDelta`. + pub stdout: String, + /// Whether stdout reached `outputBytesCap`. + /// + /// In streaming mode, stdout is empty and cap state is also reported on the + /// final stdout `process/outputDelta` notification. + pub stdout_cap_reached: bool, + /// Buffered stderr capture. + /// + /// Empty when stderr was streamed via `process/outputDelta`. + pub stderr: String, + /// Whether stderr reached `outputBytesCap`. + /// + /// In streaming mode, stderr is empty and cap state is also reported on the + /// final stderr `process/outputDelta` notification. + pub stderr_cap_reached: bool, +} + // === Threads, Turns, and Items === // Thread APIs #[derive( @@ -9147,6 +9345,97 @@ mod tests { assert_eq!(decoded, params); } + #[test] + fn process_spawn_params_round_trips_without_sandbox_policy() { + let params = ProcessSpawnParams { + command: vec!["sleep".to_string(), "30".to_string()], + process_handle: "sleep-1".to_string(), + cwd: test_absolute_path(), + tty: false, + stream_stdin: false, + stream_stdout_stderr: false, + output_bytes_cap: None, + timeout_ms: None, + env: None, + size: None, + }; + + let value = serde_json::to_value(¶ms).expect("serialize process/spawn params"); + assert_eq!( + value, + json!({ + "command": ["sleep", "30"], + "processHandle": "sleep-1", + "cwd": absolute_path_string("readable"), + "env": null, + "size": null, + }) + ); + + let decoded = + serde_json::from_value::(value).expect("deserialize round-trip"); + assert_eq!(decoded, params); + } + + #[test] + fn process_spawn_params_distinguish_omitted_null_and_value_limits() { + let base = json!({ + "command": ["sleep", "30"], + "processHandle": "sleep-1", + "cwd": absolute_path_string("readable"), + }); + + let expected_omitted = ProcessSpawnParams { + command: vec!["sleep".to_string(), "30".to_string()], + process_handle: "sleep-1".to_string(), + cwd: test_absolute_path(), + tty: false, + stream_stdin: false, + stream_stdout_stderr: false, + output_bytes_cap: None, + timeout_ms: None, + env: None, + size: None, + }; + let decoded = + serde_json::from_value::(base).expect("deserialize omitted limits"); + assert_eq!(decoded, expected_omitted); + + let decoded = serde_json::from_value::(json!({ + "command": ["sleep", "30"], + "processHandle": "sleep-1", + "cwd": absolute_path_string("readable"), + "outputBytesCap": null, + "timeoutMs": null, + })) + .expect("deserialize disabled limits"); + assert_eq!( + decoded, + ProcessSpawnParams { + output_bytes_cap: Some(None), + timeout_ms: Some(None), + ..expected_omitted.clone() + } + ); + + let decoded = serde_json::from_value::(json!({ + "command": ["sleep", "30"], + "processHandle": "sleep-1", + "cwd": absolute_path_string("readable"), + "outputBytesCap": 123, + "timeoutMs": 456, + })) + .expect("deserialize explicit limits"); + assert_eq!( + decoded, + ProcessSpawnParams { + output_bytes_cap: Some(Some(123)), + timeout_ms: Some(Some(456)), + ..expected_omitted + } + ); + } + #[test] fn command_exec_params_round_trips_disable_output_cap() { let params = CommandExecParams { @@ -9379,6 +9668,110 @@ mod tests { assert_eq!(decoded, notification); } + #[test] + fn process_control_params_round_trip() { + let write = ProcessWriteStdinParams { + process_handle: "proc-7".to_string(), + delta_base64: None, + close_stdin: true, + }; + let value = serde_json::to_value(&write).expect("serialize process/writeStdin params"); + assert_eq!( + value, + json!({ + "processHandle": "proc-7", + "deltaBase64": null, + "closeStdin": true, + }) + ); + let decoded = serde_json::from_value::(value) + .expect("deserialize process/writeStdin params"); + assert_eq!(decoded, write); + + let resize = ProcessResizePtyParams { + process_handle: "proc-7".to_string(), + size: ProcessTerminalSize { + rows: 50, + cols: 160, + }, + }; + let value = serde_json::to_value(&resize).expect("serialize process/resizePty params"); + assert_eq!( + value, + json!({ + "processHandle": "proc-7", + "size": { + "rows": 50, + "cols": 160, + }, + }) + ); + let decoded = serde_json::from_value::(value) + .expect("deserialize process/resizePty params"); + assert_eq!(decoded, resize); + + let kill = ProcessKillParams { + process_handle: "proc-7".to_string(), + }; + let value = serde_json::to_value(&kill).expect("serialize process/kill params"); + assert_eq!( + value, + json!({ + "processHandle": "proc-7", + }) + ); + let decoded = + serde_json::from_value::(value).expect("deserialize process/kill"); + assert_eq!(decoded, kill); + } + + #[test] + fn process_notifications_round_trip() { + let delta = ProcessOutputDeltaNotification { + process_handle: "proc-1".to_string(), + stream: ProcessOutputStream::Stdout, + delta_base64: "AQI=".to_string(), + cap_reached: false, + }; + let value = serde_json::to_value(&delta).expect("serialize process/outputDelta"); + assert_eq!( + value, + json!({ + "processHandle": "proc-1", + "stream": "stdout", + "deltaBase64": "AQI=", + "capReached": false, + }) + ); + let decoded = serde_json::from_value::(value) + .expect("deserialize process/outputDelta"); + assert_eq!(decoded, delta); + + let exited = ProcessExitedNotification { + process_handle: "proc-1".to_string(), + exit_code: 0, + stdout: "out".to_string(), + stdout_cap_reached: false, + stderr: "err".to_string(), + stderr_cap_reached: true, + }; + let value = serde_json::to_value(&exited).expect("serialize process/exited"); + assert_eq!( + value, + json!({ + "processHandle": "proc-1", + "exitCode": 0, + "stdout": "out", + "stdoutCapReached": false, + "stderr": "err", + "stderrCapReached": true, + }) + ); + let decoded = serde_json::from_value::(value) + .expect("deserialize process/exited"); + assert_eq!(decoded, exited); + } + #[test] fn command_execution_output_delta_round_trips() { let notification = CommandExecutionOutputDeltaNotification { diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index df41d859cd..3b80234ad0 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -181,6 +181,12 @@ Example with notification opt-out: - `command/exec/resize` — resize a running PTY-backed `command/exec` session by `processId`; returns `{}`. - `command/exec/terminate` — terminate a running `command/exec` session by `processId`; returns `{}`. - `command/exec/outputDelta` — notification emitted for base64-encoded stdout/stderr chunks from a streaming `command/exec` session. +- `process/spawn` — experimental; spawn a standalone process without the Codex sandbox on the host where the app server is running; returns after the process starts and emits `process/outputDelta` and `process/exited` notifications. +- `process/writeStdin` — experimental; write base64-decoded stdin bytes to a running `process/spawn` session or close stdin; returns `{}`. +- `process/resizePty` — experimental; resize a running PTY-backed `process/spawn` session by `processHandle`; returns `{}`. +- `process/kill` — experimental; terminate a running `process/spawn` session by `processHandle`; returns `{}`. +- `process/outputDelta` — experimental; notification emitted for base64-encoded stdout/stderr chunks from a streaming `process/spawn` session. +- `process/exited` — experimental; notification emitted when a `process/spawn` session exits. - `fs/readFile` — read an absolute file path and return `{ dataBase64 }`. - `fs/writeFile` — write an absolute file path from base64-encoded `{ dataBase64 }`; returns `{}`. - `fs/createDirectory` — create an absolute directory path; `recursive` defaults to `true`. @@ -930,6 +936,7 @@ Run a standalone command (argv vector) in the server’s sandbox without creatin } } ``` +- Prefer using `process/spawn` when you want an explicitly unsandboxed process execution API with immediate spawn acknowledgement, handle-based control, output notifications, and an exit notification. - For clients that are already sandboxed externally, set the legacy `sandboxPolicy` to `{"type":"externalSandbox","networkAccess":"enabled"}` (or omit `networkAccess` to keep it restricted). Codex will not enforce its own sandbox in this mode; it tells the model it has full file-system access and passes the `networkAccess` state through `environment_context`. Notes: @@ -1001,6 +1008,83 @@ Streaming stdin/stdout uses base64 so PTY sessions can carry arbitrary bytes: - `command/exec.params.env` overrides the server-computed environment per key; set a key to `null` to unset an inherited variable. - `command/exec/resize` is only supported for PTY-backed `command/exec` sessions. +### Example: Process lifecycle execution + +Use `process/spawn` to start a standalone argv-based process without the Codex sandbox on the host where the app server is running. The `process/*` API is experimental and requires `initialize.params.capabilities.experimentalApi: true`. The spawn response means the process has started and the `processHandle` is registered; completion is reported later through `process/exited`. + +```json +{ "method": "process/spawn", "id": 40, "params": { + "command": ["cargo", "check"], + "processHandle": "cargo-check-1", + "cwd": "/Users/me/project", // required absolute path + "env": { "RUST_LOG": null }, // optional; override or unset app-server env vars + "outputBytesCap": 1048576, // optional; omit for default, null disables + "timeoutMs": 10000 // optional; omit for default, null disables +} } +{ "id": 40, "result": {} } +{ "method": "process/exited", "params": { + "processHandle": "cargo-check-1", + "exitCode": 0, + "stdout": "...", + "stdoutCapReached": false, + "stderr": "", + "stderrCapReached": false +} } +``` + +For interactive or streaming processes, set `tty: true` or `streamStdoutStderr: true` and route output notifications by `processHandle`: + +```json +{ "method": "process/spawn", "id": 41, "params": { + "command": ["bash", "-i"], + "processHandle": "bash-1", + "cwd": "/Users/me/project", + "tty": true, + "size": { "rows": 40, "cols": 120 }, + "outputBytesCap": null, + "timeoutMs": null +} } +{ "id": 41, "result": {} } +{ "method": "process/outputDelta", "params": { + "processHandle": "bash-1", + "stream": "stdout", + "deltaBase64": "YmFzaC00LjQkIA==", + "capReached": false +} } +{ "method": "process/writeStdin", "id": 42, "params": { + "processHandle": "bash-1", + "deltaBase64": "cHdkCg==" +} } +{ "id": 42, "result": {} } +{ "method": "process/resizePty", "id": 43, "params": { + "processHandle": "bash-1", + "size": { "rows": 48, "cols": 160 } +} } +{ "id": 43, "result": {} } +{ "method": "process/kill", "id": 44, "params": { + "processHandle": "bash-1" +} } +{ "id": 44, "result": {} } +{ "method": "process/exited", "params": { + "processHandle": "bash-1", + "exitCode": 137, + "stdout": "", + "stdoutCapReached": false, + "stderr": "", + "stderrCapReached": false +} } +``` + +- Empty `command` arrays and empty `processHandle` strings are rejected. +- `cwd` is required and must be absolute. +- `process/spawn` is intentionally unsandboxed and does not define sandbox-selection fields such as `sandboxPolicy` or `permissionProfile`. +- Duplicate active `processHandle` values are rejected on the same connection; the same handle can be reused after the prior process exits. +- `tty: true` implies PTY mode plus `streamStdin: true` and `streamStdoutStderr: true`. +- `process/writeStdin` accepts either `deltaBase64`, `closeStdin`, or both. +- When omitted, `timeoutMs` and `outputBytesCap` fall back to server defaults. Set either field to `null` to disable that limit for terminal-style sessions. +- `outputBytesCap` applies independently to `stdout` and `stderr`; `process/exited.stdoutCapReached` and `stderrCapReached` report whether each stream reached the cap. Streamed bytes are not duplicated into `process/exited`. +- `process/outputDelta` and `process/exited` notifications are connection-scoped. If the originating connection closes, the server terminates the process. + ### Example: Filesystem utilities These methods operate on absolute paths on the host filesystem and cover reading, writing, directory traversal, copying, removal, and change notifications. diff --git a/codex-rs/app-server/src/command_exec.rs b/codex-rs/app-server/src/command_exec.rs index 699556dd5b..443117e592 100644 --- a/codex-rs/app-server/src/command_exec.rs +++ b/codex-rs/app-server/src/command_exec.rs @@ -477,7 +477,7 @@ async fn run_command(params: RunCommandParams) { }); let stderr_handle = spawn_process_output(SpawnProcessOutputParams { connection_id: request_id.connection_id, - process_id, + process_id: process_id.clone(), output_rx: stderr_rx, stdio_timeout_rx, outgoing: Arc::clone(&outgoing), diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index b4bce010de..6054212045 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -26,6 +26,7 @@ use crate::request_processors::InitializeRequestProcessor; use crate::request_processors::MarketplaceRequestProcessor; use crate::request_processors::McpRequestProcessor; use crate::request_processors::PluginRequestProcessor; +use crate::request_processors::ProcessExecRequestProcessor; use crate::request_processors::SearchRequestProcessor; use crate::request_processors::ThreadGoalRequestProcessor; use crate::request_processors::ThreadRequestProcessor; @@ -157,6 +158,7 @@ pub(crate) struct MessageProcessor { apps_processor: AppsRequestProcessor, catalog_processor: CatalogRequestProcessor, command_exec_processor: CommandExecRequestProcessor, + process_exec_processor: ProcessExecRequestProcessor, config_processor: ConfigRequestProcessor, device_key_processor: DeviceKeyRequestProcessor, external_agent_config_processor: ExternalAgentConfigRequestProcessor, @@ -335,6 +337,7 @@ impl MessageProcessor { Arc::clone(&config), outgoing.clone(), ); + let process_exec_processor = ProcessExecRequestProcessor::new(outgoing.clone()); let feedback_processor = FeedbackRequestProcessor::new( auth_manager.clone(), Arc::clone(&thread_manager), @@ -457,6 +460,7 @@ impl MessageProcessor { apps_processor, catalog_processor, command_exec_processor, + process_exec_processor, config_processor, device_key_processor, external_agent_config_processor, @@ -675,6 +679,9 @@ impl MessageProcessor { self.command_exec_processor .connection_closed(connection_id) .await; + self.process_exec_processor + .connection_closed(connection_id) + .await; self.thread_processor.connection_closed(connection_id).await; } @@ -1240,6 +1247,26 @@ impl MessageProcessor { .command_exec_terminate(request_id.clone(), params) .await } + ClientRequest::ProcessSpawn { params, .. } => self + .process_exec_processor + .process_spawn(request_id.clone(), params) + .await + .map(|()| None), + ClientRequest::ProcessWriteStdin { params, .. } => { + self.process_exec_processor + .process_write_stdin(request_id.clone(), params) + .await + } + ClientRequest::ProcessKill { params, .. } => { + self.process_exec_processor + .process_kill(request_id.clone(), params) + .await + } + ClientRequest::ProcessResizePty { params, .. } => { + self.process_exec_processor + .process_resize_pty(request_id.clone(), params) + .await + } ClientRequest::FeedbackUpload { params, .. } => { self.feedback_processor.feedback_upload(params).await } diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index f59ea14402..55b3aeef13 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -429,6 +429,7 @@ mod initialize_processor; mod marketplace_processor; mod mcp_processor; mod plugins; +mod process_exec_processor; mod search; mod thread_processor; mod token_usage_replay; @@ -449,6 +450,7 @@ pub(crate) use initialize_processor::InitializeRequestProcessor; pub(crate) use marketplace_processor::MarketplaceRequestProcessor; pub(crate) use mcp_processor::McpRequestProcessor; pub(crate) use plugins::PluginRequestProcessor; +pub(crate) use process_exec_processor::ProcessExecRequestProcessor; pub(crate) use search::SearchRequestProcessor; pub(crate) use thread_goal_processor::ThreadGoalRequestProcessor; pub(crate) use thread_processor::ThreadRequestProcessor; diff --git a/codex-rs/app-server/src/request_processors/process_exec_processor.rs b/codex-rs/app-server/src/request_processors/process_exec_processor.rs new file mode 100644 index 0000000000..5742d0e4d5 --- /dev/null +++ b/codex-rs/app-server/src/request_processors/process_exec_processor.rs @@ -0,0 +1,708 @@ +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::sync::Arc; +use std::time::Duration; + +use base64::Engine; +use base64::engine::general_purpose::STANDARD; +use codex_app_server_protocol::ClientResponsePayload; +use codex_app_server_protocol::JSONRPCErrorError; +use codex_app_server_protocol::ProcessExitedNotification; +use codex_app_server_protocol::ProcessKillParams; +use codex_app_server_protocol::ProcessKillResponse; +use codex_app_server_protocol::ProcessOutputDeltaNotification; +use codex_app_server_protocol::ProcessOutputStream; +use codex_app_server_protocol::ProcessResizePtyParams; +use codex_app_server_protocol::ProcessResizePtyResponse; +use codex_app_server_protocol::ProcessSpawnParams; +use codex_app_server_protocol::ProcessSpawnResponse; +use codex_app_server_protocol::ProcessTerminalSize; +use codex_app_server_protocol::ProcessWriteStdinParams; +use codex_app_server_protocol::ProcessWriteStdinResponse; +use codex_app_server_protocol::ServerNotification; +use codex_core::exec::ExecExpiration; +use codex_core::exec::ExecExpirationOutcome; +use codex_core::exec::IO_DRAIN_TIMEOUT_MS; +use codex_protocol::exec_output::bytes_to_string_smart; +use codex_utils_absolute_path::AbsolutePathBuf; +use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP; +use codex_utils_pty::ProcessHandle; +use codex_utils_pty::SpawnedProcess; +use codex_utils_pty::TerminalSize; +use tokio::sync::Mutex; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::sync::watch; +use tokio_util::sync::CancellationToken; + +use crate::error_code::internal_error; +use crate::error_code::invalid_params; +use crate::error_code::invalid_request; +use crate::outgoing_message::ConnectionId; +use crate::outgoing_message::ConnectionRequestId; +use crate::outgoing_message::OutgoingMessageSender; + +const EXEC_TIMEOUT_EXIT_CODE: i32 = 124; +const OUTPUT_CHUNK_SIZE_HINT: usize = 64 * 1024; + +#[derive(Clone)] +pub(crate) struct ProcessExecRequestProcessor { + outgoing: Arc, + process_exec_manager: ProcessExecManager, +} + +impl ProcessExecRequestProcessor { + pub(crate) fn new(outgoing: Arc) -> Self { + Self { + outgoing, + process_exec_manager: ProcessExecManager::default(), + } + } + + pub(crate) async fn process_spawn( + &self, + request_id: ConnectionRequestId, + params: ProcessSpawnParams, + ) -> Result<(), JSONRPCErrorError> { + let ProcessSpawnParams { + command, + process_handle, + cwd, + tty, + stream_stdin, + stream_stdout_stderr, + output_bytes_cap, + timeout_ms, + env: env_overrides, + size, + } = params; + let method_name = "process/spawn"; + tracing::debug!("{method_name} command: {command:?}"); + if command.is_empty() { + return Err(invalid_request("command must not be empty")); + } + if process_handle.is_empty() { + return Err(invalid_request("processHandle must not be empty")); + } + if size.is_some() && !tty { + return Err(invalid_params("process/spawn size requires tty: true")); + } + let mut env = std::env::vars().collect::>(); + if let Some(env_overrides) = env_overrides { + for (key, value) in env_overrides { + match value { + Some(value) => { + env.insert(key, value); + } + None => { + env.remove(&key); + } + } + } + } + let expiration = match timeout_ms { + Some(Some(timeout_ms)) => match u64::try_from(timeout_ms) { + Ok(timeout_ms) => timeout_ms.into(), + Err(_) => { + return Err(invalid_params(format!( + "{method_name} timeoutMs must be non-negative, got {timeout_ms}" + ))); + } + }, + Some(None) => ExecExpiration::Cancellation(CancellationToken::new()), + None => ExecExpiration::DefaultTimeout, + }; + let output_bytes_cap = output_bytes_cap.unwrap_or(Some(DEFAULT_OUTPUT_BYTES_CAP)); + let size = size.map(terminal_size_from_protocol).transpose()?; + + self.process_exec_manager + .start(StartProcessParams { + outgoing: self.outgoing.clone(), + request_id, + process_handle, + command, + cwd, + env, + expiration, + tty, + stream_stdin, + stream_stdout_stderr, + output_bytes_cap, + size, + }) + .await?; + + Ok(()) + } + + pub(crate) async fn process_write_stdin( + &self, + request_id: ConnectionRequestId, + params: ProcessWriteStdinParams, + ) -> Result, JSONRPCErrorError> { + self.process_exec_manager + .write_stdin(request_id, params) + .await + .map(|response| Some(response.into())) + } + + pub(crate) async fn process_resize_pty( + &self, + request_id: ConnectionRequestId, + params: ProcessResizePtyParams, + ) -> Result, JSONRPCErrorError> { + self.process_exec_manager + .resize_pty(request_id, params) + .await + .map(|response| Some(response.into())) + } + + pub(crate) async fn process_kill( + &self, + request_id: ConnectionRequestId, + params: ProcessKillParams, + ) -> Result, JSONRPCErrorError> { + self.process_exec_manager + .kill(request_id, params) + .await + .map(|response| Some(response.into())) + } + + pub(crate) async fn connection_closed(&self, connection_id: ConnectionId) { + self.process_exec_manager + .connection_closed(connection_id) + .await; + } +} + +#[derive(Clone, Default)] +struct ProcessExecManager { + sessions: Arc>>, +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct ConnectionProcessHandle { + connection_id: ConnectionId, + process_handle: String, +} + +#[derive(Clone)] +struct ProcessSession { + control_tx: mpsc::Sender, +} + +enum ProcessControl { + Write { delta: Vec, close_stdin: bool }, + Resize { size: TerminalSize }, + Kill, +} + +struct ProcessControlRequest { + control: ProcessControl, + response_tx: Option>>, +} + +struct StartProcessParams { + outgoing: Arc, + request_id: ConnectionRequestId, + process_handle: String, + command: Vec, + cwd: AbsolutePathBuf, + env: HashMap, + expiration: ExecExpiration, + tty: bool, + stream_stdin: bool, + stream_stdout_stderr: bool, + output_bytes_cap: Option, + size: Option, +} + +struct RunProcessParams { + outgoing: Arc, + request_id: ConnectionRequestId, + process_handle: String, + spawned: SpawnedProcess, + control_rx: mpsc::Receiver, + stream_stdin: bool, + stream_stdout_stderr: bool, + expiration: ExecExpiration, + output_bytes_cap: Option, +} + +struct SpawnProcessOutputParams { + connection_id: ConnectionId, + process_handle: String, + output_rx: mpsc::Receiver>, + stdio_timeout_rx: watch::Receiver, + outgoing: Arc, + stream: ProcessOutputStream, + stream_output: bool, + output_bytes_cap: Option, +} + +#[derive(Default)] +struct ProcessOutputCapture { + text: String, + cap_reached: bool, +} + +impl ProcessExecManager { + async fn start(&self, params: StartProcessParams) -> Result<(), JSONRPCErrorError> { + let StartProcessParams { + outgoing, + request_id, + process_handle, + command, + cwd, + env, + expiration, + tty, + stream_stdin, + stream_stdout_stderr, + output_bytes_cap, + size, + } = params; + + let (program, args) = command + .split_first() + .ok_or_else(|| invalid_request("command must not be empty"))?; + let stream_stdin = tty || stream_stdin; + let stream_stdout_stderr = tty || stream_stdout_stderr; + let arg0 = None; + let (control_tx, control_rx) = mpsc::channel(32); + let process_key = ConnectionProcessHandle { + connection_id: request_id.connection_id, + process_handle: process_handle.clone(), + }; + + { + let mut sessions = self.sessions.lock().await; + match sessions.entry(process_key.clone()) { + Entry::Occupied(_) => { + return Err(invalid_request(format!( + "duplicate active process handle: {process_handle:?}", + ))); + } + Entry::Vacant(entry) => { + entry.insert(ProcessSession { control_tx }); + } + } + } + + let spawned = if tty { + codex_utils_pty::spawn_pty_process( + program, + args, + cwd.as_path(), + &env, + &arg0, + size.unwrap_or_default(), + ) + .await + } else if stream_stdin { + codex_utils_pty::spawn_pipe_process(program, args, cwd.as_path(), &env, &arg0).await + } else { + codex_utils_pty::spawn_pipe_process_no_stdin(program, args, cwd.as_path(), &env, &arg0) + .await + }; + let spawned = match spawned { + Ok(spawned) => spawned, + Err(err) => { + self.sessions.lock().await.remove(&process_key); + return Err(internal_error(format!("failed to spawn process: {err}"))); + } + }; + + outgoing + .send_response(request_id.clone(), ProcessSpawnResponse {}) + .await; + + let sessions = Arc::clone(&self.sessions); + tokio::spawn(async move { + run_process(RunProcessParams { + outgoing, + request_id, + process_handle, + spawned, + control_rx, + stream_stdin, + stream_stdout_stderr, + expiration, + output_bytes_cap, + }) + .await; + sessions.lock().await.remove(&process_key); + }); + + Ok(()) + } + + async fn write_stdin( + &self, + request_id: ConnectionRequestId, + params: ProcessWriteStdinParams, + ) -> Result { + if params.delta_base64.is_none() && !params.close_stdin { + return Err(invalid_params( + "process/writeStdin requires deltaBase64 or closeStdin", + )); + } + + let delta = match params.delta_base64 { + Some(delta_base64) => STANDARD + .decode(delta_base64) + .map_err(|err| invalid_params(format!("invalid deltaBase64: {err}")))?, + None => Vec::new(), + }; + + self.send_control( + request_id.connection_id, + params.process_handle, + ProcessControl::Write { + delta, + close_stdin: params.close_stdin, + }, + ) + .await?; + + Ok(ProcessWriteStdinResponse {}) + } + + async fn kill( + &self, + request_id: ConnectionRequestId, + params: ProcessKillParams, + ) -> Result { + self.send_control( + request_id.connection_id, + params.process_handle, + ProcessControl::Kill, + ) + .await?; + Ok(ProcessKillResponse {}) + } + + async fn resize_pty( + &self, + request_id: ConnectionRequestId, + params: ProcessResizePtyParams, + ) -> Result { + self.send_control( + request_id.connection_id, + params.process_handle, + ProcessControl::Resize { + size: terminal_size_from_protocol(params.size)?, + }, + ) + .await?; + Ok(ProcessResizePtyResponse {}) + } + + async fn connection_closed(&self, connection_id: ConnectionId) { + let controls = { + let mut sessions = self.sessions.lock().await; + let process_handles = sessions + .keys() + .filter(|process_handle| process_handle.connection_id == connection_id) + .cloned() + .collect::>(); + let mut controls = Vec::with_capacity(process_handles.len()); + for process_handle in process_handles { + if let Some(control) = sessions.remove(&process_handle) { + controls.push(control); + } + } + controls + }; + + for control in controls { + let _ = control + .control_tx + .send(ProcessControlRequest { + control: ProcessControl::Kill, + response_tx: None, + }) + .await; + } + } + + async fn send_control( + &self, + connection_id: ConnectionId, + process_handle: String, + control: ProcessControl, + ) -> Result<(), JSONRPCErrorError> { + let process_key = ConnectionProcessHandle { + connection_id, + process_handle, + }; + let session = self + .sessions + .lock() + .await + .get(&process_key) + .cloned() + .ok_or_else(|| no_active_process_error(&process_key.process_handle))?; + let (response_tx, response_rx) = oneshot::channel(); + session + .control_tx + .send(ProcessControlRequest { + control, + response_tx: Some(response_tx), + }) + .await + .map_err(|_| process_no_longer_running_error(&process_key.process_handle))?; + response_rx + .await + .map_err(|_| process_no_longer_running_error(&process_key.process_handle))? + } +} + +async fn run_process(params: RunProcessParams) { + let RunProcessParams { + outgoing, + request_id, + process_handle, + spawned, + control_rx, + stream_stdin, + stream_stdout_stderr, + expiration, + output_bytes_cap, + } = params; + let mut control_rx = control_rx; + let mut control_open = true; + let expiration = expiration.wait_with_outcome(); + tokio::pin!(expiration); + let SpawnedProcess { + session, + stdout_rx, + stderr_rx, + exit_rx, + } = spawned; + tokio::pin!(exit_rx); + let mut expiration_outcome = None; + let (stdio_timeout_tx, stdio_timeout_rx) = watch::channel(false); + + let stdout_handle = collect_spawn_process_output(SpawnProcessOutputParams { + connection_id: request_id.connection_id, + process_handle: process_handle.clone(), + output_rx: stdout_rx, + stdio_timeout_rx: stdio_timeout_rx.clone(), + outgoing: Arc::clone(&outgoing), + stream: ProcessOutputStream::Stdout, + stream_output: stream_stdout_stderr, + output_bytes_cap, + }); + let stderr_handle = collect_spawn_process_output(SpawnProcessOutputParams { + connection_id: request_id.connection_id, + process_handle: process_handle.clone(), + output_rx: stderr_rx, + stdio_timeout_rx, + outgoing: Arc::clone(&outgoing), + stream: ProcessOutputStream::Stderr, + stream_output: stream_stdout_stderr, + output_bytes_cap, + }); + + let exit_code = loop { + tokio::select! { + control = control_rx.recv(), if control_open => { + match control { + Some(ProcessControlRequest { control, response_tx }) => { + let result = match control { + ProcessControl::Write { delta, close_stdin } => { + handle_process_write( + &session, + stream_stdin, + delta, + close_stdin, + ).await + } + ProcessControl::Resize { size } => { + handle_process_resize(&session, size) + } + ProcessControl::Kill => { + session.request_terminate(); + Ok(()) + } + }; + if let Some(response_tx) = response_tx + && response_tx.send(result).is_err() + { + tracing::debug!( + process_handle = %process_handle, + "process control response receiver dropped" + ); + } + }, + None => { + control_open = false; + session.request_terminate(); + } + } + } + outcome = &mut expiration, if expiration_outcome.is_none() => { + expiration_outcome = Some(outcome); + session.request_terminate(); + } + exit = &mut exit_rx => { + if matches!(expiration_outcome, Some(ExecExpirationOutcome::TimedOut)) { + break EXEC_TIMEOUT_EXIT_CODE; + } else { + break exit.unwrap_or(-1); + } + } + } + }; + + // Give stdout/stderr readers a bounded grace period to drain after process exit. + let timeout_handle = tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(IO_DRAIN_TIMEOUT_MS)).await; + let _ = stdio_timeout_tx.send(true); + }); + + let stdout = stdout_handle.await.unwrap_or_default(); + let stderr = stderr_handle.await.unwrap_or_default(); + timeout_handle.abort(); + + outgoing + .send_server_notification_to_connection_and_wait( + request_id.connection_id, + ServerNotification::ProcessExited(ProcessExitedNotification { + process_handle, + exit_code, + stdout: stdout.text, + stdout_cap_reached: stdout.cap_reached, + stderr: stderr.text, + stderr_cap_reached: stderr.cap_reached, + }), + ) + .await; +} + +fn collect_spawn_process_output( + params: SpawnProcessOutputParams, +) -> tokio::task::JoinHandle { + let SpawnProcessOutputParams { + connection_id, + process_handle, + mut output_rx, + mut stdio_timeout_rx, + outgoing, + stream, + stream_output, + output_bytes_cap, + } = params; + tokio::spawn(async move { + let mut buffer: Vec = Vec::new(); + let mut observed_num_bytes = 0usize; + let mut cap_reached = false; + loop { + let mut chunk = tokio::select! { + chunk = output_rx.recv() => match chunk { + Some(chunk) => chunk, + None => break, + }, + _ = stdio_timeout_rx.wait_for(|&v| v) => break, + }; + while chunk.len() < OUTPUT_CHUNK_SIZE_HINT + && let Ok(next_chunk) = output_rx.try_recv() + { + chunk.extend_from_slice(&next_chunk); + } + let capped_chunk = match output_bytes_cap { + Some(output_bytes_cap) => { + let capped_chunk_len = output_bytes_cap + .saturating_sub(observed_num_bytes) + .min(chunk.len()); + observed_num_bytes += capped_chunk_len; + &chunk[0..capped_chunk_len] + } + None => chunk.as_slice(), + }; + cap_reached = Some(observed_num_bytes) == output_bytes_cap; + if stream_output { + outgoing + .send_server_notification_to_connection_and_wait( + connection_id, + ServerNotification::ProcessOutputDelta(ProcessOutputDeltaNotification { + process_handle: process_handle.clone(), + stream, + delta_base64: STANDARD.encode(capped_chunk), + cap_reached, + }), + ) + .await; + } else { + buffer.extend_from_slice(capped_chunk); + } + if cap_reached { + break; + } + } + ProcessOutputCapture { + text: bytes_to_string_smart(&buffer), + cap_reached, + } + }) +} + +async fn handle_process_write( + session: &ProcessHandle, + stream_stdin: bool, + delta: Vec, + close_stdin: bool, +) -> Result<(), JSONRPCErrorError> { + if !stream_stdin { + return Err(invalid_request( + "stdin streaming is not enabled for this process", + )); + } + if !delta.is_empty() { + session + .writer_sender() + .send(delta) + .await + .map_err(|_| invalid_request("stdin is already closed"))?; + } + if close_stdin { + // Closing drops our sender; the writer task still drains any bytes + // accepted above before its receiver observes EOF and closes stdin. + session.close_stdin(); + } + Ok(()) +} + +fn handle_process_resize( + session: &ProcessHandle, + size: TerminalSize, +) -> Result<(), JSONRPCErrorError> { + session + .resize(size) + .map_err(|err| invalid_request(format!("failed to resize PTY: {err}"))) +} + +fn terminal_size_from_protocol( + size: ProcessTerminalSize, +) -> Result { + if size.rows == 0 || size.cols == 0 { + return Err(invalid_params( + "process size rows and cols must be greater than 0", + )); + } + Ok(TerminalSize { + rows: size.rows, + cols: size.cols, + }) +} + +fn no_active_process_error(process_handle: &str) -> JSONRPCErrorError { + invalid_request(format!( + "no active process for process handle {process_handle:?}" + )) +} + +fn process_no_longer_running_error(process_handle: &str) -> JSONRPCErrorError { + invalid_request(format!("process {process_handle:?} is no longer running")) +} diff --git a/codex-rs/app-server/src/request_serialization.rs b/codex-rs/app-server/src/request_serialization.rs index c3e21d134e..0eb509e098 100644 --- a/codex-rs/app-server/src/request_serialization.rs +++ b/codex-rs/app-server/src/request_serialization.rs @@ -27,6 +27,10 @@ pub(crate) enum RequestSerializationQueueKey { connection_id: ConnectionId, process_id: String, }, + Process { + connection_id: ConnectionId, + process_handle: String, + }, FuzzyFileSearchSession { session_id: String, }, @@ -54,6 +58,10 @@ impl RequestSerializationQueueKey { process_id, } } + ClientRequestSerializationScope::Process { process_handle } => Self::Process { + connection_id, + process_handle, + }, ClientRequestSerializationScope::FuzzyFileSearchSession { session_id } => { Self::FuzzyFileSearchSession { session_id } } diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index 2abdbd8f7c..c2a49d8fa6 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -61,6 +61,10 @@ use codex_app_server_protocol::PluginListParams; use codex_app_server_protocol::PluginReadParams; use codex_app_server_protocol::PluginSkillReadParams; use codex_app_server_protocol::PluginUninstallParams; +use codex_app_server_protocol::ProcessKillParams; +use codex_app_server_protocol::ProcessResizePtyParams; +use codex_app_server_protocol::ProcessSpawnParams; +use codex_app_server_protocol::ProcessWriteStdinParams; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ReviewStartParams; use codex_app_server_protocol::SendAddCreditsNudgeEmailParams; @@ -741,6 +745,42 @@ impl McpProcess { self.send_request("command/exec", params).await } + /// Send a `process/spawn` JSON-RPC request (v2). + pub async fn send_process_spawn_request( + &mut self, + params: ProcessSpawnParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("process/spawn", params).await + } + + /// Send a `process/writeStdin` JSON-RPC request (v2). + pub async fn send_process_write_stdin_request( + &mut self, + params: ProcessWriteStdinParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("process/writeStdin", params).await + } + + /// Send a `process/resizePty` JSON-RPC request (v2). + pub async fn send_process_resize_pty_request( + &mut self, + params: ProcessResizePtyParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("process/resizePty", params).await + } + + /// Send a `process/kill` JSON-RPC request (v2). + pub async fn send_process_kill_request( + &mut self, + params: ProcessKillParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("process/kill", params).await + } + /// Send a `command/exec/write` JSON-RPC request (v2). pub async fn send_command_exec_write_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index a951257cc2..be5f12a535 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -35,6 +35,7 @@ mod plugin_list; mod plugin_read; mod plugin_share; mod plugin_uninstall; +mod process_exec; mod rate_limits; mod realtime_conversation; #[cfg(debug_assertions)] diff --git a/codex-rs/app-server/tests/suite/v2/process_exec.rs b/codex-rs/app-server/tests/suite/v2/process_exec.rs new file mode 100644 index 0000000000..5dd3e84b4c --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/process_exec.rs @@ -0,0 +1,250 @@ +use anyhow::Context; +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_mock_responses_server_sequence_unchecked; +use codex_app_server_protocol::ProcessExitedNotification; +use codex_app_server_protocol::ProcessKillParams; +use codex_app_server_protocol::ProcessSpawnParams; +use codex_app_server_protocol::RequestId; +use codex_utils_absolute_path::AbsolutePathBuf; +use pretty_assertions::assert_eq; +use std::collections::HashMap; +use std::path::Path; +use tempfile::TempDir; +use tokio::time::Duration; +use tokio::time::sleep; +use tokio::time::timeout; +use wiremock::MockServer; + +use super::connection_handling_websocket::DEFAULT_READ_TIMEOUT; +use super::connection_handling_websocket::create_config_toml; + +#[tokio::test] +async fn process_spawn_returns_before_exit_and_emits_exit_notification() -> Result<()> { + let codex_home = TempDir::new()?; + let (_server, mut mcp) = initialized_mcp(codex_home.path()).await?; + + let process_handle = "one-shot-1".to_string(); + let probe_file = codex_home.path().join("process-created"); + let release_file = codex_home.path().join("process-release"); + // Use a probe/release handshake instead of asserting on wall-clock timing: + // the child proves it started by writing the probe file, then waits for the + // test to create the release file before it can emit output and exit. + let command = if cfg!(windows) { + vec![ + "powershell.exe".to_string(), + "-NoProfile".to_string(), + "-NonInteractive".to_string(), + "-Command".to_string(), + concat!( + "[IO.File]::WriteAllText($env:CODEX_PROCESS_EXEC_PROBE_FILE, 'process'); ", + "while (!(Test-Path -LiteralPath $env:CODEX_PROCESS_EXEC_RELEASE_FILE)) { ", + "Start-Sleep -Milliseconds 20 ", + "}; ", + "[Console]::Out.Write('process-out'); ", + "[Console]::Error.Write('process-err')", + ) + .to_string(), + ] + } else { + vec![ + "sh".to_string(), + "-c".to_string(), + concat!( + "printf process > \"$CODEX_PROCESS_EXEC_PROBE_FILE\"; ", + "while [ ! -e \"$CODEX_PROCESS_EXEC_RELEASE_FILE\" ]; do sleep 0.05; done; ", + "printf process-out; ", + "printf process-err >&2", + ) + .to_string(), + ] + }; + let env = HashMap::from([ + ( + "CODEX_PROCESS_EXEC_PROBE_FILE".to_string(), + Some(probe_file.display().to_string()), + ), + ( + "CODEX_PROCESS_EXEC_RELEASE_FILE".to_string(), + Some(release_file.display().to_string()), + ), + ]); + let spawn_request_id = mcp + .send_process_spawn_request(ProcessSpawnParams { + env: Some(env), + output_bytes_cap: Some(None), + timeout_ms: Some(None), + ..process_spawn_params(process_handle.clone(), codex_home.path(), command)? + }) + .await?; + + let response = mcp + .read_stream_until_response_message(RequestId::Integer(spawn_request_id)) + .await?; + assert_eq!(response.result, serde_json::json!({})); + + wait_for_file(&probe_file).await?; + assert_eq!(std::fs::read_to_string(&probe_file)?, "process"); + std::fs::write(&release_file, "release")?; + + let exited = read_process_exited(&mut mcp).await?; + assert_eq!( + exited, + ProcessExitedNotification { + process_handle, + exit_code: 0, + stdout: "process-out".to_string(), + stdout_cap_reached: false, + stderr: "process-err".to_string(), + stderr_cap_reached: false, + } + ); + Ok(()) +} + +#[tokio::test] +async fn process_spawn_reports_buffered_output_cap_reached() -> Result<()> { + let codex_home = TempDir::new()?; + let (_server, mut mcp) = initialized_mcp(codex_home.path()).await?; + + let process_handle = "capped-one-shot-1".to_string(); + let command = if cfg!(windows) { + vec![ + "powershell.exe".to_string(), + "-NoProfile".to_string(), + "-NonInteractive".to_string(), + "-Command".to_string(), + "[Console]::Out.Write('abcde'); [Console]::Error.Write('12345')".to_string(), + ] + } else { + vec![ + "sh".to_string(), + "-lc".to_string(), + "printf abcde; printf 12345 >&2".to_string(), + ] + }; + let spawn_request_id = mcp + .send_process_spawn_request(ProcessSpawnParams { + output_bytes_cap: Some(Some(3)), + ..process_spawn_params(process_handle.clone(), codex_home.path(), command)? + }) + .await?; + + let response = mcp + .read_stream_until_response_message(RequestId::Integer(spawn_request_id)) + .await?; + assert_eq!(response.result, serde_json::json!({})); + + let exited = read_process_exited(&mut mcp).await?; + assert_eq!( + exited, + ProcessExitedNotification { + process_handle, + exit_code: 0, + stdout: "abc".to_string(), + stdout_cap_reached: true, + stderr: "123".to_string(), + stderr_cap_reached: true, + } + ); + + Ok(()) +} + +#[tokio::test] +async fn process_kill_terminates_running_process() -> Result<()> { + let codex_home = TempDir::new()?; + let (_server, mut mcp) = initialized_mcp(codex_home.path()).await?; + + let process_handle = "sleep-process-1".to_string(); + let command = if cfg!(windows) { + vec![ + "powershell.exe".to_string(), + "-NoProfile".to_string(), + "-NonInteractive".to_string(), + "-Command".to_string(), + "Start-Sleep -Seconds 30".to_string(), + ] + } else { + vec!["sh".to_string(), "-lc".to_string(), "sleep 30".to_string()] + }; + let spawn_request_id = mcp + .send_process_spawn_request(process_spawn_params( + process_handle.clone(), + codex_home.path(), + command, + )?) + .await?; + + let response = mcp + .read_stream_until_response_message(RequestId::Integer(spawn_request_id)) + .await?; + assert_eq!(response.result, serde_json::json!({})); + + let kill_request_id = mcp + .send_process_kill_request(ProcessKillParams { + process_handle: process_handle.clone(), + }) + .await?; + let kill_response = mcp + .read_stream_until_response_message(RequestId::Integer(kill_request_id)) + .await?; + assert_eq!(kill_response.result, serde_json::json!({})); + + let exited = read_process_exited(&mut mcp).await?; + assert_eq!(exited.process_handle, process_handle); + assert_ne!(exited.exit_code, 0); + assert_eq!(exited.stdout, ""); + assert!(!exited.stdout_cap_reached); + assert_eq!(exited.stderr, ""); + assert!(!exited.stderr_cap_reached); + + Ok(()) +} + +async fn initialized_mcp(codex_home: &Path) -> Result<(MockServer, McpProcess)> { + let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await; + create_config_toml(codex_home, &server.uri(), "never")?; + let mut mcp = McpProcess::new(codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + Ok((server, mcp)) +} + +fn process_spawn_params( + process_handle: String, + cwd: &Path, + command: Vec, +) -> Result { + Ok(ProcessSpawnParams { + command, + process_handle, + cwd: AbsolutePathBuf::try_from(cwd)?, + tty: false, + stream_stdin: false, + stream_stdout_stderr: false, + output_bytes_cap: None, + timeout_ms: None, + env: None, + size: None, + }) +} + +async fn read_process_exited(mcp: &mut McpProcess) -> Result { + let notification = mcp + .read_stream_until_notification_message("process/exited") + .await?; + let params = notification + .params + .context("process/exited notification should include params")?; + serde_json::from_value(params).context("deserialize process/exited notification") +} + +async fn wait_for_file(path: &Path) -> Result<()> { + timeout(DEFAULT_READ_TIMEOUT, async { + while !path.exists() { + sleep(Duration::from_millis(20)).await; + } + }) + .await + .context("timed out waiting for process probe file") +} diff --git a/codex-rs/tui/src/app/app_server_event_targets.rs b/codex-rs/tui/src/app/app_server_event_targets.rs index bc0567df51..382a82a19f 100644 --- a/codex-rs/tui/src/app/app_server_event_targets.rs +++ b/codex-rs/tui/src/app/app_server_event_targets.rs @@ -153,6 +153,8 @@ pub(super) fn server_notification_thread_target( | ServerNotification::FuzzyFileSearchSessionUpdated(_) | ServerNotification::FuzzyFileSearchSessionCompleted(_) | ServerNotification::CommandExecOutputDelta(_) + | ServerNotification::ProcessOutputDelta(_) + | ServerNotification::ProcessExited(_) | ServerNotification::FsChanged(_) | ServerNotification::WindowsWorldWritableWarning(_) | ServerNotification::WindowsSandboxSetupCompleted(_) diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 8a61e79fe0..2b6166ff2d 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -6390,6 +6390,8 @@ impl ChatWidget { | ServerNotification::ThreadUnarchived(_) | ServerNotification::RawResponseItemCompleted(_) | ServerNotification::CommandExecOutputDelta(_) + | ServerNotification::ProcessOutputDelta(_) + | ServerNotification::ProcessExited(_) | ServerNotification::FileChangePatchUpdated(_) | ServerNotification::McpToolCallProgress(_) | ServerNotification::McpServerOauthLoginCompleted(_)