Compare commits

...

8 Commits

Author SHA1 Message Date
Ahmed Ibrahim
61dbfabc13 Bound Windows PowerShell parser hangs
Fail closed when the PowerShell AST helper stalls on startup, preserve a 5s parser cap, add a stuck-child regression test, and close stdin so safe wrapper invocations do not time out waiting on inherited input.

Co-authored-by: Codex <noreply@openai.com>
2026-03-17 17:09:07 +00:00
jif-oai
e8add54e5d feat: show effective model in spawn agent event (#14944)
Show effective model after the full config layering for the sub agent
2026-03-17 16:58:58 +00:00
daveaitel-openai
ef36d39199 Fix agent jobs finalization race and reduce status polling churn (#14843)
## Summary
- make `report_agent_job_result` atomically transition an item from
running to completed while storing `result_json`
- remove brittle finalization grace-sleep logic and make finished-item
cleanup idempotent
- replace blind fixed-interval waiting with status-subscription-based
waiting for active worker threads
- add state runtime tests for atomic completion and late-report
rejection

## Why
This addresses the race and polling concerns in #13948 by removing
timing-based correctness assumptions and reducing unnecessary status
polling churn.

## Validation
- `cd codex-rs && just fmt`
- `cd codex-rs && cargo test -p codex-state`
- `cd codex-rs && cargo test -p codex-core --test all suite::agent_jobs`
- `cd codex-rs && cargo test`
- fails in an unrelated app-server tracing test:
`message_processor::tracing_tests::thread_start_jsonrpc_span_exports_server_span_and_parents_children`
timed out waiting for response

## Notes
- This PR supersedes #14129 with the same agent-jobs fix on a clean
branch from `main`.
- The earlier PR branch was stacked on unrelated history, which made the
review diff include unrelated commits.

Fixes #13948
2026-03-17 10:40:14 -04:00
jif-oai
4ed19b0766 feat: rename to get more explicit close agent (#14935)
https://github.com/openai/codex/issues/14907
2026-03-17 14:37:20 +00:00
jif-oai
31648563c8 feat: centralize package manager version (#14920) 2026-03-17 12:03:07 +00:00
viyatb-oai
603b6493a9 fix(linux-sandbox): ignore missing writable roots (#14890)
## Summary
- skip nonexistent `workspace-write` writable roots in the Linux
bubblewrap mount builder instead of aborting sandbox startup
- keep existing writable roots mounted normally so mixed Windows/WSL
configs continue to work
- add unit and Linux integration regression coverage for the
missing-root case

## Context
This addresses regression A from #14875. Regression B will be handled in
a separate PR.

The old bubblewrap integration added `ensure_mount_targets_exist` as a
preflight guard because bubblewrap bind targets must exist, and failing
early let Codex return a clearer error than a lower-level mount failure.

That policy turned out to be too strict once bubblewrap became the
default Linux sandbox: shared Windows/WSL or mixed-platform configs can
legitimately contain a well-formed writable root that does not exist on
the current machine. This PR keeps bubblewrap's existing-target
requirement, but changes Codex to skip missing writable roots instead of
treating them as fatal configuration errors.
2026-03-17 00:21:00 -07:00
Eric Traut
d37dcca7e0 Revert tui code so it does not rely on in-process app server (#14899)
PR https://github.com/openai/codex/pull/14512 added an in-process app
server and started to wire up the tui to use it. We were originally
planning to modify the `tui` code in place, converting it to use the app
server a bit at a time using a hybrid adapter. We've since decided to
create an entirely new parallel `tui_app_server` implementation and do
the conversion all at once but retain the existing `tui` while we work
the bugs out of the new implementation.

This PR undoes the changes to the `tui` made in the PR #14512 and
restores the old initialization to its previous state. This allows us to
modify the `tui_app_server` without the risk of regressing the old `tui`
code. For example, we can start to remove support for all legacy core
events, like the ones that PR https://github.com/openai/codex/pull/14892
needed to ignore.

Testing:
* I manually verified that the old `tui` starts and shuts down without a
problem.
2026-03-17 00:56:32 -06:00
Eric Traut
57f865c069 Fix tui_app_server: ignore duplicate legacy stream events (#14892)
The in-process app-server currently emits both typed
`ServerNotification`s and legacy `codex/event/*` notifications for the
same live turn updates. `tui_app_server` was consuming both paths, so
message deltas and completed items could be enqueued twice and rendered
as duplicated output in the transcript.

Ignore legacy notifications for event types that already have typed (app
server) notification handling, while keeping legacy fallback behavior
for events that still only arrive on the old path. This preserves
compatibility without duplicating streamed commentary or final agent
output.

We will remove all of the legacy event handlers over time; they're here
only during the short window where we're moving the tui to use the app
server.
2026-03-17 00:50:25 -06:00
23 changed files with 646 additions and 333 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -2500,7 +2500,6 @@ dependencies = [
"chrono",
"clap",
"codex-ansi-escape",
"codex-app-server-client",
"codex-app-server-protocol",
"codex-arg0",
"codex-backend-client",

View File

@@ -1857,6 +1857,187 @@ async fn turn_start_emits_spawn_agent_item_with_model_metadata_v2() -> Result<()
Ok(())
}
#[tokio::test]
async fn turn_start_emits_spawn_agent_item_with_effective_role_model_metadata_v2() -> Result<()> {
skip_if_no_network!(Ok(()));
const CHILD_PROMPT: &str = "child: do work";
const PARENT_PROMPT: &str = "spawn a child and continue";
const SPAWN_CALL_ID: &str = "spawn-call-1";
const REQUESTED_MODEL: &str = "gpt-5.1";
const REQUESTED_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::Low;
const ROLE_MODEL: &str = "gpt-5.1-codex-max";
const ROLE_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::High;
let server = responses::start_mock_server().await;
let spawn_args = serde_json::to_string(&json!({
"message": CHILD_PROMPT,
"agent_type": "custom",
"model": REQUESTED_MODEL,
"reasoning_effort": REQUESTED_REASONING_EFFORT,
}))?;
let _parent_turn = responses::mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, PARENT_PROMPT),
responses::sse(vec![
responses::ev_response_created("resp-turn1-1"),
responses::ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
responses::ev_completed("resp-turn1-1"),
]),
)
.await;
let _child_turn = responses::mount_sse_once_match(
&server,
|req: &wiremock::Request| {
body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID)
},
responses::sse(vec![
responses::ev_response_created("resp-child-1"),
responses::ev_assistant_message("msg-child-1", "child done"),
responses::ev_completed("resp-child-1"),
]),
)
.await;
let _parent_follow_up = responses::mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
responses::sse(vec![
responses::ev_response_created("resp-turn1-2"),
responses::ev_assistant_message("msg-turn1-2", "parent done"),
responses::ev_completed("resp-turn1-2"),
]),
)
.await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
"never",
&BTreeMap::from([(Feature::Collab, true)]),
)?;
std::fs::write(
codex_home.path().join("custom-role.toml"),
format!("model = \"{ROLE_MODEL}\"\nmodel_reasoning_effort = \"{ROLE_REASONING_EFFORT}\"\n",),
)?;
let config_path = codex_home.path().join("config.toml");
let base_config = std::fs::read_to_string(&config_path)?;
std::fs::write(
&config_path,
format!(
r#"{base_config}
[agents.custom]
description = "Custom role"
config_file = "./custom-role.toml"
"#
),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.2-codex".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: PARENT_PROMPT.to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let turn: TurnStartResponse = to_response::<TurnStartResponse>(turn_resp)?;
let spawn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let completed_notif = mcp
.read_stream_until_notification_message("item/completed")
.await?;
let completed: ItemCompletedNotification =
serde_json::from_value(completed_notif.params.expect("item/completed params"))?;
if let ThreadItem::CollabAgentToolCall { id, .. } = &completed.item
&& id == SPAWN_CALL_ID
{
return Ok::<ThreadItem, anyhow::Error>(completed.item);
}
}
})
.await??;
let ThreadItem::CollabAgentToolCall {
id,
tool,
status,
sender_thread_id,
receiver_thread_ids,
prompt,
model,
reasoning_effort,
agents_states,
} = spawn_completed
else {
unreachable!("loop ensures we break on collab agent tool call items");
};
let receiver_thread_id = receiver_thread_ids
.first()
.cloned()
.expect("spawn completion should include child thread id");
assert_eq!(id, SPAWN_CALL_ID);
assert_eq!(tool, CollabAgentTool::SpawnAgent);
assert_eq!(status, CollabAgentToolCallStatus::Completed);
assert_eq!(sender_thread_id, thread.id);
assert_eq!(receiver_thread_ids, vec![receiver_thread_id.clone()]);
assert_eq!(prompt, Some(CHILD_PROMPT.to_string()));
assert_eq!(model, Some(ROLE_MODEL.to_string()));
assert_eq!(reasoning_effort, Some(ROLE_REASONING_EFFORT));
assert_eq!(
agents_states,
HashMap::from([(
receiver_thread_id,
CollabAgentState {
status: CollabAgentStatus::PendingInit,
message: None,
},
)])
);
let turn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let turn_completed_notif = mcp
.read_stream_until_notification_message("turn/completed")
.await?;
let turn_completed: TurnCompletedNotification = serde_json::from_value(
turn_completed_notif.params.expect("turn/completed params"),
)?;
if turn_completed.thread_id == thread.id && turn_completed.turn.id == turn.turn.id {
return Ok::<TurnCompletedNotification, anyhow::Error>(turn_completed);
}
}
})
.await??;
assert_eq!(turn_completed.thread_id, thread.id);
Ok(())
}
#[tokio::test]
async fn turn_start_file_change_approval_accept_for_session_persists_v2() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -3,6 +3,7 @@ use crate::agent::guards::Guards;
use crate::agent::role::DEFAULT_ROLE_NAME;
use crate::agent::role::resolve_role_config;
use crate::agent::status::is_final;
use crate::codex_thread::ThreadConfigSnapshot;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::find_thread_path_by_id_str;
@@ -360,6 +361,19 @@ impl AgentControl {
))
}
pub(crate) async fn get_agent_config_snapshot(
&self,
agent_id: ThreadId,
) -> Option<ThreadConfigSnapshot> {
let Ok(state) = self.upgrade() else {
return None;
};
let Ok(thread) = state.get_thread(agent_id).await else {
return None;
};
Some(thread.config_snapshot().await)
}
/// Subscribe to status updates for `agent_id`, yielding the latest value and changes.
pub(crate) async fn subscribe_status(
&self,

View File

@@ -52,6 +52,7 @@ pub mod models_manager;
mod network_policy_decision;
pub mod network_proxy_loader;
mod original_image_detail;
mod packages;
pub use mcp_connection_manager::MCP_SANDBOX_STATE_CAPABILITY;
pub use mcp_connection_manager::MCP_SANDBOX_STATE_METHOD;
pub use mcp_connection_manager::SandboxState;

View File

@@ -0,0 +1 @@
pub(crate) mod versions;

View File

@@ -0,0 +1,2 @@
/// Pinned versions for package-manager-backed installs.
pub(crate) const ARTIFACT_RUNTIME: &str = "2.4.0";

View File

@@ -15,9 +15,12 @@ use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use async_trait::async_trait;
use codex_protocol::ThreadId;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
@@ -26,8 +29,10 @@ use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::watch::Receiver;
use tokio::time::Duration;
use tokio::time::Instant;
use tokio::time::timeout;
use uuid::Uuid;
pub struct BatchJobHandler;
@@ -103,6 +108,7 @@ struct JobRunnerOptions {
struct ActiveJobItem {
item_id: String,
started_at: Instant,
status_rx: Option<Receiver<AgentStatus>>,
}
struct JobProgressEmitter {
@@ -676,6 +682,12 @@ async fn run_agent_job_loop(
ActiveJobItem {
item_id: item.item_id.clone(),
started_at: Instant::now(),
status_rx: session
.services
.agent_control
.subscribe_status(thread_id)
.await
.ok(),
},
);
progressed = true;
@@ -708,7 +720,7 @@ async fn run_agent_job_loop(
break;
}
if !progressed {
tokio::time::sleep(STATUS_POLL_INTERVAL).await;
wait_for_status_change(&active_items).await;
}
continue;
}
@@ -863,6 +875,12 @@ async fn recover_running_items(
ActiveJobItem {
item_id: item.item_id.clone(),
started_at: started_at_from_item(&item),
status_rx: session
.services
.agent_control
.subscribe_status(thread_id)
.await
.ok(),
},
);
}
@@ -876,13 +894,44 @@ async fn find_finished_threads(
) -> Vec<(ThreadId, String)> {
let mut finished = Vec::new();
for (thread_id, item) in active_items {
if is_final(&session.services.agent_control.get_status(*thread_id).await) {
let status = active_item_status(session.as_ref(), *thread_id, item).await;
if is_final(&status) {
finished.push((*thread_id, item.item_id.clone()));
}
}
finished
}
async fn active_item_status(
session: &Session,
thread_id: ThreadId,
item: &ActiveJobItem,
) -> AgentStatus {
if let Some(status_rx) = item.status_rx.as_ref()
&& status_rx.has_changed().is_ok()
{
return status_rx.borrow().clone();
}
session.services.agent_control.get_status(thread_id).await
}
async fn wait_for_status_change(active_items: &HashMap<ThreadId, ActiveJobItem>) {
let mut waiters = FuturesUnordered::new();
for item in active_items.values() {
if let Some(status_rx) = item.status_rx.as_ref() {
let mut status_rx = status_rx.clone();
waiters.push(async move {
let _ = status_rx.changed().await;
});
}
}
if waiters.is_empty() {
tokio::time::sleep(STATUS_POLL_INTERVAL).await;
return;
}
let _ = timeout(STATUS_POLL_INTERVAL, waiters.next()).await;
}
async fn reap_stale_active_items(
session: Arc<Session>,
db: Arc<codex_state::StateRuntime>,
@@ -920,37 +969,24 @@ async fn finalize_finished_item(
item_id: &str,
thread_id: ThreadId,
) -> anyhow::Result<()> {
let mut item = db
let item = db
.get_agent_job_item(job_id, item_id)
.await?
.ok_or_else(|| {
anyhow::anyhow!("job item not found for finalization: {job_id}/{item_id}")
})?;
if item.result_json.is_none() {
tokio::time::sleep(Duration::from_millis(250)).await;
item = db
.get_agent_job_item(job_id, item_id)
.await?
.ok_or_else(|| {
anyhow::anyhow!("job item not found after grace period: {job_id}/{item_id}")
})?;
}
if item.result_json.is_some() {
if !db.mark_agent_job_item_completed(job_id, item_id).await? {
db.mark_agent_job_item_failed(
job_id,
item_id,
"worker reported result but item could not transition to completed",
)
.await?;
if matches!(item.status, codex_state::AgentJobItemStatus::Running) {
if item.result_json.is_some() {
let _ = db.mark_agent_job_item_completed(job_id, item_id).await?;
} else {
let _ = db
.mark_agent_job_item_failed(
job_id,
item_id,
"worker finished without calling report_agent_job_result",
)
.await?;
}
} else {
db.mark_agent_job_item_failed(
job_id,
item_id,
"worker finished without calling report_agent_job_result",
)
.await?;
}
let _ = session
.services

View File

@@ -15,6 +15,7 @@ use crate::exec::ExecToolCallOutput;
use crate::exec::StreamOutput;
use crate::features::Feature;
use crate::function_tool::FunctionCallError;
use crate::packages::versions;
use crate::protocol::ExecCommandSource;
use crate::tools::context::FunctionToolOutput;
use crate::tools::context::ToolInvocation;
@@ -28,7 +29,6 @@ use crate::tools::registry::ToolKind;
const ARTIFACTS_TOOL_NAME: &str = "artifacts";
const ARTIFACTS_PRAGMA_PREFIXES: [&str; 2] = ["// codex-artifacts:", "// codex-artifact-tool:"];
pub(crate) const PINNED_ARTIFACT_RUNTIME_VERSION: &str = "2.4.0";
const DEFAULT_EXECUTION_TIMEOUT: Duration = Duration::from_secs(30);
pub struct ArtifactsHandler;
@@ -216,7 +216,7 @@ fn parse_pragma_prefix(line: &str) -> Option<&str> {
fn default_runtime_manager(codex_home: std::path::PathBuf) -> ArtifactRuntimeManager {
ArtifactRuntimeManager::new(ArtifactRuntimeManagerConfig::with_default_release(
codex_home,
PINNED_ARTIFACT_RUNTIME_VERSION,
versions::ARTIFACT_RUNTIME,
))
}

View File

@@ -1,4 +1,5 @@
use super::*;
use crate::packages::versions;
use codex_artifacts::RuntimeEntrypoints;
use codex_artifacts::RuntimePathEntry;
use tempfile::TempDir;
@@ -46,7 +47,7 @@ fn default_runtime_manager_uses_openai_codex_release_base() {
);
assert_eq!(
manager.config().release().runtime_version(),
PINNED_ARTIFACT_RUNTIME_VERSION
versions::ARTIFACT_RUNTIME
);
}
@@ -59,14 +60,14 @@ fn load_cached_runtime_reads_pinned_cache_path() {
.path()
.join("packages")
.join("artifacts")
.join(PINNED_ARTIFACT_RUNTIME_VERSION)
.join(versions::ARTIFACT_RUNTIME)
.join(platform.as_str());
std::fs::create_dir_all(&install_dir).expect("create install dir");
std::fs::write(
install_dir.join("manifest.json"),
serde_json::json!({
"schema_version": 1,
"runtime_version": PINNED_ARTIFACT_RUNTIME_VERSION,
"runtime_version": versions::ARTIFACT_RUNTIME,
"node": { "relative_path": "node/bin/node" },
"entrypoints": {
"build_js": { "relative_path": "artifact-tool/dist/artifact_tool.mjs" },
@@ -95,10 +96,10 @@ fn load_cached_runtime_reads_pinned_cache_path() {
&codex_home
.path()
.join(codex_artifacts::DEFAULT_CACHE_ROOT_RELATIVE),
PINNED_ARTIFACT_RUNTIME_VERSION,
versions::ARTIFACT_RUNTIME,
)
.expect("resolve runtime");
assert_eq!(runtime.runtime_version(), PINNED_ARTIFACT_RUNTIME_VERSION);
assert_eq!(runtime.runtime_version(), versions::ARTIFACT_RUNTIME);
assert_eq!(
runtime.manifest().entrypoints,
RuntimeEntrypoints {

View File

@@ -95,13 +95,15 @@ impl ToolHandler for Handler {
.await;
result?;
Ok(CloseAgentResult { status })
Ok(CloseAgentResult {
previous_status: status,
})
}
}
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct CloseAgentResult {
pub(crate) status: AgentStatus,
pub(crate) previous_status: AgentStatus,
}
impl ToolOutput for CloseAgentResult {

View File

@@ -98,15 +98,37 @@ impl ToolHandler for Handler {
),
Err(_) => (None, AgentStatus::NotFound),
};
let (new_agent_nickname, new_agent_role) = match new_thread_id {
Some(thread_id) => session
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
.services
.agent_control
.get_agent_config_snapshot(thread_id)
.await
}
None => None,
};
let (new_agent_nickname, new_agent_role) = match (&agent_snapshot, new_thread_id) {
(Some(snapshot), _) => (
snapshot.session_source.get_nickname(),
snapshot.session_source.get_agent_role(),
),
(None, Some(thread_id)) => session
.services
.agent_control
.get_agent_nickname_and_role(thread_id)
.await
.unwrap_or((None, None)),
None => (None, None),
(None, None) => (None, None),
};
let effective_model = agent_snapshot
.as_ref()
.map(|snapshot| snapshot.model.clone())
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
let effective_reasoning_effort = agent_snapshot
.as_ref()
.and_then(|snapshot| snapshot.reasoning_effort)
.unwrap_or(args.reasoning_effort.unwrap_or_default());
let nickname = new_agent_nickname.clone();
session
.send_event(
@@ -118,8 +140,8 @@ impl ToolHandler for Handler {
new_agent_nickname,
new_agent_role,
prompt,
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
model: effective_model,
reasoning_effort: effective_reasoning_effort,
status,
}
.into(),

View File

@@ -971,7 +971,7 @@ async fn wait_agent_returns_final_status_without_timeout() {
}
#[tokio::test]
async fn close_agent_submits_shutdown_and_returns_status() {
async fn close_agent_submits_shutdown_and_returns_previous_status() {
let (mut session, turn) = make_session_and_context().await;
let manager = thread_manager();
session.services.agent_control = manager.agent_control();
@@ -993,7 +993,7 @@ async fn close_agent_submits_shutdown_and_returns_status() {
let (content, success) = expect_text_output(output);
let result: close_agent::CloseAgentResult =
serde_json::from_str(&content).expect("close_agent result should be json");
assert_eq!(result.status, status_before);
assert_eq!(result.previous_status, status_before);
assert_eq!(success, Some(true));
let ops = manager.captured_ops();

View File

@@ -195,9 +195,12 @@ fn close_agent_output_schema() -> JsonValue {
json!({
"type": "object",
"properties": {
"status": agent_status_output_schema()
"previous_status": {
"description": "The agent status observed before shutdown was requested.",
"allOf": [agent_status_output_schema()]
}
},
"required": ["status"],
"required": ["previous_status"],
"additionalProperties": false
})
}
@@ -1523,7 +1526,7 @@ fn create_close_agent_tool() -> ToolSpec {
ToolSpec::Function(ResponsesApiTool {
name: "close_agent".to_string(),
description: "Close an agent when it is no longer needed and return its last known status. Don't keep agents open for too long if they are not needed anymore.".to_string(),
description: "Close an agent when it is no longer needed and return its previous status before shutdown was requested. Don't keep agents open for too long if they are not needed anymore.".to_string(),
strict: false,
defer_loading: None,
parameters: JsonSchema::Object {

View File

@@ -16,10 +16,8 @@ use std::os::fd::AsRawFd;
use std::path::Path;
use std::path::PathBuf;
use codex_core::error::CodexErr;
use codex_core::error::Result;
use codex_protocol::protocol::FileSystemSandboxPolicy;
use codex_protocol::protocol::WritableRoot;
use codex_utils_absolute_path::AbsolutePathBuf;
/// Linux "platform defaults" that keep common system binaries and dynamic
@@ -41,10 +39,10 @@ const LINUX_PLATFORM_DEFAULT_READ_ROOTS: &[&str] = &[
/// Options that control how bubblewrap is invoked.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct BwrapOptions {
/// Whether to mount a fresh `/proc` inside the PID namespace.
/// Whether to mount a fresh `/proc` inside the sandbox.
///
/// This is the secure default, but some restrictive container environments
/// deny `--proc /proc` even when PID namespaces are available.
/// deny `--proc /proc`.
pub mount_proc: bool,
/// How networking should be configured inside the bubblewrap sandbox.
pub network_mode: BwrapNetworkMode,
@@ -167,7 +165,6 @@ fn create_bwrap_flags(
// Request a user namespace explicitly rather than relying on bubblewrap's
// auto-enable behavior, which is skipped when the caller runs as uid 0.
args.push("--unshare-user".to_string());
// Isolate the PID namespace.
args.push("--unshare-pid".to_string());
if options.network_mode.should_unshare_network() {
args.push("--unshare-net".to_string());
@@ -213,9 +210,15 @@ fn create_filesystem_args(
file_system_sandbox_policy: &FileSystemSandboxPolicy,
cwd: &Path,
) -> Result<BwrapArgs> {
let writable_roots = file_system_sandbox_policy.get_writable_roots_with_cwd(cwd);
// Bubblewrap requires bind mount targets to exist. Skip missing writable
// roots so mixed-platform configs can keep harmless paths for other
// environments without breaking Linux command startup.
let writable_roots = file_system_sandbox_policy
.get_writable_roots_with_cwd(cwd)
.into_iter()
.filter(|writable_root| writable_root.root.as_path().exists())
.collect::<Vec<_>>();
let unreadable_roots = file_system_sandbox_policy.get_unreadable_roots_with_cwd(cwd);
ensure_mount_targets_exist(&writable_roots)?;
let mut args = if file_system_sandbox_policy.has_full_disk_read_access() {
// Read-only root, then mount a minimal device tree.
@@ -385,23 +388,6 @@ fn create_filesystem_args(
})
}
/// Validate that writable roots exist before constructing mounts.
///
/// Bubblewrap requires bind mount targets to exist. We fail fast with a clear
/// error so callers can present an actionable message.
fn ensure_mount_targets_exist(writable_roots: &[WritableRoot]) -> Result<()> {
for writable_root in writable_roots {
let root = writable_root.root.as_path();
if !root.exists() {
return Err(CodexErr::UnsupportedOperation(format!(
"Sandbox expected writable root {root}, but it does not exist.",
root = root.display()
)));
}
}
Ok(())
}
fn path_to_string(path: &Path) -> String {
path.to_string_lossy().to_string()
}
@@ -731,6 +717,41 @@ mod tests {
);
}
#[test]
fn ignores_missing_writable_roots() {
let temp_dir = TempDir::new().expect("temp dir");
let existing_root = temp_dir.path().join("existing");
let missing_root = temp_dir.path().join("missing");
std::fs::create_dir(&existing_root).expect("create existing root");
let policy = SandboxPolicy::WorkspaceWrite {
writable_roots: vec![
AbsolutePathBuf::try_from(existing_root.as_path()).expect("absolute existing root"),
AbsolutePathBuf::try_from(missing_root.as_path()).expect("absolute missing root"),
],
read_only_access: Default::default(),
network_access: false,
exclude_tmpdir_env_var: true,
exclude_slash_tmp: true,
};
let args = create_filesystem_args(&FileSystemSandboxPolicy::from(&policy), temp_dir.path())
.expect("filesystem args");
let existing_root = path_to_string(&existing_root);
let missing_root = path_to_string(&missing_root);
assert!(
args.args.windows(3).any(|window| {
window == ["--bind", existing_root.as_str(), existing_root.as_str()]
}),
"existing writable root should be rebound writable",
);
assert!(
!args.args.iter().any(|arg| arg == &missing_root),
"missing writable root should be skipped",
);
}
#[test]
fn mounts_dev_before_writable_dev_binds() {
let sandbox_policy = SandboxPolicy::WorkspaceWrite {

View File

@@ -310,6 +310,32 @@ async fn test_writable_root() {
.await;
}
#[tokio::test]
async fn sandbox_ignores_missing_writable_roots_under_bwrap() {
if should_skip_bwrap_tests().await {
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
return;
}
let tempdir = tempfile::tempdir().expect("tempdir");
let existing_root = tempdir.path().join("existing");
let missing_root = tempdir.path().join("missing");
std::fs::create_dir(&existing_root).expect("create existing root");
let output = run_cmd_result_with_writable_roots(
&["bash", "-lc", "printf sandbox-ok"],
&[existing_root, missing_root],
LONG_TIMEOUT_MS,
false,
true,
)
.await
.expect("sandboxed command should execute");
assert_eq!(output.exit_code, 0);
assert_eq!(output.stdout.text, "sandbox-ok");
}
#[tokio::test]
async fn test_no_new_privs_is_enabled() {
let output = run_cmd_output(

View File

@@ -3263,9 +3263,9 @@ pub struct CollabAgentSpawnEndEvent {
/// Initial prompt sent to the agent. Can be empty to prevent CoT leaking at the
/// beginning.
pub prompt: String,
/// Model requested for the spawned agent.
/// Effective model used by the spawned agent after inheritance and role overrides.
pub model: String,
/// Reasoning effort requested for the spawned agent.
/// Effective reasoning effort used by the spawned agent after inheritance and role overrides.
pub reasoning_effort: ReasoningEffortConfig,
/// Last known status of the new agent reported to the sender agent.
pub status: AgentStatus,

View File

@@ -1,11 +1,19 @@
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use serde::Deserialize;
use std::io::Read;
use std::path::Path;
use std::process::Command;
use std::process::Output;
use std::process::Stdio;
use std::sync::LazyLock;
use std::thread;
use std::time::Duration;
use std::time::Instant;
const POWERSHELL_PARSER_SCRIPT: &str = include_str!("powershell_parser.ps1");
const POWERSHELL_PARSER_TIMEOUT: Duration = Duration::from_secs(5);
const POWERSHELL_PARSER_POLL_INTERVAL: Duration = Duration::from_millis(10);
/// On Windows, we conservatively allow only clearly read-only PowerShell invocations
/// that match a small safelist. Anything else (including direct CMD commands) is unsafe.
@@ -127,7 +135,7 @@ fn is_powershell_executable(exe: &str) -> bool {
fn parse_with_powershell_ast(executable: &str, script: &str) -> PowershellParseOutcome {
let encoded_script = encode_powershell_base64(script);
let encoded_parser_script = encoded_parser_script();
match Command::new(executable)
let mut child = match Command::new(executable)
.args([
"-NoLogo",
"-NoProfile",
@@ -136,18 +144,68 @@ fn parse_with_powershell_ast(executable: &str, script: &str) -> PowershellParseO
encoded_parser_script,
])
.env("CODEX_POWERSHELL_PAYLOAD", &encoded_script)
.output()
// Match `Command::output()` so PowerShell does not stay alive waiting on inherited stdin
// after the parser script has already finished.
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(output) if output.status.success() => {
if let Ok(result) =
serde_json::from_slice::<PowershellParserOutput>(output.stdout.as_slice())
{
Ok(child) => child,
Err(_) => return PowershellParseOutcome::Failed,
};
let deadline = Instant::now() + POWERSHELL_PARSER_TIMEOUT;
let output = loop {
match child.try_wait() {
Ok(Some(status)) => {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
if let Some(mut reader) = child.stdout.take()
&& reader.read_to_end(&mut stdout).is_err()
{
return PowershellParseOutcome::Failed;
}
if let Some(mut reader) = child.stderr.take()
&& reader.read_to_end(&mut stderr).is_err()
{
return PowershellParseOutcome::Failed;
}
break Output {
status,
stdout,
stderr,
};
}
Ok(None) => {
if Instant::now() >= deadline {
let _ = child.kill();
let _ = child.wait();
return PowershellParseOutcome::Failed;
}
thread::sleep(POWERSHELL_PARSER_POLL_INTERVAL);
}
Err(_) => {
let _ = child.kill();
let _ = child.wait();
return PowershellParseOutcome::Failed;
}
}
};
match output.status.success() {
true => {
if let Ok(result) = serde_json::from_slice::<PowershellParserOutput>(&output.stdout) {
result.into_outcome()
} else {
PowershellParseOutcome::Failed
}
}
_ => PowershellParseOutcome::Failed,
false => PowershellParseOutcome::Failed,
}
}
@@ -348,7 +406,11 @@ fn is_safe_git_command(words: &[String]) -> bool {
mod tests {
use super::*;
use crate::powershell::try_find_pwsh_executable_blocking;
use std::fs;
use std::string::ToString;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
/// Converts a slice of string literals into owned `String`s for the tests.
fn vec_str(args: &[&str]) -> Vec<String> {
@@ -620,4 +682,26 @@ mod tests {
);
}
}
#[test]
fn powershell_ast_parser_times_out_for_stuck_child() {
let unique = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("system time")
.as_nanos();
let script_path =
std::env::temp_dir().join(format!("codex-powershell-parser-timeout-{unique}.cmd"));
fs::write(&script_path, "@echo off\r\ntimeout /t 10 /nobreak >nul\r\n")
.expect("write fake powershell");
let started = Instant::now();
let outcome = parse_with_powershell_ast(
script_path.to_str().expect("utf8 temp path"),
"Write-Output ok",
);
let _ = fs::remove_file(&script_path);
assert!(matches!(outcome, PowershellParseOutcome::Failed));
assert!(started.elapsed() < POWERSHELL_PARSER_TIMEOUT + Duration::from_secs(1));
}
}

View File

@@ -435,10 +435,13 @@ WHERE job_id = ? AND item_id = ? AND status = ?
r#"
UPDATE agent_job_items
SET
status = ?,
result_json = ?,
reported_at = ?,
completed_at = ?,
updated_at = ?,
last_error = NULL
last_error = NULL,
assigned_thread_id = NULL
WHERE
job_id = ?
AND item_id = ?
@@ -446,9 +449,11 @@ WHERE
AND assigned_thread_id = ?
"#,
)
.bind(AgentJobItemStatus::Completed.as_str())
.bind(serialized)
.bind(now)
.bind(now)
.bind(now)
.bind(job_id)
.bind(item_id)
.bind(AgentJobItemStatus::Running.as_str())
@@ -560,3 +565,120 @@ WHERE job_id = ?
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::test_support::unique_temp_dir;
use pretty_assertions::assert_eq;
use serde_json::json;
async fn create_running_single_item_job(
runtime: &StateRuntime,
) -> anyhow::Result<(String, String, String)> {
let job_id = "job-1".to_string();
let item_id = "item-1".to_string();
let thread_id = "thread-1".to_string();
runtime
.create_agent_job(
&AgentJobCreateParams {
id: job_id.clone(),
name: "test-job".to_string(),
instruction: "Return a result".to_string(),
auto_export: true,
max_runtime_seconds: None,
output_schema_json: None,
input_headers: vec!["path".to_string()],
input_csv_path: "/tmp/in.csv".to_string(),
output_csv_path: "/tmp/out.csv".to_string(),
},
&[AgentJobItemCreateParams {
item_id: item_id.clone(),
row_index: 0,
source_id: None,
row_json: json!({"path":"file-1"}),
}],
)
.await?;
runtime.mark_agent_job_running(job_id.as_str()).await?;
let marked_running = runtime
.mark_agent_job_item_running_with_thread(
job_id.as_str(),
item_id.as_str(),
thread_id.as_str(),
)
.await?;
assert!(marked_running);
Ok((job_id, item_id, thread_id))
}
#[tokio::test]
async fn report_agent_job_item_result_completes_item_atomically() -> anyhow::Result<()> {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?;
let (job_id, item_id, thread_id) = create_running_single_item_job(runtime.as_ref()).await?;
let accepted = runtime
.report_agent_job_item_result(
job_id.as_str(),
item_id.as_str(),
thread_id.as_str(),
&json!({"ok": true}),
)
.await?;
assert!(accepted);
let item = runtime
.get_agent_job_item(job_id.as_str(), item_id.as_str())
.await?
.expect("job item should exist");
assert_eq!(item.status, AgentJobItemStatus::Completed);
assert_eq!(item.result_json, Some(json!({"ok": true})));
assert_eq!(item.assigned_thread_id, None);
assert_eq!(item.last_error, None);
assert!(item.reported_at.is_some());
assert!(item.completed_at.is_some());
let progress = runtime.get_agent_job_progress(job_id.as_str()).await?;
assert_eq!(
progress,
AgentJobProgress {
total_items: 1,
pending_items: 0,
running_items: 0,
completed_items: 1,
failed_items: 0,
}
);
Ok(())
}
#[tokio::test]
async fn report_agent_job_item_result_rejects_late_reports() -> anyhow::Result<()> {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?;
let (job_id, item_id, thread_id) = create_running_single_item_job(runtime.as_ref()).await?;
let marked_failed = runtime
.mark_agent_job_item_failed(job_id.as_str(), item_id.as_str(), "missing report")
.await?;
assert!(marked_failed);
let accepted = runtime
.report_agent_job_item_result(
job_id.as_str(),
item_id.as_str(),
thread_id.as_str(),
&json!({"late": true}),
)
.await?;
assert!(!accepted);
let item = runtime
.get_agent_job_item(job_id.as_str(), item_id.as_str())
.await?
.expect("job item should exist");
assert_eq!(item.status, AgentJobItemStatus::Failed);
assert_eq!(item.result_json, None);
assert_eq!(item.last_error, Some("missing report".to_string()));
Ok(())
}
}

View File

@@ -29,7 +29,6 @@ base64 = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["derive"] }
codex-ansi-escape = { workspace = true }
codex-app-server-client = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-arg0 = { workspace = true }
codex-backend-client = { workspace = true }

View File

@@ -39,7 +39,6 @@ use crate::tui::TuiEvent;
use crate::update_action::UpdateAction;
use crate::version::CODEX_CLI_VERSION;
use codex_ansi_escape::ansi_escape_line;
use codex_app_server_client::InProcessAppServerClient;
use codex_app_server_protocol::ConfigLayerSource;
use codex_core::AuthManager;
use codex_core::CodexAuth;
@@ -53,6 +52,7 @@ use codex_core::config::types::ApprovalsReviewer;
use codex_core::config::types::ModelAvailabilityNuxConfig;
use codex_core::config_loader::ConfigLayerStackOrdering;
use codex_core::features::Feature;
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use codex_core::models_manager::manager::RefreshStrategy;
use codex_core::models_manager::model_presets::HIDE_GPT_5_1_CODEX_MAX_MIGRATION_PROMPT_CONFIG;
use codex_core::models_manager::model_presets::HIDE_GPT5_1_MIGRATION_PROMPT_CONFIG;
@@ -113,7 +113,6 @@ use tokio::task::JoinHandle;
use toml::Value as TomlValue;
mod agent_navigation;
mod app_server_adapter;
mod pending_interactive_replay;
use self::agent_navigation::AgentNavigationDirection;
@@ -1948,7 +1947,7 @@ impl App {
#[allow(clippy::too_many_arguments)]
pub async fn run(
tui: &mut tui::Tui,
mut app_server: InProcessAppServerClient,
auth_manager: Arc<AuthManager>,
mut config: Config,
cli_kv_overrides: Vec<(String, TomlValue)>,
harness_overrides: ConfigOverrides,
@@ -1968,8 +1967,20 @@ impl App {
let harness_overrides =
normalize_harness_overrides_for_cwd(harness_overrides, &config.cwd)?;
let auth_manager = app_server.auth_manager();
let thread_manager = app_server.thread_manager();
let thread_manager = Arc::new(ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Cli,
CollaborationModesConfig {
default_mode_request_user_input: config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config);
let mut model = thread_manager
.get_models_manager()
.get_default_model(&config.model, RefreshStrategy::Offline)
@@ -1987,13 +1998,6 @@ impl App {
)
.await;
if let Some(exit_info) = exit_info {
app_server
.shutdown()
.await
.inspect_err(|err| {
tracing::warn!("app-server shutdown failed: {err}");
})
.ok();
return Ok(exit_info);
}
if let Some(updated_model) = config.model.clone() {
@@ -2225,7 +2229,6 @@ impl App {
let mut thread_created_rx = thread_manager.subscribe_thread_created();
let mut listen_for_threads = true;
let mut listen_for_app_server_events = true;
let mut waiting_for_initial_session_configured = wait_for_initial_session_configured;
#[cfg(not(debug_assertions))]
@@ -2285,16 +2288,6 @@ impl App {
Err(err) => break Err(err),
}
}
app_server_event = app_server.next_event(), if listen_for_app_server_events => {
match app_server_event {
Some(event) => app.handle_app_server_event(&app_server, event).await,
None => {
listen_for_app_server_events = false;
tracing::warn!("app-server event stream closed");
}
}
AppRunControl::Continue
}
// Listen on new thread creation due to collab tools.
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
@@ -2325,9 +2318,6 @@ impl App {
}
}
};
if let Err(err) = app_server.shutdown().await {
tracing::warn!(error = %err, "failed to shut down embedded app server");
}
let clear_result = tui.terminal.clear();
let exit_reason = match exit_reason_result {
Ok(exit_reason) => {

View File

@@ -1,72 +0,0 @@
/*
This module holds the temporary adapter layer between the TUI and the app
server during the hybrid migration period.
For now, the TUI still owns its existing direct-core behavior, but startup
allocates a local in-process app server and drains its event stream. Keeping
the app-server-specific wiring here keeps that transitional logic out of the
main `app.rs` orchestration path.
As more TUI flows move onto the app-server surface directly, this adapter
should shrink and eventually disappear.
*/
use super::App;
use codex_app_server_client::InProcessAppServerClient;
use codex_app_server_client::InProcessServerEvent;
use codex_app_server_protocol::JSONRPCErrorError;
impl App {
pub(super) async fn handle_app_server_event(
&mut self,
app_server_client: &InProcessAppServerClient,
event: InProcessServerEvent,
) {
match event {
InProcessServerEvent::Lagged { skipped } => {
tracing::warn!(
skipped,
"app-server event consumer lagged; dropping ignored events"
);
}
InProcessServerEvent::ServerNotification(_) => {}
InProcessServerEvent::LegacyNotification(_) => {}
InProcessServerEvent::ServerRequest(request) => {
let request_id = request.id().clone();
tracing::warn!(
?request_id,
"rejecting app-server request while TUI still uses direct core APIs"
);
if let Err(err) = self
.reject_app_server_request(
app_server_client,
request_id,
"TUI client does not yet handle this app-server server request".to_string(),
)
.await
{
tracing::warn!("{err}");
}
}
}
}
async fn reject_app_server_request(
&self,
app_server_client: &InProcessAppServerClient,
request_id: codex_app_server_protocol::RequestId,
reason: String,
) -> std::result::Result<(), String> {
app_server_client
.reject_server_request(
request_id,
JSONRPCErrorError {
code: -32000,
message: reason,
data: None,
},
)
.await
.map_err(|err| format!("failed to reject app-server request: {err}"))
}
}

View File

@@ -7,10 +7,6 @@ use additional_dirs::add_dir_warning_message;
use app::App;
pub use app::AppExitInfo;
pub use app::ExitReason;
use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
use codex_app_server_client::InProcessAppServerClient;
use codex_app_server_client::InProcessClientStartArgs;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_cloud_requirements::cloud_requirements_loader;
use codex_core::AuthManager;
use codex_core::CodexAuth;
@@ -50,15 +46,12 @@ use codex_state::log_db;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_oss::ensure_oss_provider_ready;
use codex_utils_oss::get_default_model_for_oss_provider;
use color_eyre::eyre::WrapErr;
use cwd_prompt::CwdPromptAction;
use cwd_prompt::CwdPromptOutcome;
use cwd_prompt::CwdSelection;
use std::fs::OpenOptions;
use std::future::Future;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::error;
use tracing_appender::non_blocking;
use tracing_subscriber::EnvFilter;
@@ -246,74 +239,10 @@ pub use public_widgets::composer_input::ComposerAction;
pub use public_widgets::composer_input::ComposerInput;
// (tests access modules directly within the crate)
async fn start_embedded_app_server(
arg0_paths: Arg0DispatchPaths,
config: Config,
cli_kv_overrides: Vec<(String, toml::Value)>,
loader_overrides: LoaderOverrides,
cloud_requirements: CloudRequirementsLoader,
feedback: codex_feedback::CodexFeedback,
) -> color_eyre::Result<InProcessAppServerClient> {
start_embedded_app_server_with(
arg0_paths,
config,
cli_kv_overrides,
loader_overrides,
cloud_requirements,
feedback,
InProcessAppServerClient::start,
)
.await
}
async fn start_embedded_app_server_with<F, Fut>(
arg0_paths: Arg0DispatchPaths,
config: Config,
cli_kv_overrides: Vec<(String, toml::Value)>,
loader_overrides: LoaderOverrides,
cloud_requirements: CloudRequirementsLoader,
feedback: codex_feedback::CodexFeedback,
start_client: F,
) -> color_eyre::Result<InProcessAppServerClient>
where
F: FnOnce(InProcessClientStartArgs) -> Fut,
Fut: Future<Output = std::io::Result<InProcessAppServerClient>>,
{
let config_warnings = config
.startup_warnings
.iter()
.map(|warning| ConfigWarningNotification {
summary: warning.clone(),
details: None,
path: None,
range: None,
})
.collect();
let client = start_client(InProcessClientStartArgs {
arg0_paths,
config: Arc::new(config),
cli_overrides: cli_kv_overrides,
loader_overrides,
cloud_requirements,
feedback,
config_warnings,
session_source: codex_protocol::protocol::SessionSource::Cli,
enable_codex_api_key_env: false,
client_name: "codex-tui".to_string(),
client_version: env!("CARGO_PKG_VERSION").to_string(),
experimental_api: true,
opt_out_notification_methods: Vec::new(),
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await
.wrap_err("failed to start embedded app server")?;
Ok(client)
}
pub async fn run_main(
mut cli: Cli,
arg0_paths: Arg0DispatchPaths,
loader_overrides: LoaderOverrides,
_loader_overrides: LoaderOverrides,
) -> std::io::Result<AppExitInfo> {
let (sandbox_mode, approval_policy) = if cli.full_auto {
(
@@ -611,8 +540,6 @@ pub async fn run_main(
run_ratatui_app(
cli,
arg0_paths,
loader_overrides,
config,
overrides,
cli_kv_overrides,
@@ -626,8 +553,6 @@ pub async fn run_main(
#[allow(clippy::too_many_arguments)]
async fn run_ratatui_app(
cli: Cli,
arg0_paths: Arg0DispatchPaths,
loader_overrides: LoaderOverrides,
initial_config: Config,
overrides: ConfigOverrides,
cli_kv_overrides: Vec<(String, toml::Value)>,
@@ -1025,27 +950,10 @@ async fn run_ratatui_app(
let use_alt_screen = determine_alt_screen_mode(no_alt_screen, config.tui_alternate_screen);
tui.set_alt_screen_enabled(use_alt_screen);
let app_server = match start_embedded_app_server(
arg0_paths,
config.clone(),
cli_kv_overrides.clone(),
loader_overrides,
cloud_requirements.clone(),
feedback.clone(),
)
.await
{
Ok(app_server) => app_server,
Err(err) => {
restore();
session_log::log_session_end();
return Err(err);
}
};
let app_result = App::run(
&mut tui,
app_server,
auth_manager,
config,
cli_kv_overrides.clone(),
overrides.clone(),
@@ -1328,20 +1236,6 @@ mod tests {
.await
}
async fn start_test_embedded_app_server(
config: Config,
) -> color_eyre::Result<InProcessAppServerClient> {
start_embedded_app_server(
Arg0DispatchPaths::default(),
config,
Vec::new(),
LoaderOverrides::default(),
CloudRequirementsLoader::default(),
codex_feedback::CodexFeedback::new(),
)
.await
}
#[tokio::test]
#[serial]
async fn windows_shows_trust_prompt_without_sandbox() -> std::io::Result<()> {
@@ -1358,51 +1252,6 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn embedded_app_server_exposes_client_manager_accessors() -> color_eyre::Result<()> {
let temp_dir = TempDir::new()?;
let config = build_config(&temp_dir).await?;
let app_server = start_test_embedded_app_server(config).await?;
assert!(Arc::ptr_eq(
&app_server.auth_manager(),
&app_server.auth_manager()
));
assert!(Arc::ptr_eq(
&app_server.thread_manager(),
&app_server.thread_manager()
));
app_server.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn embedded_app_server_start_failure_is_returned() -> color_eyre::Result<()> {
let temp_dir = TempDir::new()?;
let config = build_config(&temp_dir).await?;
let result = start_embedded_app_server_with(
Arg0DispatchPaths::default(),
config,
Vec::new(),
LoaderOverrides::default(),
CloudRequirementsLoader::default(),
codex_feedback::CodexFeedback::new(),
|_args| async { Err(std::io::Error::other("boom")) },
)
.await;
let err = match result {
Ok(_) => panic!("startup failure should be returned"),
Err(err) => err,
};
assert!(
err.to_string()
.contains("failed to start embedded app server"),
"error should preserve the embedded app server startup context"
);
Ok(())
}
#[tokio::test]
#[serial]
async fn windows_shows_trust_prompt_with_sandbox() -> std::io::Result<()> {

View File

@@ -89,6 +89,17 @@ impl App {
);
}
notification => {
if !app_server_client.is_remote()
&& matches!(
notification,
ServerNotification::TurnCompleted(_)
| ServerNotification::ThreadRealtimeItemAdded(_)
| ServerNotification::ThreadRealtimeOutputAudioDelta(_)
| ServerNotification::ThreadRealtimeError(_)
)
{
return;
}
if let Some((thread_id, events)) =
server_notification_thread_events(notification)
{
@@ -116,6 +127,9 @@ impl App {
AppServerEvent::LegacyNotification(notification) => {
if let Some((thread_id, event)) = legacy_thread_event(notification.params) {
self.pending_app_server_requests.note_legacy_event(&event);
if legacy_event_is_shadowed_by_server_notification(&event.msg) {
return;
}
if self.primary_thread_id.is_none()
|| matches!(event.msg, EventMsg::SessionConfigured(_))
&& self.primary_thread_id == Some(thread_id)
@@ -198,6 +212,24 @@ fn legacy_thread_event(params: Option<Value>) -> Option<(ThreadId, Event)> {
Some((thread_id, event))
}
fn legacy_event_is_shadowed_by_server_notification(msg: &EventMsg) -> bool {
matches!(
msg,
EventMsg::TokenCount(_)
| EventMsg::Error(_)
| EventMsg::ThreadNameUpdated(_)
| EventMsg::TurnStarted(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::PlanDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::RealtimeConversationStarted(_)
| EventMsg::RealtimeConversationClosed(_)
)
}
fn server_notification_thread_events(
notification: ServerNotification,
) -> Option<(ThreadId, Vec<Event>)> {