Compare commits

...

6 Commits

Author SHA1 Message Date
Michael Bolin
f375477147 core: resolve host_executable() rules during preflight 2026-02-27 13:43:36 -08:00
Michael Bolin
acf683899c core: adopt host_executable() rules in zsh-fork 2026-02-27 13:43:36 -08:00
Michael Bolin
b148d98e0e execpolicy: add host_executable() path mappings (#12964)
## Why

`execpolicy` currently keys `prefix_rule()` matching off the literal
first token. That works for rules like `["/usr/bin/git"]`, but it means
shared basename rules such as `["git"]` do not help when a caller passes
an absolute executable path like `/usr/bin/git`.

This PR lays the groundwork for basename-aware matching without changing
existing callers yet. It adds typed host-executable metadata and an
opt-in resolution path in `codex-execpolicy`, so a follow-up PR can
adopt the new behavior in `unix_escalation.rs` and other call sites
without having to redesign the policy layer first.

## What Changed

- added `host_executable(name = ..., paths = [...])` to the execpolicy
parser and validated it with `AbsolutePathBuf`
- stored host executable mappings separately from prefix rules inside
`Policy`
- added `MatchOptions` and opt-in `*_with_options()` APIs that preserve
existing behavior by default
- implemented exact-first matching with optional basename fallback,
gated by `host_executable()` allowlists when present
- normalized executable names for cross-platform matching so Windows
paths like `git.exe` can satisfy `host_executable(name = "git", ...)`
- updated `match` / `not_match` example validation to exercise the
host-executable resolution path instead of only raw prefix-rule matching
- preserved source locations for deferred example-validation errors so
policy load failures still point at the right file and line
- surfaced `resolvedProgram` on `RuleMatch` so callers can tell when a
basename rule matched an absolute executable path
- preserved host executable metadata when requirements policies overlay
file-based policies in `core/src/exec_policy.rs`
- documented the new rule shape and CLI behavior in
`execpolicy/README.md`

## Verification

- `cargo test -p codex-execpolicy`
- added coverage in `execpolicy/tests/basic.rs` for parsing, precedence,
empty allowlists, basename fallback, exact-match precedence, and
host-executable-backed `match` / `not_match` examples
- added a regression test in `core/src/exec_policy.rs` to verify
requirements overlays preserve `host_executable()` metadata
- verified `cargo test -p codex-core --lib`, including source-rendering
coverage for deferred validation errors
2026-02-27 12:59:24 -08:00
Michael Bolin
6e0f1e9469 fix: disable Bazel builds in CI on ubuntu-24.04-arm until we can stabilize them (#13055)
The other three Bazel builds have experienced low flakiness in my
experience whereas I find myself re-running the `ubuntu-24.04-arm` jobs
often to shake out the flakes. Disabling for now.
2026-02-27 12:49:13 -08:00
Ruslan Nigmatullin
69d7a456bb app-server: Replay pending item requests on thread/resume (#12560)
Replay pending client requests after `thread/resume` and emit resolved
notifications when those requests clear so approval/input UI state stays
in sync after reconnects and across subscribed clients.

Affected RPCs:
- `item/commandExecution/requestApproval`
- `item/fileChange/requestApproval`
- `item/tool/requestUserInput`

Motivation:
- Resumed clients need to see pending approval/input requests that were
already outstanding before the reconnect.
- Clients also need an explicit signal when a pending request resolves
or is cleared so stale UI can be removed on turn start, completion, or
interruption.

Implementation notes:
- Use pending client requests from `OutgoingMessageSender` in order to
replay them after `thread/resume` attaches the connection, using
original request ids.
- Emit `serverRequest/resolved` when pending requests are answered
or cleared by lifecycle cleanup.
- Update the app-server protocol schema, generated TypeScript bindings,
and README docs for the replay/resolution flow.

High-level test plan:
- Added automated coverage for replaying pending command execution and
file change approval requests on `thread/resume`.
- Added automated coverage for resolved notifications in command
approval, file change approval, request_user_input, turn start, and turn
interrupt flows.
- Verified schema/docs updates in the relevant protocol and app-server
tests.

Manual testing:
- Tested reconnect/resume with multiple connections.
- Confirmed state stayed in sync between connections.
2026-02-27 12:45:59 -08:00
Michael Bolin
66b0adb34c app-server: deflake running thread resume tests (#13047)
## Why

CI has been intermittently failing in
`suite::v2::thread_resume::thread_resume_rejoins_running_thread_even_with_override_mismatch`
because these running-thread resume tests treated `turn/started` as
proof that the thread was already active.

That signal is too early for this path. `turn/started` is emitted
optimistically from
[`turn_start`](1103d0037e/codex-rs/app-server/src/codex_message_processor.rs (L5757-L5767)).
In `single_client_mode`, the listener skips `current_turn_history`
tracking in
[`codex_message_processor.rs`](1103d0037e/codex-rs/app-server/src/codex_message_processor.rs (L6461-L6465)),
so running-thread resume still depends on `ThreadWatchManager` observing
the core `TurnStarted` event in
[`bespoke_event_handling.rs`](1103d0037e/codex-rs/app-server/src/bespoke_event_handling.rs (L152-L156)).
If `thread/resume` lands in that window, the thread can still look
`Idle` and the assertion flakes.

## What

- Add a helper in `codex-rs/app-server/tests/suite/v2/thread_resume.rs`
that waits for `thread/status/changed` to report `Active` for the target
thread.
- Use that public v2 notification as the synchronization barrier in the
four running-thread resume tests instead of relying on `turn/started`.

## Follow-up

This PR keeps the fix at the test layer so we can remove the flake
without changing server behavior. A broader runtime fix should still be
considered separately, for example:

- make `turn/start` eagerly transition the thread to `Active` so
`turn/started` and `thread/status/changed` are coherent
- or revisit the `single_client_mode` guard that skips current-turn
tracking for running-thread resume

## Testing

- `cargo test -p codex-app-server thread_resume -- --nocapture`
- `for i in $(seq 1 10); do cargo test -p codex-app-server
'suite::v2::thread_resume::thread_resume_rejoins_running_thread_even_with_override_mismatch'
-- --exact --nocapture; done`
2026-02-27 19:47:30 +00:00
39 changed files with 2666 additions and 274 deletions

View File

@@ -28,14 +28,17 @@ jobs:
target: x86_64-apple-darwin
# Linux
- os: ubuntu-24.04-arm
target: aarch64-unknown-linux-gnu
- os: ubuntu-24.04
target: x86_64-unknown-linux-gnu
- os: ubuntu-24.04-arm
target: aarch64-unknown-linux-musl
- os: ubuntu-24.04
target: x86_64-unknown-linux-musl
# 2026-02-27 Bazel tests have been flaky on arm in CI.
# Disable until we can investigate and stabilize them.
# - os: ubuntu-24.04-arm
# target: aarch64-unknown-linux-musl
# - os: ubuntu-24.04-arm
# target: aarch64-unknown-linux-gnu
# TODO: Enable Windows once we fix the toolchain issues there.
#- os: windows-latest
# target: x86_64-pc-windows-gnullvm

1
codex-rs/Cargo.lock generated
View File

@@ -1900,6 +1900,7 @@ version = "0.0.0"
dependencies = [
"anyhow",
"clap",
"codex-utils-absolute-path",
"multimap",
"pretty_assertions",
"serde",

View File

@@ -1422,6 +1422,32 @@
],
"type": "object"
},
"RequestId": {
"anyOf": [
{
"type": "string"
},
{
"format": "int64",
"type": "integer"
}
]
},
"ServerRequestResolvedNotification": {
"properties": {
"requestId": {
"$ref": "#/definitions/RequestId"
},
"threadId": {
"type": "string"
}
},
"required": [
"requestId",
"threadId"
],
"type": "object"
},
"SessionSource": {
"oneOf": [
{
@@ -3422,6 +3448,26 @@
"title": "Item/fileChange/outputDeltaNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"serverRequest/resolved"
],
"title": "ServerRequest/resolvedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ServerRequestResolvedNotification"
}
},
"required": [
"method",
"params"
],
"title": "ServerRequest/resolvedNotification",
"type": "object"
},
{
"properties": {
"method": {

View File

@@ -357,7 +357,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -382,7 +382,7 @@
"description": "NEW APIs",
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -406,7 +406,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -430,7 +430,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -454,7 +454,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -478,7 +478,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -502,7 +502,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -526,7 +526,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -550,7 +550,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -574,7 +574,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -598,7 +598,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -622,7 +622,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -646,7 +646,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -670,7 +670,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -694,7 +694,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -718,7 +718,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -742,7 +742,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -766,7 +766,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -790,7 +790,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -814,7 +814,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -838,7 +838,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -862,7 +862,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -886,7 +886,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -910,7 +910,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -934,7 +934,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -958,7 +958,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -981,7 +981,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1005,7 +1005,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1029,7 +1029,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1053,7 +1053,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1077,7 +1077,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1100,7 +1100,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1123,7 +1123,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1148,7 +1148,7 @@
"description": "Execute a command (argv vector) under the server's sandbox.",
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1172,7 +1172,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1196,7 +1196,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1220,7 +1220,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1244,7 +1244,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1268,7 +1268,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1292,7 +1292,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1315,7 +1315,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -1339,7 +1339,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -3065,7 +3065,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"message": {
"type": "string"
@@ -4943,7 +4943,7 @@
"$ref": "#/definitions/JSONRPCErrorError"
},
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
}
},
"required": [
@@ -5011,7 +5011,7 @@
"description": "A request that expects a response.",
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"type": "string"
@@ -5030,7 +5030,7 @@
"description": "A successful (non-error) response to a request.",
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"result": true
},
@@ -5544,6 +5544,7 @@
"type": "object"
},
"RequestId": {
"$schema": "http://json-schema.org/draft-07/schema#",
"anyOf": [
{
"type": "string"
@@ -5553,7 +5554,7 @@
"type": "integer"
}
],
"description": "ID of a request, which can be either a string or an integer."
"title": "RequestId"
},
"RequestUserInputQuestion": {
"properties": {
@@ -6194,6 +6195,26 @@
"title": "Item/fileChange/outputDeltaNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"serverRequest/resolved"
],
"title": "ServerRequest/resolvedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ServerRequestResolvedNotification"
}
},
"required": [
"method",
"params"
],
"title": "ServerRequest/resolvedNotification",
"type": "object"
},
{
"properties": {
"method": {
@@ -6647,7 +6668,7 @@
"description": "NEW APIs Sent when approval is requested for a specific command execution. This request is used for Turns started via turn/start.",
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -6672,7 +6693,7 @@
"description": "Sent when approval is requested for a specific file change. This request is used for Turns started via turn/start.",
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -6697,7 +6718,7 @@
"description": "EXPERIMENTAL - Request input from the user for a tool call.",
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -6722,7 +6743,7 @@
"description": "Execute a dynamic tool call on the client.",
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -6746,7 +6767,7 @@
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -6771,7 +6792,7 @@
"description": "DEPRECATED APIs below Request to approve a patch. This request is used for Turns started via the legacy APIs (i.e. SendUserTurn, SendUserMessage).",
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -6796,7 +6817,7 @@
"description": "Request to exec a command. This request is used for Turns started via the legacy APIs (i.e. SendUserTurn, SendUserMessage).",
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
@@ -11110,6 +11131,17 @@
],
"type": "object"
},
"RequestId": {
"anyOf": [
{
"type": "string"
},
{
"format": "int64",
"type": "integer"
}
]
},
"ResidencyRequirement": {
"enum": [
"us"
@@ -11849,6 +11881,23 @@
},
"type": "object"
},
"ServerRequestResolvedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"requestId": {
"$ref": "#/definitions/v2/RequestId"
},
"threadId": {
"type": "string"
}
},
"required": [
"requestId",
"threadId"
],
"title": "ServerRequestResolvedNotification",
"type": "object"
},
"SessionSource": {
"oneOf": [
{

View File

@@ -0,0 +1,30 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"RequestId": {
"anyOf": [
{
"type": "string"
},
{
"format": "int64",
"type": "integer"
}
]
}
},
"properties": {
"requestId": {
"$ref": "#/definitions/RequestId"
},
"threadId": {
"type": "string"
}
},
"required": [
"requestId",
"threadId"
],
"title": "ServerRequestResolvedNotification",
"type": "object"
}

View File

@@ -27,6 +27,7 @@ import type { RawResponseItemCompletedNotification } from "./v2/RawResponseItemC
import type { ReasoningSummaryPartAddedNotification } from "./v2/ReasoningSummaryPartAddedNotification";
import type { ReasoningSummaryTextDeltaNotification } from "./v2/ReasoningSummaryTextDeltaNotification";
import type { ReasoningTextDeltaNotification } from "./v2/ReasoningTextDeltaNotification";
import type { ServerRequestResolvedNotification } from "./v2/ServerRequestResolvedNotification";
import type { TerminalInteractionNotification } from "./v2/TerminalInteractionNotification";
import type { ThreadArchivedNotification } from "./v2/ThreadArchivedNotification";
import type { ThreadClosedNotification } from "./v2/ThreadClosedNotification";
@@ -50,4 +51,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW
/**
* Notification sent from the server to the client.
*/
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "serverRequest/resolved", "params": ServerRequestResolvedNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { RequestId } from "../RequestId";
export type ServerRequestResolvedNotification = { threadId: string, requestId: RequestId, };

View File

@@ -142,6 +142,7 @@ export type { ReviewTarget } from "./ReviewTarget";
export type { SandboxMode } from "./SandboxMode";
export type { SandboxPolicy } from "./SandboxPolicy";
export type { SandboxWorkspaceWrite } from "./SandboxWorkspaceWrite";
export type { ServerRequestResolvedNotification } from "./ServerRequestResolvedNotification";
export type { SessionSource } from "./SessionSource";
export type { SkillDependencies } from "./SkillDependencies";
export type { SkillErrorInfo } from "./SkillErrorInfo";

View File

@@ -8,7 +8,9 @@ use ts_rs::TS;
pub const JSONRPC_VERSION: &str = "2.0";
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, Hash, Eq, JsonSchema, TS)]
#[derive(
Debug, Clone, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash, Eq, JsonSchema, TS,
)]
#[serde(untagged)]
pub enum RequestId {
String(String),

View File

@@ -548,6 +548,14 @@ macro_rules! server_request_definitions {
)*
}
impl ServerRequest {
pub fn id(&self) -> &RequestId {
match self {
$(Self::$variant { request_id, .. } => request_id,)*
}
}
}
#[derive(Debug, Clone, PartialEq, JsonSchema)]
#[allow(clippy::large_enum_variant)]
pub enum ServerRequestPayload {
@@ -838,6 +846,7 @@ server_notification_definitions! {
CommandExecutionOutputDelta => "item/commandExecution/outputDelta" (v2::CommandExecutionOutputDeltaNotification),
TerminalInteraction => "item/commandExecution/terminalInteraction" (v2::TerminalInteractionNotification),
FileChangeOutputDelta => "item/fileChange/outputDelta" (v2::FileChangeOutputDeltaNotification),
ServerRequestResolved => "serverRequest/resolved" (v2::ServerRequestResolvedNotification),
McpToolCallProgress => "item/mcpToolCall/progress" (v2::McpToolCallProgressNotification),
McpServerOauthLoginCompleted => "mcpServer/oauthLogin/completed" (v2::McpServerOauthLoginCompletedNotification),
AccountUpdated => "account/updated" (v2::AccountUpdatedNotification),
@@ -1106,6 +1115,7 @@ mod tests {
);
let payload = ServerRequestPayload::ExecCommandApproval(params);
assert_eq!(request.id(), &RequestId::Integer(7));
assert_eq!(payload.request_with_id(RequestId::Integer(7)), request);
Ok(())
}

View File

@@ -22,6 +22,7 @@ use codex_protocol::models::MessagePhase;
use codex_protocol::protocol::AgentReasoningEvent;
use codex_protocol::protocol::AgentReasoningRawContentEvent;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::ContextCompactedEvent;
use codex_protocol::protocol::DynamicToolCallResponseEvent;
@@ -126,6 +127,9 @@ impl ThreadHistoryBuilder {
EventMsg::WebSearchEnd(payload) => self.handle_web_search_end(payload),
EventMsg::ExecCommandBegin(payload) => self.handle_exec_command_begin(payload),
EventMsg::ExecCommandEnd(payload) => self.handle_exec_command_end(payload),
EventMsg::ApplyPatchApprovalRequest(payload) => {
self.handle_apply_patch_approval_request(payload)
}
EventMsg::PatchApplyBegin(payload) => self.handle_patch_apply_begin(payload),
EventMsg::PatchApplyEnd(payload) => self.handle_patch_apply_end(payload),
EventMsg::DynamicToolCallRequest(payload) => {
@@ -364,6 +368,19 @@ impl ThreadHistoryBuilder {
self.upsert_item_in_turn_id(&payload.turn_id, item);
}
fn handle_apply_patch_approval_request(&mut self, payload: &ApplyPatchApprovalRequestEvent) {
let item = ThreadItem::FileChange {
id: payload.call_id.clone(),
changes: convert_patch_changes(&payload.changes),
status: PatchApplyStatus::InProgress,
};
if payload.turn_id.is_empty() {
self.upsert_item_in_current_turn(item);
} else {
self.upsert_item_in_turn_id(&payload.turn_id, item);
}
}
fn handle_patch_apply_begin(&mut self, payload: &PatchApplyBeginEvent) {
let item = ThreadItem::FileChange {
id: payload.call_id.clone(),
@@ -1080,6 +1097,7 @@ mod tests {
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::AgentReasoningEvent;
use codex_protocol::protocol::AgentReasoningRawContentEvent;
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::DynamicToolCallResponseEvent;
@@ -1088,6 +1106,7 @@ mod tests {
use codex_protocol::protocol::ItemStartedEvent;
use codex_protocol::protocol::McpInvocation;
use codex_protocol::protocol::McpToolCallEndEvent;
use codex_protocol::protocol::PatchApplyBeginEvent;
use codex_protocol::protocol::ThreadRolledBackEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
@@ -1980,6 +1999,133 @@ mod tests {
);
}
#[test]
fn patch_apply_begin_updates_active_turn_snapshot_with_file_change() {
let turn_id = "turn-1";
let mut builder = ThreadHistoryBuilder::new();
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
message: "apply patch".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "patch-call".into(),
turn_id: turn_id.to_string(),
auto_approved: false,
changes: [(
PathBuf::from("README.md"),
codex_protocol::protocol::FileChange::Add {
content: "hello\n".into(),
},
)]
.into_iter()
.collect(),
}),
];
for event in &events {
builder.handle_event(event);
}
let snapshot = builder
.active_turn_snapshot()
.expect("active turn snapshot");
assert_eq!(snapshot.id, turn_id);
assert_eq!(snapshot.status, TurnStatus::InProgress);
assert_eq!(
snapshot.items,
vec![
ThreadItem::UserMessage {
id: "item-1".into(),
content: vec![UserInput::Text {
text: "apply patch".into(),
text_elements: Vec::new(),
}],
},
ThreadItem::FileChange {
id: "patch-call".into(),
changes: vec![FileUpdateChange {
path: "README.md".into(),
kind: PatchChangeKind::Add,
diff: "hello\n".into(),
}],
status: PatchApplyStatus::InProgress,
},
]
);
}
#[test]
fn apply_patch_approval_request_updates_active_turn_snapshot_with_file_change() {
let turn_id = "turn-1";
let mut builder = ThreadHistoryBuilder::new();
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
message: "apply patch".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "patch-call".into(),
turn_id: turn_id.to_string(),
changes: [(
PathBuf::from("README.md"),
codex_protocol::protocol::FileChange::Add {
content: "hello\n".into(),
},
)]
.into_iter()
.collect(),
reason: None,
grant_root: None,
}),
];
for event in &events {
builder.handle_event(event);
}
let snapshot = builder
.active_turn_snapshot()
.expect("active turn snapshot");
assert_eq!(snapshot.id, turn_id);
assert_eq!(snapshot.status, TurnStatus::InProgress);
assert_eq!(
snapshot.items,
vec![
ThreadItem::UserMessage {
id: "item-1".into(),
content: vec![UserInput::Text {
text: "apply patch".into(),
text_elements: Vec::new(),
}],
},
ThreadItem::FileChange {
id: "patch-call".into(),
changes: vec![FileUpdateChange {
path: "README.md".into(),
kind: PatchChangeKind::Add,
diff: "hello\n".into(),
}],
status: PatchApplyStatus::InProgress,
},
]
);
}
#[test]
fn late_turn_complete_does_not_close_active_turn() {
let events = vec![

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::path::PathBuf;
use crate::RequestId;
use crate::protocol::common::AuthMode;
use codex_experimental_api_macros::ExperimentalApi;
use codex_protocol::account::PlanType;
@@ -3745,6 +3746,14 @@ pub struct FileChangeOutputDeltaNotification {
pub delta: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ServerRequestResolvedNotification {
pub thread_id: String,
pub request_id: RequestId,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -712,7 +712,8 @@ Order of messages:
1. `item/started` — shows the pending `commandExecution` item with `command`, `cwd`, and other fields so you can render the proposed action.
2. `item/commandExecution/requestApproval` (request) — carries the same `itemId`, `threadId`, `turnId`, optionally `approvalId` (for subcommand callbacks), and `reason`. For normal command approvals, it also includes `command`, `cwd`, and `commandActions` for friendly display. When `initialize.params.capabilities.experimentalApi = true`, it may also include experimental `additionalPermissions` describing requested per-command sandbox access; any filesystem paths in that payload are absolute on the wire. For network-only approvals, those command fields may be omitted and `networkApprovalContext` is provided instead. Optional persistence hints may also be included via `proposedExecpolicyAmendment` and `proposedNetworkPolicyAmendments`. Clients can prefer `availableDecisions` when present to render the exact set of choices the server wants to expose, while still falling back to the older heuristics if it is omitted.
3. Client response — for example `{ "decision": "accept" }`, `{ "decision": "acceptForSession" }`, `{ "decision": { "acceptWithExecpolicyAmendment": { "execpolicy_amendment": [...] } } }`, `{ "decision": { "applyNetworkPolicyAmendment": { "network_policy_amendment": { "host": "example.com", "action": "allow" } } } }`, `{ "decision": "decline" }`, or `{ "decision": "cancel" }`.
4. `item/completed` — final `commandExecution` item with `status: "completed" | "failed" | "declined"` and execution output. Render this as the authoritative result.
4. `serverRequest/resolved``{ threadId, requestId }` confirms the pending request has been resolved or cleared, including lifecycle cleanup on turn start/complete/interrupt.
5. `item/completed` — final `commandExecution` item with `status: "completed" | "failed" | "declined"` and execution output. Render this as the authoritative result.
### File change approvals
@@ -721,10 +722,15 @@ Order of messages:
1. `item/started` — emits a `fileChange` item with `changes` (diff chunk summaries) and `status: "inProgress"`. Show the proposed edits and paths to the user.
2. `item/fileChange/requestApproval` (request) — includes `itemId`, `threadId`, `turnId`, and an optional `reason`.
3. Client response — `{ "decision": "accept" }` or `{ "decision": "decline" }`.
4. `item/completed` — returns the same `fileChange` item with `status` updated to `completed`, `failed`, or `declined` after the patch attempt. Rely on this to show success/failure and finalize the diff state in your UI.
4. `serverRequest/resolved``{ threadId, requestId }` confirms the pending request has been resolved or cleared, including lifecycle cleanup on turn start/complete/interrupt.
5. `item/completed` — returns the same `fileChange` item with `status` updated to `completed`, `failed`, or `declined` after the patch attempt. Rely on this to show success/failure and finalize the diff state in your UI.
UI guidance for IDEs: surface an approval dialog as soon as the request arrives. The turn will proceed after the server receives a response to the approval request. The terminal `item/completed` notification will be sent with the appropriate status.
### request_user_input
When the client responds to `item/tool/requestUserInput`, the server emits `serverRequest/resolved` with `{ threadId, requestId }`. If the pending request is cleared by turn start, turn completion, or turn interruption before the client answers, the server emits the same notification for that cleanup.
### Dynamic tool calls (experimental)
`dynamicTools` on `thread/start` and the corresponding `item/tool/call` request/response flow are experimental APIs. To enable them, set `initialize.params.capabilities.experimentalApi = true`.

View File

@@ -6,6 +6,8 @@ use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::outgoing_message::ClientRequestResult;
use crate::outgoing_message::ThreadScopedOutgoingMessageSender;
use crate::server_request_error::is_turn_transition_server_request_error;
use crate::thread_state::ThreadListenerCommand;
use crate::thread_state::ThreadState;
use crate::thread_state::TurnSummary;
use crate::thread_status::ThreadWatchActiveGuard;
@@ -56,6 +58,7 @@ use codex_app_server_protocol::RawResponseItemCompletedNotification;
use codex_app_server_protocol::ReasoningSummaryPartAddedNotification;
use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
use codex_app_server_protocol::ReasoningTextDeltaNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::TerminalInteractionNotification;
@@ -132,6 +135,38 @@ struct CommandExecutionCompletionItem {
command_actions: Vec<V2ParsedCommand>,
}
async fn resolve_server_request_on_thread_listener(
thread_state: &Arc<Mutex<ThreadState>>,
request_id: RequestId,
) {
let (completion_tx, completion_rx) = oneshot::channel();
let listener_command_tx = {
let state = thread_state.lock().await;
state.listener_command_tx()
};
let Some(listener_command_tx) = listener_command_tx else {
error!("failed to remove pending client request: thread listener is not running");
return;
};
if listener_command_tx
.send(ThreadListenerCommand::ResolveServerRequest {
request_id,
completion_tx,
})
.is_err()
{
error!(
"failed to remove pending client request: thread listener command channel is closed"
);
return;
}
if let Err(err) = completion_rx.await {
error!("failed to remove pending client request: {err}");
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn apply_bespoke_event_handling(
event: Event,
@@ -151,11 +186,15 @@ pub(crate) async fn apply_bespoke_event_handling(
} = event;
match msg {
EventMsg::TurnStarted(_) => {
// While not technically necessary as it was already done on TurnComplete, be extra cautios and abort any pending server requests.
outgoing.abort_pending_server_requests().await;
thread_watch_manager
.note_turn_started(&conversation_id.to_string())
.await;
}
EventMsg::TurnComplete(_ev) => {
// All per-thread requests are bound to a turn, so abort them.
outgoing.abort_pending_server_requests().await;
let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some();
thread_watch_manager
.note_turn_completed(&conversation_id.to_string(), turn_failed)
@@ -263,7 +302,7 @@ pub(crate) async fn apply_bespoke_event_handling(
reason,
grant_root,
};
let rx = outgoing
let (_pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
.await;
tokio::spawn(async move {
@@ -307,7 +346,7 @@ pub(crate) async fn apply_bespoke_event_handling(
reason,
grant_root,
};
let rx = outgoing
let (pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::FileChangeRequestApproval(params))
.await;
tokio::spawn(async move {
@@ -316,6 +355,7 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id,
item_id,
patch_changes,
pending_request_id,
rx,
conversation,
outgoing,
@@ -362,7 +402,7 @@ pub(crate) async fn apply_bespoke_event_handling(
reason,
parsed_cmd,
};
let rx = outgoing
let (_pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::ExecCommandApproval(params))
.await;
tokio::spawn(async move {
@@ -435,7 +475,7 @@ pub(crate) async fn apply_bespoke_event_handling(
proposed_network_policy_amendments: proposed_network_policy_amendments_v2,
available_decisions: Some(available_decisions),
};
let rx = outgoing
let (pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::CommandExecutionRequestApproval(
params,
))
@@ -447,6 +487,7 @@ pub(crate) async fn apply_bespoke_event_handling(
approval_id,
call_id,
completion_item,
pending_request_id,
rx,
conversation,
outgoing,
@@ -489,14 +530,16 @@ pub(crate) async fn apply_bespoke_event_handling(
item_id: request.call_id,
questions,
};
let rx = outgoing
let (pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::ToolRequestUserInput(params))
.await;
tokio::spawn(async move {
on_request_user_input_response(
event_turn_id,
pending_request_id,
rx,
conversation,
thread_state,
user_input_guard,
)
.await;
@@ -550,7 +593,7 @@ pub(crate) async fn apply_bespoke_event_handling(
tool: tool.clone(),
arguments: arguments.clone(),
};
let rx = outgoing
let (_pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::DynamicToolCall(params))
.await;
tokio::spawn(async move {
@@ -1136,6 +1179,7 @@ pub(crate) async fn apply_bespoke_event_handling(
// Until we migrate the core to be aware of a first class FileChangeItem
// and emit the corresponding EventMsg, we repurpose the call_id as the item_id.
let item_id = patch_begin_event.call_id.clone();
let changes = convert_patch_changes(&patch_begin_event.changes);
let first_start = {
let mut state = thread_state.lock().await;
@@ -1147,7 +1191,7 @@ pub(crate) async fn apply_bespoke_event_handling(
if first_start {
let item = ThreadItem::FileChange {
id: item_id.clone(),
changes: convert_patch_changes(&patch_begin_event.changes),
changes,
status: PatchApplyStatus::InProgress,
};
let notification = ItemStartedNotification {
@@ -1329,6 +1373,8 @@ pub(crate) async fn apply_bespoke_event_handling(
}
// If this is a TurnAborted, reply to any pending interrupt requests.
EventMsg::TurnAborted(turn_aborted_event) => {
// All per-thread requests are bound to a turn, so abort them.
outgoing.abort_pending_server_requests().await;
let pending = {
let mut state = thread_state.lock().await;
std::mem::take(&mut state.pending_interrupts)
@@ -1725,6 +1771,7 @@ async fn on_patch_approval_response(
let response = receiver.await;
let value = match response {
Ok(Ok(value)) => value,
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
if let Err(submit_err) = codex
@@ -1781,6 +1828,7 @@ async fn on_exec_approval_response(
let response = receiver.await;
let value = match response {
Ok(Ok(value)) => value,
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
return;
@@ -1816,14 +1864,18 @@ async fn on_exec_approval_response(
async fn on_request_user_input_response(
event_turn_id: String,
pending_request_id: RequestId,
receiver: oneshot::Receiver<ClientRequestResult>,
conversation: Arc<CodexThread>,
thread_state: Arc<Mutex<ThreadState>>,
user_input_guard: ThreadWatchActiveGuard,
) {
let response = receiver.await;
resolve_server_request_on_thread_listener(&thread_state, pending_request_id).await;
drop(user_input_guard);
let value = match response {
Ok(Ok(value)) => value,
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
let empty = CoreRequestUserInputResponse {
@@ -1934,6 +1986,7 @@ async fn on_file_change_request_approval_response(
conversation_id: ThreadId,
item_id: String,
changes: Vec<FileUpdateChange>,
pending_request_id: RequestId,
receiver: oneshot::Receiver<ClientRequestResult>,
codex: Arc<CodexThread>,
outgoing: ThreadScopedOutgoingMessageSender,
@@ -1941,6 +1994,7 @@ async fn on_file_change_request_approval_response(
permission_guard: ThreadWatchActiveGuard,
) {
let response = receiver.await;
resolve_server_request_on_thread_listener(&thread_state, pending_request_id).await;
drop(permission_guard);
let (decision, completion_status) = match response {
Ok(Ok(value)) => {
@@ -1958,6 +2012,7 @@ async fn on_file_change_request_approval_response(
// Only short-circuit on declines/cancels/failures.
(decision, completion_status)
}
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
(ReviewDecision::Denied, Some(PatchApplyStatus::Failed))
@@ -1999,6 +2054,7 @@ async fn on_command_execution_request_approval_response(
approval_id: Option<String>,
item_id: String,
completion_item: Option<CommandExecutionCompletionItem>,
pending_request_id: RequestId,
receiver: oneshot::Receiver<ClientRequestResult>,
conversation: Arc<CodexThread>,
outgoing: ThreadScopedOutgoingMessageSender,
@@ -2006,6 +2062,7 @@ async fn on_command_execution_request_approval_response(
permission_guard: ThreadWatchActiveGuard,
) {
let response = receiver.await;
resolve_server_request_on_thread_listener(&thread_state, pending_request_id).await;
drop(permission_guard);
let (decision, completion_status) = match response {
Ok(Ok(value)) => {
@@ -2057,6 +2114,7 @@ async fn on_command_execution_request_approval_response(
};
(decision, completion_status)
}
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
(ReviewDecision::Denied, Some(CommandExecutionStatus::Failed))

View File

@@ -100,6 +100,7 @@ use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::ProductSurface as ApiProductSurface;
use codex_app_server_protocol::RemoveConversationListenerParams;
use codex_app_server_protocol::RemoveConversationSubscriptionResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ResumeConversationParams;
use codex_app_server_protocol::ResumeConversationResponse;
use codex_app_server_protocol::ReviewDelivery as ApiReviewDelivery;
@@ -112,6 +113,7 @@ use codex_app_server_protocol::SendUserMessageResponse;
use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::SendUserTurnResponse;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestResolvedNotification;
use codex_app_server_protocol::SessionConfiguredNotification;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::SetDefaultModelResponse;
@@ -297,8 +299,12 @@ use tracing::info;
use tracing::warn;
use uuid::Uuid;
#[cfg(test)]
use codex_app_server_protocol::ServerRequest;
use crate::filters::compute_source_filters;
use crate::filters::source_kind_matches;
use crate::thread_state::ThreadListenerCommand;
use crate::thread_state::ThreadState;
use crate::thread_state::ThreadStateManager;
@@ -3221,11 +3227,11 @@ impl CodexMessageProcessor {
};
let command = crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse(
crate::thread_state::PendingThreadResumeRequest {
Box::new(crate::thread_state::PendingThreadResumeRequest {
request_id: request_id.clone(),
rollout_path,
config_snapshot,
},
}),
);
if listener_command_tx.send(command).is_err() {
let err = JSONRPCErrorError {
@@ -4844,7 +4850,9 @@ impl CodexMessageProcessor {
async fn finalize_thread_teardown(&mut self, thread_id: ThreadId) {
self.pending_thread_unloads.lock().await.remove(&thread_id);
self.outgoing.cancel_requests_for_thread(thread_id).await;
self.outgoing
.cancel_requests_for_thread(thread_id, None)
.await;
self.thread_state_manager
.remove_thread_state(thread_id)
.await;
@@ -4905,7 +4913,9 @@ impl CodexMessageProcessor {
self.pending_thread_unloads.lock().await.insert(thread_id);
// Any pending app-server -> client requests for this thread can no longer be
// answered; cancel their callbacks before shutdown/unload.
self.outgoing.cancel_requests_for_thread(thread_id).await;
self.outgoing
.cancel_requests_for_thread(thread_id, None)
.await;
self.thread_state_manager
.remove_thread_state(thread_id)
.await;
@@ -6507,21 +6517,15 @@ impl CodexMessageProcessor {
let Some(listener_command) = listener_command else {
break;
};
match listener_command {
crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse(
resume_request,
) => {
handle_pending_thread_resume_request(
conversation_id,
codex_home.as_path(),
&thread_state,
&thread_watch_manager,
&outgoing_for_task,
resume_request,
)
.await;
}
}
handle_thread_listener_command(
conversation_id,
codex_home.as_path(),
&thread_state,
&thread_watch_manager,
&outgoing_for_task,
listener_command,
)
.await;
}
}
}
@@ -6830,6 +6834,37 @@ impl CodexMessageProcessor {
}
}
async fn handle_thread_listener_command(
conversation_id: ThreadId,
codex_home: &Path,
thread_state: &Arc<Mutex<ThreadState>>,
thread_watch_manager: &ThreadWatchManager,
outgoing: &Arc<OutgoingMessageSender>,
listener_command: ThreadListenerCommand,
) {
match listener_command {
ThreadListenerCommand::SendThreadResumeResponse(resume_request) => {
handle_pending_thread_resume_request(
conversation_id,
codex_home,
thread_state,
thread_watch_manager,
outgoing,
*resume_request,
)
.await;
}
ThreadListenerCommand::ResolveServerRequest {
request_id,
completion_tx,
} => {
resolve_pending_server_request(conversation_id, thread_state, outgoing, request_id)
.await;
let _ = completion_tx.send(());
}
}
}
async fn handle_pending_thread_resume_request(
conversation_id: ThreadId,
codex_home: &Path,
@@ -6918,9 +6953,36 @@ async fn handle_pending_thread_resume_request(
reasoning_effort,
};
outgoing.send_response(request_id, response).await;
outgoing
.replay_requests_to_connection_for_thread(connection_id, conversation_id)
.await;
thread_state.lock().await.add_connection(connection_id);
}
async fn resolve_pending_server_request(
conversation_id: ThreadId,
thread_state: &Arc<Mutex<ThreadState>>,
outgoing: &Arc<OutgoingMessageSender>,
request_id: RequestId,
) {
let thread_id = conversation_id.to_string();
let subscribed_connection_ids = thread_state.lock().await.subscribed_connection_ids();
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
subscribed_connection_ids,
conversation_id,
);
outgoing
.send_server_notification(ServerNotification::ServerRequestResolved(
ServerRequestResolvedNotification {
thread_id,
request_id,
},
))
.await;
}
async fn load_thread_for_running_resume_response(
conversation_id: ThreadId,
rollout_path: &Path,
@@ -7668,7 +7730,11 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
#[cfg(test)]
mod tests {
use super::*;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use anyhow::Result;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use pretty_assertions::assert_eq;
@@ -7862,6 +7928,67 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn aborting_pending_request_clears_pending_state() -> Result<()> {
let thread_id = ThreadId::from_string("bfd12a78-5900-467b-9bc5-d3d35df08191")?;
let thread_state = Arc::new(Mutex::new(ThreadState::default()));
let connection_id = ConnectionId(7);
thread_state.lock().await.add_connection(connection_id);
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8);
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
vec![connection_id],
thread_id,
);
let (request_id, client_request_rx) = thread_outgoing
.send_request(ServerRequestPayload::ToolRequestUserInput(
ToolRequestUserInputParams {
thread_id: thread_id.to_string(),
turn_id: "turn-1".to_string(),
item_id: "call-1".to_string(),
questions: vec![],
},
))
.await;
thread_outgoing.abort_pending_server_requests().await;
let request_message = outgoing_rx.recv().await.expect("request should be sent");
let OutgoingEnvelope::ToConnection {
connection_id: request_connection_id,
message:
OutgoingMessage::Request(ServerRequest::ToolRequestUserInput {
request_id: sent_request_id,
..
}),
} = request_message
else {
panic!("expected tool request to be sent to the subscribed connection");
};
assert_eq!(request_connection_id, connection_id);
assert_eq!(sent_request_id, request_id);
let response = client_request_rx
.await
.expect("callback should be resolved");
let error = response.expect_err("request should be aborted during cleanup");
assert_eq!(
error.message,
"client request resolved because the turn state was changed"
);
assert_eq!(error.data, Some(json!({ "reason": "turnTransition" })));
assert!(
outgoing
.pending_requests_for_thread(thread_id)
.await
.is_empty()
);
assert!(outgoing_rx.try_recv().is_err());
Ok(())
}
#[test]
fn summary_from_state_db_metadata_preserves_agent_nickname() -> Result<()> {
let conversation_id = ThreadId::from_string("bfd12a78-5900-467b-9bc5-d3d35df08191")?;

View File

@@ -9,6 +9,7 @@ use tokio::sync::oneshot;
use tracing::error;
use crate::outgoing_message::ClientRequestResult;
use crate::server_request_error::is_turn_transition_server_request_error;
pub(crate) async fn on_call_response(
call_id: String,
@@ -18,6 +19,7 @@ pub(crate) async fn on_call_response(
let response = receiver.await;
let (response, _error) = match response {
Ok(Ok(value)) => decode_response(value),
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
fallback_response("dynamic tool request failed")

View File

@@ -63,6 +63,7 @@ mod fuzzy_file_search;
mod message_processor;
mod models;
mod outgoing_message;
mod server_request_error;
mod thread_state;
mod thread_status;
mod transport;

View File

@@ -17,6 +17,7 @@ use tokio::sync::oneshot;
use tracing::warn;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::server_request_error::TURN_TRANSITION_PENDING_REQUEST_ERROR_REASON;
#[cfg(test)]
use codex_protocol::account::PlanType;
@@ -62,6 +63,7 @@ pub(crate) struct ThreadScopedOutgoingMessageSender {
struct PendingCallbackEntry {
callback: oneshot::Sender<ClientRequestResult>,
thread_id: Option<ThreadId>,
request: ServerRequest,
}
impl ThreadScopedOutgoingMessageSender {
@@ -80,12 +82,12 @@ impl ThreadScopedOutgoingMessageSender {
pub(crate) async fn send_request(
&self,
payload: ServerRequestPayload,
) -> oneshot::Receiver<ClientRequestResult> {
) -> (RequestId, oneshot::Receiver<ClientRequestResult>) {
self.outgoing
.send_request_to_thread_connections(
self.thread_id,
self.connection_ids.as_slice(),
.send_request_to_connections(
Some(self.connection_ids.as_slice()),
payload,
Some(self.thread_id),
)
.await
}
@@ -99,6 +101,20 @@ impl ThreadScopedOutgoingMessageSender {
.await;
}
pub(crate) async fn abort_pending_server_requests(&self) {
self.outgoing
.cancel_requests_for_thread(
self.thread_id,
Some(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "client request resolved because the turn state was changed"
.to_string(),
data: Some(serde_json::json!({ "reason": TURN_TRANSITION_PENDING_REQUEST_ERROR_REASON })),
}),
)
.await
}
pub(crate) async fn send_response<T: Serialize>(
&self,
request_id: ConnectionRequestId,
@@ -129,38 +145,23 @@ impl OutgoingMessageSender {
&self,
request: ServerRequestPayload,
) -> (RequestId, oneshot::Receiver<ClientRequestResult>) {
self.send_request_with_id_to_connections(&[], request, None)
.await
}
async fn send_request_to_thread_connections(
&self,
thread_id: ThreadId,
connection_ids: &[ConnectionId],
request: ServerRequestPayload,
) -> oneshot::Receiver<ClientRequestResult> {
if connection_ids.is_empty() {
let (_tx, rx) = oneshot::channel();
return rx;
}
let (_request_id, receiver) = self
.send_request_with_id_to_connections(connection_ids, request, Some(thread_id))
.await;
receiver
self.send_request_to_connections(None, request, None).await
}
fn next_request_id(&self) -> RequestId {
RequestId::Integer(self.next_server_request_id.fetch_add(1, Ordering::Relaxed))
}
async fn send_request_with_id_to_connections(
async fn send_request_to_connections(
&self,
connection_ids: &[ConnectionId],
connection_ids: Option<&[ConnectionId]>,
request: ServerRequestPayload,
thread_id: Option<ThreadId>,
) -> (RequestId, oneshot::Receiver<ClientRequestResult>) {
let id = self.next_request_id();
let outgoing_message_id = id.clone();
let request = request.request_with_id(outgoing_message_id.clone());
let (tx_approve, rx_approve) = oneshot::channel();
{
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
@@ -169,36 +170,39 @@ impl OutgoingMessageSender {
PendingCallbackEntry {
callback: tx_approve,
thread_id,
request: request.clone(),
},
);
}
let outgoing_message =
OutgoingMessage::Request(request.request_with_id(outgoing_message_id.clone()));
let send_result = if connection_ids.is_empty() {
self.sender
.send(OutgoingEnvelope::Broadcast {
message: outgoing_message,
})
.await
} else {
let mut send_error = None;
for connection_id in connection_ids {
if let Err(err) = self
.sender
.send(OutgoingEnvelope::ToConnection {
connection_id: *connection_id,
message: outgoing_message.clone(),
let outgoing_message = OutgoingMessage::Request(request);
let send_result = match connection_ids {
None => {
self.sender
.send(OutgoingEnvelope::Broadcast {
message: outgoing_message,
})
.await
{
send_error = Some(err);
break;
}
}
match send_error {
Some(err) => Err(err),
None => Ok(()),
Some(connection_ids) => {
let mut send_error = None;
for connection_id in connection_ids {
if let Err(err) = self
.sender
.send(OutgoingEnvelope::ToConnection {
connection_id: *connection_id,
message: outgoing_message.clone(),
})
.await
{
send_error = Some(err);
break;
}
}
match send_error {
Some(err) => Err(err),
None => Ok(()),
}
}
};
@@ -210,11 +214,28 @@ impl OutgoingMessageSender {
(outgoing_message_id, rx_approve)
}
pub(crate) async fn replay_requests_to_connection_for_thread(
&self,
connection_id: ConnectionId,
thread_id: ThreadId,
) {
let requests = self.pending_requests_for_thread(thread_id).await;
for request in requests {
if let Err(err) = self
.sender
.send(OutgoingEnvelope::ToConnection {
connection_id,
message: OutgoingMessage::Request(request),
})
.await
{
warn!("failed to resend request to client: {err:?}");
}
}
}
pub(crate) async fn notify_client_response(&self, id: RequestId, result: Result) {
let entry = {
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
request_id_to_callback.remove_entry(&id)
};
let entry = self.take_request_callback(&id).await;
match entry {
Some((id, entry)) => {
@@ -229,10 +250,7 @@ impl OutgoingMessageSender {
}
pub(crate) async fn notify_client_error(&self, id: RequestId, error: JSONRPCErrorError) {
let entry = {
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
request_id_to_callback.remove_entry(&id)
};
let entry = self.take_request_callback(&id).await;
match entry {
Some((id, entry)) => {
@@ -248,23 +266,62 @@ impl OutgoingMessageSender {
}
pub(crate) async fn cancel_request(&self, id: &RequestId) -> bool {
let entry = {
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
request_id_to_callback.remove_entry(id)
};
entry.is_some()
self.take_request_callback(id).await.is_some()
}
pub(crate) async fn cancel_requests_for_thread(&self, thread_id: ThreadId) {
async fn take_request_callback(
&self,
id: &RequestId,
) -> Option<(RequestId, PendingCallbackEntry)> {
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
let request_ids = request_id_to_callback
request_id_to_callback.remove_entry(id)
}
pub(crate) async fn pending_requests_for_thread(
&self,
thread_id: ThreadId,
) -> Vec<ServerRequest> {
let request_id_to_callback = self.request_id_to_callback.lock().await;
let mut requests = request_id_to_callback
.iter()
.filter_map(|(request_id, entry)| {
(entry.thread_id == Some(thread_id)).then_some(request_id.clone())
.filter_map(|(_, entry)| {
(entry.thread_id == Some(thread_id)).then_some(entry.request.clone())
})
.collect::<Vec<_>>();
for request_id in request_ids {
request_id_to_callback.remove(&request_id);
requests.sort_by(|left, right| left.id().cmp(right.id()));
requests
}
pub(crate) async fn cancel_requests_for_thread(
&self,
thread_id: ThreadId,
error: Option<JSONRPCErrorError>,
) {
let entries = {
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
let request_ids = request_id_to_callback
.iter()
.filter_map(|(request_id, entry)| {
(entry.thread_id == Some(thread_id)).then_some(request_id.clone())
})
.collect::<Vec<_>>();
let mut entries = Vec::with_capacity(request_ids.len());
for request_id in request_ids {
if let Some(entry) = request_id_to_callback.remove(&request_id) {
entries.push(entry);
}
}
entries
};
if let Some(error) = error {
for entry in entries {
if let Err(err) = entry.callback.send(Err(error.clone())) {
let request_id = entry.request.id();
warn!("could not notify callback for {request_id:?} due to: {err:?}",);
}
}
}
}
@@ -441,14 +498,18 @@ mod tests {
use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
use codex_app_server_protocol::LoginChatGptCompleteNotification;
use codex_app_server_protocol::ModelRerouteReason;
use codex_app_server_protocol::ModelReroutedNotification;
use codex_app_server_protocol::RateLimitSnapshot;
use codex_app_server_protocol::RateLimitWindow;
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_protocol::ThreadId;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::sync::Arc;
use tokio::time::timeout;
use uuid::Uuid;
@@ -723,4 +784,121 @@ mod tests {
.expect("waiter should receive a callback");
assert_eq!(result, Err(error));
}
#[tokio::test]
async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let thread_id = ThreadId::new();
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
vec![ConnectionId(1)],
thread_id,
);
let (dynamic_tool_request_id, _dynamic_tool_waiter) = thread_outgoing
.send_request(ServerRequestPayload::DynamicToolCall(
DynamicToolCallParams {
thread_id: thread_id.to_string(),
turn_id: "turn-1".to_string(),
call_id: "call-0".to_string(),
tool: "tool".to_string(),
arguments: json!({}),
},
))
.await;
let (first_request_id, _first_waiter) = thread_outgoing
.send_request(ServerRequestPayload::ToolRequestUserInput(
ToolRequestUserInputParams {
thread_id: thread_id.to_string(),
turn_id: "turn-1".to_string(),
item_id: "call-1".to_string(),
questions: vec![],
},
))
.await;
let (second_request_id, _second_waiter) = thread_outgoing
.send_request(ServerRequestPayload::FileChangeRequestApproval(
FileChangeRequestApprovalParams {
thread_id: thread_id.to_string(),
turn_id: "turn-1".to_string(),
item_id: "call-2".to_string(),
reason: None,
grant_root: None,
},
))
.await;
let pending_requests = outgoing.pending_requests_for_thread(thread_id).await;
assert_eq!(
pending_requests
.iter()
.map(ServerRequest::id)
.collect::<Vec<_>>(),
vec![
&dynamic_tool_request_id,
&first_request_id,
&second_request_id
]
);
}
#[tokio::test]
async fn cancel_requests_for_thread_cancels_all_thread_requests() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let thread_id = ThreadId::new();
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
vec![ConnectionId(1)],
thread_id,
);
let (_dynamic_tool_request_id, dynamic_tool_waiter) = thread_outgoing
.send_request(ServerRequestPayload::DynamicToolCall(
DynamicToolCallParams {
thread_id: thread_id.to_string(),
turn_id: "turn-1".to_string(),
call_id: "call-0".to_string(),
tool: "tool".to_string(),
arguments: json!({}),
},
))
.await;
let (_request_id, user_input_waiter) = thread_outgoing
.send_request(ServerRequestPayload::ToolRequestUserInput(
ToolRequestUserInputParams {
thread_id: thread_id.to_string(),
turn_id: "turn-1".to_string(),
item_id: "call-1".to_string(),
questions: vec![],
},
))
.await;
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "tracked request cancelled".to_string(),
data: None,
};
outgoing
.cancel_requests_for_thread(thread_id, Some(error.clone()))
.await;
let dynamic_tool_result = timeout(Duration::from_secs(1), dynamic_tool_waiter)
.await
.expect("dynamic tool waiter should resolve")
.expect("dynamic tool waiter should receive a callback");
let user_input_result = timeout(Duration::from_secs(1), user_input_waiter)
.await
.expect("user input waiter should resolve")
.expect("user input waiter should receive a callback");
assert_eq!(dynamic_tool_result, Err(error.clone()));
assert_eq!(user_input_result, Err(error));
assert!(
outgoing
.pending_requests_for_thread(thread_id)
.await
.is_empty()
);
}
}

View File

@@ -0,0 +1,42 @@
use codex_app_server_protocol::JSONRPCErrorError;
pub(crate) const TURN_TRANSITION_PENDING_REQUEST_ERROR_REASON: &str = "turnTransition";
pub(crate) fn is_turn_transition_server_request_error(error: &JSONRPCErrorError) -> bool {
error
.data
.as_ref()
.and_then(|data| data.get("reason"))
.and_then(serde_json::Value::as_str)
== Some(TURN_TRANSITION_PENDING_REQUEST_ERROR_REASON)
}
#[cfg(test)]
mod tests {
use super::is_turn_transition_server_request_error;
use codex_app_server_protocol::JSONRPCErrorError;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn turn_transition_error_is_detected() {
let error = JSONRPCErrorError {
code: -1,
message: "client request resolved because the turn state was changed".to_string(),
data: Some(json!({ "reason": "turnTransition" })),
};
assert_eq!(is_turn_transition_server_request_error(&error), true);
}
#[test]
fn unrelated_error_is_not_detected() {
let error = JSONRPCErrorError {
code: -1,
message: "boom".to_string(),
data: Some(json!({ "reason": "other" })),
};
assert_eq!(is_turn_transition_server_request_error(&error), false);
}
}

View File

@@ -1,5 +1,6 @@
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::ConnectionRequestId;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadHistoryBuilder;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnError;
@@ -28,8 +29,16 @@ pub(crate) struct PendingThreadResumeRequest {
pub(crate) config_snapshot: ThreadConfigSnapshot,
}
// ThreadListenerCommand is used to perform operations in the context of the thread listener, for serialization purposes.
pub(crate) enum ThreadListenerCommand {
SendThreadResumeResponse(PendingThreadResumeRequest),
// SendThreadResumeResponse is used to resume an already running thread by sending the thread's history to the client and atomically subscribing for new updates.
SendThreadResumeResponse(Box<PendingThreadResumeRequest>),
// ResolveServerRequest is used to notify the client that the request has been resolved.
// It is executed in the thread listener's context to ensure that the resolved notification is ordered with regard to the request itself.
ResolveServerRequest {
request_id: RequestId,
completion_tx: oneshot::Sender<()>,
},
}
/// Per-conversation accumulation of the latest states e.g. error message while a turn runs.

View File

@@ -4,9 +4,11 @@ use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_request_user_input_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerRequestResolvedNotification;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
@@ -86,6 +88,7 @@ async fn request_user_input_round_trip() -> Result<()> {
assert_eq!(params.turn_id, turn.id);
assert_eq!(params.item_id, "call1");
assert_eq!(params.questions.len(), 1);
let resolved_request_id = request_id.clone();
mcp.send_response(
request_id,
@@ -96,17 +99,31 @@ async fn request_user_input_round_trip() -> Result<()> {
}),
)
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let mut saw_resolved = false;
loop {
let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??;
let JSONRPCMessage::Notification(notification) = message else {
continue;
};
match notification.method.as_str() {
"serverRequest/resolved" => {
let resolved: ServerRequestResolvedNotification = serde_json::from_value(
notification
.params
.clone()
.expect("serverRequest/resolved params"),
)?;
assert_eq!(resolved.thread_id, thread.id);
assert_eq!(resolved.request_id, resolved_request_id);
saw_resolved = true;
}
"turn/completed" => {
assert!(saw_resolved, "serverRequest/resolved should arrive first");
break;
}
_ => {}
}
}
Ok(())
}

View File

@@ -1,13 +1,28 @@
use anyhow::Context;
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_shell_command_sse_response;
use app_test_support::rollout_path;
use app_test_support::to_response;
use chrono::Utc;
use codex_app_server_protocol::AskForApproval;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::FileChangeApprovalDecision;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::PatchApplyStatus;
use codex_app_server_protocol::PatchChangeKind;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadResumeParams;
@@ -15,6 +30,7 @@ use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadStatusChangedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
@@ -277,7 +293,7 @@ async fn thread_resume_keeps_in_flight_turn_streaming() -> Result<()> {
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/started"),
wait_for_thread_status_active(&mut primary, &thread.id),
)
.await??;
@@ -384,7 +400,7 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
to_response::<TurnStartResponse>(running_turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/started"),
wait_for_thread_status_active(&mut primary, &thread_id),
)
.await??;
@@ -500,7 +516,7 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul
to_response::<TurnStartResponse>(running_turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/started"),
wait_for_thread_status_active(&mut primary, &thread_id),
)
.await??;
@@ -603,7 +619,7 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/started"),
wait_for_thread_status_active(&mut primary, &thread.id),
)
.await??;
@@ -639,6 +655,306 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R
Ok(())
}
#[tokio::test]
async fn thread_resume_replays_pending_command_execution_request_approval() -> Result<()> {
let responses = vec![
create_final_assistant_message_sse_response("seeded")?,
create_shell_command_sse_response(
vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
],
None,
Some(5000),
"call-1",
)?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut primary = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
let start_id = primary
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.1-codex-max".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
primary.clear_message_buffer();
let running_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "run command".to_string(),
text_elements: Vec::new(),
}],
approval_policy: Some(AskForApproval::UnlessTrusted),
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)),
)
.await??;
let original_request = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_request_message(),
)
.await??;
let ServerRequest::CommandExecutionRequestApproval { .. } = &original_request else {
panic!("expected CommandExecutionRequestApproval request, got {original_request:?}");
};
let resume_id = primary
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed_thread,
..
} = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resumed_thread.id, thread.id);
assert!(
resumed_thread
.turns
.iter()
.any(|turn| matches!(turn.status, TurnStatus::InProgress))
);
let replayed_request = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_request_message(),
)
.await??;
pretty_assertions::assert_eq!(replayed_request, original_request);
let ServerRequest::CommandExecutionRequestApproval { request_id, .. } = replayed_request else {
panic!("expected CommandExecutionRequestApproval request");
};
primary
.send_response(
request_id,
serde_json::to_value(CommandExecutionRequestApprovalResponse {
decision: CommandExecutionApprovalDecision::Accept,
})?,
)
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(())
}
#[tokio::test]
async fn thread_resume_replays_pending_file_change_request_approval() -> Result<()> {
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let workspace = tmp.path().join("workspace");
std::fs::create_dir(&workspace)?;
let patch = r#"*** Begin Patch
*** Add File: README.md
+new line
*** End Patch
"#;
let responses = vec![
create_final_assistant_message_sse_response("seeded")?,
create_apply_patch_sse_response(patch, "patch-call")?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
create_config_toml(&codex_home, &server.uri())?;
let mut primary = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
let start_id = primary
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.1-codex-max".to_string()),
cwd: Some(workspace.to_string_lossy().into_owned()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
}],
cwd: Some(workspace.clone()),
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
primary.clear_message_buffer();
let running_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "apply patch".to_string(),
text_elements: Vec::new(),
}],
cwd: Some(workspace.clone()),
approval_policy: Some(AskForApproval::UnlessTrusted),
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)),
)
.await??;
let original_started = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let notification = primary
.read_stream_until_notification_message("item/started")
.await?;
let started: ItemStartedNotification =
serde_json::from_value(notification.params.clone().expect("item/started params"))?;
if let ThreadItem::FileChange { .. } = started.item {
return Ok::<ThreadItem, anyhow::Error>(started.item);
}
}
})
.await??;
let expected_readme_path = workspace.join("README.md");
let expected_file_change = ThreadItem::FileChange {
id: "patch-call".to_string(),
changes: vec![codex_app_server_protocol::FileUpdateChange {
path: expected_readme_path.to_string_lossy().into_owned(),
kind: PatchChangeKind::Add,
diff: "new line\n".to_string(),
}],
status: PatchApplyStatus::InProgress,
};
assert_eq!(original_started, expected_file_change);
let original_request = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_request_message(),
)
.await??;
let ServerRequest::FileChangeRequestApproval { .. } = &original_request else {
panic!("expected FileChangeRequestApproval request, got {original_request:?}");
};
primary.clear_message_buffer();
let resume_id = primary
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed_thread,
..
} = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resumed_thread.id, thread.id);
assert!(
resumed_thread
.turns
.iter()
.any(|turn| matches!(turn.status, TurnStatus::InProgress))
);
let replayed_request = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_request_message(),
)
.await??;
assert_eq!(replayed_request, original_request);
let ServerRequest::FileChangeRequestApproval { request_id, .. } = replayed_request else {
panic!("expected FileChangeRequestApproval request");
};
primary
.send_response(
request_id,
serde_json::to_value(FileChangeRequestApprovalResponse {
decision: FileChangeApprovalDecision::Accept,
})?,
)
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(())
}
#[tokio::test]
async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -1103,6 +1419,30 @@ required = true
)
}
async fn wait_for_thread_status_active(
mcp: &mut McpProcess,
thread_id: &str,
) -> Result<ThreadStatusChangedNotification> {
loop {
let status_changed_notif: JSONRPCNotification = mcp
.read_stream_until_notification_message("thread/status/changed")
.await?;
let status_changed_params = status_changed_notif
.params
.context("thread/status/changed params must be present")?;
let status_changed: ThreadStatusChangedNotification =
serde_json::from_value(status_changed_params)?;
if status_changed.thread_id == thread_id
&& status_changed.status
== (ThreadStatus::Active {
active_flags: Vec::new(),
})
{
return Ok(status_changed);
}
}
}
#[allow(dead_code)]
fn set_rollout_mtime(path: &Path, updated_at_rfc3339: &str) -> Result<()> {
let parsed = chrono::DateTime::parse_from_rfc3339(updated_at_rfc3339)?.with_timezone(&Utc);

View File

@@ -8,6 +8,8 @@ use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerRequestResolvedNotification;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
@@ -48,7 +50,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
"call_sleep",
)?])
.await;
create_config_toml(&codex_home, &server.uri())?;
create_config_toml(&codex_home, &server.uri(), "never")?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -120,15 +122,134 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn turn_interrupt_resolves_pending_command_approval_request() -> Result<()> {
#[cfg(target_os = "windows")]
let shell_command = vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 10".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "10".to_string()];
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let working_directory = tmp.path().join("workdir");
std::fs::create_dir(&working_directory)?;
let server = create_mock_responses_server_sequence(vec![create_shell_command_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000),
"call_sleep_approval",
)?])
.await;
create_config_toml(&codex_home, &server.uri(), "untrusted")?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "run sleep".to_string(),
text_elements: Vec::new(),
}],
cwd: Some(working_directory),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
let request = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_request_message(),
)
.await??;
let ServerRequest::CommandExecutionRequestApproval { request_id, params } = request else {
panic!("expected CommandExecutionRequestApproval request");
};
assert_eq!(params.item_id, "call_sleep_approval");
assert_eq!(params.thread_id, thread.id);
assert_eq!(params.turn_id, turn.id);
let interrupt_id = mcp
.send_turn_interrupt_request(TurnInterruptParams {
thread_id: thread.id.clone(),
turn_id: turn.id.clone(),
})
.await?;
let interrupt_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(interrupt_id)),
)
.await??;
let _resp: TurnInterruptResponse = to_response::<TurnInterruptResponse>(interrupt_resp)?;
let resolved_notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("serverRequest/resolved"),
)
.await??;
let resolved: ServerRequestResolvedNotification = serde_json::from_value(
resolved_notification
.params
.clone()
.expect("serverRequest/resolved params must be present"),
)?;
assert_eq!(resolved.thread_id, thread.id);
assert_eq!(resolved.request_id, request_id);
let completed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let completed: TurnCompletedNotification = serde_json::from_value(
completed_notif
.params
.expect("turn/completed params must be present"),
)?;
assert_eq!(completed.thread_id, thread.id);
assert_eq!(completed.turn.status, TurnStatus::Interrupted);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
fn create_config_toml(
codex_home: &std::path::Path,
server_uri: &str,
approval_policy: &str,
) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
approval_policy = "{approval_policy}"
sandbox_mode = "danger-full-access"
model_provider = "mock_provider"

View File

@@ -22,12 +22,14 @@ use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::PatchApplyStatus;
use codex_app_server_protocol::PatchChangeKind;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerRequestResolvedNotification;
use codex_app_server_protocol::TextElement;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
@@ -1071,6 +1073,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
panic!("expected CommandExecutionRequestApproval request");
};
assert_eq!(params.item_id, "call1");
let resolved_request_id = request_id.clone();
// Approve and wait for task completion
mcp.send_response(
@@ -1080,16 +1083,31 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
})?,
)
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let mut saw_resolved = false;
loop {
let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??;
let JSONRPCMessage::Notification(notification) = message else {
continue;
};
match notification.method.as_str() {
"serverRequest/resolved" => {
let resolved: ServerRequestResolvedNotification = serde_json::from_value(
notification
.params
.clone()
.expect("serverRequest/resolved params"),
)?;
assert_eq!(resolved.thread_id, thread.id);
assert_eq!(resolved.request_id, resolved_request_id);
saw_resolved = true;
}
"turn/completed" => {
assert!(saw_resolved, "serverRequest/resolved should arrive first");
break;
}
_ => {}
}
}
// Second turn with approval_policy=never should not elicit approval
let second_turn_id = mcp
@@ -1527,6 +1545,7 @@ async fn turn_start_file_change_approval_v2() -> Result<()> {
assert_eq!(params.item_id, "patch-call");
assert_eq!(params.thread_id, thread.id);
assert_eq!(params.turn_id, turn.id);
let resolved_request_id = request_id.clone();
let expected_readme_path = workspace.join("README.md");
let expected_readme_path = expected_readme_path.to_string_lossy().into_owned();
pretty_assertions::assert_eq!(
@@ -1545,18 +1564,49 @@ async fn turn_start_file_change_approval_v2() -> Result<()> {
})?,
)
.await?;
let output_delta_notif = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("item/fileChange/outputDelta"),
)
.await??;
let output_delta: FileChangeOutputDeltaNotification = serde_json::from_value(
output_delta_notif
.params
.clone()
.expect("item/fileChange/outputDelta params"),
)?;
let mut saw_resolved = false;
let mut output_delta: Option<FileChangeOutputDeltaNotification> = None;
let mut completed_file_change: Option<ThreadItem> = None;
while !(output_delta.is_some() && completed_file_change.is_some()) {
let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??;
let JSONRPCMessage::Notification(notification) = message else {
continue;
};
match notification.method.as_str() {
"serverRequest/resolved" => {
let resolved: ServerRequestResolvedNotification = serde_json::from_value(
notification
.params
.clone()
.expect("serverRequest/resolved params"),
)?;
assert_eq!(resolved.thread_id, thread.id);
assert_eq!(resolved.request_id, resolved_request_id);
saw_resolved = true;
}
"item/fileChange/outputDelta" => {
assert!(saw_resolved, "serverRequest/resolved should arrive first");
let notification: FileChangeOutputDeltaNotification = serde_json::from_value(
notification
.params
.clone()
.expect("item/fileChange/outputDelta params"),
)?;
output_delta = Some(notification);
}
"item/completed" => {
let completed: ItemCompletedNotification = serde_json::from_value(
notification.params.clone().expect("item/completed params"),
)?;
if let ThreadItem::FileChange { .. } = completed.item {
assert!(saw_resolved, "serverRequest/resolved should arrive first");
completed_file_change = Some(completed.item);
}
}
_ => {}
}
}
let output_delta = output_delta.expect("file change output delta should be observed");
assert_eq!(output_delta.thread_id, thread.id);
assert_eq!(output_delta.turn_id, turn.id);
assert_eq!(output_delta.item_id, "patch-call");
@@ -1566,38 +1616,23 @@ async fn turn_start_file_change_approval_v2() -> Result<()> {
output_delta.delta
);
let completed_file_change = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let completed_notif = mcp
.read_stream_until_notification_message("item/completed")
.await?;
let completed: ItemCompletedNotification = serde_json::from_value(
completed_notif
.params
.clone()
.expect("item/completed params"),
)?;
if let ThreadItem::FileChange { .. } = completed.item {
return Ok::<ThreadItem, anyhow::Error>(completed.item);
}
}
})
.await??;
let completed_file_change =
completed_file_change.expect("file change completion should be observed");
let ThreadItem::FileChange { ref id, status, .. } = completed_file_change else {
unreachable!("loop ensures we break on file change items");
};
assert_eq!(id, "patch-call");
assert_eq!(status, PatchApplyStatus::Completed);
let readme_contents = std::fs::read_to_string(expected_readme_path)?;
assert_eq!(readme_contents, "new line\n");
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
let readme_contents = std::fs::read_to_string(expected_readme_path)?;
assert_eq!(readme_contents, "new line\n");
Ok(())
}

View File

@@ -1147,6 +1147,7 @@ mod tests {
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["rm"]),
decision: Decision::Forbidden,
resolved_program: None,
justification: None,
}],
}

View File

@@ -1390,6 +1390,7 @@ prefix_rules = [
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["rm"]),
decision: Decision::Forbidden,
resolved_program: None,
justification: None,
}],
}
@@ -1415,6 +1416,7 @@ prefix_rules = [
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git", "status"]),
decision: Decision::Prompt,
resolved_program: None,
justification: None,
}],
}
@@ -1426,6 +1428,7 @@ prefix_rules = [
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["hg", "status"]),
decision: Decision::Prompt,
resolved_program: None,
justification: None,
}],
}
@@ -1509,6 +1512,7 @@ prefix_rules = []
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: vec!["rm".to_string()],
decision: Decision::Forbidden,
resolved_program: None,
justification: None,
}],
}
@@ -1547,6 +1551,7 @@ prefix_rules = []
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: vec!["rm".to_string()],
decision: Decision::Forbidden,
resolved_program: None,
justification: None,
}],
}
@@ -1561,6 +1566,7 @@ prefix_rules = []
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: vec!["git".to_string(), "push".to_string()],
decision: Decision::Prompt,
resolved_program: None,
justification: None,
}],
}

View File

@@ -13,6 +13,7 @@ use codex_execpolicy::AmendError;
use codex_execpolicy::Decision;
use codex_execpolicy::Error as ExecPolicyRuleError;
use codex_execpolicy::Evaluation;
use codex_execpolicy::MatchOptions;
use codex_execpolicy::NetworkRuleProtocol;
use codex_execpolicy::Policy;
use codex_execpolicy::PolicyParser;
@@ -221,7 +222,14 @@ impl ExecPolicyManager {
used_complex_parsing,
)
};
let evaluation = exec_policy.check_multiple(commands.iter(), &exec_policy_fallback);
let match_options = MatchOptions {
resolve_host_executables: true,
};
let evaluation = exec_policy.check_multiple_with_options(
commands.iter(),
&exec_policy_fallback,
&match_options,
);
let requested_amendment = derive_requested_execpolicy_amendment_from_prefix_rule(
prefix_rule.as_ref(),
@@ -229,6 +237,7 @@ impl ExecPolicyManager {
exec_policy.as_ref(),
&commands,
&exec_policy_fallback,
&match_options,
);
match evaluation.decision {
@@ -472,17 +481,7 @@ pub async fn load_exec_policy(config_stack: &ConfigLayerStack) -> Result<Policy,
return Ok(policy);
};
let mut combined_rules = policy.rules().clone();
for (program, rules) in requirements_policy.as_ref().rules().iter_all() {
for rule in rules {
combined_rules.insert(program.clone(), rule.clone());
}
}
let mut combined_network_rules = policy.network_rules().to_vec();
combined_network_rules.extend(requirements_policy.as_ref().network_rules().iter().cloned());
Ok(Policy::from_parts(combined_rules, combined_network_rules))
Ok(policy.merge_overlay(requirements_policy.as_ref()))
}
/// If a command is not matched by any execpolicy rule, derive a [`Decision`].
@@ -640,6 +639,7 @@ fn derive_requested_execpolicy_amendment_from_prefix_rule(
exec_policy: &Policy,
commands: &[Vec<String>],
exec_policy_fallback: &impl Fn(&[String]) -> Decision,
match_options: &MatchOptions,
) -> Option<ExecPolicyAmendment> {
let prefix_rule = prefix_rule?;
if prefix_rule.is_empty() {
@@ -666,6 +666,7 @@ fn derive_requested_execpolicy_amendment_from_prefix_rule(
&amendment.command,
commands,
exec_policy_fallback,
match_options,
) {
Some(amendment)
} else {
@@ -678,6 +679,7 @@ fn prefix_rule_would_approve_all_commands(
prefix_rule: &[String],
commands: &[Vec<String>],
exec_policy_fallback: &impl Fn(&[String]) -> Decision,
match_options: &MatchOptions,
) -> bool {
let mut policy_with_prefix_rule = exec_policy.clone();
if policy_with_prefix_rule
@@ -689,7 +691,7 @@ fn prefix_rule_would_approve_all_commands(
commands.iter().all(|command| {
policy_with_prefix_rule
.check(command, exec_policy_fallback)
.check_with_options(command, exec_policy_fallback, match_options)
.decision
== Decision::Allow
})
@@ -827,6 +829,7 @@ mod tests {
use pretty_assertions::assert_eq;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::tempdir;
use toml::Value as TomlValue;
@@ -846,6 +849,31 @@ mod tests {
.expect("ConfigLayerStack")
}
fn host_absolute_path(segments: &[&str]) -> String {
let mut path = if cfg!(windows) {
PathBuf::from(r"C:\")
} else {
PathBuf::from("/")
};
for segment in segments {
path.push(segment);
}
path.to_string_lossy().into_owned()
}
fn host_program_path(name: &str) -> String {
let executable_name = if cfg!(windows) {
format!("{name}.exe")
} else {
name.to_string()
};
host_absolute_path(&["usr", "bin", &executable_name])
}
fn starlark_string(value: &str) -> String {
value.replace('\\', "\\\\").replace('"', "\\\"")
}
#[tokio::test]
async fn returns_empty_policy_when_no_policy_files_exist() {
let temp_dir = tempdir().expect("create temp dir");
@@ -949,6 +977,7 @@ mod tests {
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: vec!["rm".to_string()],
decision: Decision::Forbidden,
resolved_program: None,
justification: None,
}],
},
@@ -991,6 +1020,59 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn preserves_host_executables_when_requirements_overlay_is_present() -> anyhow::Result<()>
{
let temp_dir = tempdir()?;
let policy_dir = temp_dir.path().join(RULES_DIR_NAME);
fs::create_dir_all(&policy_dir)?;
let git_path = host_absolute_path(&["usr", "bin", "git"]);
let git_path_literal = starlark_string(&git_path);
fs::write(
policy_dir.join("host.rules"),
format!(
r#"
host_executable(name = "git", paths = ["{git_path_literal}"])
"#
),
)?;
let mut requirements_exec_policy = Policy::empty();
requirements_exec_policy.add_network_rule(
"blocked.example.com",
codex_execpolicy::NetworkRuleProtocol::Https,
Decision::Forbidden,
None,
)?;
let requirements = ConfigRequirements {
exec_policy: Some(codex_config::Sourced::new(
codex_config::RequirementsExecPolicy::new(requirements_exec_policy),
codex_config::RequirementSource::Unknown,
)),
..ConfigRequirements::default()
};
let dot_codex_folder = AbsolutePathBuf::from_absolute_path(temp_dir.path())?;
let layer = ConfigLayerEntry::new(
ConfigLayerSource::Project { dot_codex_folder },
TomlValue::Table(Default::default()),
);
let config_stack =
ConfigLayerStack::new(vec![layer], requirements, ConfigRequirementsToml::default())?;
let policy = load_exec_policy(&config_stack).await?;
assert_eq!(
policy
.host_executables()
.get("git")
.expect("missing git host executable")
.as_ref(),
[AbsolutePathBuf::try_from(git_path)?]
);
Ok(())
}
#[tokio::test]
async fn ignores_policies_outside_policy_dir() {
let temp_dir = tempdir().expect("create temp dir");
@@ -1106,6 +1188,7 @@ mod tests {
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: vec!["rm".to_string()],
decision: Decision::Forbidden,
resolved_program: None,
justification: None,
}],
},
@@ -1117,6 +1200,7 @@ mod tests {
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: vec!["ls".to_string()],
decision: Decision::Prompt,
resolved_program: None,
justification: None,
}],
},
@@ -1335,6 +1419,115 @@ prefix_rule(
);
}
#[tokio::test]
async fn absolute_path_exec_approval_requirement_matches_host_executable_rules() {
let git_path = host_program_path("git");
let git_path_literal = starlark_string(&git_path);
let policy_src = format!(
r#"
host_executable(name = "git", paths = ["{git_path_literal}"])
prefix_rule(pattern=["git"], decision="allow")
"#
);
let mut parser = PolicyParser::new();
parser
.parse("test.rules", &policy_src)
.expect("parse policy");
let manager = ExecPolicyManager::new(Arc::new(parser.build()));
let command = vec![git_path, "status".to_string()];
let requirement = manager
.create_exec_approval_requirement_for_command(ExecApprovalRequest {
command: &command,
approval_policy: AskForApproval::UnlessTrusted,
sandbox_policy: &SandboxPolicy::new_read_only_policy(),
sandbox_permissions: SandboxPermissions::UseDefault,
prefix_rule: None,
})
.await;
assert_eq!(
requirement,
ExecApprovalRequirement::Skip {
bypass_sandbox: true,
proposed_execpolicy_amendment: None,
}
);
}
#[tokio::test]
async fn absolute_path_exec_approval_requirement_ignores_disallowed_host_executable_paths() {
let allowed_git_path = host_program_path("git");
let disallowed_git_path = host_absolute_path(&[
"opt",
"homebrew",
"bin",
if cfg!(windows) { "git.exe" } else { "git" },
]);
let allowed_git_path_literal = starlark_string(&allowed_git_path);
let policy_src = format!(
r#"
host_executable(name = "git", paths = ["{allowed_git_path_literal}"])
prefix_rule(pattern=["git"], decision="prompt")
"#
);
let mut parser = PolicyParser::new();
parser
.parse("test.rules", &policy_src)
.expect("parse policy");
let manager = ExecPolicyManager::new(Arc::new(parser.build()));
let command = vec![disallowed_git_path, "status".to_string()];
let requirement = manager
.create_exec_approval_requirement_for_command(ExecApprovalRequest {
command: &command,
approval_policy: AskForApproval::UnlessTrusted,
sandbox_policy: &SandboxPolicy::new_read_only_policy(),
sandbox_permissions: SandboxPermissions::UseDefault,
prefix_rule: None,
})
.await;
assert_eq!(
requirement,
ExecApprovalRequirement::Skip {
bypass_sandbox: false,
proposed_execpolicy_amendment: Some(ExecPolicyAmendment::new(command)),
}
);
}
#[tokio::test]
async fn requested_prefix_rule_can_approve_absolute_path_commands() {
let command = vec![
host_program_path("cargo"),
"install".to_string(),
"cargo-insta".to_string(),
];
let manager = ExecPolicyManager::default();
let requirement = manager
.create_exec_approval_requirement_for_command(ExecApprovalRequest {
command: &command,
approval_policy: AskForApproval::UnlessTrusted,
sandbox_policy: &SandboxPolicy::new_read_only_policy(),
sandbox_permissions: SandboxPermissions::UseDefault,
prefix_rule: Some(vec!["cargo".to_string(), "install".to_string()]),
})
.await;
assert_eq!(
requirement,
ExecApprovalRequirement::NeedsApproval {
reason: None,
proposed_execpolicy_amendment: Some(ExecPolicyAmendment::new(vec![
"cargo".to_string(),
"install".to_string(),
])),
}
);
}
#[tokio::test]
async fn exec_approval_requirement_respects_approval_policy() {
let policy_src = r#"prefix_rule(pattern=["rm"], decision="prompt")"#;
@@ -1889,6 +2082,7 @@ prefix_rule(
&Policy::empty(),
&commands,
&|_: &[String]| Decision::Allow,
&MatchOptions::default(),
)
}
@@ -1983,6 +2177,7 @@ prefix_rule(
let matched_rules_prompt = vec![RuleMatch::PrefixRuleMatch {
matched_prefix: vec!["cargo".to_string()],
decision: Decision::Prompt,
resolved_program: None,
justification: None,
}];
assert_eq!(
@@ -1996,6 +2191,7 @@ prefix_rule(
let matched_rules_allow = vec![RuleMatch::PrefixRuleMatch {
matched_prefix: vec!["cargo".to_string()],
decision: Decision::Allow,
resolved_program: None,
justification: None,
}];
assert_eq!(
@@ -2009,6 +2205,7 @@ prefix_rule(
let matched_rules_forbidden = vec![RuleMatch::PrefixRuleMatch {
matched_prefix: vec!["cargo".to_string()],
decision: Decision::Forbidden,
resolved_program: None,
justification: None,
}];
assert_eq!(

View File

@@ -16,6 +16,8 @@ use crate::tools::sandboxing::SandboxablePreference;
use crate::tools::sandboxing::ToolCtx;
use crate::tools::sandboxing::ToolError;
use codex_execpolicy::Decision;
use codex_execpolicy::Evaluation;
use codex_execpolicy::MatchOptions;
use codex_execpolicy::Policy;
use codex_execpolicy::RuleMatch;
use codex_protocol::config_types::WindowsSandboxLevel;
@@ -493,28 +495,16 @@ impl EscalationPolicy for CoreShellActionProvider {
.await;
}
let command = join_program_and_argv(program, argv);
let (commands, used_complex_parsing) =
if let Some(commands) = parse_shell_lc_plain_commands(&command) {
(commands, false)
} else if let Some(single_command) = parse_shell_lc_single_command_prefix(&command) {
(vec![single_command], true)
} else {
(vec![command.clone()], false)
};
let fallback = |cmd: &[String]| {
crate::exec_policy::render_decision_for_unmatched_command(
self.approval_policy,
&self.sandbox_policy,
cmd,
self.sandbox_permissions,
used_complex_parsing,
)
};
let evaluation = {
let policy = self.policy.read().await;
policy.check_multiple(commands.iter(), &fallback)
evaluate_intercepted_exec_policy(
&policy,
program,
argv,
self.approval_policy,
&self.sandbox_policy,
self.sandbox_permissions,
)
};
// When true, means the Evaluation was due to *.rules, not the
// fallback function.
@@ -552,6 +542,56 @@ impl EscalationPolicy for CoreShellActionProvider {
}
}
fn evaluate_intercepted_exec_policy(
policy: &Policy,
program: &AbsolutePathBuf,
argv: &[String],
approval_policy: AskForApproval,
sandbox_policy: &SandboxPolicy,
sandbox_permissions: SandboxPermissions,
) -> Evaluation {
let (commands, used_complex_parsing) = commands_for_intercepted_exec_policy(program, argv);
let fallback = |cmd: &[String]| {
crate::exec_policy::render_decision_for_unmatched_command(
approval_policy,
sandbox_policy,
cmd,
sandbox_permissions,
used_complex_parsing,
)
};
policy.check_multiple_with_options(
commands.iter(),
&fallback,
&MatchOptions {
resolve_host_executables: true,
},
)
}
fn commands_for_intercepted_exec_policy(
program: &AbsolutePathBuf,
argv: &[String],
) -> (Vec<Vec<String>>, bool) {
if let [_, flag, script] = argv {
let shell_command = [
program.to_string_lossy().to_string(),
flag.clone(),
script.clone(),
];
if let Some(commands) = parse_shell_lc_plain_commands(&shell_command) {
return (commands, false);
}
if let Some(single_command) = parse_shell_lc_single_command_prefix(&shell_command) {
return (vec![single_command], true);
}
}
(vec![join_program_and_argv(program, argv)], false)
}
struct CoreShellCommandExecutor {
command: Vec<String>,
cwd: PathBuf,

View File

@@ -2,6 +2,8 @@ use super::CoreShellActionProvider;
#[cfg(target_os = "macos")]
use super::CoreShellCommandExecutor;
use super::ParsedShellCommand;
use super::commands_for_intercepted_exec_policy;
use super::evaluate_intercepted_exec_policy;
use super::extract_shell_script;
use super::join_program_and_argv;
use super::map_exec_result;
@@ -12,14 +14,16 @@ use crate::config::Permissions;
#[cfg(target_os = "macos")]
use crate::config::types::ShellEnvironmentPolicy;
use crate::exec::SandboxType;
#[cfg(target_os = "macos")]
use crate::protocol::AskForApproval;
use crate::protocol::ReadOnlyAccess;
use crate::protocol::SandboxPolicy;
#[cfg(target_os = "macos")]
use crate::sandboxing::SandboxPermissions;
#[cfg(target_os = "macos")]
use crate::seatbelt::MACOS_PATH_TO_SEATBELT_EXECUTABLE;
use codex_execpolicy::Decision;
use codex_execpolicy::Evaluation;
use codex_execpolicy::PolicyParser;
use codex_execpolicy::RuleMatch;
#[cfg(target_os = "macos")]
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::models::FileSystemPermissions;
@@ -36,8 +40,25 @@ use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
#[cfg(target_os = "macos")]
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
fn host_absolute_path(segments: &[&str]) -> String {
let mut path = if cfg!(windows) {
PathBuf::from(r"C:\")
} else {
PathBuf::from("/")
};
for segment in segments {
path.push(segment);
}
path.to_string_lossy().into_owned()
}
fn starlark_string(value: &str) -> String {
value.replace('\\', "\\\\").replace('"', "\\\"")
}
#[test]
fn extract_shell_script_preserves_login_flag() {
assert_eq!(
@@ -126,6 +147,24 @@ fn join_program_and_argv_replaces_original_argv_zero() {
);
}
#[test]
fn commands_for_intercepted_exec_policy_uses_program_path_for_shell_wrapper_parsing() {
let program = AbsolutePathBuf::try_from(host_absolute_path(&["bin", "bash"])).unwrap();
assert_eq!(
commands_for_intercepted_exec_policy(
&program,
&["not-bash".into(), "-lc".into(), "git status && pwd".into()],
),
(
vec![
vec!["git".to_string(), "status".to_string()],
vec!["pwd".to_string()],
],
false,
)
);
}
#[test]
fn map_exec_result_preserves_stdout_and_stderr() {
let out = map_exec_result(
@@ -203,6 +242,84 @@ fn shell_request_escalation_execution_is_explicit() {
);
}
#[test]
fn intercepted_exec_policy_uses_host_executable_mappings() {
let git_path = host_absolute_path(&["usr", "bin", "git"]);
let git_path_literal = starlark_string(&git_path);
let policy_src = format!(
r#"
prefix_rule(pattern = ["git", "status"], decision = "prompt")
host_executable(name = "git", paths = ["{git_path_literal}"])
"#
);
let mut parser = PolicyParser::new();
parser.parse("test.rules", &policy_src).unwrap();
let policy = parser.build();
let program = AbsolutePathBuf::try_from(git_path).unwrap();
let evaluation = evaluate_intercepted_exec_policy(
&policy,
&program,
&["git".to_string(), "status".to_string()],
AskForApproval::OnRequest,
&SandboxPolicy::new_read_only_policy(),
SandboxPermissions::UseDefault,
);
assert_eq!(
evaluation,
Evaluation {
decision: Decision::Prompt,
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: vec!["git".to_string(), "status".to_string()],
decision: Decision::Prompt,
resolved_program: Some(program),
justification: None,
}],
}
);
assert!(CoreShellActionProvider::decision_driven_by_policy(
&evaluation.matched_rules,
evaluation.decision
));
}
#[test]
fn intercepted_exec_policy_rejects_disallowed_host_executable_mapping() {
let allowed_git = host_absolute_path(&["usr", "bin", "git"]);
let other_git = host_absolute_path(&["opt", "homebrew", "bin", "git"]);
let allowed_git_literal = starlark_string(&allowed_git);
let policy_src = format!(
r#"
prefix_rule(pattern = ["git", "status"], decision = "prompt")
host_executable(name = "git", paths = ["{allowed_git_literal}"])
"#
);
let mut parser = PolicyParser::new();
parser.parse("test.rules", &policy_src).unwrap();
let policy = parser.build();
let program = AbsolutePathBuf::try_from(other_git.clone()).unwrap();
let evaluation = evaluate_intercepted_exec_policy(
&policy,
&program,
&["git".to_string(), "status".to_string()],
AskForApproval::OnRequest,
&SandboxPolicy::new_read_only_policy(),
SandboxPermissions::UseDefault,
);
assert!(matches!(
evaluation.matched_rules.as_slice(),
[RuleMatch::HeuristicsRuleMatch { command, .. }]
if command == &vec![other_git, "status".to_string()]
));
assert!(!CoreShellActionProvider::decision_driven_by_policy(
&evaluation.matched_rules,
evaluation.decision
));
}
#[cfg(target_os = "macos")]
#[tokio::test]
async fn prepare_escalated_exec_turn_default_preserves_macos_seatbelt_extensions() {

View File

@@ -19,6 +19,7 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
codex-utils-absolute-path = { workspace = true }
multimap = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }

View File

@@ -2,8 +2,8 @@
## Overview
- Policy engine and CLI built around `prefix_rule(pattern=[...], decision?, justification?, match?, not_match?)`.
- This release covers the prefix-rule subset of the execpolicy language; a richer language will follow.
- Policy engine and CLI built around `prefix_rule(pattern=[...], decision?, justification?, match?, not_match?)` plus `host_executable(name=..., paths=[...])`.
- This release covers the prefix-rule subset of the execpolicy language plus host executable metadata; a richer language will follow.
- Tokens are matched in order; any `pattern` element may be a list to denote alternatives. `decision` defaults to `allow`; valid values: `allow`, `prompt`, `forbidden`.
- `justification` is an optional human-readable rationale for why a rule exists. It can be provided for any `decision` and may be surfaced in different contexts (for example, in approval prompts or rejection messages). When `decision = "forbidden"` is used, include a recommended alternative in the `justification`, when appropriate (e.g., ``"Use `jj` instead of `git`."``).
- `match` / `not_match` supply example invocations that are validated at load time (think of them as unit tests); examples can be token arrays or strings (strings are tokenized with `shlex`).
@@ -24,6 +24,26 @@ prefix_rule(
)
```
- Host executable metadata can optionally constrain which absolute paths may
resolve through basename rules:
```starlark
host_executable(
name = "git",
paths = [
"/opt/homebrew/bin/git",
"/usr/bin/git",
],
)
```
- Matching semantics:
- execpolicy always tries exact first-token matches first.
- With host-executable resolution disabled, `/usr/bin/git status` only matches a rule whose first token is `/usr/bin/git`.
- With host-executable resolution enabled, if no exact rule matches, execpolicy may fall back from `/usr/bin/git` to basename rules for `git`.
- If `host_executable(name="git", ...)` exists, basename fallback is only allowed for listed absolute paths.
- If no `host_executable()` entry exists for a basename, basename fallback is allowed.
## CLI
- From the Codex CLI, run `codex execpolicy check` subcommand with one or more policy files (for example `src/default.rules`) to check a command:
@@ -32,6 +52,15 @@ prefix_rule(
codex execpolicy check --rules path/to/policy.rules git status
```
- To opt into basename fallback for absolute program paths, pass `--resolve-host-executables`:
```bash
codex execpolicy check \
--rules path/to/policy.rules \
--resolve-host-executables \
/usr/bin/git status
```
- Pass multiple `--rules` flags to merge rules, evaluated in the order provided, and use `--pretty` for formatted JSON.
- You can also run the standalone dev binary directly during development:
@@ -52,6 +81,7 @@ cargo run -p codex-execpolicy -- check --rules path/to/policy.rules git status
"prefixRuleMatch": {
"matchedPrefix": ["<token>", "..."],
"decision": "allow|prompt|forbidden",
"resolvedProgram": "/absolute/path/to/program",
"justification": "..."
}
}
@@ -62,6 +92,7 @@ cargo run -p codex-execpolicy -- check --rules path/to/policy.rules git status
- When no rules match, `matchedRules` is an empty array and `decision` is omitted.
- `matchedRules` lists every rule whose prefix matched the command; `matchedPrefix` is the exact prefix that matched.
- `resolvedProgram` is omitted unless an absolute executable path matched via basename fallback.
- The effective `decision` is the strictest severity across all matches (`forbidden` > `prompt` > `allow`).
Note: `execpolicy` commands are still in preview. The API may have breaking changes in the future.

View File

@@ -38,16 +38,47 @@ pub enum Error {
ExampleDidNotMatch {
rules: Vec<String>,
examples: Vec<String>,
location: Option<ErrorLocation>,
},
#[error("expected example to not match rule `{rule}`: {example}")]
ExampleDidMatch { rule: String, example: String },
ExampleDidMatch {
rule: String,
example: String,
location: Option<ErrorLocation>,
},
#[error("starlark error: {0}")]
Starlark(StarlarkError),
}
impl Error {
pub fn with_location(self, location: ErrorLocation) -> Self {
match self {
Error::ExampleDidNotMatch {
rules,
examples,
location: None,
} => Error::ExampleDidNotMatch {
rules,
examples,
location: Some(location),
},
Error::ExampleDidMatch {
rule,
example,
location: None,
} => Error::ExampleDidMatch {
rule,
example,
location: Some(location),
},
other => other,
}
}
pub fn location(&self) -> Option<ErrorLocation> {
match self {
Error::ExampleDidNotMatch { location, .. }
| Error::ExampleDidMatch { location, .. } => location.clone(),
Error::Starlark(err) => err.span().map(|span| {
let resolved = span.resolve_span();
ErrorLocation {

View File

@@ -7,6 +7,7 @@ use clap::Parser;
use serde::Serialize;
use crate::Decision;
use crate::MatchOptions;
use crate::Policy;
use crate::PolicyParser;
use crate::RuleMatch;
@@ -22,6 +23,11 @@ pub struct ExecPolicyCheckCommand {
#[arg(long)]
pub pretty: bool,
/// Resolve absolute program paths against basename rules, gated by any
/// `host_executable()` definitions in the loaded policy files.
#[arg(long)]
pub resolve_host_executables: bool,
/// Command tokens to check against the policy.
#[arg(
value_name = "COMMAND",
@@ -36,7 +42,13 @@ impl ExecPolicyCheckCommand {
/// Load the policies for this command, evaluate the command, and render JSON output.
pub fn run(&self) -> Result<()> {
let policy = load_policies(&self.rules)?;
let matched_rules = policy.matches_for_command(&self.command, None);
let matched_rules = policy.matches_for_command_with_options(
&self.command,
None,
&MatchOptions {
resolve_host_executables: self.resolve_host_executables,
},
);
let json = format_matches_json(&matched_rules, self.pretty)?;
println!("{json}");

View File

@@ -0,0 +1,29 @@
use std::path::Path;
#[cfg(windows)]
const WINDOWS_EXECUTABLE_SUFFIXES: [&str; 4] = [".exe", ".cmd", ".bat", ".com"];
pub(crate) fn executable_lookup_key(raw: &str) -> String {
#[cfg(windows)]
{
let raw = raw.to_ascii_lowercase();
for suffix in WINDOWS_EXECUTABLE_SUFFIXES {
if raw.ends_with(suffix) {
let stripped_len = raw.len() - suffix.len();
return raw[..stripped_len].to_string();
}
}
raw
}
#[cfg(not(windows))]
{
raw.to_string()
}
}
pub(crate) fn executable_path_lookup_key(path: &Path) -> Option<String> {
path.file_name()
.and_then(|name| name.to_str())
.map(executable_lookup_key)
}

View File

@@ -2,6 +2,7 @@ pub mod amend;
pub mod decision;
pub mod error;
pub mod execpolicycheck;
mod executable_name;
pub mod parser;
pub mod policy;
pub mod rule;
@@ -18,6 +19,7 @@ pub use error::TextRange;
pub use execpolicycheck::ExecPolicyCheckCommand;
pub use parser::PolicyParser;
pub use policy::Evaluation;
pub use policy::MatchOptions;
pub use policy::Policy;
pub use rule::NetworkRuleProtocol;
pub use rule::Rule;

View File

@@ -1,6 +1,8 @@
use codex_utils_absolute_path::AbsolutePathBuf;
use multimap::MultiMap;
use shlex;
use starlark::any::ProvidesStaticType;
use starlark::codemap::FileSpan;
use starlark::environment::GlobalsBuilder;
use starlark::environment::Module;
use starlark::eval::Evaluator;
@@ -13,11 +15,18 @@ use starlark::values::list::UnpackList;
use starlark::values::none::NoneType;
use std::cell::RefCell;
use std::cell::RefMut;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use crate::decision::Decision;
use crate::error::Error;
use crate::error::ErrorLocation;
use crate::error::Result;
use crate::error::TextPosition;
use crate::error::TextRange;
use crate::executable_name::executable_lookup_key;
use crate::executable_name::executable_path_lookup_key;
use crate::rule::NetworkRule;
use crate::rule::NetworkRuleProtocol;
use crate::rule::PatternToken;
@@ -47,6 +56,7 @@ impl PolicyParser {
/// Parses a policy, tagging parser errors with `policy_identifier` so failures include the
/// identifier alongside line numbers.
pub fn parse(&mut self, policy_identifier: &str, policy_file_contents: &str) -> Result<()> {
let pending_validation_count = self.builder.borrow().pending_example_validations.len();
let mut dialect = Dialect::Extended.clone();
dialect.enable_f_strings = true;
let ast = AstModule::parse(
@@ -62,6 +72,9 @@ impl PolicyParser {
eval.extra = Some(&self.builder);
eval.eval_module(ast, &globals).map_err(Error::Starlark)?;
}
self.builder
.borrow()
.validate_pending_examples_from(pending_validation_count)?;
Ok(())
}
@@ -74,6 +87,8 @@ impl PolicyParser {
struct PolicyBuilder {
rules_by_program: MultiMap<String, RuleRef>,
network_rules: Vec<NetworkRule>,
host_executables_by_name: HashMap<String, Arc<[AbsolutePathBuf]>>,
pending_example_validations: Vec<PendingExampleValidation>,
}
impl PolicyBuilder {
@@ -81,6 +96,8 @@ impl PolicyBuilder {
Self {
rules_by_program: MultiMap::new(),
network_rules: Vec::new(),
host_executables_by_name: HashMap::new(),
pending_example_validations: Vec::new(),
}
}
@@ -93,9 +110,62 @@ impl PolicyBuilder {
self.network_rules.push(rule);
}
fn build(self) -> crate::policy::Policy {
crate::policy::Policy::from_parts(self.rules_by_program, self.network_rules)
fn add_host_executable(&mut self, name: String, paths: Vec<AbsolutePathBuf>) {
self.host_executables_by_name.insert(name, paths.into());
}
fn add_pending_example_validation(
&mut self,
rules: Vec<RuleRef>,
matches: Vec<Vec<String>>,
not_matches: Vec<Vec<String>>,
location: Option<ErrorLocation>,
) {
self.pending_example_validations
.push(PendingExampleValidation {
rules,
matches,
not_matches,
location,
});
}
fn validate_pending_examples_from(&self, start: usize) -> Result<()> {
for validation in &self.pending_example_validations[start..] {
let mut rules_by_program = MultiMap::new();
for rule in &validation.rules {
rules_by_program.insert(rule.program().to_string(), rule.clone());
}
let policy = crate::policy::Policy::from_parts(
rules_by_program,
Vec::new(),
self.host_executables_by_name.clone(),
);
validate_not_match_examples(&policy, &validation.rules, &validation.not_matches)
.map_err(|error| attach_validation_location(error, validation.location.clone()))?;
validate_match_examples(&policy, &validation.rules, &validation.matches)
.map_err(|error| attach_validation_location(error, validation.location.clone()))?;
}
Ok(())
}
fn build(self) -> crate::policy::Policy {
crate::policy::Policy::from_parts(
self.rules_by_program,
self.network_rules,
self.host_executables_by_name,
)
}
}
#[derive(Debug)]
struct PendingExampleValidation {
rules: Vec<RuleRef>,
matches: Vec<Vec<String>>,
not_matches: Vec<Vec<String>>,
location: Option<ErrorLocation>,
}
fn parse_pattern<'v>(pattern: UnpackList<Value<'v>>) -> Result<Vec<PatternToken>> {
@@ -150,6 +220,36 @@ fn parse_examples<'v>(examples: UnpackList<Value<'v>>) -> Result<Vec<Vec<String>
examples.items.into_iter().map(parse_example).collect()
}
fn parse_literal_absolute_path(raw: &str) -> Result<AbsolutePathBuf> {
if !Path::new(raw).is_absolute() {
return Err(Error::InvalidRule(format!(
"host_executable paths must be absolute (got {raw})"
)));
}
AbsolutePathBuf::try_from(raw.to_string())
.map_err(|error| Error::InvalidRule(format!("invalid absolute path `{raw}`: {error}")))
}
fn validate_host_executable_name(name: &str) -> Result<()> {
if name.is_empty() {
return Err(Error::InvalidRule(
"host_executable name cannot be empty".to_string(),
));
}
let path = Path::new(name);
if path.components().count() != 1
|| path.file_name().and_then(|value| value.to_str()) != Some(name)
{
return Err(Error::InvalidRule(format!(
"host_executable name must be a bare executable name (got {name})"
)));
}
Ok(())
}
fn parse_network_rule_decision(raw: &str) -> Result<Decision> {
match raw {
"deny" => Ok(Decision::Forbidden),
@@ -157,6 +257,30 @@ fn parse_network_rule_decision(raw: &str) -> Result<Decision> {
}
}
fn error_location_from_file_span(span: FileSpan) -> ErrorLocation {
let resolved = span.resolve_span();
ErrorLocation {
path: span.filename().to_string(),
range: TextRange {
start: TextPosition {
line: resolved.begin.line + 1,
column: resolved.begin.column + 1,
},
end: TextPosition {
line: resolved.end.line + 1,
column: resolved.end.column + 1,
},
},
}
}
fn attach_validation_location(error: Error, location: Option<ErrorLocation>) -> Error {
match location {
Some(location) => error.with_location(location),
None => error,
}
}
fn parse_example<'v>(value: Value<'v>) -> Result<Vec<String>> {
if let Some(raw) = value.unpack_str() {
parse_string_example(raw)
@@ -251,6 +375,9 @@ fn policy_builtins(builder: &mut GlobalsBuilder) {
.map(parse_examples)
.transpose()?
.unwrap_or_default();
let location = eval
.call_stack_top_location()
.map(error_location_from_file_span);
let mut builder = policy_builder(eval);
@@ -275,9 +402,7 @@ fn policy_builtins(builder: &mut GlobalsBuilder) {
})
.collect();
validate_not_match_examples(&rules, &not_matches)?;
validate_match_examples(&rules, &matches)?;
builder.add_pending_example_validation(rules.clone(), matches, not_matches, location);
rules.into_iter().for_each(|rule| builder.add_rule(rule));
Ok(NoneType)
}
@@ -308,4 +433,41 @@ fn policy_builtins(builder: &mut GlobalsBuilder) {
});
Ok(NoneType)
}
fn host_executable<'v>(
name: &'v str,
paths: UnpackList<Value<'v>>,
eval: &mut Evaluator<'v, '_, '_>,
) -> anyhow::Result<NoneType> {
validate_host_executable_name(name)?;
let mut parsed_paths = Vec::new();
for value in paths.items {
let raw = value.unpack_str().ok_or_else(|| {
Error::InvalidRule(format!(
"host_executable paths must be strings (got {})",
value.get_type()
))
})?;
let path = parse_literal_absolute_path(raw)?;
let Some(path_name) = executable_path_lookup_key(path.as_path()) else {
return Err(Error::InvalidRule(format!(
"host_executable path `{raw}` must have basename `{name}`"
))
.into());
};
if path_name != executable_lookup_key(name) {
return Err(Error::InvalidRule(format!(
"host_executable path `{raw}` must have basename `{name}`"
))
.into());
}
if !parsed_paths.iter().any(|existing| existing == &path) {
parsed_paths.push(path);
}
}
policy_builder(eval).add_host_executable(executable_lookup_key(name), parsed_paths);
Ok(NoneType)
}
}

View File

@@ -1,6 +1,7 @@
use crate::decision::Decision;
use crate::error::Error;
use crate::error::Result;
use crate::executable_name::executable_path_lookup_key;
use crate::rule::NetworkRule;
use crate::rule::NetworkRuleProtocol;
use crate::rule::PatternToken;
@@ -9,31 +10,41 @@ use crate::rule::PrefixRule;
use crate::rule::RuleMatch;
use crate::rule::RuleRef;
use crate::rule::normalize_network_rule_host;
use codex_utils_absolute_path::AbsolutePathBuf;
use multimap::MultiMap;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;
use std::sync::Arc;
type HeuristicsFallback<'a> = Option<&'a dyn Fn(&[String]) -> Decision>;
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct MatchOptions {
pub resolve_host_executables: bool,
}
#[derive(Clone, Debug)]
pub struct Policy {
rules_by_program: MultiMap<String, RuleRef>,
network_rules: Vec<NetworkRule>,
host_executables_by_name: HashMap<String, Arc<[AbsolutePathBuf]>>,
}
impl Policy {
pub fn new(rules_by_program: MultiMap<String, RuleRef>) -> Self {
Self::from_parts(rules_by_program, Vec::new())
Self::from_parts(rules_by_program, Vec::new(), HashMap::new())
}
pub fn from_parts(
rules_by_program: MultiMap<String, RuleRef>,
network_rules: Vec<NetworkRule>,
host_executables_by_name: HashMap<String, Arc<[AbsolutePathBuf]>>,
) -> Self {
Self {
rules_by_program,
network_rules,
host_executables_by_name,
}
}
@@ -49,6 +60,10 @@ impl Policy {
&self.network_rules
}
pub fn host_executables(&self) -> &HashMap<String, Arc<[AbsolutePathBuf]>> {
&self.host_executables_by_name
}
pub fn get_allowed_prefixes(&self) -> Vec<Vec<String>> {
let mut prefixes = Vec::new();
@@ -119,6 +134,36 @@ impl Policy {
Ok(())
}
pub fn set_host_executable_paths(&mut self, name: String, paths: Vec<AbsolutePathBuf>) {
self.host_executables_by_name.insert(name, paths.into());
}
pub fn merge_overlay(&self, overlay: &Policy) -> Policy {
let mut combined_rules = self.rules_by_program.clone();
for (program, rules) in overlay.rules_by_program.iter_all() {
for rule in rules {
combined_rules.insert(program.clone(), rule.clone());
}
}
let mut combined_network_rules = self.network_rules.clone();
combined_network_rules.extend(overlay.network_rules.iter().cloned());
let mut host_executables_by_name = self.host_executables_by_name.clone();
host_executables_by_name.extend(
overlay
.host_executables_by_name
.iter()
.map(|(name, paths)| (name.clone(), paths.clone())),
);
Policy::from_parts(
combined_rules,
combined_network_rules,
host_executables_by_name,
)
}
pub fn compiled_network_domains(&self) -> (Vec<String>, Vec<String>) {
let mut allowed = Vec::new();
let mut denied = Vec::new();
@@ -144,7 +189,25 @@ impl Policy {
where
F: Fn(&[String]) -> Decision,
{
let matched_rules = self.matches_for_command(cmd, Some(heuristics_fallback));
let matched_rules = self.matches_for_command_with_options(
cmd,
Some(heuristics_fallback),
&MatchOptions::default(),
);
Evaluation::from_matches(matched_rules)
}
pub fn check_with_options<F>(
&self,
cmd: &[String],
heuristics_fallback: &F,
options: &MatchOptions,
) -> Evaluation
where
F: Fn(&[String]) -> Decision,
{
let matched_rules =
self.matches_for_command_with_options(cmd, Some(heuristics_fallback), options);
Evaluation::from_matches(matched_rules)
}
@@ -154,6 +217,20 @@ impl Policy {
commands: Commands,
heuristics_fallback: &F,
) -> Evaluation
where
Commands: IntoIterator,
Commands::Item: AsRef<[String]>,
F: Fn(&[String]) -> Decision,
{
self.check_multiple_with_options(commands, heuristics_fallback, &MatchOptions::default())
}
pub fn check_multiple_with_options<Commands, F>(
&self,
commands: Commands,
heuristics_fallback: &F,
options: &MatchOptions,
) -> Evaluation
where
Commands: IntoIterator,
Commands::Item: AsRef<[String]>,
@@ -162,7 +239,11 @@ impl Policy {
let matched_rules: Vec<RuleMatch> = commands
.into_iter()
.flat_map(|command| {
self.matches_for_command(command.as_ref(), Some(heuristics_fallback))
self.matches_for_command_with_options(
command.as_ref(),
Some(heuristics_fallback),
options,
)
})
.collect();
@@ -181,14 +262,25 @@ impl Policy {
cmd: &[String],
heuristics_fallback: HeuristicsFallback<'_>,
) -> Vec<RuleMatch> {
let matched_rules: Vec<RuleMatch> = match cmd.first() {
Some(first) => self
.rules_by_program
.get_vec(first)
.map(|rules| rules.iter().filter_map(|rule| rule.matches(cmd)).collect())
.unwrap_or_default(),
None => Vec::new(),
};
self.matches_for_command_with_options(cmd, heuristics_fallback, &MatchOptions::default())
}
pub fn matches_for_command_with_options(
&self,
cmd: &[String],
heuristics_fallback: HeuristicsFallback<'_>,
options: &MatchOptions,
) -> Vec<RuleMatch> {
let matched_rules = self
.match_exact_rules(cmd)
.filter(|matched_rules| !matched_rules.is_empty())
.or_else(|| {
options
.resolve_host_executables
.then(|| self.match_host_executable_rules(cmd))
.filter(|matched_rules| !matched_rules.is_empty())
})
.unwrap_or_default();
if matched_rules.is_empty()
&& let Some(heuristics_fallback) = heuristics_fallback
@@ -201,6 +293,45 @@ impl Policy {
matched_rules
}
}
fn match_exact_rules(&self, cmd: &[String]) -> Option<Vec<RuleMatch>> {
let first = cmd.first()?;
Some(
self.rules_by_program
.get_vec(first)
.map(|rules| rules.iter().filter_map(|rule| rule.matches(cmd)).collect())
.unwrap_or_default(),
)
}
fn match_host_executable_rules(&self, cmd: &[String]) -> Vec<RuleMatch> {
let Some(first) = cmd.first() else {
return Vec::new();
};
let Ok(program) = AbsolutePathBuf::try_from(first.clone()) else {
return Vec::new();
};
let Some(basename) = executable_path_lookup_key(program.as_path()) else {
return Vec::new();
};
let Some(rules) = self.rules_by_program.get_vec(&basename) else {
return Vec::new();
};
if let Some(paths) = self.host_executables_by_name.get(&basename)
&& !paths.iter().any(|path| path == &program)
{
return Vec::new();
}
let basename_command = std::iter::once(basename)
.chain(cmd.iter().skip(1).cloned())
.collect::<Vec<_>>();
rules
.iter()
.filter_map(|rule| rule.matches(&basename_command))
.map(|rule_match| rule_match.with_resolved_program(&program))
.collect()
}
}
fn upsert_domain(entries: &mut Vec<String>, host: &str) {

View File

@@ -1,6 +1,9 @@
use crate::decision::Decision;
use crate::error::Error;
use crate::error::Result;
use crate::policy::MatchOptions;
use crate::policy::Policy;
use codex_utils_absolute_path::AbsolutePathBuf;
use serde::Deserialize;
use serde::Serialize;
use shlex::try_join;
@@ -63,6 +66,8 @@ pub enum RuleMatch {
#[serde(rename = "matchedPrefix")]
matched_prefix: Vec<String>,
decision: Decision,
#[serde(rename = "resolvedProgram", skip_serializing_if = "Option::is_none")]
resolved_program: Option<AbsolutePathBuf>,
/// Optional rationale for why this rule exists.
///
/// This can be supplied for any decision and may be surfaced in different contexts
@@ -83,6 +88,23 @@ impl RuleMatch {
Self::HeuristicsRuleMatch { decision, .. } => *decision,
}
}
pub fn with_resolved_program(self, resolved_program: &AbsolutePathBuf) -> Self {
match self {
Self::PrefixRuleMatch {
matched_prefix,
decision,
justification,
..
} => Self::PrefixRuleMatch {
matched_prefix,
decision,
resolved_program: Some(resolved_program.clone()),
justification,
},
other => other,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
@@ -210,6 +232,7 @@ impl Rule for PrefixRule {
.map(|matched_prefix| RuleMatch::PrefixRuleMatch {
matched_prefix,
decision: self.decision,
resolved_program: None,
justification: self.justification.clone(),
})
}
@@ -220,11 +243,21 @@ impl Rule for PrefixRule {
}
/// Count how many rules match each provided example and error if any example is unmatched.
pub(crate) fn validate_match_examples(rules: &[RuleRef], matches: &[Vec<String>]) -> Result<()> {
pub(crate) fn validate_match_examples(
policy: &Policy,
rules: &[RuleRef],
matches: &[Vec<String>],
) -> Result<()> {
let mut unmatched_examples = Vec::new();
let options = MatchOptions {
resolve_host_executables: true,
};
for example in matches {
if rules.iter().any(|rule| rule.matches(example).is_some()) {
if !policy
.matches_for_command_with_options(example, None, &options)
.is_empty()
{
continue;
}
@@ -240,21 +273,31 @@ pub(crate) fn validate_match_examples(rules: &[RuleRef], matches: &[Vec<String>]
Err(Error::ExampleDidNotMatch {
rules: rules.iter().map(|rule| format!("{rule:?}")).collect(),
examples: unmatched_examples,
location: None,
})
}
}
/// Ensure that no rule matches any provided negative example.
pub(crate) fn validate_not_match_examples(
rules: &[RuleRef],
policy: &Policy,
_rules: &[RuleRef],
not_matches: &[Vec<String>],
) -> Result<()> {
let options = MatchOptions {
resolve_host_executables: true,
};
for example in not_matches {
if let Some(rule) = rules.iter().find(|rule| rule.matches(example).is_some()) {
if let Some(rule) = policy
.matches_for_command_with_options(example, None, &options)
.first()
{
return Err(Error::ExampleDidMatch {
rule: format!("{rule:?}"),
example: try_join(example.iter().map(String::as_str))
.unwrap_or_else(|_| "unable to render example".to_string()),
location: None,
});
}
}

View File

@@ -1,5 +1,6 @@
use std::any::Any;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
@@ -7,6 +8,7 @@ use anyhow::Result;
use codex_execpolicy::Decision;
use codex_execpolicy::Error;
use codex_execpolicy::Evaluation;
use codex_execpolicy::MatchOptions;
use codex_execpolicy::NetworkRuleProtocol;
use codex_execpolicy::Policy;
use codex_execpolicy::PolicyParser;
@@ -16,6 +18,7 @@ use codex_execpolicy::blocking_append_allow_prefix_rule;
use codex_execpolicy::rule::PatternToken;
use codex_execpolicy::rule::PrefixPattern;
use codex_execpolicy::rule::PrefixRule;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
@@ -31,6 +34,35 @@ fn prompt_all(_: &[String]) -> Decision {
Decision::Prompt
}
fn absolute_path(path: &str) -> AbsolutePathBuf {
AbsolutePathBuf::try_from(path.to_string())
.unwrap_or_else(|error| panic!("expected absolute path `{path}`: {error}"))
}
fn host_absolute_path(segments: &[&str]) -> String {
let mut path = if cfg!(windows) {
PathBuf::from(r"C:\")
} else {
PathBuf::from("/")
};
for segment in segments {
path.push(segment);
}
path.to_string_lossy().into_owned()
}
fn host_executable_name(name: &str) -> String {
if cfg!(windows) {
format!("{name}.exe")
} else {
name.to_string()
}
}
fn starlark_string(value: &str) -> String {
value.replace('\\', "\\\\").replace('"', "\\\"")
}
#[derive(Clone, Debug, Eq, PartialEq)]
enum RuleSnapshot {
Prefix(PrefixRule),
@@ -125,6 +157,7 @@ prefix_rule(
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git", "status"]),
decision: Decision::Allow,
resolved_program: None,
justification: None,
}],
},
@@ -156,6 +189,7 @@ prefix_rule(
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["rm"]),
decision: Decision::Forbidden,
resolved_program: None,
justification: Some("destructive command".to_string()),
}],
},
@@ -184,6 +218,7 @@ prefix_rule(
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["ls"]),
decision: Decision::Allow,
resolved_program: None,
justification: Some("safe and commonly used".to_string()),
}],
},
@@ -236,6 +271,7 @@ fn add_prefix_rule_extends_policy() -> Result<()> {
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["ls", "-l"]),
decision: Decision::Prompt,
resolved_program: None,
justification: None,
}],
},
@@ -305,6 +341,7 @@ prefix_rule(
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git"]),
decision: Decision::Prompt,
resolved_program: None,
justification: None,
}],
},
@@ -319,11 +356,13 @@ prefix_rule(
RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git"]),
decision: Decision::Prompt,
resolved_program: None,
justification: None,
},
RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git", "commit"]),
decision: Decision::Forbidden,
resolved_program: None,
justification: None,
},
],
@@ -381,6 +420,7 @@ prefix_rule(
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["bash", "-c"]),
decision: Decision::Allow,
resolved_program: None,
justification: None,
}],
},
@@ -394,6 +434,7 @@ prefix_rule(
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["sh", "-l"]),
decision: Decision::Allow,
resolved_program: None,
justification: None,
}],
},
@@ -440,6 +481,7 @@ prefix_rule(
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["npm", "i", "--legacy-peer-deps"]),
decision: Decision::Allow,
resolved_program: None,
justification: None,
}],
},
@@ -456,6 +498,7 @@ prefix_rule(
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["npm", "install", "--no-save"]),
decision: Decision::Allow,
resolved_program: None,
justification: None,
}],
},
@@ -486,6 +529,7 @@ prefix_rule(
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git", "status"]),
decision: Decision::Allow,
resolved_program: None,
justification: None,
}],
},
@@ -533,11 +577,13 @@ prefix_rule(
RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git"]),
decision: Decision::Prompt,
resolved_program: None,
justification: None,
},
RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git", "commit"]),
decision: Decision::Forbidden,
resolved_program: None,
justification: None,
},
],
@@ -576,16 +622,19 @@ prefix_rule(
RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git"]),
decision: Decision::Prompt,
resolved_program: None,
justification: None,
},
RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git"]),
decision: Decision::Prompt,
resolved_program: None,
justification: None,
},
RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git", "commit"]),
decision: Decision::Forbidden,
resolved_program: None,
justification: None,
},
],
@@ -612,3 +661,303 @@ fn heuristics_match_is_returned_when_no_policy_matches() {
evaluation
);
}
#[test]
fn parses_host_executable_paths() -> Result<()> {
let homebrew_git = host_absolute_path(&["opt", "homebrew", "bin", "git"]);
let usr_git = host_absolute_path(&["usr", "bin", "git"]);
let homebrew_git_literal = starlark_string(&homebrew_git);
let usr_git_literal = starlark_string(&usr_git);
let policy_src = format!(
r#"
host_executable(
name = "git",
paths = [
"{homebrew_git_literal}",
"{usr_git_literal}",
"{usr_git_literal}",
],
)
"#
);
let mut parser = PolicyParser::new();
parser.parse("test.rules", &policy_src)?;
let policy = parser.build();
assert_eq!(
policy
.host_executables()
.get("git")
.expect("missing git host executable")
.as_ref(),
[absolute_path(&homebrew_git), absolute_path(&usr_git)]
);
Ok(())
}
#[test]
fn host_executable_rejects_non_absolute_path() {
let policy_src = r#"
host_executable(name = "git", paths = ["git"])
"#;
let mut parser = PolicyParser::new();
let err = parser
.parse("test.rules", policy_src)
.expect_err("expected parse error");
assert!(
err.to_string()
.contains("host_executable paths must be absolute")
);
}
#[test]
fn host_executable_rejects_name_with_path_separator() {
let git_path = host_absolute_path(&["usr", "bin", "git"]);
let git_path_literal = starlark_string(&git_path);
let policy_src =
format!(r#"host_executable(name = "{git_path_literal}", paths = ["{git_path_literal}"])"#);
let mut parser = PolicyParser::new();
let err = parser
.parse("test.rules", &policy_src)
.expect_err("expected parse error");
assert!(
err.to_string()
.contains("host_executable name must be a bare executable name")
);
}
#[test]
fn host_executable_rejects_path_with_wrong_basename() {
let rg_path = host_absolute_path(&["usr", "bin", "rg"]);
let rg_path_literal = starlark_string(&rg_path);
let policy_src = format!(r#"host_executable(name = "git", paths = ["{rg_path_literal}"])"#);
let mut parser = PolicyParser::new();
let err = parser
.parse("test.rules", &policy_src)
.expect_err("expected parse error");
assert!(err.to_string().contains("must have basename `git`"));
}
#[test]
fn host_executable_last_definition_wins() -> Result<()> {
let usr_git = host_absolute_path(&["usr", "bin", "git"]);
let homebrew_git = host_absolute_path(&["opt", "homebrew", "bin", "git"]);
let usr_git_literal = starlark_string(&usr_git);
let homebrew_git_literal = starlark_string(&homebrew_git);
let mut parser = PolicyParser::new();
parser.parse(
"shared.rules",
&format!(r#"host_executable(name = "git", paths = ["{usr_git_literal}"])"#),
)?;
parser.parse(
"user.rules",
&format!(r#"host_executable(name = "git", paths = ["{homebrew_git_literal}"])"#),
)?;
let policy = parser.build();
assert_eq!(
policy
.host_executables()
.get("git")
.expect("missing git host executable")
.as_ref(),
[absolute_path(&homebrew_git)]
);
Ok(())
}
#[test]
fn host_executable_resolution_uses_basename_rule_when_allowed() -> Result<()> {
let git_name = host_executable_name("git");
let git_path = host_absolute_path(&["usr", "bin", &git_name]);
let git_path_literal = starlark_string(&git_path);
let policy_src = format!(
r#"
prefix_rule(pattern = ["git", "status"], decision = "prompt")
host_executable(name = "git", paths = ["{git_path_literal}"])
"#
);
let mut parser = PolicyParser::new();
parser.parse("test.rules", &policy_src)?;
let policy = parser.build();
let evaluation = policy.check_with_options(
&[git_path.clone(), "status".to_string()],
&allow_all,
&MatchOptions {
resolve_host_executables: true,
},
);
assert_eq!(
evaluation,
Evaluation {
decision: Decision::Prompt,
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git", "status"]),
decision: Decision::Prompt,
resolved_program: Some(absolute_path(&git_path)),
justification: None,
}],
}
);
Ok(())
}
#[test]
fn prefix_rule_examples_honor_host_executable_resolution() -> Result<()> {
let allowed_git_name = host_executable_name("git");
let allowed_git = host_absolute_path(&["usr", "bin", &allowed_git_name]);
let other_git = host_absolute_path(&["opt", "homebrew", "bin", &allowed_git_name]);
let allowed_git_literal = starlark_string(&allowed_git);
let other_git_literal = starlark_string(&other_git);
let policy_src = format!(
r#"
prefix_rule(
pattern = ["git", "status"],
match = [["{allowed_git_literal}", "status"]],
not_match = [["{other_git_literal}", "status"]],
)
host_executable(name = "git", paths = ["{allowed_git_literal}"])
"#
);
let mut parser = PolicyParser::new();
parser.parse("test.rules", &policy_src)?;
Ok(())
}
#[test]
fn host_executable_resolution_respects_explicit_empty_allowlist() -> Result<()> {
let policy_src = r#"
prefix_rule(pattern = ["git"], decision = "prompt")
host_executable(name = "git", paths = [])
"#;
let mut parser = PolicyParser::new();
parser.parse("test.rules", policy_src)?;
let policy = parser.build();
let git_path = host_absolute_path(&["usr", "bin", "git"]);
let evaluation = policy.check_with_options(
&[git_path.clone(), "status".to_string()],
&allow_all,
&MatchOptions {
resolve_host_executables: true,
},
);
assert_eq!(
evaluation,
Evaluation {
decision: Decision::Allow,
matched_rules: vec![RuleMatch::HeuristicsRuleMatch {
command: vec![git_path, "status".to_string()],
decision: Decision::Allow,
}],
}
);
Ok(())
}
#[test]
fn host_executable_resolution_ignores_path_not_in_allowlist() -> Result<()> {
let allowed_git = host_absolute_path(&["usr", "bin", "git"]);
let other_git = host_absolute_path(&["opt", "homebrew", "bin", "git"]);
let allowed_git_literal = starlark_string(&allowed_git);
let policy_src = format!(
r#"
prefix_rule(pattern = ["git"], decision = "prompt")
host_executable(name = "git", paths = ["{allowed_git_literal}"])
"#
);
let mut parser = PolicyParser::new();
parser.parse("test.rules", &policy_src)?;
let policy = parser.build();
let evaluation = policy.check_with_options(
&[other_git.clone(), "status".to_string()],
&allow_all,
&MatchOptions {
resolve_host_executables: true,
},
);
assert_eq!(
evaluation,
Evaluation {
decision: Decision::Allow,
matched_rules: vec![RuleMatch::HeuristicsRuleMatch {
command: vec![other_git, "status".to_string()],
decision: Decision::Allow,
}],
}
);
Ok(())
}
#[test]
fn host_executable_resolution_falls_back_without_mapping() -> Result<()> {
let policy_src = r#"
prefix_rule(pattern = ["git"], decision = "prompt")
"#;
let mut parser = PolicyParser::new();
parser.parse("test.rules", policy_src)?;
let policy = parser.build();
let git_path = host_absolute_path(&["usr", "bin", "git"]);
let evaluation = policy.check_with_options(
&[git_path.clone(), "status".to_string()],
&allow_all,
&MatchOptions {
resolve_host_executables: true,
},
);
assert_eq!(
evaluation,
Evaluation {
decision: Decision::Prompt,
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: tokens(&["git"]),
decision: Decision::Prompt,
resolved_program: Some(absolute_path(&git_path)),
justification: None,
}],
}
);
Ok(())
}
#[test]
fn host_executable_resolution_does_not_override_exact_match() -> Result<()> {
let git_path = host_absolute_path(&["usr", "bin", "git"]);
let git_path_literal = starlark_string(&git_path);
let policy_src = format!(
r#"
prefix_rule(pattern = ["{git_path_literal}"], decision = "allow")
prefix_rule(pattern = ["git"], decision = "prompt")
host_executable(name = "git", paths = ["{git_path_literal}"])
"#
);
let mut parser = PolicyParser::new();
parser.parse("test.rules", &policy_src)?;
let policy = parser.build();
let evaluation = policy.check_with_options(
&[git_path.clone(), "status".to_string()],
&allow_all,
&MatchOptions {
resolve_host_executables: true,
},
);
assert_eq!(
evaluation,
Evaluation {
decision: Decision::Allow,
matched_rules: vec![RuleMatch::PrefixRuleMatch {
matched_prefix: vec![git_path],
decision: Decision::Allow,
resolved_program: None,
justification: None,
}],
}
);
Ok(())
}