Compare commits

...

31 Commits

Author SHA1 Message Date
jif-oai
63a3f3941a Merge branch 'main' into jif/fork 2025-11-20 12:36:40 +00:00
hanson-openai
b5dd189067 Allow unified_exec to early exit (if the process terminates before yield_time_ms) (#6867)
Thread through an `exit_notify` tokio `Notify` through to the
`UnifiedExecSession` so that we can return early if the command
terminates before `yield_time_ms`.

As Codex review correctly pointed out below 🙌 we also need a
`exit_signaled` flag so that commands which finish before we start
waiting can also exit early.

Since the default `yield_time_ms` is now 10s, this means that we don't
have to wait 10s for trivial commands like ls, sed, etc (which are the
majority of agent commands 😅)

---------

Co-authored-by: jif-oai <jif@openai.com>
2025-11-20 13:34:41 +01:00
Michael Bolin
54e6e4ac32 fix: when displaying execv, show file instead of arg0 (#6966)
After merging https://github.com/openai/codex/pull/6958, I realized that
the `command` I was displaying was not quite right. Since we know it, we
should show the _exact_ program being executed (the first arg to
`execve(3)`) rather than `arg0` to be more precise.

Below is the same command I used to test
https://github.com/openai/codex/pull/6958, but now you can see it shows
`/Users/mbolin/.openai/bin/git` instead of just `git`.

<img width="1526" height="1444" alt="image"
src="https://github.com/user-attachments/assets/428128d1-c658-456e-a64e-fc6a0009cb34"
/>
2025-11-19 22:42:58 -08:00
Michael Bolin
e8af41de8a fix: clean up elicitation used by exec-server (#6958)
Using appropriate message/title fields, I think this looks better now:

<img width="3370" height="3208" alt="image"
src="https://github.com/user-attachments/assets/e9bbf906-4ba8-4563-affc-62cdc6c97342"
/>

Though note that in the current version of the Inspector (`0.17.2`), you
cannot hit **Submit** until you fill out the field. I believe this is a
bug in the Inspector, as it does not properly handle the case when all
fields are optional. I put up a fix:

https://github.com/modelcontextprotocol/inspector/pull/926
2025-11-20 04:59:17 +00:00
Owen Lin
d6c30ed25e [app-server] feat: v2 apply_patch approval flow (#6760)
This PR adds the API V2 version of the apply_patch approval flow, which
centers around `ThreadItem::FileChange`.

This PR wires the new RPC (`item/fileChange/requestApproval`, V2 only)
and related events (`item/started`, `item/completed` for
`ThreadItem::FileChange`, which are emitted in both V1 and V2) through
the app-server
protocol. The new approval RPC is only sent when the user initiates a
turn with the new `turn/start` API so we don't break backwards
compatibility with VSCE.

Similar to https://github.com/openai/codex/pull/6758, the approach I
took was to make as few changes to the Codex core as possible,
leveraging existing `EventMsg` core events, and translating those in
app-server. I did have to add a few additional fields to
`EventMsg::PatchApplyBegin` and `EventMsg::PatchApplyEnd`, but those
were fairly lightweight.

However, the `EventMsg`s emitted by core are the following:
```
1) Auto-approved (no request for approval)

- EventMsg::PatchApplyBegin
- EventMsg::PatchApplyEnd

2) Approved by user
- EventMsg::ApplyPatchApprovalRequest
- EventMsg::PatchApplyBegin
- EventMsg::PatchApplyEnd

3) Declined by user
- EventMsg::ApplyPatchApprovalRequest
- EventMsg::PatchApplyBegin
- EventMsg::PatchApplyEnd
```

For a request triggering an approval, this would result in:
```
item/fileChange/requestApproval
item/started
item/completed
```

which is different from the `ThreadItem::CommandExecution` flow
introduced in https://github.com/openai/codex/pull/6758, which does the
below and is preferable:
```
item/started
item/commandExecution/requestApproval
item/completed
```

To fix this, we leverage `TurnSummaryStore` on codex_message_processor
to store a little bit of state, allowing us to fire `item/started` and
`item/fileChange/requestApproval` whenever we receive the underlying
`EventMsg::ApplyPatchApprovalRequest`, and no-oping when we receive the
`EventMsg::PatchApplyBegin` later.

This is much less invasive than modifying the order of EventMsg within
core (I tried).

The resulting payloads:
```
{
  "method": "item/started",
  "params": {
    "item": {
      "changes": [
        {
          "diff": "Hello from Codex!\n",
          "kind": "add",
          "path": "/Users/owen/repos/codex/codex-rs/APPROVAL_DEMO.txt"
        }
      ],
      "id": "call_Nxnwj7B3YXigfV6Mwh03d686",
      "status": "inProgress",
      "type": "fileChange"
    }
  }
}
```

```
{
  "id": 0,
  "method": "item/fileChange/requestApproval",
  "params": {
    "grantRoot": null,
    "itemId": "call_Nxnwj7B3YXigfV6Mwh03d686",
    "reason": null,
    "threadId": "019a9e11-8295-7883-a283-779e06502c6f",
    "turnId": "1"
  }
}
```

```
{
  "id": 0,
  "result": {
    "decision": "accept"
  }
}
```

```
{
  "method": "item/completed",
  "params": {
    "item": {
      "changes": [
        {
          "diff": "Hello from Codex!\n",
          "kind": "add",
          "path": "/Users/owen/repos/codex/codex-rs/APPROVAL_DEMO.txt"
        }
      ],
      "id": "call_Nxnwj7B3YXigfV6Mwh03d686",
      "status": "completed",
      "type": "fileChange"
    }
  }
}
```
2025-11-19 20:13:31 -08:00
zhao-oai
fb9849e1e3 migrating execpolicy -> execpolicy-legacy and execpolicy2 -> execpolicy (#6956) 2025-11-19 19:14:10 -08:00
Celia Chen
72a1453ac5 Revert "[core] add optional status_code to error events (#6865)" (#6955)
This reverts commit c2ec477d93.

# External (non-OpenAI) Pull Request Requirements

Before opening this Pull Request, please read the dedicated
"Contributing" markdown file or your PR may be closed:
https://github.com/openai/codex/blob/main/docs/contributing.md

If your PR conforms to our contribution guidelines, replace this text
with a detailed and high quality description of your changes.

Include a link to a bug report or enhancement request.
2025-11-20 01:26:14 +00:00
Ahmed Ibrahim
6d67b8b283 stop model migration screen after first time. (#6954)
it got serialized wrong.
2025-11-19 17:17:04 -08:00
zhao-oai
74a75679d9 update execpolicy quickstart readme (#6952) 2025-11-19 16:57:27 -08:00
pakrym-oai
92e3046733 Single pass truncation (#6914) 2025-11-19 16:56:37 -08:00
zhao-oai
65c13f1ae7 execpolicy2 core integration (#6641)
This PR threads execpolicy2 into codex-core.

activated via feature flag: exec_policy (on by default)

reads and parses all .codexpolicy files in `codex_home/codex`

refactored tool runtime API to integrate execpolicy logic

---------

Co-authored-by: Michael Bolin <mbolin@openai.com>
2025-11-19 16:50:43 -08:00
Dylan Hurd
b00a7cf40d fix(shell) fallback shells (#6948)
## Summary
Add fallbacks when user_shell_path does not resolve to a known shell
type

## Testing
- [x] Tests still pass
2025-11-19 16:41:38 -08:00
Michael Bolin
13d378f2ce chore: refactor exec-server to prepare it for standalone MCP use (#6944)
This PR reorganizes things slightly so that:

- Instead of a single multitool executable, `codex-exec-server`, we now
have two executables:
  - `codex-exec-mcp-server` to launch the MCP server
- `codex-execve-wrapper` is the `execve(2)` wrapper to use with the
`BASH_EXEC_WRAPPER` environment variable
- `BASH_EXEC_WRAPPER` must be a single executable: it cannot be a
command string composed of an executable with args (i.e., it no longer
adds the `escalate` subcommand, as before)
- `codex-exec-mcp-server` takes `--bash` and `--execve` as options.
Though if `--execve` is not specified, the MCP server will check the
directory containing `std::env::current_exe()` and attempt to use the
file named `codex-execve-wrapper` within it. In development, this works
out since these executables are side-by-side in the `target/debug`
folder.

With respect to testing, this also fixes an important bug in
`dummy_exec_policy()`, as I was using `ends_with()` as if it applied to
a `String`, but in this case, it is used with a `&Path`, so the
semantics are slightly different.

Putting this all together, I was able to test this by running the
following:

```
~/code/codex/codex-rs$ npx @modelcontextprotocol/inspector \
    ./target/debug/codex-exec-mcp-server --bash ~/code/bash/bash
```

If I try to run `git status` in `/Users/mbolin/code/codex` via the
`shell` tool from the MCP server:

<img width="1589" height="1335" alt="image"
src="https://github.com/user-attachments/assets/9db6aea8-7fbc-4675-8b1f-ec446685d6c4"
/>

then I get prompted with the following elicitation, as expected:

<img width="1589" height="1335" alt="image"
src="https://github.com/user-attachments/assets/21b68fe0-494d-4562-9bad-0ddc55fc846d"
/>

Though a current limitation is that the `shell` tool defaults to a
timeout of 10s, which means I only have 10s to respond to the
elicitation. Ideally, the time spent waiting for a response from a human
should not count against the timeout for the command execution. I will
address this in a subsequent PR.

---

Note `~/code/bash/bash` was created by doing:

```
cd ~/code
git clone https://github.com/bminor/bash
cd bash
git checkout a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
<apply the patch below>
./configure
make
```

The patch:

```
diff --git a/execute_cmd.c b/execute_cmd.c
index 070f5119..d20ad2b9 100644
--- a/execute_cmd.c
+++ b/execute_cmd.c
@@ -6129,6 +6129,19 @@ shell_execve (char *command, char **args, char **env)
   char sample[HASH_BANG_BUFSIZ];
   size_t larray;

+  char* exec_wrapper = getenv("BASH_EXEC_WRAPPER");
+  if (exec_wrapper && *exec_wrapper && !whitespace (*exec_wrapper))
+    {
+      char *orig_command = command;
+
+      larray = strvec_len (args);
+
+      memmove (args + 2, args, (++larray) * sizeof (char *));
+      args[0] = exec_wrapper;
+      args[1] = orig_command;
+      command = exec_wrapper;
+    }
+
```
2025-11-19 16:38:14 -08:00
Lionel Cheng
a6597a9958 Fix/correct reasoning display (#6749)
This closes #6748 by implementing fallback to
`model_family.default_reasoning_effort` in `reasoning_effort` display of
`/status` when no `model_reasoning_effort` is set in the configuration.

## common/src/config_summary.rs

- `create_config_summary_entries` now fills the "reasoning effort" entry
with the explicit `config.model_reasoning_effort` when present and falls
back to `config.model_family.default_reasoning_effort` when it is
`None`, instead of emitting the literal string `none`.
- This ensures downstream consumers such as `tui/src/status/helpers.rs`
continue to work unchanged while automatically picking up model-family
defaults when the user has not selected a reasoning effort.

## tui/src/status/helpers.rs / core/src/model_family.rs

`ModelFamily::default_reasoning_effort` metadata is set to `medium` for
both `gpt-5*-codex` and `gpt-5` models following the default behaviour
of the API and recommendation of the codebase:
- per https://platform.openai.com/docs/api-reference/responses/create
`gpt-5` defaults to `medium` reasoning when no preset is passed
- there is no mention of the preset for `gpt-5.1-codex` in the API docs
but `medium` is the default setting for `gpt-5.1-codex` as per
`codex-rs/tui/src/chatwidget/snapshots/codex_tui__chatwidget__tests__model_reasoning_selection_popup.snap`

---------

Signed-off-by: lionelchg <lionel.cheng@hotmail.fr>
Co-authored-by: Eric Traut <etraut@openai.com>
2025-11-19 15:52:24 -08:00
Beehive Innovations
692989c277 fix(context left after review): review footer context after /review (#5610)
## Summary
- show live review token usage while `/review` runs and restore the main
session indicator afterward
  - add regression coverage for the footer behavior

## Testing
  - just fmt
  - cargo test -p codex-tui

Fixes #5604

---------

Signed-off-by: Fahad <fahad@2doapp.com>
2025-11-19 22:50:07 +00:00
iceweasel-oai
2fde03b4a0 stop over-reporting world-writable directories (#6936)
Fix world-writable audit false positives by expanding generic
permissions with MapGenericMask and then checking only concrete write
bits. The earlier check looked for FILE_GENERIC_WRITE/generic masks
directly, which shares bits with read permissions and could flag an
Everyone read ACE as writable.
2025-11-19 13:59:17 -08:00
Michael Bolin
056c8f8279 fix: prepare ExecPolicy in exec-server for execpolicy2 cutover (#6888)
This PR introduces an extra layer of abstraction to prepare us for the
migration to execpolicy2:

- introduces a new trait, `EscalationPolicy`, whose `determine_action()`
method is responsible for producing the `EscalateAction`
- the existing `ExecPolicy` typedef is changed to return an intermediate
`ExecPolicyOutcome` instead of `EscalateAction`
- the default implementation of `EscalationPolicy`,
`McpEscalationPolicy`, composes `ExecPolicy`
- the `ExecPolicyOutcome` includes `codex_execpolicy2::Decision`, which
has a `Prompt` variant
- when `McpEscalationPolicy` gets `Decision::Prompt` back from
`ExecPolicy`, it prompts the user via an MCP elicitation and maps the
result into an `ElicitationAction`
- now that the end user can reply to an elicitation with `Decline` or
`Cancel`, we introduce a new variant, `EscalateAction::Deny`, which the
client handles by returning exit code `1` without running anything

Note the way the elicitation is created is still not quite right, but I
will fix that once we have things running end-to-end for real in a
follow-up PR.
2025-11-19 13:55:29 -08:00
jif-oai
7ceabac707 nit fix 2025-11-19 20:40:35 +00:00
jif-oai
2a24ae36c2 Merge remote-tracking branch 'origin/main' into jif/fork
# Conflicts:
#	codex-rs/core/src/codex.rs
2025-11-19 20:21:19 +00:00
jif-oai
5071cc8fff Add a few tests 2025-11-19 15:30:04 +00:00
jif-oai
9c765f1217 Compilation nits 2025-11-19 15:21:58 +00:00
jif-oai
7116d2a6a4 More tests 2025-11-19 15:16:12 +00:00
jif-oai
94f1a61df5 Clippy 2025-11-19 15:06:51 +00:00
jif-oai
7ec1311aff Decoupling of TUI 2025-11-19 14:53:17 +00:00
jif-oai
b9f260057c Optional command 2025-11-19 14:21:04 +00:00
jif-oai
72af9e3092 Clippy 2025-11-19 12:39:28 +00:00
jif-oai
8f0d83eb11 Clean errors 2025-11-19 12:31:27 +00:00
jif-oai
26d667e152 Add tests 2025-11-19 12:13:36 +00:00
jif-oai
d1cf2b967c One fix 2025-11-19 11:50:42 +00:00
jif-oai
78afe914e1 Merge remote-tracking branch 'origin/main' into jif/fork
# Conflicts:
#	codex-rs/cli/src/main.rs
#	codex-rs/tui/src/cli.rs
2025-11-19 11:41:17 +00:00
jif-oai
1a8c1a4d9a V1 2025-11-18 16:50:45 +00:00
120 changed files with 4502 additions and 1478 deletions

View File

@@ -69,6 +69,37 @@ Codex can access MCP servers. To configure them, refer to the [config docs](./do
Codex CLI supports a rich set of configuration options, with preferences stored in `~/.codex/config.toml`. For full configuration options, see [Configuration](./docs/config.md).
### Execpolicy Quickstart
Codex can enforce your own rules-based execution policy before it runs shell commands.
1. Create a policy directory: `mkdir -p ~/.codex/policy`.
2. Create one or more `.codexpolicy` files in that folder. Codex automatically loads every `.codexpolicy` file in there on startup.
3. Write `prefix_rule` entries to describe the commands you want to allow, prompt, or block:
```starlark
prefix_rule(
pattern = ["git", ["push", "fetch"]],
decision = "prompt", # allow | prompt | forbidden
match = [["git", "push", "origin", "main"]], # examples that must match
not_match = [["git", "status"]], # examples that must not match
)
```
- `pattern` is a list of shell tokens, evaluated from left to right; wrap tokens in a nested list to express alternatives (e.g., match both `push` and `fetch`).
- `decision` sets the severity; Codex picks the strictest decision when multiple rules match (forbidden > prompt > allow).
- `match` and `not_match` act as (optional) unit tests. Codex validates them when it loads your policy, so you get feedback if an example has unexpected behavior.
In this example rule, if Codex wants to run commands with the prefix `git push` or `git fetch`, it will first ask for user approval.
Use [`execpolicy2` CLI](./codex-rs/execpolicy2/README.md) to preview decisions for policy files:
```shell
cargo run -p codex-execpolicy2 -- check --policy ~/.codex/policy/default.codexpolicy git push origin main
```
Pass multiple `--policy` flags to test how several files combine. See the [`codex-rs/execpolicy2` README](./codex-rs/execpolicy2/README.md) for a more detailed walkthrough of the available syntax.
---
### Docs & FAQ

33
codex-rs/Cargo.lock generated
View File

@@ -1086,6 +1086,7 @@ dependencies = [
"codex-apply-patch",
"codex-arg0",
"codex-async-utils",
"codex-execpolicy",
"codex-file-search",
"codex-git",
"codex-keyring-store",
@@ -1188,6 +1189,7 @@ name = "codex-exec-server"
version = "0.0.0"
dependencies = [
"anyhow",
"async-trait",
"clap",
"codex-core",
"libc",
@@ -1196,6 +1198,7 @@ dependencies = [
"rmcp",
"serde",
"serde_json",
"shlex",
"socket2 0.6.0",
"tempfile",
"tokio",
@@ -1206,6 +1209,21 @@ dependencies = [
[[package]]
name = "codex-execpolicy"
version = "0.0.0"
dependencies = [
"anyhow",
"clap",
"multimap",
"pretty_assertions",
"serde",
"serde_json",
"shlex",
"starlark",
"thiserror 2.0.17",
]
[[package]]
name = "codex-execpolicy-legacy"
version = "0.0.0"
dependencies = [
"allocative",
"anyhow",
@@ -1223,21 +1241,6 @@ dependencies = [
"tempfile",
]
[[package]]
name = "codex-execpolicy2"
version = "0.0.0"
dependencies = [
"anyhow",
"clap",
"multimap",
"pretty_assertions",
"serde",
"serde_json",
"shlex",
"starlark",
"thiserror 2.0.17",
]
[[package]]
name = "codex-feedback"
version = "0.0.0"

View File

@@ -18,7 +18,7 @@ members = [
"exec",
"exec-server",
"execpolicy",
"execpolicy2",
"execpolicy-legacy",
"keyring-store",
"file-search",
"linux-sandbox",
@@ -67,6 +67,7 @@ codex-chatgpt = { path = "chatgpt" }
codex-common = { path = "common" }
codex-core = { path = "core" }
codex-exec = { path = "exec" }
codex-execpolicy = { path = "execpolicy" }
codex-feedback = { path = "feedback" }
codex-file-search = { path = "file-search" }
codex-git = { path = "utils/git" }

View File

@@ -438,6 +438,13 @@ server_request_definitions! {
response: v2::CommandExecutionRequestApprovalResponse,
},
/// Sent when approval is requested for a specific file change.
/// This request is used for Turns started via turn/start.
FileChangeRequestApproval => "item/fileChange/requestApproval" {
params: v2::FileChangeRequestApprovalParams,
response: v2::FileChangeRequestApprovalResponse,
},
/// DEPRECATED APIs below
/// Request to approve a patch.
/// This request is used for Turns started via the legacy APIs (i.e. SendUserTurn, SendUserMessage).

View File

@@ -794,20 +794,23 @@ pub struct FileUpdateChange {
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "type", rename_all = "camelCase")]
#[ts(tag = "type")]
#[ts(export_to = "v2/")]
pub enum PatchChangeKind {
Add,
Delete,
Update,
Update { move_path: Option<PathBuf> },
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub enum PatchApplyStatus {
InProgress,
Completed,
Failed,
Declined,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@@ -982,6 +985,26 @@ pub struct CommandExecutionRequestApprovalResponse {
pub accept_settings: Option<CommandExecutionRequestAcceptSettings>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct FileChangeRequestApprovalParams {
pub thread_id: String,
pub turn_id: String,
pub item_id: String,
/// Optional explanatory reason (e.g. request for extra write access).
pub reason: Option<String>,
/// [UNSTABLE] When set, the agent is asking the user to allow writes under this root
/// for the remainder of the session (unclear if this is honored today).
pub grant_root: Option<PathBuf>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[ts(export_to = "v2/")]
pub struct FileChangeRequestApprovalResponse {
pub decision: ApprovalDecision,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -24,6 +24,8 @@ use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::CommandExecutionRequestAcceptSettings;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::GetAccountRateLimitsResponse;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::InitializeResponse;
@@ -677,6 +679,9 @@ impl CodexClient {
ServerRequest::CommandExecutionRequestApproval { request_id, params } => {
self.handle_command_execution_request_approval(request_id, params)?;
}
ServerRequest::FileChangeRequestApproval { request_id, params } => {
self.approve_file_change_request(request_id, params)?;
}
other => {
bail!("received unsupported server request: {other:?}");
}
@@ -717,6 +722,37 @@ impl CodexClient {
Ok(())
}
fn approve_file_change_request(
&mut self,
request_id: RequestId,
params: FileChangeRequestApprovalParams,
) -> Result<()> {
let FileChangeRequestApprovalParams {
thread_id,
turn_id,
item_id,
reason,
grant_root,
} = params;
println!(
"\n< fileChange approval requested for thread {thread_id}, turn {turn_id}, item {item_id}"
);
if let Some(reason) = reason.as_deref() {
println!("< reason: {reason}");
}
if let Some(grant_root) = grant_root.as_deref() {
println!("< grant root: {}", grant_root.display());
}
let response = FileChangeRequestApprovalResponse {
decision: ApprovalDecision::Accept,
};
self.send_server_request_response(request_id, &response)?;
println!("< approved fileChange request for item {item_id}");
Ok(())
}
fn send_server_request_response<T>(&mut self, request_id: RequestId, response: &T) -> Result<()>
where
T: Serialize,

View File

@@ -15,12 +15,17 @@ use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::ExecCommandApprovalParams;
use codex_app_server_protocol::ExecCommandApprovalResponse;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::FileUpdateChange;
use codex_app_server_protocol::InterruptConversationResponse;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::McpToolCallError;
use codex_app_server_protocol::McpToolCallResult;
use codex_app_server_protocol::McpToolCallStatus;
use codex_app_server_protocol::PatchApplyStatus;
use codex_app_server_protocol::PatchChangeKind as V2PatchChangeKind;
use codex_app_server_protocol::ReasoningSummaryPartAddedNotification;
use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
use codex_app_server_protocol::ReasoningTextDeltaNotification;
@@ -40,6 +45,7 @@ use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::ExecCommandEndEvent;
use codex_core::protocol::FileChange as CoreFileChange;
use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::Op;
@@ -47,7 +53,9 @@ use codex_core::protocol::ReviewDecision;
use codex_core::review_format::format_review_findings_block;
use codex_protocol::ConversationId;
use codex_protocol::protocol::ReviewOutputEvent;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::oneshot;
use tracing::error;
@@ -70,24 +78,74 @@ pub(crate) async fn apply_bespoke_event_handling(
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
turn_id,
changes,
reason,
grant_root,
}) => {
let params = ApplyPatchApprovalParams {
conversation_id,
call_id,
file_changes: changes,
reason,
grant_root,
};
let rx = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
.await;
tokio::spawn(async move {
on_patch_approval_response(event_id, rx, conversation).await;
});
}
}) => match api_version {
ApiVersion::V1 => {
let params = ApplyPatchApprovalParams {
conversation_id,
call_id,
file_changes: changes.clone(),
reason,
grant_root,
};
let rx = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
.await;
tokio::spawn(async move {
on_patch_approval_response(event_id, rx, conversation).await;
});
}
ApiVersion::V2 => {
// Until we migrate the core to be aware of a first class FileChangeItem
// and emit the corresponding EventMsg, we repurpose the call_id as the item_id.
let item_id = call_id.clone();
let patch_changes = convert_patch_changes(&changes);
let first_start = {
let mut map = turn_summary_store.lock().await;
let summary = map.entry(conversation_id).or_default();
summary.file_change_started.insert(item_id.clone())
};
if first_start {
let item = ThreadItem::FileChange {
id: item_id.clone(),
changes: patch_changes.clone(),
status: PatchApplyStatus::InProgress,
};
let notification = ItemStartedNotification { item };
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
let params = FileChangeRequestApprovalParams {
thread_id: conversation_id.to_string(),
turn_id: turn_id.clone(),
item_id: item_id.clone(),
reason,
grant_root,
};
let rx = outgoing
.send_request(ServerRequestPayload::FileChangeRequestApproval(params))
.await;
tokio::spawn(async move {
on_file_change_request_approval_response(
event_id,
conversation_id,
item_id,
patch_changes,
rx,
conversation,
outgoing,
turn_summary_store,
)
.await;
});
}
},
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
call_id,
turn_id,
@@ -244,6 +302,49 @@ pub(crate) async fn apply_bespoke_event_handling(
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
}
EventMsg::PatchApplyBegin(patch_begin_event) => {
// Until we migrate the core to be aware of a first class FileChangeItem
// and emit the corresponding EventMsg, we repurpose the call_id as the item_id.
let item_id = patch_begin_event.call_id.clone();
let first_start = {
let mut map = turn_summary_store.lock().await;
let summary = map.entry(conversation_id).or_default();
summary.file_change_started.insert(item_id.clone())
};
if first_start {
let item = ThreadItem::FileChange {
id: item_id.clone(),
changes: convert_patch_changes(&patch_begin_event.changes),
status: PatchApplyStatus::InProgress,
};
let notification = ItemStartedNotification { item };
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
}
EventMsg::PatchApplyEnd(patch_end_event) => {
// Until we migrate the core to be aware of a first class FileChangeItem
// and emit the corresponding EventMsg, we repurpose the call_id as the item_id.
let item_id = patch_end_event.call_id.clone();
let status = if patch_end_event.success {
PatchApplyStatus::Completed
} else {
PatchApplyStatus::Failed
};
let changes = convert_patch_changes(&patch_end_event.changes);
complete_file_change_item(
conversation_id,
item_id,
changes,
status,
outgoing.as_ref(),
&turn_summary_store,
)
.await;
}
EventMsg::ExecCommandBegin(exec_command_begin_event) => {
let item = ThreadItem::CommandExecution {
id: exec_command_begin_event.call_id.clone(),
@@ -365,6 +466,32 @@ async fn emit_turn_completed_with_status(
.await;
}
async fn complete_file_change_item(
conversation_id: ConversationId,
item_id: String,
changes: Vec<FileUpdateChange>,
status: PatchApplyStatus,
outgoing: &OutgoingMessageSender,
turn_summary_store: &TurnSummaryStore,
) {
{
let mut map = turn_summary_store.lock().await;
if let Some(summary) = map.get_mut(&conversation_id) {
summary.file_change_started.remove(&item_id);
}
}
let item = ThreadItem::FileChange {
id: item_id,
changes,
status,
};
let notification = ItemCompletedNotification { item };
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
}
async fn find_and_remove_turn_summary(
conversation_id: ConversationId,
turn_summary_store: &TurnSummaryStore,
@@ -512,6 +639,110 @@ fn render_review_output_text(output: &ReviewOutputEvent) -> String {
}
}
fn convert_patch_changes(changes: &HashMap<PathBuf, CoreFileChange>) -> Vec<FileUpdateChange> {
let mut converted: Vec<FileUpdateChange> = changes
.iter()
.map(|(path, change)| FileUpdateChange {
path: path.to_string_lossy().into_owned(),
kind: map_patch_change_kind(change),
diff: format_file_change_diff(change),
})
.collect();
converted.sort_by(|a, b| a.path.cmp(&b.path));
converted
}
fn map_patch_change_kind(change: &CoreFileChange) -> V2PatchChangeKind {
match change {
CoreFileChange::Add { .. } => V2PatchChangeKind::Add,
CoreFileChange::Delete { .. } => V2PatchChangeKind::Delete,
CoreFileChange::Update { move_path, .. } => V2PatchChangeKind::Update {
move_path: move_path.clone(),
},
}
}
fn format_file_change_diff(change: &CoreFileChange) -> String {
match change {
CoreFileChange::Add { content } => content.clone(),
CoreFileChange::Delete { content } => content.clone(),
CoreFileChange::Update {
unified_diff,
move_path,
} => {
if let Some(path) = move_path {
format!("{unified_diff}\n\nMoved to: {}", path.display())
} else {
unified_diff.clone()
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn on_file_change_request_approval_response(
event_id: String,
conversation_id: ConversationId,
item_id: String,
changes: Vec<FileUpdateChange>,
receiver: oneshot::Receiver<JsonValue>,
codex: Arc<CodexConversation>,
outgoing: Arc<OutgoingMessageSender>,
turn_summary_store: TurnSummaryStore,
) {
let response = receiver.await;
let (decision, completion_status) = match response {
Ok(value) => {
let response = serde_json::from_value::<FileChangeRequestApprovalResponse>(value)
.unwrap_or_else(|err| {
error!("failed to deserialize FileChangeRequestApprovalResponse: {err}");
FileChangeRequestApprovalResponse {
decision: ApprovalDecision::Decline,
}
});
let (decision, completion_status) = match response.decision {
ApprovalDecision::Accept => (ReviewDecision::Approved, None),
ApprovalDecision::Decline => {
(ReviewDecision::Denied, Some(PatchApplyStatus::Declined))
}
ApprovalDecision::Cancel => {
(ReviewDecision::Abort, Some(PatchApplyStatus::Declined))
}
};
// Allow EventMsg::PatchApplyEnd to emit ItemCompleted for accepted patches.
// Only short-circuit on declines/cancels/failures.
(decision, completion_status)
}
Err(err) => {
error!("request failed: {err:?}");
(ReviewDecision::Denied, Some(PatchApplyStatus::Failed))
}
};
if let Some(status) = completion_status {
complete_file_change_item(
conversation_id,
item_id,
changes,
status,
outgoing.as_ref(),
&turn_summary_store,
)
.await;
}
if let Err(err) = codex
.submit(Op::PatchApproval {
id: event_id,
decision,
})
.await
{
error!("failed to submit PatchApproval: {err}");
}
}
async fn on_command_execution_request_approval_response(
event_id: String,
receiver: oneshot::Receiver<JsonValue>,

View File

@@ -139,6 +139,7 @@ use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::user_input::UserInput as CoreInputItem;
use codex_utils_json_to_toml::json_to_toml;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ffi::OsStr;
use std::io::Error as IoError;
use std::path::Path;
@@ -147,7 +148,6 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tracing::error;
@@ -162,6 +162,7 @@ pub(crate) type PendingInterrupts = Arc<Mutex<HashMap<ConversationId, PendingInt
#[derive(Default, Clone)]
pub(crate) struct TurnSummary {
pub(crate) last_error_message: Option<String>,
pub(crate) file_change_started: HashSet<String>,
}
pub(crate) type TurnSummaryStore = Arc<Mutex<HashMap<ConversationId, TurnSummary>>>;
@@ -2237,51 +2238,25 @@ impl CodexMessageProcessor {
.await
{
info!("conversation {conversation_id} was active; shutting down");
let conversation_clone = conversation.clone();
let notify = Arc::new(tokio::sync::Notify::new());
let notify_clone = notify.clone();
// Do not wait on conversation.next_event(); the listener task already consumes
// the stream. Request shutdown and ensure the rollout file is flushed before moving it.
if let Err(err) = conversation.submit(Op::Shutdown).await {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
}
// Establish the listener for ShutdownComplete before submitting
// Shutdown so it is not missed.
let is_shutdown = tokio::spawn(async move {
// Create the notified future outside the loop to avoid losing notifications.
let notified = notify_clone.notified();
tokio::pin!(notified);
loop {
select! {
_ = &mut notified => { break; }
event = conversation_clone.next_event() => {
match event {
Ok(event) => {
if matches!(event.msg, EventMsg::ShutdownComplete) { break; }
}
// Break on errors to avoid tight loops when the agent loop has exited.
Err(_) => { break; }
}
}
}
let flush_result =
tokio::time::timeout(Duration::from_secs(5), conversation.flush_rollout()).await;
match flush_result {
Ok(Ok(())) => {}
Ok(Err(err)) => {
warn!(
"conversation {conversation_id} rollout flush failed before archive: {err}"
);
}
});
// Request shutdown.
match conversation.submit(Op::Shutdown).await {
Ok(_) => {
// Successfully submitted Shutdown; wait before proceeding.
select! {
_ = is_shutdown => {
// Normal shutdown: proceed with archive.
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("conversation {conversation_id} shutdown timed out; proceeding with archive");
// Wake any waiter; use notify_waiters to avoid missing the signal.
notify.notify_waiters();
// Perhaps we lost a shutdown race, so let's continue to
// clean up the .jsonl file.
}
}
}
Err(err) => {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
notify.notify_waiters();
Err(_) => {
warn!(
"conversation {conversation_id} rollout flush timed out; proceeding with archive"
);
}
}
}
@@ -2293,7 +2268,8 @@ impl CodexMessageProcessor {
.codex_home
.join(codex_core::ARCHIVED_SESSIONS_SUBDIR);
tokio::fs::create_dir_all(&archive_folder).await?;
tokio::fs::rename(&canonical_rollout_path, &archive_folder.join(&file_name)).await?;
let destination = archive_folder.join(&file_name);
tokio::fs::rename(&canonical_rollout_path, &destination).await?;
Ok(())
}
.await;

View File

@@ -46,6 +46,7 @@ pub fn create_fake_rollout(
instructions: None,
source: SessionSource::Cli,
model_provider: model_provider.map(str::to_string),
name: None,
})?;
let lines = [

View File

@@ -1,14 +1,20 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::create_mock_chat_completions_server_unchecked;
use app_test_support::create_shell_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::ApprovalDecision;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::PatchApplyStatus;
use codex_app_server_protocol::PatchChangeKind;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadItem;
@@ -471,6 +477,300 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn turn_start_file_change_approval_v2() -> Result<()> {
skip_if_no_network!(Ok(()));
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let workspace = tmp.path().join("workspace");
std::fs::create_dir(&workspace)?;
let patch = r#"*** Begin Patch
*** Add File: README.md
+new line
*** End Patch
"#;
let responses = vec![
create_apply_patch_sse_response(patch, "patch-call")?,
create_final_assistant_message_sse_response("patch applied")?,
];
let server = create_mock_chat_completions_server(responses).await;
create_config_toml(&codex_home, &server.uri(), "untrusted")?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
cwd: Some(workspace.to_string_lossy().into_owned()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "apply patch".into(),
}],
cwd: Some(workspace.clone()),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
let started_file_change = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let started_notif = mcp
.read_stream_until_notification_message("item/started")
.await?;
let started: ItemStartedNotification =
serde_json::from_value(started_notif.params.clone().expect("item/started params"))?;
if let ThreadItem::FileChange { .. } = started.item {
return Ok::<ThreadItem, anyhow::Error>(started.item);
}
}
})
.await??;
let ThreadItem::FileChange {
ref id,
status,
ref changes,
} = started_file_change
else {
unreachable!("loop ensures we break on file change items");
};
assert_eq!(id, "patch-call");
assert_eq!(status, PatchApplyStatus::InProgress);
let started_changes = changes.clone();
let server_req = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_request_message(),
)
.await??;
let ServerRequest::FileChangeRequestApproval { request_id, params } = server_req else {
panic!("expected FileChangeRequestApproval request")
};
assert_eq!(params.item_id, "patch-call");
assert_eq!(params.thread_id, thread.id);
assert_eq!(params.turn_id, turn.id);
let expected_readme_path = workspace.join("README.md");
let expected_readme_path = expected_readme_path.to_string_lossy().into_owned();
pretty_assertions::assert_eq!(
started_changes,
vec![codex_app_server_protocol::FileUpdateChange {
path: expected_readme_path.clone(),
kind: PatchChangeKind::Add,
diff: "new line\n".to_string(),
}]
);
mcp.send_response(
request_id,
serde_json::to_value(FileChangeRequestApprovalResponse {
decision: ApprovalDecision::Accept,
})?,
)
.await?;
let completed_file_change = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let completed_notif = mcp
.read_stream_until_notification_message("item/completed")
.await?;
let completed: ItemCompletedNotification = serde_json::from_value(
completed_notif
.params
.clone()
.expect("item/completed params"),
)?;
if let ThreadItem::FileChange { .. } = completed.item {
return Ok::<ThreadItem, anyhow::Error>(completed.item);
}
}
})
.await??;
let ThreadItem::FileChange { ref id, status, .. } = completed_file_change else {
unreachable!("loop ensures we break on file change items");
};
assert_eq!(id, "patch-call");
assert_eq!(status, PatchApplyStatus::Completed);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
let readme_contents = std::fs::read_to_string(expected_readme_path)?;
assert_eq!(readme_contents, "new line\n");
Ok(())
}
#[tokio::test]
async fn turn_start_file_change_approval_decline_v2() -> Result<()> {
skip_if_no_network!(Ok(()));
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let workspace = tmp.path().join("workspace");
std::fs::create_dir(&workspace)?;
let patch = r#"*** Begin Patch
*** Add File: README.md
+new line
*** End Patch
"#;
let responses = vec![
create_apply_patch_sse_response(patch, "patch-call")?,
create_final_assistant_message_sse_response("patch declined")?,
];
let server = create_mock_chat_completions_server(responses).await;
create_config_toml(&codex_home, &server.uri(), "untrusted")?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
cwd: Some(workspace.to_string_lossy().into_owned()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "apply patch".into(),
}],
cwd: Some(workspace.clone()),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
let started_file_change = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let started_notif = mcp
.read_stream_until_notification_message("item/started")
.await?;
let started: ItemStartedNotification =
serde_json::from_value(started_notif.params.clone().expect("item/started params"))?;
if let ThreadItem::FileChange { .. } = started.item {
return Ok::<ThreadItem, anyhow::Error>(started.item);
}
}
})
.await??;
let ThreadItem::FileChange {
ref id,
status,
ref changes,
} = started_file_change
else {
unreachable!("loop ensures we break on file change items");
};
assert_eq!(id, "patch-call");
assert_eq!(status, PatchApplyStatus::InProgress);
let started_changes = changes.clone();
let server_req = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_request_message(),
)
.await??;
let ServerRequest::FileChangeRequestApproval { request_id, params } = server_req else {
panic!("expected FileChangeRequestApproval request")
};
assert_eq!(params.item_id, "patch-call");
assert_eq!(params.thread_id, thread.id);
assert_eq!(params.turn_id, turn.id);
let expected_readme_path = workspace.join("README.md");
let expected_readme_path_str = expected_readme_path.to_string_lossy().into_owned();
pretty_assertions::assert_eq!(
started_changes,
vec![codex_app_server_protocol::FileUpdateChange {
path: expected_readme_path_str.clone(),
kind: PatchChangeKind::Add,
diff: "new line\n".to_string(),
}]
);
mcp.send_response(
request_id,
serde_json::to_value(FileChangeRequestApprovalResponse {
decision: ApprovalDecision::Decline,
})?,
)
.await?;
let completed_file_change = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let completed_notif = mcp
.read_stream_until_notification_message("item/completed")
.await?;
let completed: ItemCompletedNotification = serde_json::from_value(
completed_notif
.params
.clone()
.expect("item/completed params"),
)?;
if let ThreadItem::FileChange { .. } = completed.item {
return Ok::<ThreadItem, anyhow::Error>(completed.item);
}
}
})
.await??;
let ThreadItem::FileChange { ref id, status, .. } = completed_file_change else {
unreachable!("loop ensures we break on file change items");
};
assert_eq!(id, "patch-call");
assert_eq!(status, PatchApplyStatus::Declined);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
assert!(
!expected_readme_path.exists(),
"declined patch should not be applied"
);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(
codex_home: &Path,

View File

@@ -100,6 +100,9 @@ enum Subcommand {
/// Resume a previous interactive session (picker by default; use --last to continue the most recent).
Resume(ResumeCommand),
/// Fork an existing session into a new conversation.
Fork(ForkCommand),
/// [EXPERIMENTAL] Browse tasks from Codex Cloud and apply changes locally.
#[clap(name = "cloud", alias = "cloud-tasks")]
Cloud(CloudTasksCli),
@@ -142,6 +145,16 @@ struct ResumeCommand {
config_overrides: TuiCli,
}
#[derive(Debug, Parser)]
struct ForkCommand {
/// Resume from a saved session name or rollout id, but start a new conversation.
#[arg(value_name = "ID|NAME")]
target: String,
#[clap(flatten)]
config_overrides: TuiCli,
}
#[derive(Debug, Parser)]
struct SandboxArgs {
#[command(subcommand)]
@@ -466,6 +479,19 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
let exit_info = codex_tui::run_main(interactive, codex_linux_sandbox_exe).await?;
handle_app_exit(exit_info)?;
}
Some(Subcommand::Fork(ForkCommand {
target,
config_overrides,
})) => {
interactive = finalize_fork_interactive(
interactive,
root_config_overrides.clone(),
target,
config_overrides,
);
let exit_info = codex_tui::run_main(interactive, codex_linux_sandbox_exe).await?;
handle_app_exit(exit_info)?;
}
Some(Subcommand::Login(mut login_cli)) => {
prepend_config_flags(
&mut login_cli.config_overrides,
@@ -627,6 +653,7 @@ fn finalize_resume_interactive(
interactive.resume_last = last;
interactive.resume_session_id = resume_session_id;
interactive.resume_show_all = show_all;
interactive.fork_source = None;
// Merge resume-scoped flags and overrides with highest precedence.
merge_resume_cli_flags(&mut interactive, resume_cli);
@@ -637,6 +664,21 @@ fn finalize_resume_interactive(
interactive
}
fn finalize_fork_interactive(
mut interactive: TuiCli,
root_config_overrides: CliConfigOverrides,
target: String,
fork_cli: TuiCli,
) -> TuiCli {
interactive.resume_picker = false;
interactive.resume_last = false;
interactive.resume_session_id = None;
interactive.fork_source = Some(target);
merge_resume_cli_flags(&mut interactive, fork_cli);
prepend_config_flags(&mut interactive.config_overrides, root_config_overrides);
interactive
}
/// Merge flags provided to `codex resume` so they take precedence over any
/// root-level flags. Only overrides fields explicitly set on the resume-scoped
/// CLI. Also appends `-c key=value` overrides with highest precedence.
@@ -727,6 +769,26 @@ mod tests {
)
}
fn fork_from_args(args: &[&str]) -> TuiCli {
let cli = MultitoolCli::try_parse_from(args).expect("parse");
let MultitoolCli {
interactive,
config_overrides: root_overrides,
subcommand,
feature_toggles: _,
} = cli;
let Subcommand::Fork(ForkCommand {
target,
config_overrides,
}) = subcommand.expect("fork present")
else {
unreachable!()
};
finalize_fork_interactive(interactive, root_overrides, target, config_overrides)
}
fn sample_exit_info(conversation: Option<&str>) -> AppExitInfo {
let token_usage = TokenUsage {
output_tokens: 2,
@@ -819,6 +881,15 @@ mod tests {
assert!(interactive.resume_show_all);
}
#[test]
fn fork_sets_target_and_disables_resume_controls() {
let interactive = fork_from_args(["codex", "fork", "saved"].as_ref());
assert_eq!(interactive.fork_source.as_deref(), Some("saved"));
assert!(!interactive.resume_picker);
assert!(!interactive.resume_last);
assert!(interactive.resume_session_id.is_none());
}
#[test]
fn resume_merges_option_flags_and_full_auto() {
let interactive = finalize_from_args(

View File

@@ -15,13 +15,12 @@ pub fn create_config_summary_entries(config: &Config) -> Vec<(&'static str, Stri
if config.model_provider.wire_api == WireApi::Responses
&& config.model_family.supports_reasoning_summaries
{
entries.push((
"reasoning effort",
config
.model_reasoning_effort
.map(|effort| effort.to_string())
.unwrap_or_else(|| "none".to_string()),
));
let reasoning_effort = config
.model_reasoning_effort
.or(config.model_family.default_reasoning_effort)
.map(|effort| effort.to_string())
.unwrap_or_else(|| "none".to_string());
entries.push(("reasoning effort", reasoning_effort));
entries.push((
"reasoning summaries",
config.model_reasoning_summary.to_string(),

View File

@@ -22,6 +22,7 @@ chrono = { workspace = true, features = ["serde"] }
codex-app-server-protocol = { workspace = true }
codex-apply-patch = { workspace = true }
codex-async-utils = { workspace = true }
codex-execpolicy = { workspace = true }
codex-file-search = { workspace = true }
codex-git = { workspace = true }
codex-keyring-store = { workspace = true }

View File

@@ -66,7 +66,6 @@ use crate::context_manager::ContextManager;
use crate::environment_context::EnvironmentContext;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::error::http_status_code_value;
#[cfg(test)]
use crate::exec::StreamOutput;
use crate::mcp::auth::compute_auth_statuses;
@@ -80,6 +79,7 @@ use crate::protocol::ApplyPatchApprovalRequestEvent;
use crate::protocol::AskForApproval;
use crate::protocol::BackgroundEventEvent;
use crate::protocol::DeprecationNoticeEvent;
use crate::protocol::ErrorEvent;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecApprovalRequestEvent;
@@ -100,6 +100,8 @@ use crate::protocol::TurnDiffEvent;
use crate::protocol::WarningEvent;
use crate::rollout::RolloutRecorder;
use crate::rollout::RolloutRecorderParams;
use crate::saved_sessions::build_saved_session_entry;
use crate::saved_sessions::upsert_saved_session;
use crate::shell;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
@@ -121,6 +123,7 @@ use crate::user_instructions::UserInstructions;
use crate::user_notification::UserNotification;
use crate::util::backoff;
use codex_async_utils::OrCancelExt;
use codex_execpolicy::Policy as ExecPolicy;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
@@ -134,6 +137,7 @@ use codex_utils_readiness::Readiness;
use codex_utils_readiness::ReadinessFlag;
use codex_utils_tokenizer::warm_model_cache;
use reqwest::StatusCode;
use std::path::Path;
/// The high-level interface to the Codex system.
/// It operates as a queue pair where you send submissions and receive events.
@@ -149,6 +153,7 @@ pub struct Codex {
pub struct CodexSpawnOk {
pub codex: Codex,
pub conversation_id: ConversationId,
pub(crate) session: Arc<Session>,
}
pub(crate) const INITIAL_SUBMIT_ID: &str = "";
@@ -167,6 +172,10 @@ impl Codex {
let user_instructions = get_user_instructions(&config).await;
let exec_policy = crate::exec_policy::exec_policy_for(&config.features, &config.codex_home)
.await
.map_err(|err| CodexErr::Fatal(format!("failed to load execpolicy: {err}")))?;
let config = Arc::new(config);
let session_configuration = SessionConfiguration {
@@ -183,6 +192,7 @@ impl Codex {
cwd: config.cwd.clone(),
original_config_do_not_use: Arc::clone(&config),
features: config.features.clone(),
exec_policy,
session_source,
};
@@ -204,7 +214,8 @@ impl Codex {
let conversation_id = session.conversation_id;
// This task will run until Op::Shutdown is received.
tokio::spawn(submission_loop(session, config, rx_sub));
let submission_session = Arc::clone(&session);
tokio::spawn(submission_loop(submission_session, config, rx_sub));
let codex = Codex {
next_id: AtomicU64::new(0),
tx_sub,
@@ -214,6 +225,7 @@ impl Codex {
Ok(CodexSpawnOk {
codex,
conversation_id,
session,
})
}
@@ -280,6 +292,7 @@ pub(crate) struct TurnContext {
pub(crate) final_output_json_schema: Option<Value>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
pub(crate) tool_call_gate: Arc<ReadinessFlag>,
pub(crate) exec_policy: Arc<ExecPolicy>,
pub(crate) truncation_policy: TruncationPolicy,
}
@@ -336,6 +349,8 @@ pub(crate) struct SessionConfiguration {
/// Set of feature flags for this session
features: Features,
/// Execpolicy policy, applied only when enabled by feature flag.
exec_policy: Arc<ExecPolicy>,
// TODO(pakrym): Remove config from here
original_config_do_not_use: Arc<Config>,
@@ -436,6 +451,7 @@ impl Session {
final_output_json_schema: None,
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
tool_call_gate: Arc::new(ReadinessFlag::new()),
exec_policy: session_configuration.exec_policy.clone(),
truncation_policy: TruncationPolicy::new(&per_turn_config),
}
}
@@ -623,18 +639,68 @@ impl Session {
}
/// Ensure all rollout writes are durably flushed.
pub(crate) async fn flush_rollout(&self) {
pub(crate) async fn flush_rollout(&self) -> std::io::Result<()> {
let recorder = {
let guard = self.services.rollout.lock().await;
guard.clone()
};
if let Some(rec) = recorder
&& let Err(e) = rec.flush().await
{
warn!("failed to flush rollout recorder: {e}");
if let Some(rec) = recorder {
rec.flush().await
} else {
Ok(())
}
}
pub(crate) async fn set_session_name(&self, name: Option<String>) -> std::io::Result<()> {
let recorder = {
let guard = self.services.rollout.lock().await;
guard.clone()
};
if let Some(rec) = recorder {
rec.set_session_name(name).await
} else {
Ok(())
}
}
pub(crate) async fn rollout_path(&self) -> CodexResult<PathBuf> {
let guard = self.services.rollout.lock().await;
let Some(rec) = guard.as_ref() else {
return Err(CodexErr::Fatal(
"Rollout recorder is not initialized; cannot save session.".to_string(),
));
};
Ok(rec.rollout_path.clone())
}
pub(crate) async fn model(&self) -> String {
let state = self.state.lock().await;
state.session_configuration.model.clone()
}
pub(crate) async fn save_session(
&self,
codex_home: &Path,
name: &str,
) -> CodexResult<crate::SavedSessionEntry> {
let trimmed = name.trim();
if trimmed.is_empty() {
return Err(CodexErr::Fatal("Usage: /save <name>".to_string()));
}
let rollout_path = self.rollout_path().await?;
self.flush_rollout()
.await
.map_err(|e| CodexErr::Fatal(format!("failed to flush rollout recorder: {e}")))?;
self.set_session_name(Some(trimmed.to_string()))
.await
.map_err(|e| CodexErr::Fatal(format!("failed to write session name: {e}")))?;
let entry =
build_saved_session_entry(trimmed.to_string(), rollout_path, self.model().await)
.await?;
upsert_saved_session(codex_home, entry.clone()).await?;
Ok(entry)
}
fn next_internal_sub_id(&self) -> String {
let id = self
.next_internal_sub_id
@@ -650,7 +716,9 @@ impl Session {
let items = self.build_initial_context(&turn_context);
self.record_conversation_items(&turn_context, &items).await;
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
self.flush_rollout().await;
if let Err(e) = self.flush_rollout().await {
warn!("failed to flush rollout recorder: {e}");
}
}
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
let rollout_items = conversation_history.get_rollout_items();
@@ -694,10 +762,18 @@ impl Session {
// If persisting, persist all rollout items as-is (recorder filters)
if persist && !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
// Drop legacy SessionMeta lines from the source rollout so the forked
// session only contains its own SessionMeta written by the recorder.
let filtered =
InitialHistory::Forked(rollout_items.clone()).without_session_meta();
if !filtered.is_empty() {
self.persist_rollout_items(&filtered).await;
}
}
// Flush after seeding history and any persisted rollout copy.
self.flush_rollout().await;
if let Err(e) = self.flush_rollout().await {
warn!("failed to flush rollout recorder: {e}");
}
}
}
}
@@ -910,6 +986,7 @@ impl Session {
let event = EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
turn_id: turn_context.sub_id.clone(),
changes,
reason,
grant_root,
@@ -1187,11 +1264,9 @@ impl Session {
&self,
turn_context: &TurnContext,
message: impl Into<String>,
http_status_code: Option<StatusCode>,
) {
let event = EventMsg::StreamError(StreamErrorEvent {
message: message.into(),
http_status_code: http_status_code_value(http_status_code),
});
self.send_event(turn_context, event).await;
}
@@ -1417,6 +1492,9 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
Op::Review { review_request } => {
handlers::review(&sess, &config, sub.id.clone(), review_request).await;
}
Op::SaveSession { name } => {
handlers::save_session(&sess, &config, sub.id.clone(), name).await;
}
_ => {} // Ignore unknown ops; enum is non_exhaustive to allow extensions.
}
}
@@ -1444,6 +1522,7 @@ mod handlers {
use codex_protocol::protocol::Op;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::SaveSessionResponseEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::user_input::UserInput;
@@ -1665,6 +1744,38 @@ mod handlers {
.await;
}
pub async fn save_session(
sess: &Arc<Session>,
config: &Arc<crate::config::Config>,
sub_id: String,
name: String,
) {
match sess.save_session(&config.codex_home, &name).await {
Ok(entry) => {
let event = Event {
id: sub_id,
msg: EventMsg::SaveSessionResponse(SaveSessionResponseEvent {
name: entry.name,
rollout_path: entry.rollout_path,
conversation_id: entry.conversation_id,
}),
};
sess.send_event_raw(event).await;
}
Err(err) => {
let message = format!("Failed to save session '{name}': {err}");
let event = Event {
id: sub_id,
msg: EventMsg::Error(ErrorEvent {
message,
http_status_code: None,
}),
};
sess.send_event_raw(event).await;
}
}
}
pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
info!("Shutting down Codex instance");
@@ -1683,7 +1794,6 @@ mod handlers {
id: sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: "Failed to shutdown rollout recorder".to_string(),
http_status_code: None,
}),
};
sess.send_event_raw(event).await;
@@ -1789,6 +1899,7 @@ async fn spawn_review_thread(
final_output_json_schema: None,
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
tool_call_gate: Arc::new(ReadinessFlag::new()),
exec_policy: parent_turn_context.exec_policy.clone(),
truncation_policy: TruncationPolicy::new(&per_turn_config),
};
@@ -1937,8 +2048,10 @@ pub(crate) async fn run_task(
}
Err(e) => {
info!("Turn error: {e:#}");
sess.send_event(&turn_context, EventMsg::Error(e.to_error_event(None)))
.await;
let event = EventMsg::Error(ErrorEvent {
message: e.to_string(),
});
sess.send_event(&turn_context, event).await;
// let the user continue the conversation
break;
}
@@ -2062,7 +2175,6 @@ async fn run_turn(
sess.notify_stream_error(
&turn_context,
format!("Reconnecting... {retries}/{max_retries}"),
e.http_status_code(),
)
.await;
@@ -2608,6 +2720,7 @@ mod tests {
cwd: config.cwd.clone(),
original_config_do_not_use: Arc::clone(&config),
features: Features::default(),
exec_policy: Arc::new(ExecPolicy::empty()),
session_source: SessionSource::Exec,
};
@@ -2685,6 +2798,7 @@ mod tests {
cwd: config.cwd.clone(),
original_config_do_not_use: Arc::clone(&config),
features: Features::default(),
exec_policy: Arc::new(ExecPolicy::empty()),
session_source: SessionSource::Exec,
};

View File

@@ -1,22 +1,26 @@
use crate::codex::Codex;
use crate::codex::Session;
use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::Op;
use crate::protocol::Submission;
use std::path::PathBuf;
use std::sync::Arc;
pub struct CodexConversation {
codex: Codex,
rollout_path: PathBuf,
session: Arc<Session>,
}
/// Conduit for the bidirectional stream of messages that compose a conversation
/// in Codex.
impl CodexConversation {
pub(crate) fn new(codex: Codex, rollout_path: PathBuf) -> Self {
pub(crate) fn new(codex: Codex, rollout_path: PathBuf, session: Arc<Session>) -> Self {
Self {
codex,
rollout_path,
session,
}
}
@@ -36,4 +40,24 @@ impl CodexConversation {
pub fn rollout_path(&self) -> PathBuf {
self.rollout_path.clone()
}
pub async fn flush_rollout(&self) -> CodexResult<()> {
Ok(self.session.flush_rollout().await?)
}
pub async fn set_session_name(&self, name: Option<String>) -> CodexResult<()> {
Ok(self.session.set_session_name(name).await?)
}
pub async fn model(&self) -> String {
self.session.model().await
}
pub async fn save_session(
&self,
codex_home: &std::path::Path,
name: &str,
) -> CodexResult<crate::SavedSessionEntry> {
self.session.save_session(codex_home, name).await
}
}

View File

@@ -1,6 +1,8 @@
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::SandboxPolicy;
use crate::sandboxing::SandboxPermissions;
use crate::bash::parse_shell_lc_plain_commands;
use crate::is_safe_command::is_known_safe_command;
@@ -8,7 +10,7 @@ pub fn requires_initial_appoval(
policy: AskForApproval,
sandbox_policy: &SandboxPolicy,
command: &[String],
with_escalated_permissions: bool,
sandbox_permissions: SandboxPermissions,
) -> bool {
if is_known_safe_command(command) {
return false;
@@ -24,8 +26,7 @@ pub fn requires_initial_appoval(
// In restricted sandboxes (ReadOnly/WorkspaceWrite), do not prompt for
// nonescalated, nondangerous commands — let the sandbox enforce
// restrictions (e.g., block network/write) without a user prompt.
let wants_escalation: bool = with_escalated_permissions;
if wants_escalation {
if sandbox_permissions.requires_escalated_permissions() {
return true;
}
command_might_be_dangerous(command)

View File

@@ -10,6 +10,7 @@ use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::protocol::AgentMessageEvent;
use crate::protocol::CompactedItem;
use crate::protocol::ErrorEvent;
use crate::protocol::EventMsg;
use crate::protocol::TaskStartedEvent;
use crate::protocol::TurnContextItem;
@@ -127,8 +128,10 @@ async fn run_compact_task_inner(
continue;
}
sess.set_total_tokens_full(turn_context.as_ref()).await;
sess.send_event(&turn_context, EventMsg::Error(e.to_error_event(None)))
.await;
let event = EventMsg::Error(ErrorEvent {
message: e.to_string(),
});
sess.send_event(&turn_context, event).await;
return;
}
Err(e) => {
@@ -138,14 +141,15 @@ async fn run_compact_task_inner(
sess.notify_stream_error(
turn_context.as_ref(),
format!("Reconnecting... {retries}/{max_retries}"),
e.http_status_code(),
)
.await;
tokio::time::sleep(delay).await;
continue;
} else {
sess.send_event(&turn_context, EventMsg::Error(e.to_error_event(None)))
.await;
let event = EventMsg::Error(ErrorEvent {
message: e.to_string(),
});
sess.send_event(&turn_context, event).await;
return;
}
}

View File

@@ -6,6 +6,7 @@ use crate::codex::TurnContext;
use crate::error::Result as CodexResult;
use crate::protocol::AgentMessageEvent;
use crate::protocol::CompactedItem;
use crate::protocol::ErrorEvent;
use crate::protocol::EventMsg;
use crate::protocol::RolloutItem;
use crate::protocol::TaskStartedEvent;
@@ -29,8 +30,10 @@ pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Ar
async fn run_remote_compact_task_inner(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
let event = err.to_error_event(Some("Error running remote compact task".to_string()));
sess.send_event(turn_context, EventMsg::Error(event)).await;
let event = EventMsg::Error(ErrorEvent {
message: format!("Error running remote compact task: {err}"),
});
sess.send_event(turn_context, event).await;
}
}

View File

@@ -379,6 +379,7 @@ pub struct Notice {
/// Tracks whether the user has seen the model migration prompt
pub hide_gpt5_1_migration_prompt: Option<bool>,
/// Tracks whether the user has seen the gpt-5.1-codex-max migration prompt
#[serde(rename = "hide_gpt-5.1-codex-max_migration_prompt")]
pub hide_gpt_5_1_codex_max_migration_prompt: Option<bool>,
}

View File

@@ -3,6 +3,7 @@ use crate::CodexAuth;
use crate::codex::Codex;
use crate::codex::CodexSpawnOk;
use crate::codex::INITIAL_SUBMIT_ID;
use crate::codex::Session;
use crate::codex_conversation::CodexConversation;
use crate::config::Config;
use crate::error::CodexErr;
@@ -11,6 +12,7 @@ use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutRecorder;
use crate::saved_sessions::resolve_rollout_path;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
@@ -18,6 +20,7 @@ use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
@@ -56,6 +59,7 @@ impl ConversationManager {
)
}
/// Start a brand new conversation with default initial history.
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
self.spawn_conversation(config, self.auth_manager.clone())
.await
@@ -69,6 +73,7 @@ impl ConversationManager {
let CodexSpawnOk {
codex,
conversation_id,
session,
} = Codex::spawn(
config,
auth_manager,
@@ -76,13 +81,14 @@ impl ConversationManager {
self.session_source.clone(),
)
.await?;
self.finalize_spawn(codex, conversation_id).await
self.finalize_spawn(codex, conversation_id, session).await
}
async fn finalize_spawn(
&self,
codex: Codex,
conversation_id: ConversationId,
session: Arc<Session>,
) -> CodexResult<NewConversation> {
// The first event must be `SessionInitialized`. Validate and forward it
// to the caller so that they can display it in the conversation
@@ -101,6 +107,7 @@ impl ConversationManager {
let conversation = Arc::new(CodexConversation::new(
codex,
session_configured.rollout_path.clone(),
session,
));
self.conversations
.write()
@@ -125,6 +132,7 @@ impl ConversationManager {
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
}
/// Resume a conversation from an on-disk rollout file.
pub async fn resume_conversation_from_rollout(
&self,
config: Config,
@@ -136,6 +144,23 @@ impl ConversationManager {
.await
}
/// Resume a conversation by saved-session name or rollout id string.
pub async fn resume_conversation_from_identifier(
&self,
config: Config,
identifier: &str,
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewConversation> {
let Some(path) = resolve_rollout_path(&config.codex_home, identifier).await? else {
return Err(CodexErr::Fatal(format!(
"No saved session or rollout found for '{identifier}'"
)));
};
self.resume_conversation_from_rollout(config, path, auth_manager)
.await
}
/// Resume a conversation from provided rollout history items.
pub async fn resume_conversation_with_history(
&self,
config: Config,
@@ -145,6 +170,7 @@ impl ConversationManager {
let CodexSpawnOk {
codex,
conversation_id,
session,
} = Codex::spawn(
config,
auth_manager,
@@ -152,7 +178,54 @@ impl ConversationManager {
self.session_source.clone(),
)
.await?;
self.finalize_spawn(codex, conversation_id).await
self.finalize_spawn(codex, conversation_id, session).await
}
/// Fork a new conversation from the given rollout path.
pub async fn fork_from_rollout(
&self,
config: Config,
path: PathBuf,
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewConversation> {
let initial_history = RolloutRecorder::get_rollout_history(&path).await?;
let forked = match initial_history {
InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history),
InitialHistory::Forked(items) => InitialHistory::Forked(items),
InitialHistory::New => InitialHistory::New,
};
self.resume_conversation_with_history(config, forked, auth_manager)
.await
}
/// Fork a new conversation from a saved-session name or rollout id string.
pub async fn fork_from_identifier(
&self,
config: Config,
identifier: &str,
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewConversation> {
let Some(path) = resolve_rollout_path(&config.codex_home, identifier).await? else {
return Err(CodexErr::Fatal(format!(
"No saved session or rollout found for '{identifier}'"
)));
};
self.fork_from_rollout(config, path, auth_manager).await
}
/// Persist a human-friendly session name and record it in saved_sessions.json.
pub async fn save_session(
&self,
conversation_id: ConversationId,
codex_home: &Path,
name: &str,
) -> CodexResult<crate::SavedSessionEntry> {
let trimmed = name.trim();
if trimmed.is_empty() {
return Err(CodexErr::Fatal("Usage: /save <name>".to_string()));
}
let conversation = self.get_conversation(conversation_id).await?;
conversation.save_session(codex_home, trimmed).await
}
/// Removes the conversation from the manager's internal map, though the
@@ -185,9 +258,10 @@ impl ConversationManager {
let CodexSpawnOk {
codex,
conversation_id,
session,
} = Codex::spawn(config, auth_manager, history, self.session_source.clone()).await?;
self.finalize_spawn(codex, conversation_id).await
self.finalize_spawn(codex, conversation_id, session).await
}
}

View File

@@ -10,7 +10,6 @@ use chrono::Local;
use chrono::Utc;
use codex_async_utils::CancelErr;
use codex_protocol::ConversationId;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::RateLimitSnapshot;
use reqwest::StatusCode;
use serde_json;
@@ -431,37 +430,6 @@ impl CodexErr {
pub fn downcast_ref<T: std::any::Any>(&self) -> Option<&T> {
(self as &dyn std::any::Any).downcast_ref::<T>()
}
pub fn http_status_code(&self) -> Option<StatusCode> {
match self {
CodexErr::UnexpectedStatus(err) => Some(err.status),
CodexErr::RetryLimit(err) => Some(err.status),
CodexErr::UsageLimitReached(_) | CodexErr::UsageNotIncluded => {
Some(StatusCode::TOO_MANY_REQUESTS)
}
CodexErr::InternalServerError => Some(StatusCode::INTERNAL_SERVER_ERROR),
CodexErr::ResponseStreamFailed(err) => err.source.status(),
CodexErr::ConnectionFailed(err) => err.source.status(),
_ => None,
}
}
pub fn to_error_event(&self, message_prefix: Option<String>) -> ErrorEvent {
let error_message = self.to_string();
let message: String = match message_prefix {
Some(prefix) => format!("{prefix}: {error_message}"),
None => error_message,
};
ErrorEvent {
message,
http_status_code: http_status_code_value(self.http_status_code()),
}
}
}
pub fn http_status_code_value(http_status_code: Option<StatusCode>) -> Option<u16> {
http_status_code.as_ref().map(StatusCode::as_u16)
}
pub fn get_error_message_ui(e: &CodexErr) -> String {
@@ -807,43 +775,4 @@ mod tests {
assert_eq!(err.to_string(), expected);
});
}
#[test]
fn error_event_includes_http_status_code_when_available() {
let err = CodexErr::UnexpectedStatus(UnexpectedResponseError {
status: StatusCode::BAD_REQUEST,
body: "oops".to_string(),
request_id: Some("req-1".to_string()),
});
let event = err.to_error_event(None);
assert_eq!(
event.message,
"unexpected status 400 Bad Request: oops, request id: req-1"
);
assert_eq!(
event.http_status_code,
Some(StatusCode::BAD_REQUEST.as_u16())
);
}
#[test]
fn error_event_omits_http_status_code_when_unknown() {
let event = CodexErr::Fatal("boom".to_string()).to_error_event(None);
assert_eq!(event.message, "Fatal error: boom");
assert_eq!(event.http_status_code, None);
}
#[test]
fn error_event_applies_message_wrapper() {
let event = CodexErr::Fatal("boom".to_string())
.to_error_event(Some("Error running remote compact task".to_string()));
assert_eq!(
event.message,
"Error running remote compact task: Fatal error: boom"
);
assert_eq!(event.http_status_code, None);
}
}

View File

@@ -0,0 +1,365 @@
use std::io::ErrorKind;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use crate::command_safety::is_dangerous_command::requires_initial_appoval;
use codex_execpolicy::Decision;
use codex_execpolicy::Evaluation;
use codex_execpolicy::Policy;
use codex_execpolicy::PolicyParser;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::SandboxPolicy;
use thiserror::Error;
use tokio::fs;
use crate::bash::parse_shell_lc_plain_commands;
use crate::features::Feature;
use crate::features::Features;
use crate::sandboxing::SandboxPermissions;
use crate::tools::sandboxing::ApprovalRequirement;
const FORBIDDEN_REASON: &str = "execpolicy forbids this command";
const PROMPT_REASON: &str = "execpolicy requires approval for this command";
const POLICY_DIR_NAME: &str = "policy";
const POLICY_EXTENSION: &str = "codexpolicy";
#[derive(Debug, Error)]
pub enum ExecPolicyError {
#[error("failed to read execpolicy files from {dir}: {source}")]
ReadDir {
dir: PathBuf,
source: std::io::Error,
},
#[error("failed to read execpolicy file {path}: {source}")]
ReadFile {
path: PathBuf,
source: std::io::Error,
},
#[error("failed to parse execpolicy file {path}: {source}")]
ParsePolicy {
path: String,
source: codex_execpolicy::Error,
},
}
pub(crate) async fn exec_policy_for(
features: &Features,
codex_home: &Path,
) -> Result<Arc<Policy>, ExecPolicyError> {
if !features.enabled(Feature::ExecPolicy) {
return Ok(Arc::new(Policy::empty()));
}
let policy_dir = codex_home.join(POLICY_DIR_NAME);
let policy_paths = collect_policy_files(&policy_dir).await?;
let mut parser = PolicyParser::new();
for policy_path in &policy_paths {
let contents =
fs::read_to_string(policy_path)
.await
.map_err(|source| ExecPolicyError::ReadFile {
path: policy_path.clone(),
source,
})?;
let identifier = policy_path.to_string_lossy().to_string();
parser
.parse(&identifier, &contents)
.map_err(|source| ExecPolicyError::ParsePolicy {
path: identifier,
source,
})?;
}
let policy = Arc::new(parser.build());
tracing::debug!(
"loaded execpolicy from {} files in {}",
policy_paths.len(),
policy_dir.display()
);
Ok(policy)
}
fn evaluate_with_policy(
policy: &Policy,
command: &[String],
approval_policy: AskForApproval,
) -> Option<ApprovalRequirement> {
let commands = parse_shell_lc_plain_commands(command).unwrap_or_else(|| vec![command.to_vec()]);
let evaluation = policy.check_multiple(commands.iter());
match evaluation {
Evaluation::Match { decision, .. } => match decision {
Decision::Forbidden => Some(ApprovalRequirement::Forbidden {
reason: FORBIDDEN_REASON.to_string(),
}),
Decision::Prompt => {
let reason = PROMPT_REASON.to_string();
if matches!(approval_policy, AskForApproval::Never) {
Some(ApprovalRequirement::Forbidden { reason })
} else {
Some(ApprovalRequirement::NeedsApproval {
reason: Some(reason),
})
}
}
Decision::Allow => Some(ApprovalRequirement::Skip),
},
Evaluation::NoMatch => None,
}
}
pub(crate) fn create_approval_requirement_for_command(
policy: &Policy,
command: &[String],
approval_policy: AskForApproval,
sandbox_policy: &SandboxPolicy,
sandbox_permissions: SandboxPermissions,
) -> ApprovalRequirement {
if let Some(requirement) = evaluate_with_policy(policy, command, approval_policy) {
return requirement;
}
if requires_initial_appoval(
approval_policy,
sandbox_policy,
command,
sandbox_permissions,
) {
ApprovalRequirement::NeedsApproval { reason: None }
} else {
ApprovalRequirement::Skip
}
}
async fn collect_policy_files(dir: &Path) -> Result<Vec<PathBuf>, ExecPolicyError> {
let mut read_dir = match fs::read_dir(dir).await {
Ok(read_dir) => read_dir,
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
Err(source) => {
return Err(ExecPolicyError::ReadDir {
dir: dir.to_path_buf(),
source,
});
}
};
let mut policy_paths = Vec::new();
while let Some(entry) =
read_dir
.next_entry()
.await
.map_err(|source| ExecPolicyError::ReadDir {
dir: dir.to_path_buf(),
source,
})?
{
let path = entry.path();
let file_type = entry
.file_type()
.await
.map_err(|source| ExecPolicyError::ReadDir {
dir: dir.to_path_buf(),
source,
})?;
if path
.extension()
.and_then(|ext| ext.to_str())
.is_some_and(|ext| ext == POLICY_EXTENSION)
&& file_type.is_file()
{
policy_paths.push(path);
}
}
policy_paths.sort();
Ok(policy_paths)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::features::Feature;
use crate::features::Features;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::SandboxPolicy;
use pretty_assertions::assert_eq;
use std::fs;
use tempfile::tempdir;
#[tokio::test]
async fn returns_empty_policy_when_feature_disabled() {
let mut features = Features::with_defaults();
features.disable(Feature::ExecPolicy);
let temp_dir = tempdir().expect("create temp dir");
let policy = exec_policy_for(&features, temp_dir.path())
.await
.expect("policy result");
let commands = [vec!["rm".to_string()]];
assert!(matches!(
policy.check_multiple(commands.iter()),
Evaluation::NoMatch
));
assert!(!temp_dir.path().join(POLICY_DIR_NAME).exists());
}
#[tokio::test]
async fn collect_policy_files_returns_empty_when_dir_missing() {
let temp_dir = tempdir().expect("create temp dir");
let policy_dir = temp_dir.path().join(POLICY_DIR_NAME);
let files = collect_policy_files(&policy_dir)
.await
.expect("collect policy files");
assert!(files.is_empty());
}
#[tokio::test]
async fn loads_policies_from_policy_subdirectory() {
let temp_dir = tempdir().expect("create temp dir");
let policy_dir = temp_dir.path().join(POLICY_DIR_NAME);
fs::create_dir_all(&policy_dir).expect("create policy dir");
fs::write(
policy_dir.join("deny.codexpolicy"),
r#"prefix_rule(pattern=["rm"], decision="forbidden")"#,
)
.expect("write policy file");
let policy = exec_policy_for(&Features::with_defaults(), temp_dir.path())
.await
.expect("policy result");
let command = [vec!["rm".to_string()]];
assert!(matches!(
policy.check_multiple(command.iter()),
Evaluation::Match { .. }
));
}
#[tokio::test]
async fn ignores_policies_outside_policy_dir() {
let temp_dir = tempdir().expect("create temp dir");
fs::write(
temp_dir.path().join("root.codexpolicy"),
r#"prefix_rule(pattern=["ls"], decision="prompt")"#,
)
.expect("write policy file");
let policy = exec_policy_for(&Features::with_defaults(), temp_dir.path())
.await
.expect("policy result");
let command = [vec!["ls".to_string()]];
assert!(matches!(
policy.check_multiple(command.iter()),
Evaluation::NoMatch
));
}
#[test]
fn evaluates_bash_lc_inner_commands() {
let policy_src = r#"
prefix_rule(pattern=["rm"], decision="forbidden")
"#;
let mut parser = PolicyParser::new();
parser
.parse("test.codexpolicy", policy_src)
.expect("parse policy");
let policy = parser.build();
let forbidden_script = vec![
"bash".to_string(),
"-lc".to_string(),
"rm -rf /tmp".to_string(),
];
let requirement =
evaluate_with_policy(&policy, &forbidden_script, AskForApproval::OnRequest)
.expect("expected match for forbidden command");
assert_eq!(
requirement,
ApprovalRequirement::Forbidden {
reason: FORBIDDEN_REASON.to_string()
}
);
}
#[test]
fn approval_requirement_prefers_execpolicy_match() {
let policy_src = r#"prefix_rule(pattern=["rm"], decision="prompt")"#;
let mut parser = PolicyParser::new();
parser
.parse("test.codexpolicy", policy_src)
.expect("parse policy");
let policy = parser.build();
let command = vec!["rm".to_string()];
let requirement = create_approval_requirement_for_command(
&policy,
&command,
AskForApproval::OnRequest,
&SandboxPolicy::DangerFullAccess,
SandboxPermissions::UseDefault,
);
assert_eq!(
requirement,
ApprovalRequirement::NeedsApproval {
reason: Some(PROMPT_REASON.to_string())
}
);
}
#[test]
fn approval_requirement_respects_approval_policy() {
let policy_src = r#"prefix_rule(pattern=["rm"], decision="prompt")"#;
let mut parser = PolicyParser::new();
parser
.parse("test.codexpolicy", policy_src)
.expect("parse policy");
let policy = parser.build();
let command = vec!["rm".to_string()];
let requirement = create_approval_requirement_for_command(
&policy,
&command,
AskForApproval::Never,
&SandboxPolicy::DangerFullAccess,
SandboxPermissions::UseDefault,
);
assert_eq!(
requirement,
ApprovalRequirement::Forbidden {
reason: PROMPT_REASON.to_string()
}
);
}
#[test]
fn approval_requirement_falls_back_to_heuristics() {
let command = vec!["python".to_string()];
let empty_policy = Policy::empty();
let requirement = create_approval_requirement_for_command(
&empty_policy,
&command,
AskForApproval::UnlessTrusted,
&SandboxPolicy::ReadOnly,
SandboxPermissions::UseDefault,
);
assert_eq!(
requirement,
ApprovalRequirement::NeedsApproval { reason: None }
);
}
}

View File

@@ -42,6 +42,8 @@ pub enum Feature {
ViewImageTool,
/// Allow the model to request web searches.
WebSearchRequest,
/// Gate the execpolicy enforcement for shell/unified exec.
ExecPolicy,
/// Enable the model-based risk assessments for sandboxed commands.
SandboxCommandAssessment,
/// Enable Windows sandbox (restricted token) on Windows.
@@ -297,6 +299,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Stable,
default_enabled: false,
},
FeatureSpec {
id: Feature::ExecPolicy,
key: "exec_policy",
stage: Stage::Experimental,
default_enabled: true,
},
FeatureSpec {
id: Feature::SandboxCommandAssessment,
key: "experimental_sandbox_command_assessment",

View File

@@ -25,6 +25,7 @@ mod environment_context;
pub mod error;
pub mod exec;
pub mod exec_env;
mod exec_policy;
pub mod features;
mod flags;
pub mod git_info;
@@ -65,6 +66,7 @@ mod openai_model_info;
pub mod project_doc;
mod rollout;
pub(crate) mod safety;
pub mod saved_sessions;
pub mod seatbelt;
pub mod shell;
pub mod spawn;
@@ -82,6 +84,12 @@ pub use rollout::list::ConversationsPage;
pub use rollout::list::Cursor;
pub use rollout::list::parse_cursor;
pub use rollout::list::read_head_for_summary;
pub use saved_sessions::SavedSessionEntry;
pub use saved_sessions::build_saved_session_entry;
pub use saved_sessions::list_saved_sessions;
pub use saved_sessions::resolve_rollout_path;
pub use saved_sessions::resolve_saved_session;
pub use saved_sessions::upsert_saved_session;
mod function_tool;
mod state;
mod tasks;

View File

@@ -66,6 +66,7 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::SaveSessionResponse(_)
| EventMsg::StreamError(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)

View File

@@ -11,6 +11,8 @@ use serde_json::Value;
use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{self};
@@ -70,6 +72,10 @@ enum RolloutCmd {
Shutdown {
ack: oneshot::Sender<()>,
},
SetName {
name: Option<String>,
ack: oneshot::Sender<std::io::Result<()>>,
},
}
impl RolloutRecorderParams {
@@ -148,11 +154,14 @@ impl RolloutRecorder {
instructions,
source,
model_provider: Some(config.model_provider_id.clone()),
name: None,
}),
)
}
RolloutRecorderParams::Resume { path } => (
tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.append(true)
.open(&path)
.await?,
@@ -196,6 +205,21 @@ impl RolloutRecorder {
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
/// Update the session name stored in the rollout's SessionMeta line.
pub async fn set_session_name(&self, name: Option<String>) -> std::io::Result<()> {
let (tx, rx) = oneshot::channel();
self.tx
.send(RolloutCmd::SetName { name, ack: tx })
.await
.map_err(|e| IoError::other(format!("failed to queue session name update: {e}")))?;
match rx.await {
Ok(result) => result,
Err(e) => Err(IoError::other(format!(
"failed waiting for session name update: {e}"
))),
}
}
/// Flush all queued writes and wait until they are committed by the writer task.
pub async fn flush(&self) -> std::io::Result<()> {
let (tx, rx) = oneshot::channel();
@@ -334,6 +358,7 @@ fn create_log_file(
let path = dir.join(filename);
let file = std::fs::OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(&path)?;
@@ -389,6 +414,10 @@ async fn rollout_writer(
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
}
RolloutCmd::SetName { name, ack } => {
let result = rewrite_session_meta_name(&mut writer.file, name).await;
let _ = ack.send(result);
}
}
}
@@ -422,3 +451,232 @@ impl JsonlWriter {
Ok(())
}
}
async fn rewrite_session_meta_name(
file: &mut tokio::fs::File,
name: Option<String>,
) -> std::io::Result<()> {
use std::io::SeekFrom;
file.flush().await?;
file.seek(SeekFrom::Start(0)).await?;
let mut contents = Vec::new();
file.read_to_end(&mut contents).await?;
if contents.is_empty() {
return Err(IoError::other("empty rollout file"));
}
let newline_idx = contents
.iter()
.position(|&b| b == b'\n')
.ok_or_else(|| IoError::other("rollout missing newline after SessionMeta"))?;
let first_line = &contents[..newline_idx];
let mut rollout_line: RolloutLine = serde_json::from_slice(first_line)
.map_err(|e| IoError::other(format!("failed to parse SessionMeta: {e}")))?;
let RolloutItem::SessionMeta(ref mut session_meta_line) = rollout_line.item else {
return Err(IoError::other("first rollout item is not SessionMeta"));
};
session_meta_line.meta.name = name;
let mut updated = serde_json::to_vec(&rollout_line)?;
updated.push(b'\n');
updated.extend_from_slice(&contents[newline_idx + 1..]);
file.set_len(0).await?;
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&updated).await?;
file.flush().await?;
file.seek(SeekFrom::End(0)).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::rewrite_session_meta_name;
use codex_protocol::ConversationId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use tempfile::NamedTempFile;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;
fn sample_meta(name: Option<&str>) -> RolloutItem {
RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: ConversationId::from_string("00000000-0000-4000-8000-000000000001")
.expect("conversation id"),
timestamp: "2025-01-01T00:00:00.000Z".to_string(),
cwd: "/tmp".into(),
originator: "tester".to_string(),
cli_version: "1.0.0".to_string(),
instructions: None,
source: codex_protocol::protocol::SessionSource::Cli,
model_provider: Some("provider".to_string()),
name: name.map(str::to_string),
},
git: None,
})
}
fn sample_line() -> RolloutLine {
RolloutLine {
timestamp: "2025-01-01T00:00:00.000Z".to_string(),
item: sample_meta(None),
}
}
async fn write_rollout(lines: &[RolloutLine]) -> (NamedTempFile, tokio::fs::File) {
let temp = NamedTempFile::new().expect("temp file");
let mut file = OpenOptions::new()
.read(true)
.write(true)
.truncate(true)
.create(true)
.open(temp.path())
.await
.expect("open temp file");
for line in lines {
let mut json = serde_json::to_vec(line).expect("serialize line");
json.push(b'\n');
file.write_all(&json).await.expect("write line");
}
file.seek(std::io::SeekFrom::Start(0))
.await
.expect("rewind");
(temp, file)
}
async fn read_first_line(path: &std::path::Path) -> RolloutLine {
let mut contents = String::new();
let mut file = OpenOptions::new()
.read(true)
.open(path)
.await
.expect("open for read");
file.read_to_string(&mut contents).await.expect("read file");
let first = contents.lines().next().expect("first line");
serde_json::from_str(first).expect("parse first line")
}
#[tokio::test]
async fn updates_meta_name_and_preserves_rest() {
let events = vec![
sample_line(),
RolloutLine {
timestamp: "2025-01-01T00:00:01.000Z".to_string(),
item: RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "hello".to_string(),
}],
}),
},
];
let (temp, mut file) = write_rollout(&events).await;
rewrite_session_meta_name(&mut file, Some("renamed".to_string()))
.await
.expect("rewrite ok");
let first = read_first_line(temp.path()).await;
let RolloutItem::SessionMeta(meta_line) = first.item else {
panic!("expected SessionMeta line");
};
assert_eq!(meta_line.meta.name.as_deref(), Some("renamed"));
let contents = tokio::fs::read_to_string(temp.path())
.await
.expect("read file");
let lines: Vec<_> = contents.lines().collect();
assert_eq!(lines.len(), 2);
let parsed: RolloutLine = serde_json::from_str(lines[1]).expect("parse second line");
let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = parsed.item
else {
panic!("expected response item");
};
assert_eq!(role, "assistant");
assert_eq!(
content,
vec![ContentItem::OutputText {
text: "hello".to_string()
}]
);
}
#[tokio::test]
async fn clearing_name_sets_none() {
let mut first = sample_line();
first.item = sample_meta(Some("existing"));
let (temp, mut file) = write_rollout(&[first]).await;
rewrite_session_meta_name(&mut file, None)
.await
.expect("rewrite ok");
let first = read_first_line(temp.path()).await;
let RolloutItem::SessionMeta(meta_line) = first.item else {
panic!("expected SessionMeta line");
};
assert_eq!(meta_line.meta.name, None);
}
#[tokio::test]
async fn errors_on_empty_file() {
let temp = NamedTempFile::new().expect("temp file");
let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(temp.path())
.await
.expect("open temp file");
let err = rewrite_session_meta_name(&mut file, Some("x".to_string()))
.await
.expect_err("expected error");
assert!(format!("{err}").contains("empty rollout file"));
}
#[tokio::test]
async fn errors_when_first_line_not_session_meta() {
let wrong = RolloutLine {
timestamp: "t".to_string(),
item: RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "hello".to_string(),
}],
}),
};
let (_temp, mut file) = write_rollout(&[wrong]).await;
let err = rewrite_session_meta_name(&mut file, Some("x".to_string()))
.await
.expect_err("expected error");
assert!(format!("{err}").contains("first rollout item is not SessionMeta"));
// ensure file pointer is rewound to end after failure paths
let pos = file
.seek(std::io::SeekFrom::Current(0))
.await
.expect("seek");
assert!(pos > 0);
}
#[tokio::test]
async fn errors_when_missing_newline() {
let temp = NamedTempFile::new().expect("temp file");
let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(temp.path())
.await
.expect("open temp file");
file.write_all(b"no newline").await.expect("write");
let err = rewrite_session_meta_name(&mut file, Some("x".to_string()))
.await
.expect_err("expected error");
assert!(format!("{err}").contains("rollout missing newline after SessionMeta"));
}
}

View File

@@ -594,6 +594,7 @@ async fn test_tail_includes_last_response_items() -> Result<()> {
cli_version: "test_version".into(),
source: SessionSource::VSCode,
model_provider: Some("test-provider".into()),
name: None,
},
git: None,
}),
@@ -687,6 +688,7 @@ async fn test_tail_handles_short_sessions() -> Result<()> {
cli_version: "test_version".into(),
source: SessionSource::VSCode,
model_provider: Some("test-provider".into()),
name: None,
},
git: None,
}),
@@ -781,6 +783,7 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> {
cli_version: "test_version".into(),
source: SessionSource::VSCode,
model_provider: Some("test-provider".into()),
name: None,
},
git: None,
}),

View File

@@ -26,6 +26,28 @@ use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SandboxPermissions {
UseDefault,
RequireEscalated,
}
impl SandboxPermissions {
pub fn requires_escalated_permissions(self) -> bool {
matches!(self, SandboxPermissions::RequireEscalated)
}
}
impl From<bool> for SandboxPermissions {
fn from(with_escalated_permissions: bool) -> Self {
if with_escalated_permissions {
SandboxPermissions::RequireEscalated
} else {
SandboxPermissions::UseDefault
}
}
}
#[derive(Clone, Debug)]
pub struct CommandSpec {
pub program: String,

View File

@@ -0,0 +1,144 @@
use crate::error::Result;
use crate::find_conversation_path_by_id_str;
use crate::rollout::list::read_head_for_summary;
use codex_protocol::ConversationId;
use codex_protocol::protocol::SessionMetaLine;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::path::Path;
use std::path::PathBuf;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tracing::warn;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct SavedSessionEntry {
pub name: String,
pub conversation_id: ConversationId,
pub rollout_path: PathBuf,
pub cwd: PathBuf,
pub model: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub model_provider: Option<String>,
pub saved_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub created_at: Option<String>,
}
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct SavedSessionsFile {
#[serde(default)]
entries: BTreeMap<String, SavedSessionEntry>,
}
fn saved_sessions_path(codex_home: &Path) -> PathBuf {
codex_home.join("saved_sessions.json")
}
async fn load_saved_sessions_file(path: &Path) -> Result<SavedSessionsFile> {
match tokio::fs::read_to_string(path).await {
Ok(text) => {
let parsed = serde_json::from_str(&text)
.map_err(|e| IoError::other(format!("failed to parse saved sessions: {e}")))?;
Ok(parsed)
}
Err(err) if err.kind() == ErrorKind::NotFound => Ok(SavedSessionsFile::default()),
Err(err) => Err(err.into()),
}
}
async fn write_saved_sessions_file(path: &Path, file: &SavedSessionsFile) -> Result<()> {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let json = serde_json::to_string_pretty(file)
.map_err(|e| IoError::other(format!("failed to serialize saved sessions: {e}")))?;
let tmp_path = path.with_extension("json.tmp");
tokio::fs::write(&tmp_path, json).await?;
tokio::fs::rename(tmp_path, path).await?;
Ok(())
}
/// Create a new entry from the rollout's SessionMeta line.
pub async fn build_saved_session_entry(
name: String,
rollout_path: PathBuf,
model: String,
) -> Result<SavedSessionEntry> {
let head = read_head_for_summary(&rollout_path).await?;
let first = head.first().ok_or_else(|| {
IoError::other(format!(
"rollout at {} has no SessionMeta",
rollout_path.display()
))
})?;
let SessionMetaLine { mut meta, .. } = serde_json::from_value::<SessionMetaLine>(first.clone())
.map_err(|e| IoError::other(format!("failed to parse SessionMeta: {e}")))?;
meta.name = Some(name.clone());
let saved_at = OffsetDateTime::now_utc()
.format(&Rfc3339)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let created_at = if meta.timestamp.is_empty() {
None
} else {
Some(meta.timestamp.clone())
};
Ok(SavedSessionEntry {
name,
conversation_id: meta.id,
rollout_path,
cwd: meta.cwd,
model,
model_provider: meta.model_provider,
saved_at,
created_at,
})
}
/// Insert or replace a saved session entry in `saved_sessions.json`.
pub async fn upsert_saved_session(codex_home: &Path, entry: SavedSessionEntry) -> Result<()> {
let path = saved_sessions_path(codex_home);
let mut file = load_saved_sessions_file(&path).await?;
file.entries.insert(entry.name.clone(), entry);
write_saved_sessions_file(&path, &file).await
}
/// Lookup a saved session by name, if present.
pub async fn resolve_saved_session(
codex_home: &Path,
name: &str,
) -> Result<Option<SavedSessionEntry>> {
let path = saved_sessions_path(codex_home);
let file = load_saved_sessions_file(&path).await?;
Ok(file.entries.get(name).cloned())
}
/// Return all saved sessions ordered by newest `saved_at` first.
pub async fn list_saved_sessions(codex_home: &Path) -> Result<Vec<SavedSessionEntry>> {
let path = saved_sessions_path(codex_home);
let file = load_saved_sessions_file(&path).await?;
let mut entries: Vec<SavedSessionEntry> = file.entries.values().cloned().collect();
entries.sort_by(|a, b| b.saved_at.cmp(&a.saved_at));
Ok(entries)
}
/// Resolve a rollout path from either a saved-session name or rollout id string.
/// Returns `Ok(None)` when nothing matches.
pub async fn resolve_rollout_path(codex_home: &Path, identifier: &str) -> Result<Option<PathBuf>> {
if let Some(entry) = resolve_saved_session(codex_home, identifier).await? {
if entry.rollout_path.exists() {
return Ok(Some(entry.rollout_path));
}
warn!(
"saved session '{}' points to missing rollout at {}",
identifier,
entry.rollout_path.display()
);
}
Ok(find_conversation_path_by_id_str(codex_home, identifier).await?)
}

View File

@@ -204,10 +204,21 @@ pub async fn default_user_shell() -> Shell {
if cfg!(windows) {
get_shell(ShellType::PowerShell, None).unwrap_or(Shell::Unknown)
} else {
get_user_shell_path()
let user_default_shell = get_user_shell_path()
.and_then(|shell| detect_shell_type(&shell))
.and_then(|shell_type| get_shell(shell_type, None))
.unwrap_or(Shell::Unknown)
.and_then(|shell_type| get_shell(shell_type, None));
let shell_with_fallback = if cfg!(target_os = "macos") {
user_default_shell
.or_else(|| get_shell(ShellType::Zsh, None))
.or_else(|| get_shell(ShellType::Bash, None))
} else {
user_default_shell
.or_else(|| get_shell(ShellType::Bash, None))
.or_else(|| get_shell(ShellType::Zsh, None))
};
shell_with_fallback.unwrap_or(Shell::Unknown)
}
}

View File

@@ -128,7 +128,9 @@ impl Session {
task_cancellation_token.child_token(),
)
.await;
session_ctx.clone_session().flush_rollout().await;
if let Err(e) = session_ctx.clone_session().flush_rollout().await {
tracing::warn!("failed to flush rollout recorder: {e}");
}
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
let sess = session_ctx.clone_session();

View File

@@ -179,15 +179,17 @@ impl ToolEmitter {
ctx.turn,
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: ctx.call_id.to_string(),
turn_id: ctx.turn.sub_id.clone(),
auto_approved: *auto_approved,
changes: changes.clone(),
}),
)
.await;
}
(Self::ApplyPatch { .. }, ToolEventStage::Success(output)) => {
(Self::ApplyPatch { changes, .. }, ToolEventStage::Success(output)) => {
emit_patch_end(
ctx,
changes.clone(),
output.stdout.text.clone(),
output.stderr.text.clone(),
output.exit_code == 0,
@@ -195,11 +197,12 @@ impl ToolEmitter {
.await;
}
(
Self::ApplyPatch { .. },
Self::ApplyPatch { changes, .. },
ToolEventStage::Failure(ToolEventFailure::Output(output)),
) => {
emit_patch_end(
ctx,
changes.clone(),
output.stdout.text.clone(),
output.stderr.text.clone(),
output.exit_code == 0,
@@ -207,10 +210,17 @@ impl ToolEmitter {
.await;
}
(
Self::ApplyPatch { .. },
Self::ApplyPatch { changes, .. },
ToolEventStage::Failure(ToolEventFailure::Message(message)),
) => {
emit_patch_end(ctx, String::new(), (*message).to_string(), false).await;
emit_patch_end(
ctx,
changes.clone(),
String::new(),
(*message).to_string(),
false,
)
.await;
}
(
Self::UnifiedExec {
@@ -409,15 +419,23 @@ async fn emit_exec_end(
.await;
}
async fn emit_patch_end(ctx: ToolEventCtx<'_>, stdout: String, stderr: String, success: bool) {
async fn emit_patch_end(
ctx: ToolEventCtx<'_>,
changes: HashMap<PathBuf, FileChange>,
stdout: String,
stderr: String,
success: bool,
) {
ctx.session
.send_event(
ctx.turn,
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: ctx.call_id.to_string(),
turn_id: ctx.turn.sub_id.clone(),
stdout,
stderr,
success,
changes,
}),
)
.await;

View File

@@ -9,9 +9,11 @@ use crate::apply_patch::convert_apply_patch_to_protocol;
use crate::codex::TurnContext;
use crate::exec::ExecParams;
use crate::exec_env::create_env;
use crate::exec_policy::create_approval_requirement_for_command;
use crate::function_tool::FunctionCallError;
use crate::is_safe_command::is_known_safe_command;
use crate::protocol::ExecCommandSource;
use crate::sandboxing::SandboxPermissions;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
@@ -302,6 +304,13 @@ impl ShellHandler {
env: exec_params.env.clone(),
with_escalated_permissions: exec_params.with_escalated_permissions,
justification: exec_params.justification.clone(),
approval_requirement: create_approval_requirement_for_command(
&turn.exec_policy,
&exec_params.command,
turn.approval_policy,
&turn.sandbox_policy,
SandboxPermissions::from(exec_params.with_escalated_permissions.unwrap_or(false)),
),
};
let mut orchestrator = ToolOrchestrator::new();
let mut runtime = ShellRuntime::new();

View File

@@ -11,11 +11,13 @@ use crate::error::get_error_message_ui;
use crate::exec::ExecToolCallOutput;
use crate::sandboxing::SandboxManager;
use crate::tools::sandboxing::ApprovalCtx;
use crate::tools::sandboxing::ApprovalRequirement;
use crate::tools::sandboxing::ProvidesSandboxRetryData;
use crate::tools::sandboxing::SandboxAttempt;
use crate::tools::sandboxing::ToolCtx;
use crate::tools::sandboxing::ToolError;
use crate::tools::sandboxing::ToolRuntime;
use crate::tools::sandboxing::default_approval_requirement;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::ReviewDecision;
@@ -49,40 +51,52 @@ impl ToolOrchestrator {
let otel_cfg = codex_otel::otel_event_manager::ToolDecisionSource::Config;
// 1) Approval
let needs_initial_approval =
tool.wants_initial_approval(req, approval_policy, &turn_ctx.sandbox_policy);
let mut already_approved = false;
if needs_initial_approval {
let mut risk = None;
if let Some(metadata) = req.sandbox_retry_data() {
risk = tool_ctx
.session
.assess_sandbox_command(turn_ctx, &tool_ctx.call_id, &metadata.command, None)
.await;
let requirement = tool.approval_requirement(req).unwrap_or_else(|| {
default_approval_requirement(approval_policy, &turn_ctx.sandbox_policy)
});
match requirement {
ApprovalRequirement::Skip => {
otel.tool_decision(otel_tn, otel_ci, ReviewDecision::Approved, otel_cfg);
}
ApprovalRequirement::Forbidden { reason } => {
return Err(ToolError::Rejected(reason));
}
ApprovalRequirement::NeedsApproval { reason } => {
let mut risk = None;
let approval_ctx = ApprovalCtx {
session: tool_ctx.session,
turn: turn_ctx,
call_id: &tool_ctx.call_id,
retry_reason: None,
risk,
};
let decision = tool.start_approval_async(req, approval_ctx).await;
otel.tool_decision(otel_tn, otel_ci, decision, otel_user.clone());
match decision {
ReviewDecision::Denied | ReviewDecision::Abort => {
return Err(ToolError::Rejected("rejected by user".to_string()));
if let Some(metadata) = req.sandbox_retry_data() {
risk = tool_ctx
.session
.assess_sandbox_command(
turn_ctx,
&tool_ctx.call_id,
&metadata.command,
None,
)
.await;
}
ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {}
let approval_ctx = ApprovalCtx {
session: tool_ctx.session,
turn: turn_ctx,
call_id: &tool_ctx.call_id,
retry_reason: reason,
risk,
};
let decision = tool.start_approval_async(req, approval_ctx).await;
otel.tool_decision(otel_tn, otel_ci, decision, otel_user.clone());
match decision {
ReviewDecision::Denied | ReviewDecision::Abort => {
return Err(ToolError::Rejected("rejected by user".to_string()));
}
ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {}
}
already_approved = true;
}
already_approved = true;
} else {
otel.tool_decision(otel_tn, otel_ci, ReviewDecision::Approved, otel_cfg);
}
// 2) First attempt under the selected sandbox.

View File

@@ -4,13 +4,12 @@ Runtime: shell
Executes shell requests under the orchestrator: asks for approval when needed,
builds a CommandSpec, and runs it under the current SandboxAttempt.
*/
use crate::command_safety::is_dangerous_command::requires_initial_appoval;
use crate::exec::ExecToolCallOutput;
use crate::protocol::SandboxPolicy;
use crate::sandboxing::execute_env;
use crate::tools::runtimes::build_command_spec;
use crate::tools::sandboxing::Approvable;
use crate::tools::sandboxing::ApprovalCtx;
use crate::tools::sandboxing::ApprovalRequirement;
use crate::tools::sandboxing::ProvidesSandboxRetryData;
use crate::tools::sandboxing::SandboxAttempt;
use crate::tools::sandboxing::SandboxRetryData;
@@ -20,7 +19,6 @@ use crate::tools::sandboxing::ToolCtx;
use crate::tools::sandboxing::ToolError;
use crate::tools::sandboxing::ToolRuntime;
use crate::tools::sandboxing::with_cached_approval;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::ReviewDecision;
use futures::future::BoxFuture;
use std::path::PathBuf;
@@ -33,6 +31,7 @@ pub struct ShellRequest {
pub env: std::collections::HashMap<String, String>,
pub with_escalated_permissions: Option<bool>,
pub justification: Option<String>,
pub approval_requirement: ApprovalRequirement,
}
impl ProvidesSandboxRetryData for ShellRequest {
@@ -114,18 +113,8 @@ impl Approvable<ShellRequest> for ShellRuntime {
})
}
fn wants_initial_approval(
&self,
req: &ShellRequest,
policy: AskForApproval,
sandbox_policy: &SandboxPolicy,
) -> bool {
requires_initial_appoval(
policy,
sandbox_policy,
&req.command,
req.with_escalated_permissions.unwrap_or(false),
)
fn approval_requirement(&self, req: &ShellRequest) -> Option<ApprovalRequirement> {
Some(req.approval_requirement.clone())
}
fn wants_escalated_first_attempt(&self, req: &ShellRequest) -> bool {

View File

@@ -1,4 +1,3 @@
use crate::command_safety::is_dangerous_command::requires_initial_appoval;
/*
Runtime: unified exec
@@ -10,6 +9,7 @@ use crate::error::SandboxErr;
use crate::tools::runtimes::build_command_spec;
use crate::tools::sandboxing::Approvable;
use crate::tools::sandboxing::ApprovalCtx;
use crate::tools::sandboxing::ApprovalRequirement;
use crate::tools::sandboxing::ProvidesSandboxRetryData;
use crate::tools::sandboxing::SandboxAttempt;
use crate::tools::sandboxing::SandboxRetryData;
@@ -22,9 +22,7 @@ use crate::tools::sandboxing::with_cached_approval;
use crate::unified_exec::UnifiedExecError;
use crate::unified_exec::UnifiedExecSession;
use crate::unified_exec::UnifiedExecSessionManager;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::SandboxPolicy;
use futures::future::BoxFuture;
use std::collections::HashMap;
use std::path::PathBuf;
@@ -36,6 +34,7 @@ pub struct UnifiedExecRequest {
pub env: HashMap<String, String>,
pub with_escalated_permissions: Option<bool>,
pub justification: Option<String>,
pub approval_requirement: ApprovalRequirement,
}
impl ProvidesSandboxRetryData for UnifiedExecRequest {
@@ -65,6 +64,7 @@ impl UnifiedExecRequest {
env: HashMap<String, String>,
with_escalated_permissions: Option<bool>,
justification: Option<String>,
approval_requirement: ApprovalRequirement,
) -> Self {
Self {
command,
@@ -72,6 +72,7 @@ impl UnifiedExecRequest {
env,
with_escalated_permissions,
justification,
approval_requirement,
}
}
}
@@ -129,18 +130,8 @@ impl Approvable<UnifiedExecRequest> for UnifiedExecRuntime<'_> {
})
}
fn wants_initial_approval(
&self,
req: &UnifiedExecRequest,
policy: AskForApproval,
sandbox_policy: &SandboxPolicy,
) -> bool {
requires_initial_appoval(
policy,
sandbox_policy,
&req.command,
req.with_escalated_permissions.unwrap_or(false),
)
fn approval_requirement(&self, req: &UnifiedExecRequest) -> Option<ApprovalRequirement> {
Some(req.approval_requirement.clone())
}
fn wants_escalated_first_attempt(&self, req: &UnifiedExecRequest) -> bool {

View File

@@ -86,6 +86,37 @@ pub(crate) struct ApprovalCtx<'a> {
pub risk: Option<SandboxCommandAssessment>,
}
// Specifies what tool orchestrator should do with a given tool call.
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum ApprovalRequirement {
/// No approval required for this tool call
Skip,
/// Approval required for this tool call
NeedsApproval { reason: Option<String> },
/// Execution forbidden for this tool call
Forbidden { reason: String },
}
/// - Never, OnFailure: do not ask
/// - OnRequest: ask unless sandbox policy is DangerFullAccess
/// - UnlessTrusted: always ask
pub(crate) fn default_approval_requirement(
policy: AskForApproval,
sandbox_policy: &SandboxPolicy,
) -> ApprovalRequirement {
let needs_approval = match policy {
AskForApproval::Never | AskForApproval::OnFailure => false,
AskForApproval::OnRequest => !matches!(sandbox_policy, SandboxPolicy::DangerFullAccess),
AskForApproval::UnlessTrusted => true,
};
if needs_approval {
ApprovalRequirement::NeedsApproval { reason: None }
} else {
ApprovalRequirement::Skip
}
}
pub(crate) trait Approvable<Req> {
type ApprovalKey: Hash + Eq + Clone + Debug + Serialize;
@@ -106,22 +137,11 @@ pub(crate) trait Approvable<Req> {
matches!(policy, AskForApproval::Never)
}
/// Decide whether an initial user approval should be requested before the
/// first attempt. Defaults to the orchestrator's behavior (prerefactor):
/// - Never, OnFailure: do not ask
/// - OnRequest: ask unless sandbox policy is DangerFullAccess
/// - UnlessTrusted: always ask
fn wants_initial_approval(
&self,
_req: &Req,
policy: AskForApproval,
sandbox_policy: &SandboxPolicy,
) -> bool {
match policy {
AskForApproval::Never | AskForApproval::OnFailure => false,
AskForApproval::OnRequest => !matches!(sandbox_policy, SandboxPolicy::DangerFullAccess),
AskForApproval::UnlessTrusted => true,
}
/// Override the default approval requirement. Return `Some(_)` to specify
/// a custom requirement, or `None` to fall back to
/// policy-based default.
fn approval_requirement(&self, _req: &Req) -> Option<ApprovalRequirement> {
None
}
/// Decide we can request an approval for no-sandbox execution.

View File

@@ -185,6 +185,7 @@ fn truncate_with_byte_estimate(s: &str, policy: TruncationPolicy) -> String {
if s.is_empty() {
return String::new();
}
let total_chars = s.chars().count();
let max_bytes = policy.byte_budget();
@@ -204,24 +205,55 @@ fn truncate_with_byte_estimate(s: &str, policy: TruncationPolicy) -> String {
let total_bytes = s.len();
let (left_budget, right_budget) = split_budget(max_bytes);
let prefix_end = pick_prefix_end(s, left_budget);
let mut suffix_start = pick_suffix_start(s, right_budget);
if suffix_start < prefix_end {
suffix_start = prefix_end;
}
let left_chars = s[..prefix_end].chars().count();
let right_chars = s[suffix_start..].chars().count();
let removed_chars = total_chars
.saturating_sub(left_chars)
.saturating_sub(right_chars);
let (removed_chars, left, right) = split_string(s, left_budget, right_budget);
let marker = format_truncation_marker(
policy,
removed_units_for_source(policy, total_bytes.saturating_sub(max_bytes), removed_chars),
);
assemble_truncated_output(&s[..prefix_end], &s[suffix_start..], &marker)
assemble_truncated_output(left, right, &marker)
}
fn split_string(s: &str, beginning_bytes: usize, end_bytes: usize) -> (usize, &str, &str) {
if s.is_empty() {
return (0, "", "");
}
let len = s.len();
let tail_start_target = len.saturating_sub(end_bytes);
let mut prefix_end = 0usize;
let mut suffix_start = len;
let mut removed_chars = 0usize;
let mut suffix_started = false;
for (idx, ch) in s.char_indices() {
let char_end = idx + ch.len_utf8();
if char_end <= beginning_bytes {
prefix_end = char_end;
continue;
}
if idx >= tail_start_target {
if !suffix_started {
suffix_start = idx;
suffix_started = true;
}
continue;
}
removed_chars = removed_chars.saturating_add(1);
}
if suffix_start < prefix_end {
suffix_start = prefix_end;
}
let before = &s[..prefix_end];
let after = &s[suffix_start..];
(removed_chars, before, after)
}
fn format_truncation_marker(policy: TruncationPolicy, removed_count: u64) -> String {
@@ -270,42 +302,54 @@ fn approx_tokens_from_byte_count(bytes: usize) -> u64 {
/ (APPROX_BYTES_PER_TOKEN as u64)
}
fn truncate_on_boundary(input: &str, max_len: usize) -> &str {
if input.len() <= max_len {
return input;
}
let mut end = max_len;
while end > 0 && !input.is_char_boundary(end) {
end -= 1;
}
&input[..end]
}
fn pick_prefix_end(s: &str, left_budget: usize) -> usize {
truncate_on_boundary(s, left_budget).len()
}
fn pick_suffix_start(s: &str, right_budget: usize) -> usize {
let start_tail = s.len().saturating_sub(right_budget);
let mut idx = start_tail.min(s.len());
while idx < s.len() && !s.is_char_boundary(idx) {
idx += 1;
}
idx
}
#[cfg(test)]
mod tests {
use super::TruncationPolicy;
use super::approx_token_count;
use super::formatted_truncate_text;
use super::split_string;
use super::truncate_function_output_items_with_policy;
use super::truncate_text;
use super::truncate_with_token_budget;
use codex_protocol::models::FunctionCallOutputContentItem;
use pretty_assertions::assert_eq;
#[test]
fn split_string_works() {
assert_eq!(split_string("hello world", 5, 5), (1, "hello", "world"));
assert_eq!(split_string("abc", 0, 0), (3, "", ""));
}
#[test]
fn split_string_handles_empty_string() {
assert_eq!(split_string("", 4, 4), (0, "", ""));
}
#[test]
fn split_string_only_keeps_prefix_when_tail_budget_is_zero() {
assert_eq!(split_string("abcdef", 3, 0), (3, "abc", ""));
}
#[test]
fn split_string_only_keeps_suffix_when_prefix_budget_is_zero() {
assert_eq!(split_string("abcdef", 0, 3), (3, "", "def"));
}
#[test]
fn split_string_handles_overlapping_budgets_without_removal() {
assert_eq!(split_string("abcdef", 4, 4), (0, "abcd", "ef"));
}
#[test]
fn split_string_respects_utf8_boundaries() {
assert_eq!(split_string("😀abc😀", 5, 5), (1, "😀a", "c😀"));
assert_eq!(split_string("😀😀😀😀😀", 1, 1), (5, "", ""));
assert_eq!(split_string("😀😀😀😀😀", 7, 7), (3, "😀", "😀"));
assert_eq!(split_string("😀😀😀😀😀", 8, 8), (1, "😀😀", "😀😀"));
}
#[test]
fn truncate_bytes_less_than_placeholder_returns_placeholder() {
let content = "example output";

View File

@@ -2,13 +2,13 @@
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
@@ -67,13 +67,18 @@ impl OutputBufferState {
}
pub(crate) type OutputBuffer = Arc<Mutex<OutputBufferState>>;
pub(crate) type OutputHandles = (OutputBuffer, Arc<Notify>);
pub(crate) struct OutputHandles {
pub(crate) output_buffer: OutputBuffer,
pub(crate) output_notify: Arc<Notify>,
pub(crate) cancellation_token: CancellationToken,
}
#[derive(Debug)]
pub(crate) struct UnifiedExecSession {
session: ExecCommandSession,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
cancellation_token: CancellationToken,
output_task: JoinHandle<()>,
sandbox_type: SandboxType,
}
@@ -86,9 +91,11 @@ impl UnifiedExecSession {
) -> Self {
let output_buffer = Arc::new(Mutex::new(OutputBufferState::default()));
let output_notify = Arc::new(Notify::new());
let cancellation_token = CancellationToken::new();
let mut receiver = initial_output_rx;
let buffer_clone = Arc::clone(&output_buffer);
let notify_clone = Arc::clone(&output_notify);
let cancellation_token_clone = cancellation_token.clone();
let output_task = tokio::spawn(async move {
loop {
match receiver.recv().await {
@@ -99,7 +106,10 @@ impl UnifiedExecSession {
notify_clone.notify_waiters();
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
cancellation_token_clone.cancel();
break;
}
}
}
});
@@ -108,6 +118,7 @@ impl UnifiedExecSession {
session,
output_buffer,
output_notify,
cancellation_token,
output_task,
sandbox_type,
}
@@ -118,10 +129,11 @@ impl UnifiedExecSession {
}
pub(super) fn output_handles(&self) -> OutputHandles {
(
Arc::clone(&self.output_buffer),
Arc::clone(&self.output_notify),
)
OutputHandles {
output_buffer: Arc::clone(&self.output_buffer),
output_notify: Arc::clone(&self.output_notify),
cancellation_token: self.cancellation_token.clone(),
}
}
pub(super) fn has_exited(&self) -> bool {
@@ -199,20 +211,34 @@ impl UnifiedExecSession {
};
if exit_ready {
managed.signal_exit();
managed.check_for_sandbox_denial().await?;
return Ok(managed);
}
tokio::pin!(exit_rx);
if tokio::time::timeout(Duration::from_millis(50), &mut exit_rx)
.await
.is_ok()
{
managed.signal_exit();
managed.check_for_sandbox_denial().await?;
return Ok(managed);
}
tokio::spawn({
let cancellation_token = managed.cancellation_token.clone();
async move {
let _ = exit_rx.await;
cancellation_token.cancel();
}
});
Ok(managed)
}
fn signal_exit(&self) {
self.cancellation_token.cancel();
}
}
impl Drop for UnifiedExecSession {

View File

@@ -5,16 +5,19 @@ use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::time::Duration;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::exec::ExecToolCallOutput;
use crate::exec::StreamOutput;
use crate::exec_env::create_env;
use crate::exec_policy::create_approval_requirement_for_command;
use crate::protocol::BackgroundEventEvent;
use crate::protocol::EventMsg;
use crate::protocol::ExecCommandSource;
use crate::sandboxing::ExecEnv;
use crate::sandboxing::SandboxPermissions;
use crate::tools::events::ToolEmitter;
use crate::tools::events::ToolEventCtx;
use crate::tools::events::ToolEventFailure;
@@ -38,8 +41,20 @@ use super::clamp_yield_time;
use super::generate_chunk_id;
use super::resolve_max_tokens;
use super::session::OutputBuffer;
use super::session::OutputHandles;
use super::session::UnifiedExecSession;
struct PreparedSessionHandles {
writer_tx: mpsc::Sender<Vec<u8>>,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
cancellation_token: CancellationToken,
session_ref: Arc<Session>,
turn_ref: Arc<TurnContext>,
command: Vec<String>,
cwd: PathBuf,
}
impl UnifiedExecSessionManager {
pub(crate) async fn exec_command(
&self,
@@ -65,10 +80,19 @@ impl UnifiedExecSessionManager {
let yield_time_ms = clamp_yield_time(request.yield_time_ms);
let start = Instant::now();
let (output_buffer, output_notify) = session.output_handles();
let OutputHandles {
output_buffer,
output_notify,
cancellation_token,
} = session.output_handles();
let deadline = start + Duration::from_millis(yield_time_ms);
let collected =
Self::collect_output_until_deadline(&output_buffer, &output_notify, deadline).await;
let collected = Self::collect_output_until_deadline(
&output_buffer,
&output_notify,
&cancellation_token,
deadline,
)
.await;
let wall_time = Instant::now().saturating_duration_since(start);
let text = String::from_utf8_lossy(&collected).to_string();
@@ -127,15 +151,16 @@ impl UnifiedExecSessionManager {
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let session_id = request.session_id;
let (
let PreparedSessionHandles {
writer_tx,
output_buffer,
output_notify,
cancellation_token,
session_ref,
turn_ref,
session_command,
session_cwd,
) = self.prepare_session_handles(session_id).await?;
command: session_command,
cwd: session_cwd,
} = self.prepare_session_handles(session_id).await?;
let interaction_emitter = ToolEmitter::unified_exec(
&session_command,
@@ -174,8 +199,13 @@ impl UnifiedExecSessionManager {
let yield_time_ms = clamp_yield_time(request.yield_time_ms);
let start = Instant::now();
let deadline = start + Duration::from_millis(yield_time_ms);
let collected =
Self::collect_output_until_deadline(&output_buffer, &output_notify, deadline).await;
let collected = Self::collect_output_until_deadline(
&output_buffer,
&output_notify,
&cancellation_token,
deadline,
)
.await;
let wall_time = Instant::now().saturating_duration_since(start);
let text = String::from_utf8_lossy(&collected).to_string();
@@ -263,44 +293,27 @@ impl UnifiedExecSessionManager {
async fn prepare_session_handles(
&self,
session_id: i32,
) -> Result<
(
mpsc::Sender<Vec<u8>>,
OutputBuffer,
Arc<Notify>,
Arc<Session>,
Arc<TurnContext>,
Vec<String>,
PathBuf,
),
UnifiedExecError,
> {
) -> Result<PreparedSessionHandles, UnifiedExecError> {
let sessions = self.sessions.lock().await;
let (output_buffer, output_notify, writer_tx, session, turn, command, cwd) =
if let Some(entry) = sessions.get(&session_id) {
let (buffer, notify) = entry.session.output_handles();
(
buffer,
notify,
entry.session.writer_sender(),
Arc::clone(&entry.session_ref),
Arc::clone(&entry.turn_ref),
entry.command.clone(),
entry.cwd.clone(),
)
} else {
return Err(UnifiedExecError::UnknownSessionId { session_id });
};
Ok((
writer_tx,
let entry = sessions
.get(&session_id)
.ok_or(UnifiedExecError::UnknownSessionId { session_id })?;
let OutputHandles {
output_buffer,
output_notify,
session,
turn,
command,
cwd,
))
cancellation_token,
} = entry.session.output_handles();
Ok(PreparedSessionHandles {
writer_tx: entry.session.writer_sender(),
output_buffer,
output_notify,
cancellation_token,
session_ref: Arc::clone(&entry.session_ref),
turn_ref: Arc::clone(&entry.turn_ref),
command: entry.command.clone(),
cwd: entry.cwd.clone(),
})
}
async fn send_input(
@@ -449,6 +462,13 @@ impl UnifiedExecSessionManager {
create_env(&context.turn.shell_environment_policy),
with_escalated_permissions,
justification,
create_approval_requirement_for_command(
&context.turn.exec_policy,
command,
context.turn.approval_policy,
&context.turn.sandbox_policy,
SandboxPermissions::from(with_escalated_permissions.unwrap_or(false)),
),
);
let tool_ctx = ToolCtx {
session: context.session.as_ref(),
@@ -471,9 +491,13 @@ impl UnifiedExecSessionManager {
pub(super) async fn collect_output_until_deadline(
output_buffer: &OutputBuffer,
output_notify: &Arc<Notify>,
cancellation_token: &CancellationToken,
deadline: Instant,
) -> Vec<u8> {
const POST_EXIT_OUTPUT_GRACE: Duration = Duration::from_millis(25);
let mut collected: Vec<u8> = Vec::with_capacity(4096);
let mut exit_signal_received = cancellation_token.is_cancelled();
loop {
let drained_chunks;
let mut wait_for_output = None;
@@ -486,15 +510,27 @@ impl UnifiedExecSessionManager {
}
if drained_chunks.is_empty() {
exit_signal_received |= cancellation_token.is_cancelled();
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining == Duration::ZERO {
break;
}
let notified = wait_for_output.unwrap_or_else(|| output_notify.notified());
if exit_signal_received {
let grace = remaining.min(POST_EXIT_OUTPUT_GRACE);
if tokio::time::timeout(grace, notified).await.is_err() {
break;
}
continue;
}
tokio::pin!(notified);
let exit_notified = cancellation_token.cancelled();
tokio::pin!(exit_notified);
tokio::select! {
_ = &mut notified => {}
_ = &mut exit_notified => exit_signal_received = true,
_ = tokio::time::sleep(remaining) => break,
}
continue;
@@ -504,6 +540,7 @@ impl UnifiedExecSessionManager {
collected.extend_from_slice(&chunk);
}
exit_signal_received |= cancellation_token.is_cancelled();
if Instant::now() >= deadline {
break;
}

View File

@@ -0,0 +1,101 @@
#![allow(clippy::unwrap_used, clippy::expect_used)]
use anyhow::Result;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use serde_json::json;
use std::fs;
#[tokio::test]
async fn execpolicy_blocks_shell_invocation() -> Result<()> {
let mut builder = test_codex().with_config(|config| {
let policy_path = config.codex_home.join("policy").join("policy.codexpolicy");
fs::create_dir_all(
policy_path
.parent()
.expect("policy directory must have a parent"),
)
.expect("create policy directory");
fs::write(
&policy_path,
r#"prefix_rule(pattern=["echo"], decision="forbidden")"#,
)
.expect("write policy file");
});
let server = start_mock_server().await;
let test = builder.build(&server).await?;
let call_id = "shell-forbidden";
let args = json!({
"command": ["echo", "blocked"],
"timeout_ms": 1_000,
});
mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "shell", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
)
.await;
mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
)
.await;
let session_model = test.session_configured.model.clone();
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "run shell command".into(),
}],
final_output_json_schema: None,
cwd: test.cwd_path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
})
.await?;
let EventMsg::ExecCommandEnd(end) = wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::ExecCommandEnd(_))
})
.await
else {
unreachable!()
};
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TaskComplete(_))
})
.await;
assert!(
end.aggregated_output
.contains("execpolicy forbids this command"),
"unexpected output: {}",
end.aggregated_output
);
Ok(())
}

View File

@@ -28,6 +28,7 @@ mod compact_remote;
mod compact_resume_fork;
mod deprecation_notice;
mod exec;
mod exec_policy;
mod fork_conversation;
mod grep_files;
mod items;
@@ -44,6 +45,7 @@ mod resume;
mod review;
mod rmcp_client;
mod rollout_list_find;
mod saved_sessions;
mod seatbelt;
mod shell_serialization;
mod stream_error_allows_next_turn;

View File

@@ -0,0 +1,457 @@
#![allow(clippy::expect_used)]
use anyhow::Result;
use codex_core::AuthManager;
use codex_core::CodexAuth;
use codex_core::CodexConversation;
use codex_core::ConversationManager;
use codex_core::SavedSessionEntry;
use codex_core::build_saved_session_entry;
use codex_core::config::Config;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::RolloutItem;
use codex_core::protocol::RolloutLine;
use codex_core::protocol::SaveSessionResponseEvent;
use codex_core::protocol::SessionSource;
use codex_core::resolve_saved_session;
use codex_core::upsert_saved_session;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::Path;
use std::sync::Arc;
fn completion_body(idx: usize, message: &str) -> String {
let resp_id = format!("resp-{idx}");
let msg_id = format!("msg-{idx}");
sse(vec![
ev_response_created(&resp_id),
ev_assistant_message(&msg_id, message),
ev_completed(&resp_id),
])
}
fn rollout_lines(path: &Path) -> Vec<RolloutLine> {
let text = std::fs::read_to_string(path).expect("read rollout");
text.lines()
.filter_map(|line| {
if line.trim().is_empty() {
return None;
}
let value: serde_json::Value = serde_json::from_str(line).expect("rollout line json");
Some(serde_json::from_value::<RolloutLine>(value).expect("rollout line"))
})
.collect()
}
fn rollout_items_without_meta(path: &Path) -> Vec<RolloutItem> {
rollout_lines(path)
.into_iter()
.filter_map(|line| match line.item {
RolloutItem::SessionMeta(_) => None,
other => Some(other),
})
.collect()
}
fn session_meta_count(path: &Path) -> usize {
rollout_lines(path)
.iter()
.filter(|line| matches!(line.item, RolloutItem::SessionMeta(_)))
.count()
}
async fn submit_text(codex: &Arc<CodexConversation>, text: &str) -> Result<()> {
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: text.to_string(),
}],
})
.await?;
let _ = wait_for_event(codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
Ok(())
}
async fn save_session(
name: &str,
codex: &Arc<CodexConversation>,
config: &Config,
) -> Result<SavedSessionEntry> {
codex.flush_rollout().await?;
codex.set_session_name(Some(name.to_string())).await?;
let entry =
build_saved_session_entry(name.to_string(), codex.rollout_path(), codex.model().await)
.await?;
upsert_saved_session(&config.codex_home, entry.clone()).await?;
Ok(entry)
}
async fn save_session_via_op(
codex: &Arc<CodexConversation>,
name: &str,
) -> Result<SaveSessionResponseEvent> {
codex
.submit(Op::SaveSession {
name: name.to_string(),
})
.await?;
let response: SaveSessionResponseEvent = wait_for_event_match(codex, |ev| match ev {
EventMsg::SaveSessionResponse(resp) => Some(resp.clone()),
_ => None,
})
.await;
Ok(response)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn save_and_resume_by_name() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(&server, vec![completion_body(1, "initial")]).await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "first turn").await?;
let name = "alpha";
let entry = save_session(name, &initial.codex, &initial.config).await?;
let resolved = resolve_saved_session(&initial.config.codex_home, name)
.await?
.expect("saved session");
assert_eq!(entry, resolved);
assert_eq!(session_meta_count(&entry.rollout_path), 1);
let saved_items = rollout_items_without_meta(&entry.rollout_path);
let resumed = builder
.resume(&server, initial.home.clone(), entry.rollout_path.clone())
.await?;
assert_eq!(resumed.session_configured.session_id, entry.conversation_id);
let resumed_items = rollout_items_without_meta(&resumed.session_configured.rollout_path);
assert_eq!(
serde_json::to_value(saved_items)?,
serde_json::to_value(resumed_items)?
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn save_session_op_persists_and_emits_response() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(&server, vec![completion_body(1, "initial")]).await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "first turn").await?;
let name = "via-op";
let response = save_session_via_op(&initial.codex, name).await?;
assert_eq!(response.name, name);
assert_eq!(
response.conversation_id,
initial.session_configured.session_id
);
assert!(response.rollout_path.exists());
let resolved = resolve_saved_session(&initial.config.codex_home, name)
.await?
.expect("saved session");
assert_eq!(resolved.rollout_path, response.rollout_path);
assert_eq!(resolved.conversation_id, response.conversation_id);
assert_eq!(session_meta_count(&resolved.rollout_path), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fork_from_identifier_after_save_op() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(
&server,
vec![
completion_body(1, "seed"),
completion_body(2, "fork-extra-1"),
completion_body(3, "fork-extra-2"),
],
)
.await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "seeded").await?;
let name = "forkable-op";
let response = save_session_via_op(&initial.codex, name).await?;
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let forked = conversation_manager
.fork_from_identifier(initial.config.clone(), name, auth_manager)
.await?;
assert_ne!(
forked.session_configured.session_id,
response.conversation_id
);
// Record the baseline rollout for the saved session.
let base_items = rollout_items_without_meta(&response.rollout_path);
// Send additional turns to the forked conversation and flush.
submit_text(&forked.conversation, "fork one").await?;
submit_text(&forked.conversation, "fork two").await?;
forked.conversation.flush_rollout().await?;
// Re-read both rollouts: source should remain unchanged.
let base_after = rollout_items_without_meta(&response.rollout_path);
assert_eq!(
serde_json::to_value(&base_items)?,
serde_json::to_value(&base_after)?
);
// Forked rollout should extend the baseline.
let fork_items = rollout_items_without_meta(&forked.conversation.rollout_path());
assert!(
fork_items.len() > base_items.len(),
"expected forked rollout to contain additional items"
);
let fork_prefix: Vec<_> = fork_items.iter().take(base_items.len()).cloned().collect();
assert_eq!(
serde_json::to_value(&base_items)?,
serde_json::to_value(&fork_prefix)?,
"forked rollout should extend the baseline history"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn save_and_fork_by_name() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(&server, vec![completion_body(1, "base")]).await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "original").await?;
let entry = save_session("forkable", &initial.codex, &initial.config).await?;
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let forked = conversation_manager
.fork_from_rollout(
initial.config.clone(),
entry.rollout_path.clone(),
auth_manager,
)
.await?;
assert_ne!(forked.session_configured.session_id, entry.conversation_id);
assert_ne!(forked.conversation.rollout_path(), entry.rollout_path);
assert_eq!(session_meta_count(&forked.conversation.rollout_path()), 1);
let base_items = rollout_items_without_meta(&entry.rollout_path);
let fork_items = rollout_items_without_meta(&forked.conversation.rollout_path());
assert_eq!(
serde_json::to_value(base_items)?,
serde_json::to_value(fork_items)?
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn forked_messages_do_not_touch_original() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(
&server,
vec![
completion_body(1, "base"),
completion_body(2, "fork-1"),
completion_body(3, "fork-2"),
],
)
.await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "first").await?;
let entry = save_session("branch", &initial.codex, &initial.config).await?;
let baseline_items = rollout_items_without_meta(&entry.rollout_path);
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let forked = conversation_manager
.fork_from_rollout(
initial.config.clone(),
entry.rollout_path.clone(),
auth_manager.clone(),
)
.await?;
submit_text(&forked.conversation, "fork message one").await?;
submit_text(&forked.conversation, "fork message two").await?;
let resumed = builder
.resume(&server, initial.home.clone(), entry.rollout_path.clone())
.await?;
let resumed_items = rollout_items_without_meta(&resumed.session_configured.rollout_path);
assert_eq!(
serde_json::to_value(baseline_items.clone())?,
serde_json::to_value(resumed_items)?
);
assert_eq!(
serde_json::to_value(baseline_items)?,
serde_json::to_value(rollout_items_without_meta(&entry.rollout_path))?
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn resumed_messages_are_present_in_new_fork() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(
&server,
vec![
completion_body(1, "original"),
completion_body(2, "fork-extra"),
completion_body(3, "resumed-extra"),
],
)
.await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "start").await?;
let entry = save_session("seed", &initial.codex, &initial.config).await?;
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let forked = conversation_manager
.fork_from_rollout(
initial.config.clone(),
entry.rollout_path.clone(),
auth_manager.clone(),
)
.await?;
submit_text(&forked.conversation, "fork only").await?;
let resumed = builder
.resume(&server, initial.home.clone(), entry.rollout_path.clone())
.await?;
submit_text(&resumed.codex, "resumed addition").await?;
resumed.codex.flush_rollout().await?;
let updated_base_items = rollout_items_without_meta(&entry.rollout_path);
let fork_again = conversation_manager
.fork_from_rollout(
initial.config.clone(),
entry.rollout_path.clone(),
auth_manager,
)
.await?;
let fork_again_items = rollout_items_without_meta(&fork_again.conversation.rollout_path());
assert_eq!(
serde_json::to_value(updated_base_items)?,
serde_json::to_value(fork_again_items)?
);
assert_eq!(
session_meta_count(&fork_again.conversation.rollout_path()),
1
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn duplicate_name_overwrites_entry() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(
&server,
vec![completion_body(1, "one"), completion_body(2, "two")],
)
.await;
let mut builder = test_codex();
let first = builder.build(&server).await?;
submit_text(&first.codex, "first session").await?;
let name = "shared";
let entry_one = save_session(name, &first.codex, &first.config).await?;
let second = builder.build(&server).await?;
submit_text(&second.codex, "second session").await?;
let entry_two = save_session(name, &second.codex, &second.config).await?;
let resolved = resolve_saved_session(&second.config.codex_home, name)
.await?
.expect("latest entry present");
assert_eq!(resolved, entry_two);
assert_ne!(resolved.conversation_id, entry_one.conversation_id);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn same_session_multiple_names() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(&server, vec![completion_body(1, "hello")]).await;
let mut builder = test_codex();
let session = builder.build(&server).await?;
submit_text(&session.codex, "save twice").await?;
let entry_first = save_session("first", &session.codex, &session.config).await?;
let entry_second = save_session("second", &session.codex, &session.config).await?;
let resolved_first = resolve_saved_session(&session.config.codex_home, "first")
.await?
.expect("first entry");
let resolved_second = resolve_saved_session(&session.config.codex_home, "second")
.await?
.expect("second entry");
assert_eq!(entry_first.conversation_id, entry_second.conversation_id);
assert_eq!(
resolved_first.conversation_id,
resolved_second.conversation_id
);
assert_eq!(resolved_first.rollout_path, resolved_second.rollout_path);
let names: serde_json::Value = json!([entry_first.name, entry_second.name]);
assert_eq!(names, json!(["first", "second"]));
Ok(())
}

View File

@@ -904,6 +904,98 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_respects_early_exit_notifications() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::UnifiedExec);
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let call_id = "uexec-early-exit";
let args = serde_json::json!({
"cmd": "sleep 0.05",
"yield_time_ms": 31415,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "watch early exit timing".into(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let requests = server.received_requests().await.expect("recorded requests");
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.iter()
.map(|req| req.body_json::<Value>().expect("request json"))
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let output = outputs
.get(call_id)
.expect("missing early exit unified_exec output");
assert!(
output.session_id.is_none(),
"short-lived process should not keep a session alive"
);
assert_eq!(
output.exit_code,
Some(0),
"short-lived process should exit successfully"
);
let wall_time = output.wall_time_seconds;
assert!(
wall_time < 0.75,
"wall_time should reflect early exit rather than the full yield time; got {wall_time}"
);
assert!(
output.output.is_empty(),
"sleep command should not emit output, got {:?}",
output.output
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -72,7 +72,7 @@ For complete documentation of the `Op` and `EventMsg` variants, refer to [protoc
- `EventMsg::AgentMessage` Messages from the `Model`
- `EventMsg::ExecApprovalRequest` Request approval from user to execute a command
- `EventMsg::TaskComplete` A task completed successfully
- `EventMsg::Error` A task stopped with an error (includes an optional `http_status_code` when available)
- `EventMsg::Error` A task stopped with an error
- `EventMsg::Warning` A non-fatal warning that the client should surface to the user
- `EventMsg::TurnComplete` Contains a `response_id` bookmark for last `response_id` executed by the task. This can be used to continue the task at a later point in time, perhaps with additional user input.

View File

@@ -4,14 +4,23 @@ name = "codex-exec-server"
version = { workspace = true }
[[bin]]
name = "codex-exec-server"
path = "src/main.rs"
name = "codex-execve-wrapper"
path = "src/bin/main_execve_wrapper.rs"
[[bin]]
name = "codex-exec-mcp-server"
path = "src/bin/main_mcp_server.rs"
[lib]
name = "codex_exec_server"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true, features = ["derive"] }
codex-core = { workspace = true }
libc = { workspace = true }
@@ -31,6 +40,7 @@ rmcp = { workspace = true, default-features = false, features = [
] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
shlex = { workspace = true }
socket2 = { workspace = true }
tokio = { workspace = true, features = [
"io-std",

View File

@@ -0,0 +1,8 @@
#[cfg(not(unix))]
fn main() {
eprintln!("codex-execve-wrapper is only implemented for UNIX");
std::process::exit(1);
}
#[cfg(unix)]
pub use codex_exec_server::main_execve_wrapper as main;

View File

@@ -0,0 +1,8 @@
#[cfg(not(unix))]
fn main() {
eprintln!("codex-exec-mcp-server is only implemented for UNIX");
std::process::exit(1);
}
#[cfg(unix)]
pub use codex_exec_server::main_mcp_server as main;

View File

@@ -0,0 +1,8 @@
#[cfg(unix)]
mod posix;
#[cfg(unix)]
pub use posix::main_execve_wrapper;
#[cfg(unix)]
pub use posix::main_mcp_server;

View File

@@ -1,11 +0,0 @@
#[cfg(target_os = "windows")]
fn main() {
eprintln!("codex-exec-server is not implemented on Windows targets");
std::process::exit(1);
}
#[cfg(not(target_os = "windows"))]
mod posix;
#[cfg(not(target_os = "windows"))]
pub use posix::main;

View File

@@ -56,109 +56,114 @@
//! o<-----x
//!
use std::path::Path;
use std::path::PathBuf;
use clap::Parser;
use clap::Subcommand;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::{self};
use crate::posix::escalate_protocol::EscalateAction;
use crate::posix::escalate_server::EscalateServer;
use crate::posix::mcp_escalation_policy::ExecPolicyOutcome;
mod escalate_client;
mod escalate_protocol;
mod escalate_server;
mod escalation_policy;
mod mcp;
mod mcp_escalation_policy;
mod socket;
fn dummy_exec_policy(file: &Path, argv: &[String], _workdir: &Path) -> EscalateAction {
// TODO: execpolicy
if file == Path::new("/opt/homebrew/bin/gh")
&& let [_, arg1, arg2, ..] = argv
&& arg1 == "issue"
&& arg2 == "list"
{
return EscalateAction::Escalate;
}
EscalateAction::Run
}
/// Default value of --execve option relative to the current executable.
/// Note this must match the name of the binary as specified in Cargo.toml.
const CODEX_EXECVE_WRAPPER_EXE_NAME: &str = "codex-execve-wrapper";
#[derive(Parser)]
#[command(version)]
pub struct Cli {
#[command(subcommand)]
subcommand: Option<Commands>,
}
struct McpServerCli {
/// Executable to delegate execve(2) calls to in Bash.
#[arg(long = "execve")]
execve_wrapper: Option<PathBuf>,
#[derive(Subcommand)]
enum Commands {
Escalate(EscalateArgs),
ShellExec(ShellExecArgs),
}
/// Invoked from within the sandbox to (potentially) escalate permissions.
#[derive(Parser, Debug)]
struct EscalateArgs {
file: String,
#[arg(trailing_var_arg = true)]
argv: Vec<String>,
}
impl EscalateArgs {
/// This is the escalate client. It talks to the escalate server to determine whether to exec()
/// the command directly or to proxy to the escalate server.
async fn run(self) -> anyhow::Result<i32> {
let EscalateArgs { file, argv } = self;
escalate_client::run(file, argv).await
}
}
/// Debugging command to emulate an MCP "shell" tool call.
#[derive(Parser, Debug)]
struct ShellExecArgs {
command: String,
/// Path to Bash that has been patched to support execve() wrapping.
#[arg(long = "bash")]
bash_path: Option<PathBuf>,
}
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
pub async fn main_mcp_server() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_writer(std::io::stderr)
.with_ansi(false)
.init();
match cli.subcommand {
Some(Commands::Escalate(args)) => {
std::process::exit(args.run().await?);
}
Some(Commands::ShellExec(args)) => {
let bash_path = mcp::get_bash_path()?;
let escalate_server = EscalateServer::new(bash_path, dummy_exec_policy);
let result = escalate_server
.exec(
args.command.clone(),
std::env::vars().collect(),
std::env::current_dir()?,
None,
)
.await?;
println!("{result:?}");
std::process::exit(result.exit_code);
}
let cli = McpServerCli::parse();
let execve_wrapper = match cli.execve_wrapper {
Some(path) => path,
None => {
let bash_path = mcp::get_bash_path()?;
let cwd = std::env::current_exe()?;
cwd.parent()
.map(|p| p.join(CODEX_EXECVE_WRAPPER_EXE_NAME))
.ok_or_else(|| {
anyhow::anyhow!("failed to determine execve wrapper path from current exe")
})?
}
};
let bash_path = match cli.bash_path {
Some(path) => path,
None => mcp::get_bash_path()?,
};
tracing::info!("Starting MCP server");
let service = mcp::serve(bash_path, dummy_exec_policy)
.await
.inspect_err(|e| {
tracing::error!("serving error: {:?}", e);
})?;
tracing::info!("Starting MCP server");
let service = mcp::serve(bash_path, execve_wrapper, dummy_exec_policy)
.await
.inspect_err(|e| {
tracing::error!("serving error: {:?}", e);
})?;
service.waiting().await?;
Ok(())
service.waiting().await?;
Ok(())
}
#[derive(Parser)]
pub struct ExecveWrapperCli {
file: String,
#[arg(trailing_var_arg = true)]
argv: Vec<String>,
}
#[tokio::main]
pub async fn main_execve_wrapper() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_writer(std::io::stderr)
.with_ansi(false)
.init();
let ExecveWrapperCli { file, argv } = ExecveWrapperCli::parse();
let exit_code = escalate_client::run(file, argv).await?;
std::process::exit(exit_code);
}
// TODO: replace with execpolicy
fn dummy_exec_policy(file: &Path, argv: &[String], _workdir: &Path) -> ExecPolicyOutcome {
if file.ends_with("rm") {
ExecPolicyOutcome::Forbidden
} else if file.ends_with("git") {
ExecPolicyOutcome::Prompt {
run_with_escalated_permissions: false,
}
} else if file == Path::new("/opt/homebrew/bin/gh")
&& let [_, arg1, arg2, ..] = argv
&& arg1 == "issue"
&& arg2 == "list"
{
ExecPolicyOutcome::Allow {
run_with_escalated_permissions: true,
}
} else {
ExecPolicyOutcome::Allow {
run_with_escalated_permissions: false,
}
}
}

View File

@@ -98,5 +98,12 @@ pub(crate) async fn run(file: String, argv: Vec<String>) -> anyhow::Result<i32>
Err(err.into())
}
EscalateAction::Deny { reason } => {
match reason {
Some(reason) => eprintln!("Execution denied: {reason}"),
None => eprintln!("Execution denied"),
}
Ok(1)
}
}
}

View File

@@ -34,6 +34,8 @@ pub(super) enum EscalateAction {
Run,
/// The command should be escalated to the server for execution.
Escalate,
/// The command should not be executed.
Deny { reason: Option<String> },
}
/// The client sends this to the server to forward its open FDs.

View File

@@ -1,8 +1,8 @@
use std::collections::HashMap;
use std::os::fd::AsRawFd;
use std::path::Path;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context as _;
@@ -21,25 +21,26 @@ use crate::posix::escalate_protocol::EscalateRequest;
use crate::posix::escalate_protocol::EscalateResponse;
use crate::posix::escalate_protocol::SuperExecMessage;
use crate::posix::escalate_protocol::SuperExecResult;
use crate::posix::escalation_policy::EscalationPolicy;
use crate::posix::socket::AsyncDatagramSocket;
use crate::posix::socket::AsyncSocket;
/// This is the policy which decides how to handle an exec() call.
///
/// `file` is the absolute, canonical path to the executable to run, i.e. the first arg to exec.
/// `argv` is the argv, including the program name (`argv[0]`).
/// `workdir` is the absolute, canonical path to the working directory in which to execute the
/// command.
pub(crate) type ExecPolicy = fn(file: &Path, argv: &[String], workdir: &Path) -> EscalateAction;
pub(crate) struct EscalateServer {
bash_path: PathBuf,
policy: ExecPolicy,
execve_wrapper: PathBuf,
policy: Arc<dyn EscalationPolicy>,
}
impl EscalateServer {
pub fn new(bash_path: PathBuf, policy: ExecPolicy) -> Self {
Self { bash_path, policy }
pub fn new<P>(bash_path: PathBuf, execve_wrapper: PathBuf, policy: P) -> Self
where
P: EscalationPolicy + Send + Sync + 'static,
{
Self {
bash_path,
execve_wrapper,
policy: Arc::new(policy),
}
}
pub async fn exec(
@@ -53,7 +54,7 @@ impl EscalateServer {
let client_socket = escalate_client.into_inner();
client_socket.set_cloexec(false)?;
let escalate_task = tokio::spawn(escalate_task(escalate_server, self.policy));
let escalate_task = tokio::spawn(escalate_task(escalate_server, self.policy.clone()));
let mut env = env.clone();
env.insert(
ESCALATE_SOCKET_ENV_VAR.to_string(),
@@ -61,8 +62,15 @@ impl EscalateServer {
);
env.insert(
BASH_EXEC_WRAPPER_ENV_VAR.to_string(),
format!("{} escalate", std::env::current_exe()?.to_string_lossy()),
self.execve_wrapper.to_string_lossy().to_string(),
);
// TODO: use the sandbox policy and cwd from the calling client.
// Note that sandbox_cwd is ignored for ReadOnly, but needs to be legit
// for `SandboxPolicy::WorkspaceWrite`.
let sandbox_policy = SandboxPolicy::ReadOnly;
let sandbox_cwd = PathBuf::from("/__NONEXISTENT__");
let result = process_exec_tool_call(
codex_core::exec::ExecParams {
command: vec![
@@ -78,9 +86,8 @@ impl EscalateServer {
arg0: None,
},
get_platform_sandbox().unwrap_or(SandboxType::None),
// TODO: use the sandbox policy and cwd from the calling client
&SandboxPolicy::ReadOnly,
&PathBuf::from("/__NONEXISTENT__"), // This is ignored for ReadOnly
&sandbox_policy,
&sandbox_cwd,
&None,
None,
)
@@ -96,7 +103,10 @@ impl EscalateServer {
}
}
async fn escalate_task(socket: AsyncDatagramSocket, policy: ExecPolicy) -> anyhow::Result<()> {
async fn escalate_task(
socket: AsyncDatagramSocket,
policy: Arc<dyn EscalationPolicy>,
) -> anyhow::Result<()> {
loop {
let (_, mut fds) = socket.receive_with_fds().await?;
if fds.len() != 1 {
@@ -104,6 +114,7 @@ async fn escalate_task(socket: AsyncDatagramSocket, policy: ExecPolicy) -> anyho
continue;
}
let stream_socket = AsyncSocket::from_fd(fds.remove(0))?;
let policy = policy.clone();
tokio::spawn(async move {
if let Err(err) = handle_escalate_session_with_policy(stream_socket, policy).await {
tracing::error!("escalate session failed: {err:?}");
@@ -122,7 +133,7 @@ pub(crate) struct ExecResult {
async fn handle_escalate_session_with_policy(
socket: AsyncSocket,
policy: ExecPolicy,
policy: Arc<dyn EscalationPolicy>,
) -> anyhow::Result<()> {
let EscalateRequest {
file,
@@ -132,8 +143,12 @@ async fn handle_escalate_session_with_policy(
} = socket.receive::<EscalateRequest>().await?;
let file = PathBuf::from(&file).absolutize()?.into_owned();
let workdir = PathBuf::from(&workdir).absolutize()?.into_owned();
let action = policy(file.as_path(), &argv, &workdir);
let action = policy
.determine_action(file.as_path(), &argv, &workdir)
.await?;
tracing::debug!("decided {action:?} for {file:?} {argv:?} {workdir:?}");
match action {
EscalateAction::Run => {
socket
@@ -195,6 +210,13 @@ async fn handle_escalate_session_with_policy(
})
.await?;
}
EscalateAction::Deny { reason } => {
socket
.send(EscalateResponse {
action: EscalateAction::Deny { reason },
})
.await?;
}
}
Ok(())
}
@@ -204,14 +226,33 @@ mod tests {
use super::*;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
struct DeterministicEscalationPolicy {
action: EscalateAction,
}
#[async_trait::async_trait]
impl EscalationPolicy for DeterministicEscalationPolicy {
async fn determine_action(
&self,
_file: &Path,
_argv: &[String],
_workdir: &Path,
) -> Result<EscalateAction, rmcp::ErrorData> {
Ok(self.action.clone())
}
}
#[tokio::test]
async fn handle_escalate_session_respects_run_in_sandbox_decision() -> anyhow::Result<()> {
let (server, client) = AsyncSocket::pair()?;
let server_task = tokio::spawn(handle_escalate_session_with_policy(
server,
|_file, _argv, _workdir| EscalateAction::Run,
Arc::new(DeterministicEscalationPolicy {
action: EscalateAction::Run,
}),
));
client
@@ -238,7 +279,9 @@ mod tests {
let (server, client) = AsyncSocket::pair()?;
let server_task = tokio::spawn(handle_escalate_session_with_policy(
server,
|_file, _argv, _workdir| EscalateAction::Escalate,
Arc::new(DeterministicEscalationPolicy {
action: EscalateAction::Escalate,
}),
));
client

View File

@@ -0,0 +1,14 @@
use std::path::Path;
use crate::posix::escalate_protocol::EscalateAction;
/// Decides what action to take in response to an execve request from a client.
#[async_trait::async_trait]
pub(crate) trait EscalationPolicy: Send + Sync {
async fn determine_action(
&self,
file: &Path,
argv: &[String],
workdir: &Path,
) -> Result<EscalateAction, rmcp::ErrorData>;
}

View File

@@ -18,9 +18,10 @@ use rmcp::tool_handler;
use rmcp::tool_router;
use rmcp::transport::stdio;
use crate::posix::escalate_server;
use crate::posix::escalate_server::EscalateServer;
use crate::posix::escalate_server::ExecPolicy;
use crate::posix::escalate_server::{self};
use crate::posix::mcp_escalation_policy::ExecPolicy;
use crate::posix::mcp_escalation_policy::McpEscalationPolicy;
/// Path to our patched bash.
const CODEX_BASH_PATH_ENV_VAR: &str = "CODEX_BASH_PATH";
@@ -64,15 +65,17 @@ impl From<escalate_server::ExecResult> for ExecResult {
pub struct ExecTool {
tool_router: ToolRouter<ExecTool>,
bash_path: PathBuf,
execve_wrapper: PathBuf,
policy: ExecPolicy,
}
#[tool_router]
impl ExecTool {
pub fn new(bash_path: PathBuf, policy: ExecPolicy) -> Self {
pub fn new(bash_path: PathBuf, execve_wrapper: PathBuf, policy: ExecPolicy) -> Self {
Self {
tool_router: Self::tool_router(),
bash_path,
execve_wrapper,
policy,
}
}
@@ -81,10 +84,14 @@ impl ExecTool {
#[tool]
async fn shell(
&self,
_context: RequestContext<RoleServer>,
context: RequestContext<RoleServer>,
Parameters(params): Parameters<ExecParams>,
) -> Result<CallToolResult, McpError> {
let escalate_server = EscalateServer::new(self.bash_path.clone(), self.policy);
let escalate_server = EscalateServer::new(
self.bash_path.clone(),
self.execve_wrapper.clone(),
McpEscalationPolicy::new(self.policy, context),
);
let result = escalate_server
.exec(
params.command,
@@ -99,27 +106,6 @@ impl ExecTool {
ExecResult::from(result),
)?]))
}
#[allow(dead_code)]
async fn prompt(
&self,
command: String,
workdir: String,
context: RequestContext<RoleServer>,
) -> Result<CreateElicitationResult, McpError> {
context
.peer
.create_elicitation(CreateElicitationRequestParam {
message: format!("Allow Codex to run `{command:?}` in `{workdir:?}`?"),
#[allow(clippy::expect_used)]
requested_schema: ElicitationSchema::builder()
.property("dummy", PrimitiveSchema::String(StringSchema::new()))
.build()
.expect("failed to build elicitation schema"),
})
.await
.map_err(|e| McpError::internal_error(e.to_string(), None))
}
}
#[tool_handler]
@@ -147,8 +133,9 @@ impl ServerHandler for ExecTool {
pub(crate) async fn serve(
bash_path: PathBuf,
execve_wrapper: PathBuf,
policy: ExecPolicy,
) -> Result<RunningService<RoleServer, ExecTool>, rmcp::service::ServerInitializeError> {
let tool = ExecTool::new(bash_path, policy);
let tool = ExecTool::new(bash_path, execve_wrapper, policy);
tool.serve(stdio()).await
}

View File

@@ -0,0 +1,127 @@
use std::path::Path;
use rmcp::ErrorData as McpError;
use rmcp::RoleServer;
use rmcp::model::CreateElicitationRequestParam;
use rmcp::model::CreateElicitationResult;
use rmcp::model::ElicitationAction;
use rmcp::model::ElicitationSchema;
use rmcp::service::RequestContext;
use crate::posix::escalate_protocol::EscalateAction;
use crate::posix::escalation_policy::EscalationPolicy;
/// This is the policy which decides how to handle an exec() call.
///
/// `file` is the absolute, canonical path to the executable to run, i.e. the first arg to exec.
/// `argv` is the argv, including the program name (`argv[0]`).
/// `workdir` is the absolute, canonical path to the working directory in which to execute the
/// command.
pub(crate) type ExecPolicy = fn(file: &Path, argv: &[String], workdir: &Path) -> ExecPolicyOutcome;
pub(crate) enum ExecPolicyOutcome {
Allow {
run_with_escalated_permissions: bool,
},
Prompt {
run_with_escalated_permissions: bool,
},
Forbidden,
}
/// ExecPolicy with access to the MCP RequestContext so that it can leverage
/// elicitations.
pub(crate) struct McpEscalationPolicy {
policy: ExecPolicy,
context: RequestContext<RoleServer>,
}
impl McpEscalationPolicy {
pub(crate) fn new(policy: ExecPolicy, context: RequestContext<RoleServer>) -> Self {
Self { policy, context }
}
async fn prompt(
&self,
file: &Path,
argv: &[String],
workdir: &Path,
context: RequestContext<RoleServer>,
) -> Result<CreateElicitationResult, McpError> {
let args = shlex::try_join(argv.iter().skip(1).map(String::as_str)).unwrap_or_default();
let command = if args.is_empty() {
file.display().to_string()
} else {
format!("{} {}", file.display(), args)
};
context
.peer
.create_elicitation(CreateElicitationRequestParam {
message: format!("Allow agent to run `{command}` in `{}`?", workdir.display()),
requested_schema: ElicitationSchema::builder()
.title("Execution Permission Request")
.optional_string_with("reason", |schema| {
schema.description("Optional reason for allowing or denying execution")
})
.build()
.map_err(|e| {
McpError::internal_error(
format!("failed to build elicitation schema: {e}"),
None,
)
})?,
})
.await
.map_err(|e| McpError::internal_error(e.to_string(), None))
}
}
#[async_trait::async_trait]
impl EscalationPolicy for McpEscalationPolicy {
async fn determine_action(
&self,
file: &Path,
argv: &[String],
workdir: &Path,
) -> Result<EscalateAction, rmcp::ErrorData> {
let outcome = (self.policy)(file, argv, workdir);
let action = match outcome {
ExecPolicyOutcome::Allow {
run_with_escalated_permissions,
} => {
if run_with_escalated_permissions {
EscalateAction::Escalate
} else {
EscalateAction::Run
}
}
ExecPolicyOutcome::Prompt {
run_with_escalated_permissions,
} => {
let result = self
.prompt(file, argv, workdir, self.context.clone())
.await?;
// TODO: Extract reason from `result.content`.
match result.action {
ElicitationAction::Accept => {
if run_with_escalated_permissions {
EscalateAction::Escalate
} else {
EscalateAction::Run
}
}
ElicitationAction::Decline => EscalateAction::Deny {
reason: Some("User declined execution".to_string()),
},
ElicitationAction::Cancel => EscalateAction::Deny {
reason: Some("User cancelled execution".to_string()),
},
}
}
ExecPolicyOutcome::Forbidden => EscalateAction::Deny {
reason: Some("Execution forbidden by policy".to_string()),
},
};
Ok(action)
}
}

View File

@@ -161,7 +161,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
fn process_event(&mut self, event: Event) -> CodexStatus {
let Event { id: _, msg } = event;
match msg {
EventMsg::Error(ErrorEvent { message, .. }) => {
EventMsg::Error(ErrorEvent { message }) => {
let prefix = "ERROR:".style(self.red);
ts_msg!(self, "{prefix} {message}");
}
@@ -221,7 +221,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
ts_msg!(self, "{}", message.style(self.dimmed));
}
EventMsg::StreamError(StreamErrorEvent { message, .. }) => {
EventMsg::StreamError(StreamErrorEvent { message }) => {
ts_msg!(self, "{}", message.style(self.dimmed));
}
EventMsg::TaskStarted(_) => {
@@ -346,6 +346,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
call_id,
auto_approved,
changes,
..
}) => {
// Store metadata so we can calculate duration later when we
// receive the corresponding PatchApplyEnd event.
@@ -566,6 +567,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::UndoCompleted(_)
| EventMsg::SaveSessionResponse(_)
| EventMsg::UndoStarted(_) => {}
}
CodexStatus::Running

View File

@@ -166,6 +166,7 @@ pub struct FileUpdateChange {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
#[serde(rename_all = "snake_case")]
pub enum PatchApplyStatus {
InProgress,
Completed,
Failed,
}

View File

@@ -539,7 +539,6 @@ fn error_event_produces_error() {
"e1",
EventMsg::Error(codex_core::protocol::ErrorEvent {
message: "boom".to_string(),
http_status_code: Some(500),
}),
));
assert_eq!(
@@ -579,7 +578,6 @@ fn stream_error_event_produces_error() {
"e1",
EventMsg::StreamError(codex_core::protocol::StreamErrorEvent {
message: "retrying".to_string(),
http_status_code: Some(500),
}),
));
assert_eq!(
@@ -598,7 +596,6 @@ fn error_followed_by_task_complete_produces_turn_failed() {
"e1",
EventMsg::Error(ErrorEvent {
message: "boom".to_string(),
http_status_code: Some(500),
}),
);
assert_eq!(
@@ -825,6 +822,7 @@ fn patch_apply_success_produces_item_completed_patchapply() {
"p1",
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
auto_approved: true,
changes: changes.clone(),
}),
@@ -837,9 +835,11 @@ fn patch_apply_success_produces_item_completed_patchapply() {
"p2",
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
stdout: "applied 3 changes".to_string(),
stderr: String::new(),
success: true,
changes: changes.clone(),
}),
);
let out_end = ep.collect_thread_events(&end);
@@ -894,6 +894,7 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() {
"p1",
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "call-2".to_string(),
turn_id: "turn-2".to_string(),
auto_approved: false,
changes: changes.clone(),
}),
@@ -905,9 +906,11 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() {
"p2",
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: "call-2".to_string(),
turn_id: "turn-2".to_string(),
stdout: String::new(),
stderr: "failed to apply".to_string(),
success: false,
changes: changes.clone(),
}),
);
let out_end = ep.collect_thread_events(&end);

View File

@@ -0,0 +1,34 @@
[package]
edition = "2024"
name = "codex-execpolicy-legacy"
description = "Legacy exec policy engine for validating proposed exec calls."
version = { workspace = true }
[[bin]]
name = "codex-execpolicy-legacy"
path = "src/main.rs"
[lib]
name = "codex_execpolicy_legacy"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
allocative = { workspace = true }
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
derive_more = { workspace = true, features = ["display"] }
env_logger = { workspace = true }
log = { workspace = true }
multimap = { workspace = true }
path-absolutize = { workspace = true }
regex-lite = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_with = { workspace = true, features = ["macros"] }
starlark = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }

View File

@@ -0,0 +1,183 @@
# codex-execpolicy-legacy
This crate hosts the original execpolicy implementation. The newer prefix-rule
engine lives in `codex-execpolicy`.
The goal of this library is to classify a proposed [`execv(3)`](https://linux.die.net/man/3/execv) command into one of the following states:
- `safe` The command is safe to run (\*).
- `match` The command matched a rule in the policy, but the caller should decide whether it is safe to run based on the files it will write.
- `forbidden` The command is not allowed to be run.
- `unverified` The safety cannot be determined: make the user decide.
(\*) Whether an `execv(3)` call should be considered "safe" often requires additional context beyond the arguments to `execv()` itself. For example, if you trust an autonomous software agent to write files in your source tree, then deciding whether `/bin/cp foo bar` is "safe" depends on `getcwd(3)` for the calling process as well as the `realpath` of `foo` and `bar` when resolved against `getcwd()`.
To that end, rather than returning a boolean, the validator returns a structured result that the client is expected to use to determine the "safety" of the proposed `execv()` call.
For example, to check the command `ls -l foo`, the checker would be invoked as follows:
```shell
cargo run -p codex-execpolicy-legacy -- check ls -l foo | jq
```
It will exit with `0` and print the following to stdout:
```json
{
"result": "safe",
"match": {
"program": "ls",
"flags": [
{
"name": "-l"
}
],
"opts": [],
"args": [
{
"index": 1,
"type": "ReadableFile",
"value": "foo"
}
],
"system_path": ["/bin/ls", "/usr/bin/ls"]
}
}
```
Of note:
- `foo` is tagged as a `ReadableFile`, so the caller should resolve `foo` relative to `getcwd()` and `realpath` it (as it may be a symlink) to determine whether `foo` is safe to read.
- While the specified executable is `ls`, `"system_path"` offers `/bin/ls` and `/usr/bin/ls` as viable alternatives to avoid using whatever `ls` happens to appear first on the user's `$PATH`. If either exists on the host, it is recommended to use it as the first argument to `execv(3)` instead of `ls`.
Further, "safety" in this system is not a guarantee that the command will execute successfully. As an example, `cat /Users/mbolin/code/codex/README.md` may be considered "safe" if the system has decided the agent is allowed to read anything under `/Users/mbolin/code/codex`, but it will fail at runtime if `README.md` does not exist. (Though this is "safe" in that the agent did not read any files that it was not authorized to read.)
## Policy
Currently, the default policy is defined in [`default.policy`](./src/default.policy) within the crate.
The system uses [Starlark](https://bazel.build/rules/language) as the file format because, unlike something like JSON or YAML, it supports "macros" without compromising on safety or reproducibility. (Under the hood, we use [`starlark-rust`](https://github.com/facebook/starlark-rust) as the specific Starlark implementation.)
This policy contains "rules" such as:
```python
define_program(
program="cp",
options=[
flag("-r"),
flag("-R"),
flag("--recursive"),
],
args=[ARG_RFILES, ARG_WFILE],
system_path=["/bin/cp", "/usr/bin/cp"],
should_match=[
["foo", "bar"],
],
should_not_match=[
["foo"],
],
)
```
This rule means that:
- `cp` can be used with any of the following flags (where "flag" means "an option that does not take an argument"): `-r`, `-R`, `--recursive`.
- The initial `ARG_RFILES` passed to `args` means that it expects one or more arguments that correspond to "readable files"
- The final `ARG_WFILE` passed to `args` means that it expects exactly one argument that corresponds to a "writeable file."
- As a means of a lightweight way of including a unit test alongside the definition, the `should_match` list is a list of examples of `execv(3)` args that should match the rule and `should_not_match` is a list of examples that should not match. These examples are verified when the `.policy` file is loaded.
Note that the language of the `.policy` file is still evolving, as we have to continue to expand it so it is sufficiently expressive to accept all commands we want to consider "safe" without allowing unsafe commands to pass through.
The integrity of `default.policy` is verified [via unit tests](./tests).
Further, the CLI supports a `--policy` option to specify a custom `.policy` file for ad-hoc testing.
## Output Type: `match`
Going back to the `cp` example, because the rule matches an `ARG_WFILE`, it will return `match` instead of `safe`:
```shell
cargo run -p codex-execpolicy-legacy -- check cp src1 src2 dest | jq
```
If the caller wants to consider allowing this command, it should parse the JSON to pick out the `WriteableFile` arguments and decide whether they are safe to write:
```json
{
"result": "match",
"match": {
"program": "cp",
"flags": [],
"opts": [],
"args": [
{
"index": 0,
"type": "ReadableFile",
"value": "src1"
},
{
"index": 1,
"type": "ReadableFile",
"value": "src2"
},
{
"index": 2,
"type": "WriteableFile",
"value": "dest"
}
],
"system_path": ["/bin/cp", "/usr/bin/cp"]
}
}
```
Note the exit code is still `0` for a `match` unless the `--require-safe` flag is specified, in which case the exit code is `12`.
## Output Type: `forbidden`
It is also possible to define a rule that, if it matches a command, should flag it as _forbidden_. For example, we do not want agents to be able to run `applied deploy` _ever_, so we define the following rule:
```python
define_program(
program="applied",
args=["deploy"],
forbidden="Infrastructure Risk: command contains 'applied deploy'",
should_match=[
["deploy"],
],
should_not_match=[
["lint"],
],
)
```
Note that for a rule to be forbidden, the `forbidden` keyword arg must be specified as the reason the command is forbidden. This will be included in the output:
```shell
cargo run -p codex-execpolicy-legacy -- check applied deploy | jq
```
```json
{
"result": "forbidden",
"reason": "Infrastructure Risk: command contains 'applied deploy'",
"cause": {
"Exec": {
"exec": {
"program": "applied",
"flags": [],
"opts": [],
"args": [
{
"index": 0,
"type": {
"Literal": "deploy"
},
"value": "deploy"
}
],
"system_path": []
}
}
}
}
```

View File

@@ -0,0 +1,96 @@
use std::path::PathBuf;
use serde::Serialize;
use crate::arg_matcher::ArgMatcher;
use crate::arg_resolver::PositionalArg;
use serde_with::DisplayFromStr;
use serde_with::serde_as;
pub type Result<T> = std::result::Result<T, Error>;
#[serde_as]
#[derive(Debug, Eq, PartialEq, Serialize)]
#[serde(tag = "type")]
pub enum Error {
NoSpecForProgram {
program: String,
},
OptionMissingValue {
program: String,
option: String,
},
OptionFollowedByOptionInsteadOfValue {
program: String,
option: String,
value: String,
},
UnknownOption {
program: String,
option: String,
},
UnexpectedArguments {
program: String,
args: Vec<PositionalArg>,
},
DoubleDashNotSupportedYet {
program: String,
},
MultipleVarargPatterns {
program: String,
first: ArgMatcher,
second: ArgMatcher,
},
RangeStartExceedsEnd {
start: usize,
end: usize,
},
RangeEndOutOfBounds {
end: usize,
len: usize,
},
PrefixOverlapsSuffix {},
NotEnoughArgs {
program: String,
args: Vec<PositionalArg>,
arg_patterns: Vec<ArgMatcher>,
},
InternalInvariantViolation {
message: String,
},
VarargMatcherDidNotMatchAnything {
program: String,
matcher: ArgMatcher,
},
EmptyFileName {},
LiteralValueDidNotMatch {
expected: String,
actual: String,
},
InvalidPositiveInteger {
value: String,
},
MissingRequiredOptions {
program: String,
options: Vec<String>,
},
SedCommandNotProvablySafe {
command: String,
},
ReadablePathNotInReadableFolders {
file: PathBuf,
folders: Vec<PathBuf>,
},
WriteablePathNotInWriteableFolders {
file: PathBuf,
folders: Vec<PathBuf>,
},
CannotCheckRelativePath {
file: PathBuf,
},
CannotCanonicalizePath {
file: String,
#[serde_as(as = "DisplayFromStr")]
error: std::io::ErrorKind,
},
}

View File

@@ -0,0 +1,45 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::too_many_arguments)]
#[macro_use]
extern crate starlark;
mod arg_matcher;
mod arg_resolver;
mod arg_type;
mod error;
mod exec_call;
mod execv_checker;
mod opt;
mod policy;
mod policy_parser;
mod program;
mod sed_command;
mod valid_exec;
pub use arg_matcher::ArgMatcher;
pub use arg_resolver::PositionalArg;
pub use arg_type::ArgType;
pub use error::Error;
pub use error::Result;
pub use exec_call::ExecCall;
pub use execv_checker::ExecvChecker;
pub use opt::Opt;
pub use policy::Policy;
pub use policy_parser::PolicyParser;
pub use program::Forbidden;
pub use program::MatchedExec;
pub use program::NegativeExamplePassedCheck;
pub use program::PositiveExampleFailedCheck;
pub use program::ProgramSpec;
pub use sed_command::parse_sed_command;
pub use valid_exec::MatchedArg;
pub use valid_exec::MatchedFlag;
pub use valid_exec::MatchedOpt;
pub use valid_exec::ValidExec;
const DEFAULT_POLICY: &str = include_str!("default.policy");
pub fn get_default_policy() -> starlark::Result<Policy> {
let parser = PolicyParser::new("#default", DEFAULT_POLICY);
parser.parse()
}

View File

@@ -0,0 +1,169 @@
use anyhow::Result;
use clap::Parser;
use clap::Subcommand;
use codex_execpolicy_legacy::ExecCall;
use codex_execpolicy_legacy::MatchedExec;
use codex_execpolicy_legacy::Policy;
use codex_execpolicy_legacy::PolicyParser;
use codex_execpolicy_legacy::ValidExec;
use codex_execpolicy_legacy::get_default_policy;
use serde::Deserialize;
use serde::Serialize;
use serde::de;
use starlark::Error as StarlarkError;
use std::path::PathBuf;
use std::str::FromStr;
const MATCHED_BUT_WRITES_FILES_EXIT_CODE: i32 = 12;
const MIGHT_BE_SAFE_EXIT_CODE: i32 = 13;
const FORBIDDEN_EXIT_CODE: i32 = 14;
#[derive(Parser, Deserialize, Debug)]
#[command(version, about, long_about = None)]
pub struct Args {
/// If the command fails the policy, exit with 13, but print parseable JSON
/// to stdout.
#[clap(long)]
pub require_safe: bool,
/// Path to the policy file.
#[clap(long, short = 'p')]
pub policy: Option<PathBuf>,
#[command(subcommand)]
pub command: Command,
}
#[derive(Clone, Debug, Deserialize, Subcommand)]
pub enum Command {
/// Checks the command as if the arguments were the inputs to execv(3).
Check {
#[arg(trailing_var_arg = true)]
command: Vec<String>,
},
/// Checks the command encoded as a JSON object.
#[clap(name = "check-json")]
CheckJson {
/// JSON object with "program" (str) and "args" (list[str]) fields.
#[serde(deserialize_with = "deserialize_from_json")]
exec: ExecArg,
},
}
#[derive(Clone, Debug, Deserialize)]
pub struct ExecArg {
pub program: String,
#[serde(default)]
pub args: Vec<String>,
}
fn main() -> Result<()> {
env_logger::init();
let args = Args::parse();
let policy = match args.policy {
Some(policy) => {
let policy_source = policy.to_string_lossy().to_string();
let unparsed_policy = std::fs::read_to_string(policy)?;
let parser = PolicyParser::new(&policy_source, &unparsed_policy);
parser.parse()
}
None => get_default_policy(),
};
let policy = policy.map_err(StarlarkError::into_anyhow)?;
let exec = match args.command {
Command::Check { command } => match command.split_first() {
Some((first, rest)) => ExecArg {
program: first.to_string(),
args: rest.to_vec(),
},
None => {
eprintln!("no command provided");
std::process::exit(1);
}
},
Command::CheckJson { exec } => exec,
};
let (output, exit_code) = check_command(&policy, exec, args.require_safe);
let json = serde_json::to_string(&output)?;
println!("{json}");
std::process::exit(exit_code);
}
fn check_command(
policy: &Policy,
ExecArg { program, args }: ExecArg,
check: bool,
) -> (Output, i32) {
let exec_call = ExecCall { program, args };
match policy.check(&exec_call) {
Ok(MatchedExec::Match { exec }) => {
if exec.might_write_files() {
let exit_code = if check {
MATCHED_BUT_WRITES_FILES_EXIT_CODE
} else {
0
};
(Output::Match { r#match: exec }, exit_code)
} else {
(Output::Safe { r#match: exec }, 0)
}
}
Ok(MatchedExec::Forbidden { reason, cause }) => {
let exit_code = if check { FORBIDDEN_EXIT_CODE } else { 0 };
(Output::Forbidden { reason, cause }, exit_code)
}
Err(err) => {
let exit_code = if check { MIGHT_BE_SAFE_EXIT_CODE } else { 0 };
(Output::Unverified { error: err }, exit_code)
}
}
}
#[derive(Debug, Serialize)]
#[serde(tag = "result")]
pub enum Output {
/// The command is verified as safe.
#[serde(rename = "safe")]
Safe { r#match: ValidExec },
/// The command has matched a rule in the policy, but the caller should
/// decide whether it is "safe" given the files it wants to write.
#[serde(rename = "match")]
Match { r#match: ValidExec },
/// The user is forbidden from running the command.
#[serde(rename = "forbidden")]
Forbidden {
reason: String,
cause: codex_execpolicy_legacy::Forbidden,
},
/// The safety of the command could not be verified.
#[serde(rename = "unverified")]
Unverified {
error: codex_execpolicy_legacy::Error,
},
}
fn deserialize_from_json<'de, D>(deserializer: D) -> Result<ExecArg, D::Error>
where
D: de::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let decoded = serde_json::from_str(&s)
.map_err(|e| serde::de::Error::custom(format!("JSON parse error: {e}")))?;
Ok(decoded)
}
impl FromStr for ExecArg {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s).map_err(Into::into)
}
}

View File

@@ -0,0 +1,103 @@
use multimap::MultiMap;
use regex_lite::Error as RegexError;
use regex_lite::Regex;
use crate::ExecCall;
use crate::Forbidden;
use crate::MatchedExec;
use crate::NegativeExamplePassedCheck;
use crate::ProgramSpec;
use crate::error::Error;
use crate::error::Result;
use crate::policy_parser::ForbiddenProgramRegex;
use crate::program::PositiveExampleFailedCheck;
pub struct Policy {
programs: MultiMap<String, ProgramSpec>,
forbidden_program_regexes: Vec<ForbiddenProgramRegex>,
forbidden_substrings_pattern: Option<Regex>,
}
impl Policy {
pub fn new(
programs: MultiMap<String, ProgramSpec>,
forbidden_program_regexes: Vec<ForbiddenProgramRegex>,
forbidden_substrings: Vec<String>,
) -> std::result::Result<Self, RegexError> {
let forbidden_substrings_pattern = if forbidden_substrings.is_empty() {
None
} else {
let escaped_substrings = forbidden_substrings
.iter()
.map(|s| regex_lite::escape(s))
.collect::<Vec<_>>()
.join("|");
Some(Regex::new(&format!("({escaped_substrings})"))?)
};
Ok(Self {
programs,
forbidden_program_regexes,
forbidden_substrings_pattern,
})
}
pub fn check(&self, exec_call: &ExecCall) -> Result<MatchedExec> {
let ExecCall { program, args } = &exec_call;
for ForbiddenProgramRegex { regex, reason } in &self.forbidden_program_regexes {
if regex.is_match(program) {
return Ok(MatchedExec::Forbidden {
cause: Forbidden::Program {
program: program.clone(),
exec_call: exec_call.clone(),
},
reason: reason.clone(),
});
}
}
for arg in args {
if let Some(regex) = &self.forbidden_substrings_pattern
&& regex.is_match(arg)
{
return Ok(MatchedExec::Forbidden {
cause: Forbidden::Arg {
arg: arg.clone(),
exec_call: exec_call.clone(),
},
reason: format!("arg `{arg}` contains forbidden substring"),
});
}
}
let mut last_err = Err(Error::NoSpecForProgram {
program: program.clone(),
});
if let Some(spec_list) = self.programs.get_vec(program) {
for spec in spec_list {
match spec.check(exec_call) {
Ok(matched_exec) => return Ok(matched_exec),
Err(err) => {
last_err = Err(err);
}
}
}
}
last_err
}
pub fn check_each_good_list_individually(&self) -> Vec<PositiveExampleFailedCheck> {
let mut violations = Vec::new();
for (_program, spec) in self.programs.flat_iter() {
violations.extend(spec.verify_should_match_list());
}
violations
}
pub fn check_each_bad_list_individually(&self) -> Vec<NegativeExamplePassedCheck> {
let mut violations = Vec::new();
for (_program, spec) in self.programs.flat_iter() {
violations.extend(spec.verify_should_not_match_list());
}
violations
}
}

View File

@@ -1,5 +1,5 @@
use codex_execpolicy::NegativeExamplePassedCheck;
use codex_execpolicy::get_default_policy;
use codex_execpolicy_legacy::NegativeExamplePassedCheck;
use codex_execpolicy_legacy::get_default_policy;
#[test]
fn verify_everything_in_bad_list_is_rejected() {

View File

@@ -1,15 +1,15 @@
extern crate codex_execpolicy;
extern crate codex_execpolicy_legacy;
use codex_execpolicy::ArgMatcher;
use codex_execpolicy::ArgType;
use codex_execpolicy::Error;
use codex_execpolicy::ExecCall;
use codex_execpolicy::MatchedArg;
use codex_execpolicy::MatchedExec;
use codex_execpolicy::Policy;
use codex_execpolicy::Result;
use codex_execpolicy::ValidExec;
use codex_execpolicy::get_default_policy;
use codex_execpolicy_legacy::ArgMatcher;
use codex_execpolicy_legacy::ArgType;
use codex_execpolicy_legacy::Error;
use codex_execpolicy_legacy::ExecCall;
use codex_execpolicy_legacy::MatchedArg;
use codex_execpolicy_legacy::MatchedExec;
use codex_execpolicy_legacy::Policy;
use codex_execpolicy_legacy::Result;
use codex_execpolicy_legacy::ValidExec;
use codex_execpolicy_legacy::get_default_policy;
#[expect(clippy::expect_used)]
fn setup() -> Policy {

View File

@@ -1,5 +1,5 @@
use codex_execpolicy::PositiveExampleFailedCheck;
use codex_execpolicy::get_default_policy;
use codex_execpolicy_legacy::PositiveExampleFailedCheck;
use codex_execpolicy_legacy::get_default_policy;
#[test]
fn verify_everything_in_good_list_is_allowed() {

View File

@@ -1,16 +1,16 @@
use codex_execpolicy::ArgMatcher;
use codex_execpolicy::ArgType;
use codex_execpolicy::Error;
use codex_execpolicy::ExecCall;
use codex_execpolicy::MatchedArg;
use codex_execpolicy::MatchedExec;
use codex_execpolicy::MatchedOpt;
use codex_execpolicy::Policy;
use codex_execpolicy::Result;
use codex_execpolicy::ValidExec;
use codex_execpolicy::get_default_policy;
use codex_execpolicy_legacy::ArgMatcher;
use codex_execpolicy_legacy::ArgType;
use codex_execpolicy_legacy::Error;
use codex_execpolicy_legacy::ExecCall;
use codex_execpolicy_legacy::MatchedArg;
use codex_execpolicy_legacy::MatchedExec;
use codex_execpolicy_legacy::MatchedOpt;
use codex_execpolicy_legacy::Policy;
use codex_execpolicy_legacy::Result;
use codex_execpolicy_legacy::ValidExec;
use codex_execpolicy_legacy::get_default_policy;
extern crate codex_execpolicy;
extern crate codex_execpolicy_legacy;
#[expect(clippy::expect_used)]
fn setup() -> Policy {

View File

@@ -1,13 +1,13 @@
use codex_execpolicy::ArgType;
use codex_execpolicy::Error;
use codex_execpolicy::ExecCall;
use codex_execpolicy::MatchedArg;
use codex_execpolicy::MatchedExec;
use codex_execpolicy::PolicyParser;
use codex_execpolicy::Result;
use codex_execpolicy::ValidExec;
use codex_execpolicy_legacy::ArgType;
use codex_execpolicy_legacy::Error;
use codex_execpolicy_legacy::ExecCall;
use codex_execpolicy_legacy::MatchedArg;
use codex_execpolicy_legacy::MatchedExec;
use codex_execpolicy_legacy::PolicyParser;
use codex_execpolicy_legacy::Result;
use codex_execpolicy_legacy::ValidExec;
extern crate codex_execpolicy;
extern crate codex_execpolicy_legacy;
#[test]
fn test_invalid_subcommand() -> Result<()> {

View File

@@ -1,15 +1,15 @@
extern crate codex_execpolicy;
extern crate codex_execpolicy_legacy;
use codex_execpolicy::ArgType;
use codex_execpolicy::Error;
use codex_execpolicy::ExecCall;
use codex_execpolicy::MatchedArg;
use codex_execpolicy::MatchedExec;
use codex_execpolicy::MatchedFlag;
use codex_execpolicy::Policy;
use codex_execpolicy::Result;
use codex_execpolicy::ValidExec;
use codex_execpolicy::get_default_policy;
use codex_execpolicy_legacy::ArgType;
use codex_execpolicy_legacy::Error;
use codex_execpolicy_legacy::ExecCall;
use codex_execpolicy_legacy::MatchedArg;
use codex_execpolicy_legacy::MatchedExec;
use codex_execpolicy_legacy::MatchedFlag;
use codex_execpolicy_legacy::Policy;
use codex_execpolicy_legacy::Result;
use codex_execpolicy_legacy::ValidExec;
use codex_execpolicy_legacy::get_default_policy;
#[expect(clippy::expect_used)]
fn setup() -> Policy {

View File

@@ -1,5 +1,5 @@
use codex_execpolicy::Error;
use codex_execpolicy::parse_sed_command;
use codex_execpolicy_legacy::Error;
use codex_execpolicy_legacy::parse_sed_command;
#[test]
fn parses_simple_print_command() {

View File

@@ -1,15 +1,15 @@
extern crate codex_execpolicy;
extern crate codex_execpolicy_legacy;
use std::vec;
use codex_execpolicy::Error;
use codex_execpolicy::ExecCall;
use codex_execpolicy::MatchedExec;
use codex_execpolicy::MatchedFlag;
use codex_execpolicy::Policy;
use codex_execpolicy::PositionalArg;
use codex_execpolicy::ValidExec;
use codex_execpolicy::get_default_policy;
use codex_execpolicy_legacy::Error;
use codex_execpolicy_legacy::ExecCall;
use codex_execpolicy_legacy::MatchedExec;
use codex_execpolicy_legacy::MatchedFlag;
use codex_execpolicy_legacy::Policy;
use codex_execpolicy_legacy::PositionalArg;
use codex_execpolicy_legacy::ValidExec;
use codex_execpolicy_legacy::get_default_policy;
#[expect(clippy::expect_used)]
fn setup() -> Policy {

View File

@@ -1,16 +1,16 @@
extern crate codex_execpolicy;
extern crate codex_execpolicy_legacy;
use codex_execpolicy::ArgType;
use codex_execpolicy::Error;
use codex_execpolicy::ExecCall;
use codex_execpolicy::MatchedArg;
use codex_execpolicy::MatchedExec;
use codex_execpolicy::MatchedFlag;
use codex_execpolicy::MatchedOpt;
use codex_execpolicy::Policy;
use codex_execpolicy::Result;
use codex_execpolicy::ValidExec;
use codex_execpolicy::get_default_policy;
use codex_execpolicy_legacy::ArgType;
use codex_execpolicy_legacy::Error;
use codex_execpolicy_legacy::ExecCall;
use codex_execpolicy_legacy::MatchedArg;
use codex_execpolicy_legacy::MatchedExec;
use codex_execpolicy_legacy::MatchedFlag;
use codex_execpolicy_legacy::MatchedOpt;
use codex_execpolicy_legacy::Policy;
use codex_execpolicy_legacy::Result;
use codex_execpolicy_legacy::ValidExec;
use codex_execpolicy_legacy::get_default_policy;
#[expect(clippy::expect_used)]
fn setup() -> Policy {

View File

@@ -1,33 +1,29 @@
[package]
edition = "2024"
name = "codex-execpolicy"
version = { workspace = true }
[[bin]]
name = "codex-execpolicy"
path = "src/main.rs"
edition = "2024"
description = "Codex exec policy: prefix-based Starlark rules for command decisions."
[lib]
name = "codex_execpolicy"
path = "src/lib.rs"
[[bin]]
name = "codex-execpolicy"
path = "src/main.rs"
[lints]
workspace = true
[dependencies]
allocative = { workspace = true }
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
derive_more = { workspace = true, features = ["display"] }
env_logger = { workspace = true }
log = { workspace = true }
multimap = { workspace = true }
path-absolutize = { workspace = true }
regex-lite = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_with = { workspace = true, features = ["macros"] }
shlex = { workspace = true }
starlark = { workspace = true }
thiserror = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }
pretty_assertions = { workspace = true }

View File

@@ -1,180 +1,60 @@
# codex_execpolicy
# codex-execpolicy
The goal of this library is to classify a proposed [`execv(3)`](https://linux.die.net/man/3/execv) command into one of the following states:
## Overview
- Policy engine and CLI built around `prefix_rule(pattern=[...], decision?, match?, not_match?)`.
- This release covers the prefix-rule subset of the execpolicy language; a richer language will follow.
- Tokens are matched in order; any `pattern` element may be a list to denote alternatives. `decision` defaults to `allow`; valid values: `allow`, `prompt`, `forbidden`.
- `match` / `not_match` supply example invocations that are validated at load time (think of them as unit tests); examples can be token arrays or strings (strings are tokenized with `shlex`).
- The CLI always prints the JSON serialization of the evaluation result.
- The legacy rule matcher lives in `codex-execpolicy-legacy`.
- `safe` The command is safe to run (\*).
- `match` The command matched a rule in the policy, but the caller should decide whether it is safe to run based on the files it will write.
- `forbidden` The command is not allowed to be run.
- `unverified` The safety cannot be determined: make the user decide.
(\*) Whether an `execv(3)` call should be considered "safe" often requires additional context beyond the arguments to `execv()` itself. For example, if you trust an autonomous software agent to write files in your source tree, then deciding whether `/bin/cp foo bar` is "safe" depends on `getcwd(3)` for the calling process as well as the `realpath` of `foo` and `bar` when resolved against `getcwd()`.
To that end, rather than returning a boolean, the validator returns a structured result that the client is expected to use to determine the "safety" of the proposed `execv()` call.
For example, to check the command `ls -l foo`, the checker would be invoked as follows:
```shell
cargo run -- check ls -l foo | jq
```
It will exit with `0` and print the following to stdout:
```json
{
"result": "safe",
"match": {
"program": "ls",
"flags": [
{
"name": "-l"
}
],
"opts": [],
"args": [
{
"index": 1,
"type": "ReadableFile",
"value": "foo"
}
],
"system_path": ["/bin/ls", "/usr/bin/ls"]
}
}
```
Of note:
- `foo` is tagged as a `ReadableFile`, so the caller should resolve `foo` relative to `getcwd()` and `realpath` it (as it may be a symlink) to determine whether `foo` is safe to read.
- While the specified executable is `ls`, `"system_path"` offers `/bin/ls` and `/usr/bin/ls` as viable alternatives to avoid using whatever `ls` happens to appear first on the user's `$PATH`. If either exists on the host, it is recommended to use it as the first argument to `execv(3)` instead of `ls`.
Further, "safety" in this system is not a guarantee that the command will execute successfully. As an example, `cat /Users/mbolin/code/codex/README.md` may be considered "safe" if the system has decided the agent is allowed to read anything under `/Users/mbolin/code/codex`, but it will fail at runtime if `README.md` does not exist. (Though this is "safe" in that the agent did not read any files that it was not authorized to read.)
## Policy
Currently, the default policy is defined in [`default.policy`](./src/default.policy) within the crate.
The system uses [Starlark](https://bazel.build/rules/language) as the file format because, unlike something like JSON or YAML, it supports "macros" without compromising on safety or reproducibility. (Under the hood, we use [`starlark-rust`](https://github.com/facebook/starlark-rust) as the specific Starlark implementation.)
This policy contains "rules" such as:
```python
define_program(
program="cp",
options=[
flag("-r"),
flag("-R"),
flag("--recursive"),
],
args=[ARG_RFILES, ARG_WFILE],
system_path=["/bin/cp", "/usr/bin/cp"],
should_match=[
["foo", "bar"],
],
should_not_match=[
["foo"],
],
## Policy shapes
- Prefix rules use Starlark syntax:
```starlark
prefix_rule(
pattern = ["cmd", ["alt1", "alt2"]], # ordered tokens; list entries denote alternatives
decision = "prompt", # allow | prompt | forbidden; defaults to allow
match = [["cmd", "alt1"], "cmd alt2"], # examples that must match this rule
not_match = [["cmd", "oops"], "cmd alt3"], # examples that must not match this rule
)
```
This rule means that:
- `cp` can be used with any of the following flags (where "flag" means "an option that does not take an argument"): `-r`, `-R`, `--recursive`.
- The initial `ARG_RFILES` passed to `args` means that it expects one or more arguments that correspond to "readable files"
- The final `ARG_WFILE` passed to `args` means that it expects exactly one argument that corresponds to a "writeable file."
- As a means of a lightweight way of including a unit test alongside the definition, the `should_match` list is a list of examples of `execv(3)` args that should match the rule and `should_not_match` is a list of examples that should not match. These examples are verified when the `.policy` file is loaded.
Note that the language of the `.policy` file is still evolving, as we have to continue to expand it so it is sufficiently expressive to accept all commands we want to consider "safe" without allowing unsafe commands to pass through.
The integrity of `default.policy` is verified [via unit tests](./tests).
Further, the CLI supports a `--policy` option to specify a custom `.policy` file for ad-hoc testing.
## Output Type: `match`
Going back to the `cp` example, because the rule matches an `ARG_WFILE`, it will return `match` instead of `safe`:
```shell
cargo run -- check cp src1 src2 dest | jq
## CLI
- Provide one or more policy files (for example `src/default.codexpolicy`) to check a command:
```bash
cargo run -p codex-execpolicy -- check --policy path/to/policy.codexpolicy git status
```
- Pass multiple `--policy` flags to merge rules, evaluated in the order provided:
```bash
cargo run -p codex-execpolicy -- check --policy base.codexpolicy --policy overrides.codexpolicy git status
```
- Output is JSON by default; pass `--pretty` for pretty-printed JSON
- Example outcomes:
- Match: `{"match": { ... "decision": "allow" ... }}`
- No match: `"noMatch"`
If the caller wants to consider allowing this command, it should parse the JSON to pick out the `WriteableFile` arguments and decide whether they are safe to write:
## Response shapes
- Match:
```json
{
"result": "match",
"match": {
"program": "cp",
"flags": [],
"opts": [],
"args": [
"decision": "allow|prompt|forbidden",
"matchedRules": [
{
"index": 0,
"type": "ReadableFile",
"value": "src1"
},
{
"index": 1,
"type": "ReadableFile",
"value": "src2"
},
{
"index": 2,
"type": "WriteableFile",
"value": "dest"
"prefixRuleMatch": {
"matchedPrefix": ["<token>", "..."],
"decision": "allow|prompt|forbidden"
}
}
],
"system_path": ["/bin/cp", "/usr/bin/cp"]
]
}
}
```
Note the exit code is still `0` for a `match` unless the `--require-safe` flag is specified, in which case the exit code is `12`.
## Output Type: `forbidden`
It is also possible to define a rule that, if it matches a command, should flag it as _forbidden_. For example, we do not want agents to be able to run `applied deploy` _ever_, so we define the following rule:
```python
define_program(
program="applied",
args=["deploy"],
forbidden="Infrastructure Risk: command contains 'applied deploy'",
should_match=[
["deploy"],
],
should_not_match=[
["lint"],
],
)
```
Note that for a rule to be forbidden, the `forbidden` keyword arg must be specified as the reason the command is forbidden. This will be included in the output:
```shell
cargo run -- check applied deploy | jq
```
- No match:
```json
{
"result": "forbidden",
"reason": "Infrastructure Risk: command contains 'applied deploy'",
"cause": {
"Exec": {
"exec": {
"program": "applied",
"flags": [],
"opts": [],
"args": [
{
"index": 0,
"type": {
"Literal": "deploy"
},
"value": "deploy"
}
],
"system_path": []
}
}
}
}
"noMatch"
```
- `matchedRules` lists every rule whose prefix matched the command; `matchedPrefix` is the exact prefix that matched.
- The effective `decision` is the strictest severity across all matches (`forbidden` > `prompt` > `allow`).

View File

@@ -1,96 +1,26 @@
use std::path::PathBuf;
use serde::Serialize;
use crate::arg_matcher::ArgMatcher;
use crate::arg_resolver::PositionalArg;
use serde_with::DisplayFromStr;
use serde_with::serde_as;
use starlark::Error as StarlarkError;
use thiserror::Error;
pub type Result<T> = std::result::Result<T, Error>;
#[serde_as]
#[derive(Debug, Eq, PartialEq, Serialize)]
#[serde(tag = "type")]
#[derive(Debug, Error)]
pub enum Error {
NoSpecForProgram {
program: String,
},
OptionMissingValue {
program: String,
option: String,
},
OptionFollowedByOptionInsteadOfValue {
program: String,
option: String,
value: String,
},
UnknownOption {
program: String,
option: String,
},
UnexpectedArguments {
program: String,
args: Vec<PositionalArg>,
},
DoubleDashNotSupportedYet {
program: String,
},
MultipleVarargPatterns {
program: String,
first: ArgMatcher,
second: ArgMatcher,
},
RangeStartExceedsEnd {
start: usize,
end: usize,
},
RangeEndOutOfBounds {
end: usize,
len: usize,
},
PrefixOverlapsSuffix {},
NotEnoughArgs {
program: String,
args: Vec<PositionalArg>,
arg_patterns: Vec<ArgMatcher>,
},
InternalInvariantViolation {
message: String,
},
VarargMatcherDidNotMatchAnything {
program: String,
matcher: ArgMatcher,
},
EmptyFileName {},
LiteralValueDidNotMatch {
expected: String,
actual: String,
},
InvalidPositiveInteger {
value: String,
},
MissingRequiredOptions {
program: String,
options: Vec<String>,
},
SedCommandNotProvablySafe {
command: String,
},
ReadablePathNotInReadableFolders {
file: PathBuf,
folders: Vec<PathBuf>,
},
WriteablePathNotInWriteableFolders {
file: PathBuf,
folders: Vec<PathBuf>,
},
CannotCheckRelativePath {
file: PathBuf,
},
CannotCanonicalizePath {
file: String,
#[serde_as(as = "DisplayFromStr")]
error: std::io::ErrorKind,
#[error("invalid decision: {0}")]
InvalidDecision(String),
#[error("invalid pattern element: {0}")]
InvalidPattern(String),
#[error("invalid example: {0}")]
InvalidExample(String),
#[error(
"expected every example to match at least one rule. rules: {rules:?}; unmatched examples: \
{examples:?}"
)]
ExampleDidNotMatch {
rules: Vec<String>,
examples: Vec<String>,
},
#[error("expected example to not match rule `{rule}`: {example}")]
ExampleDidMatch { rule: String, example: String },
#[error("starlark error: {0}")]
Starlark(StarlarkError),
}

View File

@@ -1,45 +1,15 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::too_many_arguments)]
#[macro_use]
extern crate starlark;
pub mod decision;
pub mod error;
pub mod parser;
pub mod policy;
pub mod rule;
mod arg_matcher;
mod arg_resolver;
mod arg_type;
mod error;
mod exec_call;
mod execv_checker;
mod opt;
mod policy;
mod policy_parser;
mod program;
mod sed_command;
mod valid_exec;
pub use arg_matcher::ArgMatcher;
pub use arg_resolver::PositionalArg;
pub use arg_type::ArgType;
pub use decision::Decision;
pub use error::Error;
pub use error::Result;
pub use exec_call::ExecCall;
pub use execv_checker::ExecvChecker;
pub use opt::Opt;
pub use parser::PolicyParser;
pub use policy::Evaluation;
pub use policy::Policy;
pub use policy_parser::PolicyParser;
pub use program::Forbidden;
pub use program::MatchedExec;
pub use program::NegativeExamplePassedCheck;
pub use program::PositiveExampleFailedCheck;
pub use program::ProgramSpec;
pub use sed_command::parse_sed_command;
pub use valid_exec::MatchedArg;
pub use valid_exec::MatchedFlag;
pub use valid_exec::MatchedOpt;
pub use valid_exec::ValidExec;
const DEFAULT_POLICY: &str = include_str!("default.policy");
pub fn get_default_policy() -> starlark::Result<Policy> {
let parser = PolicyParser::new("#default", DEFAULT_POLICY);
parser.parse()
}
pub use rule::Rule;
pub use rule::RuleMatch;
pub use rule::RuleRef;

View File

@@ -1,167 +1,66 @@
use std::fs;
use std::path::PathBuf;
use anyhow::Context;
use anyhow::Result;
use clap::Parser;
use clap::Subcommand;
use codex_execpolicy::ExecCall;
use codex_execpolicy::MatchedExec;
use codex_execpolicy::Policy;
use codex_execpolicy::PolicyParser;
use codex_execpolicy::ValidExec;
use codex_execpolicy::get_default_policy;
use serde::Deserialize;
use serde::Serialize;
use serde::de;
use starlark::Error as StarlarkError;
use std::path::PathBuf;
use std::str::FromStr;
const MATCHED_BUT_WRITES_FILES_EXIT_CODE: i32 = 12;
const MIGHT_BE_SAFE_EXIT_CODE: i32 = 13;
const FORBIDDEN_EXIT_CODE: i32 = 14;
#[derive(Parser, Deserialize, Debug)]
#[command(version, about, long_about = None)]
pub struct Args {
/// If the command fails the policy, exit with 13, but print parseable JSON
/// to stdout.
#[clap(long)]
pub require_safe: bool,
/// Path to the policy file.
#[clap(long, short = 'p')]
pub policy: Option<PathBuf>,
#[command(subcommand)]
pub command: Command,
}
#[derive(Clone, Debug, Deserialize, Subcommand)]
pub enum Command {
/// Checks the command as if the arguments were the inputs to execv(3).
/// CLI for evaluating exec policies
#[derive(Parser)]
#[command(name = "codex-execpolicy")]
enum Cli {
/// Evaluate a command against a policy.
Check {
#[arg(trailing_var_arg = true)]
#[arg(short, long = "policy", value_name = "PATH", required = true)]
policies: Vec<PathBuf>,
/// Pretty-print the JSON output.
#[arg(long)]
pretty: bool,
/// Command tokens to check.
#[arg(
value_name = "COMMAND",
required = true,
trailing_var_arg = true,
allow_hyphen_values = true
)]
command: Vec<String>,
},
/// Checks the command encoded as a JSON object.
#[clap(name = "check-json")]
CheckJson {
/// JSON object with "program" (str) and "args" (list[str]) fields.
#[serde(deserialize_with = "deserialize_from_json")]
exec: ExecArg,
},
}
#[derive(Clone, Debug, Deserialize)]
pub struct ExecArg {
pub program: String,
#[serde(default)]
pub args: Vec<String>,
}
fn main() -> Result<()> {
env_logger::init();
let cli = Cli::parse();
match cli {
Cli::Check {
policies,
command,
pretty,
} => cmd_check(policies, command, pretty),
}
}
let args = Args::parse();
let policy = match args.policy {
Some(policy) => {
let policy_source = policy.to_string_lossy().to_string();
let unparsed_policy = std::fs::read_to_string(policy)?;
let parser = PolicyParser::new(&policy_source, &unparsed_policy);
parser.parse()
}
None => get_default_policy(),
fn cmd_check(policy_paths: Vec<PathBuf>, args: Vec<String>, pretty: bool) -> Result<()> {
let policy = load_policies(&policy_paths)?;
let eval = policy.check(&args);
let json = if pretty {
serde_json::to_string_pretty(&eval)?
} else {
serde_json::to_string(&eval)?
};
let policy = policy.map_err(StarlarkError::into_anyhow)?;
let exec = match args.command {
Command::Check { command } => match command.split_first() {
Some((first, rest)) => ExecArg {
program: first.to_string(),
args: rest.to_vec(),
},
None => {
eprintln!("no command provided");
std::process::exit(1);
}
},
Command::CheckJson { exec } => exec,
};
let (output, exit_code) = check_command(&policy, exec, args.require_safe);
let json = serde_json::to_string(&output)?;
println!("{json}");
std::process::exit(exit_code);
Ok(())
}
fn check_command(
policy: &Policy,
ExecArg { program, args }: ExecArg,
check: bool,
) -> (Output, i32) {
let exec_call = ExecCall { program, args };
match policy.check(&exec_call) {
Ok(MatchedExec::Match { exec }) => {
if exec.might_write_files() {
let exit_code = if check {
MATCHED_BUT_WRITES_FILES_EXIT_CODE
} else {
0
};
(Output::Match { r#match: exec }, exit_code)
} else {
(Output::Safe { r#match: exec }, 0)
}
}
Ok(MatchedExec::Forbidden { reason, cause }) => {
let exit_code = if check { FORBIDDEN_EXIT_CODE } else { 0 };
(Output::Forbidden { reason, cause }, exit_code)
}
Err(err) => {
let exit_code = if check { MIGHT_BE_SAFE_EXIT_CODE } else { 0 };
(Output::Unverified { error: err }, exit_code)
}
}
}
#[derive(Debug, Serialize)]
#[serde(tag = "result")]
pub enum Output {
/// The command is verified as safe.
#[serde(rename = "safe")]
Safe { r#match: ValidExec },
/// The command has matched a rule in the policy, but the caller should
/// decide whether it is "safe" given the files it wants to write.
#[serde(rename = "match")]
Match { r#match: ValidExec },
/// The user is forbidden from running the command.
#[serde(rename = "forbidden")]
Forbidden {
reason: String,
cause: codex_execpolicy::Forbidden,
},
/// The safety of the command could not be verified.
#[serde(rename = "unverified")]
Unverified { error: codex_execpolicy::Error },
}
fn deserialize_from_json<'de, D>(deserializer: D) -> Result<ExecArg, D::Error>
where
D: de::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let decoded = serde_json::from_str(&s)
.map_err(|e| serde::de::Error::custom(format!("JSON parse error: {e}")))?;
Ok(decoded)
}
impl FromStr for ExecArg {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s).map_err(Into::into)
fn load_policies(policy_paths: &[PathBuf]) -> Result<codex_execpolicy::Policy> {
let mut parser = PolicyParser::new();
for policy_path in policy_paths {
let policy_file_contents = fs::read_to_string(policy_path)
.with_context(|| format!("failed to read policy at {}", policy_path.display()))?;
let policy_identifier = policy_path.to_string_lossy().to_string();
parser.parse(&policy_identifier, &policy_file_contents)?;
}
Ok(parser.build())
}

View File

@@ -1,103 +1,84 @@
use crate::decision::Decision;
use crate::rule::RuleMatch;
use crate::rule::RuleRef;
use multimap::MultiMap;
use regex_lite::Error as RegexError;
use regex_lite::Regex;
use crate::ExecCall;
use crate::Forbidden;
use crate::MatchedExec;
use crate::NegativeExamplePassedCheck;
use crate::ProgramSpec;
use crate::error::Error;
use crate::error::Result;
use crate::policy_parser::ForbiddenProgramRegex;
use crate::program::PositiveExampleFailedCheck;
use serde::Deserialize;
use serde::Serialize;
#[derive(Clone, Debug)]
pub struct Policy {
programs: MultiMap<String, ProgramSpec>,
forbidden_program_regexes: Vec<ForbiddenProgramRegex>,
forbidden_substrings_pattern: Option<Regex>,
rules_by_program: MultiMap<String, RuleRef>,
}
impl Policy {
pub fn new(
programs: MultiMap<String, ProgramSpec>,
forbidden_program_regexes: Vec<ForbiddenProgramRegex>,
forbidden_substrings: Vec<String>,
) -> std::result::Result<Self, RegexError> {
let forbidden_substrings_pattern = if forbidden_substrings.is_empty() {
None
} else {
let escaped_substrings = forbidden_substrings
.iter()
.map(|s| regex_lite::escape(s))
.collect::<Vec<_>>()
.join("|");
Some(Regex::new(&format!("({escaped_substrings})"))?)
pub fn new(rules_by_program: MultiMap<String, RuleRef>) -> Self {
Self { rules_by_program }
}
pub fn empty() -> Self {
Self::new(MultiMap::new())
}
pub fn rules(&self) -> &MultiMap<String, RuleRef> {
&self.rules_by_program
}
pub fn check(&self, cmd: &[String]) -> Evaluation {
let rules = match cmd.first() {
Some(first) => match self.rules_by_program.get_vec(first) {
Some(rules) => rules,
None => return Evaluation::NoMatch,
},
None => return Evaluation::NoMatch,
};
Ok(Self {
programs,
forbidden_program_regexes,
forbidden_substrings_pattern,
})
let matched_rules: Vec<RuleMatch> =
rules.iter().filter_map(|rule| rule.matches(cmd)).collect();
match matched_rules.iter().map(RuleMatch::decision).max() {
Some(decision) => Evaluation::Match {
decision,
matched_rules,
},
None => Evaluation::NoMatch,
}
}
pub fn check(&self, exec_call: &ExecCall) -> Result<MatchedExec> {
let ExecCall { program, args } = &exec_call;
for ForbiddenProgramRegex { regex, reason } in &self.forbidden_program_regexes {
if regex.is_match(program) {
return Ok(MatchedExec::Forbidden {
cause: Forbidden::Program {
program: program.clone(),
exec_call: exec_call.clone(),
},
reason: reason.clone(),
});
}
}
pub fn check_multiple<Commands>(&self, commands: Commands) -> Evaluation
where
Commands: IntoIterator,
Commands::Item: AsRef<[String]>,
{
let matched_rules: Vec<RuleMatch> = commands
.into_iter()
.flat_map(|command| match self.check(command.as_ref()) {
Evaluation::Match { matched_rules, .. } => matched_rules,
Evaluation::NoMatch => Vec::new(),
})
.collect();
for arg in args {
if let Some(regex) = &self.forbidden_substrings_pattern
&& regex.is_match(arg)
{
return Ok(MatchedExec::Forbidden {
cause: Forbidden::Arg {
arg: arg.clone(),
exec_call: exec_call.clone(),
},
reason: format!("arg `{arg}` contains forbidden substring"),
});
}
match matched_rules.iter().map(RuleMatch::decision).max() {
Some(decision) => Evaluation::Match {
decision,
matched_rules,
},
None => Evaluation::NoMatch,
}
let mut last_err = Err(Error::NoSpecForProgram {
program: program.clone(),
});
if let Some(spec_list) = self.programs.get_vec(program) {
for spec in spec_list {
match spec.check(exec_call) {
Ok(matched_exec) => return Ok(matched_exec),
Err(err) => {
last_err = Err(err);
}
}
}
}
last_err
}
pub fn check_each_good_list_individually(&self) -> Vec<PositiveExampleFailedCheck> {
let mut violations = Vec::new();
for (_program, spec) in self.programs.flat_iter() {
violations.extend(spec.verify_should_match_list());
}
violations
}
pub fn check_each_bad_list_individually(&self) -> Vec<NegativeExamplePassedCheck> {
let mut violations = Vec::new();
for (_program, spec) in self.programs.flat_iter() {
violations.extend(spec.verify_should_not_match_list());
}
violations
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Evaluation {
NoMatch,
Match {
decision: Decision,
#[serde(rename = "matchedRules")]
matched_rules: Vec<RuleMatch>,
},
}
impl Evaluation {
pub fn is_match(&self) -> bool {
matches!(self, Self::Match { .. })
}
}

View File

@@ -1,14 +1,14 @@
use std::any::Any;
use std::sync::Arc;
use codex_execpolicy2::Decision;
use codex_execpolicy2::Evaluation;
use codex_execpolicy2::PolicyParser;
use codex_execpolicy2::RuleMatch;
use codex_execpolicy2::RuleRef;
use codex_execpolicy2::rule::PatternToken;
use codex_execpolicy2::rule::PrefixPattern;
use codex_execpolicy2::rule::PrefixRule;
use codex_execpolicy::Decision;
use codex_execpolicy::Evaluation;
use codex_execpolicy::PolicyParser;
use codex_execpolicy::RuleMatch;
use codex_execpolicy::RuleRef;
use codex_execpolicy::rule::PatternToken;
use codex_execpolicy::rule::PrefixPattern;
use codex_execpolicy::rule::PrefixRule;
use pretty_assertions::assert_eq;
fn tokens(cmd: &[&str]) -> Vec<String> {

Some files were not shown because too many files have changed in this diff Show More