Compare commits

...

7 Commits

Author SHA1 Message Date
Yaroslav Volovich
baf0a90cae refactor: simplify thread history synthetic-id detection 2026-02-25 21:22:16 +00:00
Yaroslav Volovich
d7602c3081 fix: make resume cwd filtering and thread renames deterministic 2026-02-25 20:45:27 +00:00
Yaroslav Volovich
5d91f2b2c2 test: skip sandbox-sensitive core tests on gha linux arm 2026-02-25 20:37:45 +00:00
Yaroslav Volovich
d52f1a5d37 test: stabilize bazel arm flakes 2026-02-25 20:37:45 +00:00
Yaroslav Volovich
e0ab8c645c docs: clarify turn-based thread fork contracts 2026-02-25 20:37:45 +00:00
Yaroslav Volovich
d2f8b95bbf codex: address PR review feedback (#12577) 2026-02-25 20:37:45 +00:00
Yaroslav Volovich
8b57fdd973 app-server: support turn-based thread forks 2026-02-25 20:37:45 +00:00
15 changed files with 1002 additions and 37 deletions

View File

@@ -19,6 +19,15 @@ export type ThreadForkParams = {threadId: string, /**
* If specified, the thread_id param will be ignored.
*/
path?: string | null, /**
* [UNSTABLE] Fork after the specified historical turn (inclusive).
* When omitted, the full thread history is copied.
*
* The value should come from `thread/read(includeTurns=true)` for the same source thread.
* Requests are rejected for legacy histories whose replayed turn ids are not stable, for
* in-progress turns, and for turns that do not yet contain both a user message and an agent
* response.
*/
forkAfterTurnId?: string | null, /**
* Configuration overrides for the forked thread, if any.
*/
model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, /**

View File

@@ -1,3 +1,16 @@
//! Reconstruct app-server turn history from persisted rollout items.
//!
//! This module translates the low-level rollout log into the higher-level `Turn` / `ThreadItem`
//! structures returned by app-server APIs. The same reducer is used both for replaying stored
//! rollouts and for incrementally rebuilding the current turn from live `EventMsg` values.
//! Callers must feed items in recorded order; reordering lifecycle events can silently attach work
//! to the wrong turn while still producing a superficially valid transcript.
//!
//! Modern rollouts persist explicit turn lifecycle events, which means replay can preserve stable
//! turn ids. Legacy rollouts may only contain message/tool events, so replay has to synthesize
//! turn ids while rebuilding them. Consumers that need durable turn anchors, such as turn-based
//! `thread/fork`, should reject those synthesized ids rather than treating them as stable.
use crate::protocol::v2::CollabAgentState;
use crate::protocol::v2::CollabAgentTool;
use crate::protocol::v2::CollabAgentToolCallStatus;
@@ -57,7 +70,10 @@ use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
/// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values.
///
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
/// resumed/rebuilt thread history preserves the original turn identifiers.
/// resumed/rebuilt thread history preserves the original turn identifiers. Callers that also need
/// to distinguish between stable and replay-synthesized turn ids should drive
/// [`ThreadHistoryBuilder`] directly and inspect [`ThreadHistoryBuilder::has_synthetic_turn_ids`]
/// before consuming the final turn list.
pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
let mut builder = ThreadHistoryBuilder::new();
for item in items {
@@ -66,10 +82,19 @@ pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
builder.finish()
}
/// Stateful reducer that groups rollout lifecycle items into app-server turns.
///
/// The builder owns the in-progress turn boundary while replay is in flight, then emits the
/// finalized `Turn` list once all items have been applied. It also tracks whether replay had to
/// synthesize any turn ids so app-server can share the same reduction rules for stored history
/// and live notifications without exposing that metadata in every convenience return type.
/// Reusing a builder across unrelated transcripts without calling [`Self::reset`] can leak late
/// events from one transcript into the next.
pub struct ThreadHistoryBuilder {
turns: Vec<Turn>,
current_turn: Option<PendingTurn>,
next_item_index: i64,
has_synthetic_turn_ids: bool,
}
impl Default for ThreadHistoryBuilder {
@@ -79,23 +104,47 @@ impl Default for ThreadHistoryBuilder {
}
impl ThreadHistoryBuilder {
/// Create an empty reducer with no active turn.
///
/// The builder expects to receive the rollout from the beginning of a transcript, in order.
/// Starting midway through a rollout can still produce `Turn` values, but turn boundaries and
/// synthetic-id detection may no longer describe the actual persisted history.
pub fn new() -> Self {
Self {
turns: Vec::new(),
current_turn: None,
next_item_index: 1,
has_synthetic_turn_ids: false,
}
}
/// Clear all accumulated turns and return the builder to its initial state.
pub fn reset(&mut self) {
*self = Self::new();
}
pub fn finish(mut self) -> Vec<Turn> {
self.finish_current_turn();
self.turns
/// Finalize replay and return only the reconstructed turns.
pub fn finish(self) -> Vec<Turn> {
let mut this = self;
this.finish_current_turn();
this.turns
}
/// Return whether replay had to synthesize any turn ids so far.
///
/// This becomes true when replay encounters legacy history without explicit turn lifecycle
/// events and must mint replacement ids while rebuilding turns. Callers that want to use turn
/// ids as stable anchors should check this before trusting replayed ids.
pub fn has_synthetic_turn_ids(&self) -> bool {
self.has_synthetic_turn_ids
}
/// Return the current turn view without consuming the builder.
///
/// When a turn is actively streaming, this returns that in-progress turn. Otherwise it falls
/// back to the most recently completed turn. This is useful for live UI updates, but a caller
/// that treats the result as committed persisted history can accidentally display incomplete
/// state after a partially applied event stream.
pub fn active_turn_snapshot(&self) -> Option<Turn> {
self.current_turn
.as_ref()
@@ -103,6 +152,7 @@ impl ThreadHistoryBuilder {
.or_else(|| self.turns.last().cloned())
}
/// Return whether replay is currently inside an unfinished turn boundary.
pub fn has_active_turn(&self) -> bool {
self.current_turn.is_some()
}
@@ -169,6 +219,12 @@ impl ThreadHistoryBuilder {
}
}
/// Apply one persisted rollout line to the reducer.
///
/// Unlike [`Self::handle_event`], this accepts the outer `RolloutItem` wrapper so replay can
/// preserve rollback markers and explicit turn boundaries exactly as they were recorded on
/// disk. Feed items in file order; replaying a suffix or shuffling lines can still produce
/// syntactically valid turns while silently attaching items to the wrong exchange.
pub fn handle_rollout_item(&mut self, item: &RolloutItem) {
match item {
RolloutItem::EventMsg(event) => self.handle_event(event),
@@ -861,8 +917,15 @@ impl ThreadHistoryBuilder {
}
fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
let id = match id {
Some(id) => id,
None => {
self.has_synthetic_turn_ids = true;
Uuid::now_v7().to_string()
}
};
PendingTurn {
id: id.unwrap_or_else(|| Uuid::now_v7().to_string()),
id,
items: Vec::new(),
error: None,
status: TurnStatus::Completed,
@@ -950,6 +1013,10 @@ fn render_review_output_text(output: &ReviewOutputEvent) -> String {
}
}
/// Convert core patch-apply change records into the app-server wire format.
///
/// The returned list is sorted by path so repeated replays of the same patch event stay stable for
/// clients and snapshot tests.
pub fn convert_patch_changes(
changes: &HashMap<std::path::PathBuf, codex_protocol::protocol::FileChange>,
) -> Vec<FileUpdateChange> {
@@ -1198,6 +1265,66 @@ mod tests {
);
}
#[test]
fn reports_synthetic_turn_ids_for_legacy_history() {
let items = vec![
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "legacy".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "reply".into(),
phase: None,
})),
];
let mut builder = ThreadHistoryBuilder::new();
for item in &items {
builder.handle_rollout_item(item);
}
assert!(builder.has_synthetic_turn_ids());
assert_eq!(builder.finish().len(), 1);
}
#[test]
fn reports_no_synthetic_turn_ids_when_turn_boundaries_are_explicit() {
let turn_id = "turn-explicit";
let items = vec![
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
model_context_window: Some(128_000),
collaboration_mode_kind: Default::default(),
})),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "modern".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "reply".into(),
phase: None,
})),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_id.to_string(),
last_agent_message: Some("reply".into()),
})),
];
let mut builder = ThreadHistoryBuilder::new();
for item in &items {
builder.handle_rollout_item(item);
}
assert!(!builder.has_synthetic_turn_ids());
let turns = builder.finish();
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].id, turn_id);
}
#[test]
fn ignores_non_plan_item_lifecycle_events() {
let turn_id = "turn-1";

View File

@@ -1869,6 +1869,17 @@ pub struct ThreadForkParams {
#[ts(optional = nullable)]
pub path: Option<PathBuf>,
/// [UNSTABLE] Fork after the specified historical turn (inclusive).
/// When omitted, the full thread history is copied.
///
/// The value should come from `thread/read(includeTurns=true)` for the same source thread.
/// Requests are rejected for legacy histories whose replayed turn ids are not stable, for
/// in-progress turns, and for turns that do not yet contain both a user message and an agent
/// response.
#[experimental("thread/fork.forkAfterTurnId")]
#[ts(optional = nullable)]
pub fork_after_turn_id: Option<String>,
/// Configuration overrides for the forked thread, if any.
#[ts(optional = nullable)]
pub model: Option<String>,

View File

@@ -121,7 +121,7 @@ Example with notification opt-out:
- `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread.
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history. Experimental `forkAfterTurnId` lets clients fork from a selected historical turn (modern turn-id-stable histories only); emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
@@ -215,7 +215,7 @@ To continue a stored session, call `thread/resume` with the `thread.id` you prev
{ "id": 11, "result": { "thread": { "id": "thr_123", } } }
```
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it:
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. To fork from a selected prior turn instead of the full history, pass experimental `forkAfterTurnId`:
```json
{ "method": "thread/fork", "id": 12, "params": { "threadId": "thr_123" } }
@@ -223,6 +223,19 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c
{ "method": "thread/started", "params": { "thread": { } } }
```
```json
{ "method": "thread/fork", "id": 13, "params": {
"threadId": "thr_123",
"forkAfterTurnId": "turn_abc",
"cwd": "/Users/me/project-worktree"
} }
```
The `forkAfterTurnId` value should come from `thread/read(includeTurns=true)` for that source
thread. Anchored forks are rejected for legacy histories that only have replay-synthesized turn
ids, for turns that are still in progress, and for turns that do not yet include both the user's
input and the model's response.
Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `persistExtendedHistory: true` to persist a richer subset of ThreadItems for non-lossy history when calling `thread/read`, `thread/resume`, and `thread/fork` later. This does not backfill events that were not persisted previously.
### Example: List threads (with pagination & filters)

View File

@@ -131,6 +131,7 @@ use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadCompactStartResponse;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadHistoryBuilder;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
@@ -2237,7 +2238,7 @@ impl CodexMessageProcessor {
return;
};
let (_, thread) = match self.load_thread(&thread_id).await {
let (thread_id, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
@@ -2245,17 +2246,54 @@ impl CodexMessageProcessor {
}
};
let expected_name = name.clone();
if let Err(err) = thread.submit(Op::SetThreadName { name }).await {
self.send_internal_error(request_id, format!("failed to set thread name: {err}"))
.await;
return;
}
if let Err(err) = self
.wait_for_thread_name_persisted(thread_id, &expected_name)
.await
{
self.send_internal_error(
request_id,
format!("failed to observe persisted thread name: {err}"),
)
.await;
return;
}
self.outgoing
.send_response(request_id, ThreadSetNameResponse {})
.await;
}
async fn wait_for_thread_name_persisted(
&self,
thread_id: ThreadId,
expected_name: &str,
) -> std::io::Result<()> {
let codex_home = self.config.codex_home.clone();
tokio::time::timeout(Duration::from_secs(2), async {
loop {
match find_thread_name_by_id(&codex_home, &thread_id).await? {
Some(name) if name == expected_name => return Ok(()),
_ => tokio::time::sleep(Duration::from_millis(10)).await,
}
}
})
.await
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!("timed out waiting for thread name {expected_name:?}"),
)
})?
}
async fn thread_unarchive(
&mut self,
request_id: ConnectionRequestId,
@@ -3324,6 +3362,7 @@ impl CodexMessageProcessor {
let ThreadForkParams {
thread_id,
path,
fork_after_turn_id,
model,
model_provider,
cwd,
@@ -3437,21 +3476,71 @@ impl CodexMessageProcessor {
};
let fallback_model_provider = config.model_provider_id.clone();
let anchored_fork_history = if let Some(fork_after_turn_id) = fork_after_turn_id.as_deref()
{
let source_items = match read_rollout_items_from_rollout(rollout_path.as_path()).await {
Ok(items) => items,
Err(err) => {
let (code, message) = match err.kind() {
std::io::ErrorKind::NotFound => (
INVALID_REQUEST_ERROR_CODE,
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
),
_ => (
INTERNAL_ERROR_CODE,
format!(
"failed to read rollout `{}` for thread fork: {err}",
rollout_path.display()
),
),
};
let error = JSONRPCErrorError {
code,
message,
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
match resolve_thread_fork_history_from_anchor(
source_items.as_slice(),
fork_after_turn_id,
) {
Ok(history_items) => Some(InitialHistory::Forked(history_items)),
Err(message) => {
self.send_invalid_request_error(request_id, message).await;
return;
}
}
} else {
None
};
let NewThread {
thread_id,
session_configured,
..
} = match self
.thread_manager
.fork_thread(
usize::MAX,
config,
rollout_path.clone(),
persist_extended_history,
)
.await
{
} = match if let Some(initial_history) = anchored_fork_history {
self.thread_manager
.resume_thread_with_history(
config,
initial_history,
self.auth_manager.clone(),
persist_extended_history,
)
.await
} else {
self.thread_manager
.fork_thread(
usize::MAX,
config,
rollout_path.clone(),
persist_extended_history,
)
.await
} {
Ok(thread) => thread,
Err(err) => {
let (code, message) = match err {
@@ -6869,6 +6958,112 @@ async fn sync_default_client_residency_requirement(
}
}
/// Return the exact rollout prefix that should seed a turn-anchored `thread/fork`.
///
/// The selected turn stays in the returned history, while every later turn is removed. The helper
/// validates that the anchor is a stable, already-materialized exchange: legacy replay-synthesized
/// ids are rejected, the turn must not still be in progress, and the turn must include both a
/// user message and an agent response. The cutoff uses both the next explicit `TurnStarted` event
/// and the next user-message boundary so the fork cannot accidentally retain trailing non-user
/// events that conceptually belong to a later turn.
fn resolve_thread_fork_history_from_anchor(
rollout_items: &[RolloutItem],
fork_after_turn_id: &str,
) -> Result<Vec<RolloutItem>, String> {
let mut history_builder = ThreadHistoryBuilder::new();
for item in rollout_items {
history_builder.handle_rollout_item(item);
}
if history_builder.has_synthetic_turn_ids() {
return Err(
"turn-based forking is not supported for legacy thread history; use full thread/fork"
.to_string(),
);
}
let turns = history_builder.finish();
let Some(target_turn_idx) = turns.iter().position(|turn| turn.id == fork_after_turn_id) else {
return Err(format!(
"fork turn not found in source thread history: {fork_after_turn_id}"
));
};
let target_turn = &turns[target_turn_idx];
let has_user_message = target_turn
.items
.iter()
.any(|item| matches!(item, ThreadItem::UserMessage { .. }));
let has_agent_message = target_turn
.items
.iter()
.any(|item| matches!(item, ThreadItem::AgentMessage { .. }));
if matches!(target_turn.status, TurnStatus::InProgress) {
return Err("fork turn must be completed/interrupted/failed, not in progress".to_string());
}
if !has_user_message {
return Err("fork turn must contain a user message".to_string());
}
if !has_agent_message {
return Err("fork turn must contain an agent message".to_string());
}
let Some(next_turn) = turns.get(target_turn_idx.saturating_add(1)) else {
return Ok(rollout_items.to_vec());
};
let next_turn_id = next_turn.id.as_str();
let next_turn_started_idx = rollout_items
.iter()
.position(|item| {
matches!(
item,
RolloutItem::EventMsg(EventMsg::TurnStarted(payload))
if payload.turn_id == next_turn_id
)
})
.ok_or_else(|| {
format!("failed to locate boundary after fork turn: missing next turn `{next_turn_id}`")
})?;
let target_terminal_idx = rollout_items.iter().rposition(|item| {
matches!(
item,
RolloutItem::EventMsg(EventMsg::TurnComplete(payload))
if payload.turn_id == fork_after_turn_id
) || matches!(
item,
RolloutItem::EventMsg(EventMsg::TurnAborted(payload))
if payload.turn_id.as_deref() == Some(fork_after_turn_id)
)
});
let next_user_boundary_idx = target_terminal_idx
.and_then(|terminal_idx| {
rollout_items
.iter()
.enumerate()
.skip(terminal_idx.saturating_add(1))
.find_map(|(idx, item)| is_user_turn_rollout_boundary(item).then_some(idx))
})
.unwrap_or(usize::MAX);
let cut_idx = next_turn_started_idx.min(next_user_boundary_idx);
Ok(rollout_items[..cut_idx].to_vec())
}
/// Return whether this rollout item starts a new user turn for truncation purposes.
///
/// Anchored forks use this as a fallback boundary when explicit turn lifecycle events are present
/// but a later turn also emitted standalone user-message markers that should not leak into the
/// forked prefix.
fn is_user_turn_rollout_boundary(item: &RolloutItem) -> bool {
match item {
RolloutItem::ResponseItem(ResponseItem::Message { role, .. }) => role == "user",
RolloutItem::EventMsg(EventMsg::UserMessage(_)) => true,
_ => false,
}
}
/// Derive the effective [`Config`] by layering three override sources.
///
/// Precedence (lowest to highest):

View File

@@ -372,8 +372,36 @@ async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<(
is_enabled: true,
}];
let first_update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(first_update.data, expected_accessible);
let expected_directory = vec![
AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
logo_url: Some("https://example.com/alpha.png".to_string()),
logo_url_dark: None,
distribution_channel: None,
branding: alpha_branding.clone(),
app_metadata: alpha_app_metadata.clone(),
labels: alpha_labels.clone(),
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
is_accessible: false,
is_enabled: true,
},
AppInfo {
id: "beta".to_string(),
name: "beta".to_string(),
description: None,
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()),
is_accessible: false,
is_enabled: true,
},
];
let expected_merged = vec![
AppInfo {
@@ -406,8 +434,26 @@ async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<(
},
];
let second_update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(second_update.data, expected_merged);
let mut saw_merged_update = false;
for update_idx in 0..2 {
let update = read_app_list_updated_notification(&mut mcp).await?;
if update.data == expected_accessible || update.data == expected_directory {
continue;
}
if update.data == expected_merged {
saw_merged_update = true;
continue;
}
panic!(
"unexpected app/list/updated payload at update {update_idx}: {:?}",
update.data
);
}
assert!(
saw_merged_update,
"expected a merged app/list/updated payload before the response"
);
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,

View File

@@ -2,6 +2,7 @@ use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::rollout_path;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCNotification;
@@ -17,11 +18,26 @@ use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_protocol::ThreadId;
use codex_protocol::models::MessagePhase;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::UserMessageEvent;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::fs;
use std::fs::FileTimes;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
@@ -191,6 +207,235 @@ async fn thread_fork_rejects_unmaterialized_thread() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_fork_can_fork_after_selected_turn() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let conversation_id = create_fake_rollout_with_explicit_turns(
codex_home.path(),
"2025-01-05T12-00-01",
"2025-01-05T12:00:01Z",
&[
ExplicitTurnFixture {
turn_id: "turn-1",
user_text: Some("u1"),
agent_text: Some("a1"),
state: FixtureTurnState::Completed,
},
ExplicitTurnFixture {
turn_id: "turn-2",
user_text: Some("u2"),
agent_text: Some("a2"),
state: FixtureTurnState::Completed,
},
ExplicitTurnFixture {
// Explicit turn with no user message exercises the exact cut-after-turn logic.
turn_id: "turn-3-empty",
user_text: None,
agent_text: None,
state: FixtureTurnState::Completed,
},
],
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let fork_cwd = codex_home.path().join("fork-worktree");
fs::create_dir_all(&fork_cwd)?;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
fork_after_turn_id: Some("turn-2".to_string()),
cwd: Some(fork_cwd.display().to_string()),
..Default::default()
})
.await?;
let fork_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
)
.await??;
let ThreadForkResponse { thread, cwd, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
assert_eq!(cwd, fork_cwd);
assert_eq!(thread.cwd, fork_cwd);
assert_eq!(thread.turns.len(), 2, "later turns should be truncated");
assert_eq!(thread.turns[0].id, "turn-1");
assert_eq!(thread.turns[1].id, "turn-2");
assert_turn_user_text(&thread.turns[0].items, "u1");
assert_turn_user_text(&thread.turns[1].items, "u2");
Ok(())
}
#[tokio::test]
async fn thread_fork_rejects_unknown_turn_anchor() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let conversation_id = create_fake_rollout_with_explicit_turns(
codex_home.path(),
"2025-01-05T12-00-02",
"2025-01-05T12:00:02Z",
&[ExplicitTurnFixture {
turn_id: "turn-1",
user_text: Some("u1"),
agent_text: Some("a1"),
state: FixtureTurnState::Completed,
}],
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
fork_after_turn_id: Some("missing-turn".to_string()),
..Default::default()
})
.await?;
let fork_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
)
.await??;
assert!(
fork_err.error.message.contains("fork turn not found"),
"unexpected fork error: {}",
fork_err.error.message
);
Ok(())
}
#[tokio::test]
async fn thread_fork_rejects_legacy_turn_anchor() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let conversation_id = create_fake_rollout(
codex_home.path(),
"2025-01-05T12-00-03",
"2025-01-05T12:00:03Z",
"legacy preview",
Some("mock_provider"),
None,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
fork_after_turn_id: Some("legacy-turn".to_string()),
..Default::default()
})
.await?;
let fork_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
)
.await??;
assert!(
fork_err.error.message.contains("legacy thread history"),
"unexpected fork error: {}",
fork_err.error.message
);
Ok(())
}
#[tokio::test]
async fn thread_fork_rejects_in_progress_turn_anchor() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let conversation_id = create_fake_rollout_with_explicit_turns(
codex_home.path(),
"2025-01-05T12-00-04",
"2025-01-05T12:00:04Z",
&[ExplicitTurnFixture {
turn_id: "turn-in-progress",
user_text: Some("u1"),
agent_text: Some("a1"),
state: FixtureTurnState::InProgress,
}],
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
fork_after_turn_id: Some("turn-in-progress".to_string()),
..Default::default()
})
.await?;
let fork_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
)
.await??;
assert!(
fork_err.error.message.contains("not in progress"),
"unexpected fork error: {}",
fork_err.error.message
);
Ok(())
}
#[tokio::test]
async fn thread_fork_rejects_turn_anchor_without_agent_message() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let conversation_id = create_fake_rollout_with_explicit_turns(
codex_home.path(),
"2025-01-05T12-00-05",
"2025-01-05T12:00:05Z",
&[ExplicitTurnFixture {
turn_id: "turn-no-agent",
user_text: Some("u1"),
agent_text: None,
state: FixtureTurnState::Completed,
}],
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
fork_after_turn_id: Some("turn-no-agent".to_string()),
..Default::default()
})
.await?;
let fork_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
)
.await??;
assert!(
fork_err.error.message.contains("agent message"),
"unexpected fork error: {}",
fork_err.error.message
);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
@@ -214,3 +459,161 @@ stream_max_retries = 0
),
)
}
#[derive(Clone, Copy)]
enum FixtureTurnState {
Completed,
InProgress,
}
#[derive(Clone, Copy)]
struct ExplicitTurnFixture<'a> {
turn_id: &'a str,
user_text: Option<&'a str>,
agent_text: Option<&'a str>,
state: FixtureTurnState,
}
fn assert_turn_user_text(items: &[ThreadItem], expected: &str) {
match items.first() {
Some(ThreadItem::UserMessage { content, .. }) => assert_eq!(
content,
&vec![UserInput::Text {
text: expected.to_string(),
text_elements: Vec::new(),
}]
),
other => panic!("expected first turn item to be a user message, got {other:?}"),
}
}
fn create_fake_rollout_with_explicit_turns(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
turns: &[ExplicitTurnFixture<'_>],
) -> Result<String> {
let uuid = Uuid::new_v4();
let uuid_str = uuid.to_string();
let conversation_id = ThreadId::from_string(&uuid_str)?;
let file_path = rollout_path(codex_home, filename_ts, &uuid_str);
let dir = file_path
.parent()
.ok_or_else(|| anyhow::anyhow!("missing rollout parent directory"))?;
fs::create_dir_all(dir)?;
let meta = SessionMeta {
id: conversation_id,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source: codex_protocol::protocol::SessionSource::Cli,
agent_nickname: None,
agent_role: None,
model_provider: Some("mock_provider".to_string()),
base_instructions: None,
dynamic_tools: None,
};
let mut lines = vec![rollout_line(
meta_rfc3339,
RolloutItem::SessionMeta(SessionMetaLine { meta, git: None }),
)?];
for (idx, turn) in turns.iter().enumerate() {
if let Some(user_text) = turn.user_text {
lines.push(
json!({
"timestamp": meta_rfc3339,
"type":"response_item",
"payload": {
"type":"message",
"role":"user",
"content":[{"type":"input_text","text": user_text}]
}
})
.to_string(),
);
}
lines.push(rollout_line(
meta_rfc3339,
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn.turn_id.to_string(),
model_context_window: Some(128_000),
collaboration_mode_kind: Default::default(),
})),
)?);
if let Some(user_text) = turn.user_text {
lines.push(rollout_line(
meta_rfc3339,
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: user_text.to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
})),
)?);
}
if let Some(agent_text) = turn.agent_text {
lines.push(rollout_line(
meta_rfc3339,
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: agent_text.to_string(),
phase: Some(MessagePhase::FinalAnswer),
})),
)?);
}
if matches!(turn.state, FixtureTurnState::Completed) {
let last_agent_message = turn.agent_text.map(str::to_string);
lines.push(rollout_line(
meta_rfc3339,
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn.turn_id.to_string(),
last_agent_message,
})),
)?);
}
if idx == 0 && turn.agent_text.is_some() {
lines.push(
json!({
"timestamp": meta_rfc3339,
"type":"response_item",
"payload": {
"type":"message",
"role":"assistant",
"content":[{"type":"output_text","text": turn.agent_text.unwrap_or("")}]
}
})
.to_string(),
);
}
}
fs::write(&file_path, lines.join("\n") + "\n")?;
let parsed = chrono::DateTime::parse_from_rfc3339(meta_rfc3339)?.with_timezone(&chrono::Utc);
let times = FileTimes::new().set_modified(parsed.into());
fs::OpenOptions::new()
.append(true)
.open(&file_path)?
.set_times(times)?;
Ok(uuid_str)
}
fn rollout_line(timestamp: &str, item: RolloutItem) -> Result<String> {
let mut line = serde_json::Map::new();
line.insert(
"timestamp".to_string(),
Value::String(timestamp.to_string()),
);
let item_value = serde_json::to_value(item)?;
let Value::Object(item_map) = item_value else {
anyhow::bail!("rollout item did not serialize as an object");
};
line.extend(item_map);
Ok(Value::Object(line).to_string())
}

View File

@@ -348,7 +348,7 @@ impl RolloutRecorder {
default_provider,
)
.await?;
if let Some(path) = select_resume_path(&page, filter_cwd) {
if let Some(path) = select_resume_path(&page, filter_cwd).await {
return Ok(Some(path));
}
cursor = page.next_cursor;
@@ -961,23 +961,36 @@ impl From<codex_state::ThreadsPage> for ThreadsPage {
}
}
fn select_resume_path(page: &ThreadsPage, filter_cwd: Option<&Path>) -> Option<PathBuf> {
async fn select_resume_path(page: &ThreadsPage, filter_cwd: Option<&Path>) -> Option<PathBuf> {
match filter_cwd {
Some(cwd) => page.items.iter().find_map(|item| {
if item
.cwd
.as_ref()
.is_some_and(|session_cwd| cwd_matches(session_cwd, cwd))
{
Some(item.path.clone())
} else {
None
Some(cwd) => {
for item in &page.items {
let effective_cwd = latest_turn_context_cwd(item.path.as_path())
.await
.or_else(|| item.cwd.clone());
if effective_cwd
.as_ref()
.is_some_and(|session_cwd| cwd_matches(session_cwd, cwd))
{
return Some(item.path.clone());
}
}
}),
None
}
None => page.items.first().map(|item| item.path.clone()),
}
}
async fn latest_turn_context_cwd(rollout_path: &Path) -> Option<PathBuf> {
let (items, _thread_id, _parse_errors) = RolloutRecorder::load_rollout_items(rollout_path)
.await
.ok()?;
items.iter().rev().find_map(|item| match item {
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.clone()),
_ => None,
})
}
fn select_resume_path_from_db_page(
page: &codex_state::ThreadsPage,
filter_cwd: Option<&Path>,
@@ -1010,8 +1023,13 @@ mod tests {
use crate::config::ConfigBuilder;
use crate::features::Feature;
use chrono::TimeZone;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::UserMessageEvent;
use pretty_assertions::assert_eq;
use std::fs::File;
@@ -1054,6 +1072,35 @@ mod tests {
Ok(path)
}
fn append_turn_context(path: &Path, ts: &str, cwd: &Path) -> std::io::Result<()> {
let mut file = fs::OpenOptions::new().append(true).open(path)?;
let line = RolloutLine {
timestamp: ts.to_string(),
item: RolloutItem::TurnContext(TurnContextItem {
turn_id: None,
cwd: cwd.to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
network: None,
model: "test-model".to_string(),
personality: None,
collaboration_mode: None,
effort: None,
summary: ReasoningSummary::Auto,
user_instructions: None,
developer_instructions: None,
final_output_json_schema: None,
truncation_policy: None,
}),
};
writeln!(
file,
"{}",
serde_json::to_string(&line).map_err(std::io::Error::other)?
)?;
Ok(())
}
#[tokio::test]
async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
@@ -1320,4 +1367,35 @@ mod tests {
assert_eq!(repaired_path, Some(real_path));
Ok(())
}
#[tokio::test]
async fn find_latest_thread_path_uses_latest_turn_context_cwd_without_sqlite()
-> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let mut config = ConfigBuilder::default()
.codex_home(home.path().to_path_buf())
.build()
.await?;
config.features.disable(Feature::Sqlite);
let latest_cwd = home.path().join("latest-cwd");
let rollout_path =
write_session_file(home.path(), "2025-01-03T12-00-00", Uuid::from_u128(9012))?;
append_turn_context(&rollout_path, "2025-01-03T12-00-01", latest_cwd.as_path())?;
let found = RolloutRecorder::find_latest_thread_path(
&config,
1,
None,
ThreadSortKey::UpdatedAt,
&[],
None,
config.model_provider_id.as_str(),
Some(latest_cwd.as_path()),
)
.await?;
assert_eq!(found, Some(rollout_path));
Ok(())
}
}

View File

@@ -483,3 +483,28 @@ macro_rules! skip_if_windows {
}
}};
}
pub fn is_github_actions_linux_aarch64() -> bool {
cfg!(all(target_os = "linux", target_arch = "aarch64"))
&& ::std::env::var("GITHUB_ACTIONS").is_ok()
}
#[macro_export]
macro_rules! skip_if_github_actions_linux_aarch64 {
() => {{
if $crate::is_github_actions_linux_aarch64() {
println!(
"Skipping test because the default Linux sandbox is unavailable on GitHub Actions linux-aarch64."
);
return;
}
}};
($return_value:expr $(,)?) => {{
if $crate::is_github_actions_linux_aarch64() {
println!(
"Skipping test because the default Linux sandbox is unavailable on GitHub Actions linux-aarch64."
);
return $return_value;
}
}};
}

View File

@@ -12,6 +12,7 @@ use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_github_actions_linux_aarch64;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use regex_lite::Regex;
@@ -70,6 +71,8 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
/// responses server, and ensures the model receives the synthesized abort.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_tool_records_history_entries() {
skip_if_github_actions_linux_aarch64!();
let command = "sleep 60";
let call_id = "call-history";
@@ -143,7 +146,7 @@ async fn interrupt_tool_records_history_entries() {
let output = response_mock
.function_call_output_text(call_id)
.expect("missing function_call_output text");
let re = Regex::new(r"^Wall time: ([0-9]+(?:\.[0-9])?) seconds\naborted by user$")
let re = Regex::new(r"Wall time: ([0-9]+(?:\.[0-9]+)?) seconds\r?\naborted by user")
.expect("compile regex");
let captures = re.captures(&output);
assert_matches!(

View File

@@ -31,6 +31,7 @@ use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_github_actions_linux_aarch64;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
@@ -1573,6 +1574,7 @@ fn scenarios() -> Vec<ScenarioSpec> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn approval_matrix_covers_all_modes() -> Result<()> {
skip_if_github_actions_linux_aarch64!(Ok(()));
skip_if_no_network!(Ok(()));
for scenario in scenarios() {

View File

@@ -20,6 +20,7 @@ use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_github_actions_linux_aarch64;
use core_test_support::skip_if_no_network;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::start_streaming_sse_server;
@@ -141,6 +142,7 @@ async fn read_file_tools_run_in_parallel() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn shell_tools_run_in_parallel() -> anyhow::Result<()> {
skip_if_github_actions_linux_aarch64!(Ok(()));
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;

View File

@@ -21,6 +21,7 @@ use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_github_actions_linux_aarch64;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use regex_lite::Regex;
@@ -190,6 +191,7 @@ async fn shell_escalated_permissions_rejected_then_ok() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sandbox_denied_shell_returns_original_output() -> Result<()> {
skip_if_github_actions_linux_aarch64!(Ok(()));
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;

View File

@@ -24,6 +24,7 @@ use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_github_actions_linux_aarch64;
use core_test_support::skip_if_no_network;
use core_test_support::skip_if_sandbox;
use core_test_support::skip_if_windows;
@@ -2493,6 +2494,7 @@ PY
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_runs_under_sandbox() -> Result<()> {
skip_if_github_actions_linux_aarch64!(Ok(()));
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

View File

@@ -122,6 +122,31 @@ fn is_bwrap_unavailable_output(output: &codex_core::exec::ExecToolCallOutput) ->
|| output.stderr.text.contains("Invalid argument")))
}
fn is_landlock_restrict_output(output: &codex_core::exec::ExecToolCallOutput) -> bool {
output.stderr.text.contains("Sandbox(LandlockRestrict)")
}
async fn should_skip_workspace_write_tests() -> bool {
match run_cmd_result_with_writable_roots(
&["bash", "-lc", "true"],
&[],
LONG_TIMEOUT_MS,
false,
false,
)
.await
{
Ok(_) => false,
Err(CodexErr::Sandbox(SandboxErr::Denied { output, .. })) => {
is_landlock_restrict_output(&output)
}
// A hung probe does not give us actionable signal; skip rather than
// fail a suite that is already running in a degraded sandbox.
Err(CodexErr::Sandbox(SandboxErr::Timeout { .. })) => true,
Err(err) => panic!("workspace-write sandbox probe failed unexpectedly: {err:?}"),
}
}
async fn should_skip_bwrap_tests() -> bool {
match run_cmd_result_with_writable_roots(
&["bash", "-lc", "true"],
@@ -159,6 +184,10 @@ fn expect_denied(
#[tokio::test]
async fn test_root_read() {
if should_skip_workspace_write_tests().await {
eprintln!("skipping workspace-write test: landlock restrictions are unavailable");
return;
}
run_cmd(&["ls", "-l", "/bin"], &[], SHORT_TIMEOUT_MS).await;
}
@@ -265,6 +294,11 @@ async fn bwrap_preserves_writable_dev_shm_bind_mount() {
#[tokio::test]
async fn test_writable_root() {
if should_skip_workspace_write_tests().await {
eprintln!("skipping workspace-write test: landlock restrictions are unavailable");
return;
}
let tmpdir = tempfile::tempdir().unwrap();
let file_path = tmpdir.path().join("test");
run_cmd(
@@ -283,6 +317,11 @@ async fn test_writable_root() {
#[tokio::test]
async fn test_no_new_privs_is_enabled() {
if should_skip_workspace_write_tests().await {
eprintln!("skipping workspace-write test: landlock restrictions are unavailable");
return;
}
let output = run_cmd_output(
&["bash", "-lc", "grep '^NoNewPrivs:' /proc/self/status"],
&[],
@@ -301,9 +340,17 @@ async fn test_no_new_privs_is_enabled() {
}
#[tokio::test]
#[should_panic(expected = "Sandbox(Timeout")]
async fn test_timeout() {
run_cmd(&["sleep", "2"], &[], 50).await;
if should_skip_workspace_write_tests().await {
eprintln!("skipping workspace-write test: landlock restrictions are unavailable");
return;
}
let result = run_cmd_result_with_writable_roots(&["sleep", "2"], &[], 50, false, false).await;
assert!(
matches!(result, Err(CodexErr::Sandbox(SandboxErr::Timeout { .. }))),
"expected sandbox timeout, got {result:?}"
);
}
/// Helper that runs `cmd` under the Linux sandbox and asserts that the command