mirror of
https://github.com/openai/codex.git
synced 2026-03-17 20:23:48 +00:00
Compare commits
8 Commits
latest-alp
...
codex/wind
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
61dbfabc13 | ||
|
|
e8add54e5d | ||
|
|
ef36d39199 | ||
|
|
4ed19b0766 | ||
|
|
31648563c8 | ||
|
|
603b6493a9 | ||
|
|
d37dcca7e0 | ||
|
|
57f865c069 |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -2500,7 +2500,6 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"codex-ansi-escape",
|
||||
"codex-app-server-client",
|
||||
"codex-app-server-protocol",
|
||||
"codex-arg0",
|
||||
"codex-backend-client",
|
||||
|
||||
@@ -1857,6 +1857,187 @@ async fn turn_start_emits_spawn_agent_item_with_model_metadata_v2() -> Result<()
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_emits_spawn_agent_item_with_effective_role_model_metadata_v2() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
const CHILD_PROMPT: &str = "child: do work";
|
||||
const PARENT_PROMPT: &str = "spawn a child and continue";
|
||||
const SPAWN_CALL_ID: &str = "spawn-call-1";
|
||||
const REQUESTED_MODEL: &str = "gpt-5.1";
|
||||
const REQUESTED_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::Low;
|
||||
const ROLE_MODEL: &str = "gpt-5.1-codex-max";
|
||||
const ROLE_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::High;
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let spawn_args = serde_json::to_string(&json!({
|
||||
"message": CHILD_PROMPT,
|
||||
"agent_type": "custom",
|
||||
"model": REQUESTED_MODEL,
|
||||
"reasoning_effort": REQUESTED_REASONING_EFFORT,
|
||||
}))?;
|
||||
let _parent_turn = responses::mount_sse_once_match(
|
||||
&server,
|
||||
|req: &wiremock::Request| body_contains(req, PARENT_PROMPT),
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-turn1-1"),
|
||||
responses::ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
|
||||
responses::ev_completed("resp-turn1-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let _child_turn = responses::mount_sse_once_match(
|
||||
&server,
|
||||
|req: &wiremock::Request| {
|
||||
body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID)
|
||||
},
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-child-1"),
|
||||
responses::ev_assistant_message("msg-child-1", "child done"),
|
||||
responses::ev_completed("resp-child-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let _parent_follow_up = responses::mount_sse_once_match(
|
||||
&server,
|
||||
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-turn1-2"),
|
||||
responses::ev_assistant_message("msg-turn1-2", "parent done"),
|
||||
responses::ev_completed("resp-turn1-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
"never",
|
||||
&BTreeMap::from([(Feature::Collab, true)]),
|
||||
)?;
|
||||
std::fs::write(
|
||||
codex_home.path().join("custom-role.toml"),
|
||||
format!("model = \"{ROLE_MODEL}\"\nmodel_reasoning_effort = \"{ROLE_REASONING_EFFORT}\"\n",),
|
||||
)?;
|
||||
let config_path = codex_home.path().join("config.toml");
|
||||
let base_config = std::fs::read_to_string(&config_path)?;
|
||||
std::fs::write(
|
||||
&config_path,
|
||||
format!(
|
||||
r#"{base_config}
|
||||
|
||||
[agents.custom]
|
||||
description = "Custom role"
|
||||
config_file = "./custom-role.toml"
|
||||
"#
|
||||
),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("gpt-5.2-codex".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: PARENT_PROMPT.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
let turn: TurnStartResponse = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
|
||||
let spawn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let completed_notif = mcp
|
||||
.read_stream_until_notification_message("item/completed")
|
||||
.await?;
|
||||
let completed: ItemCompletedNotification =
|
||||
serde_json::from_value(completed_notif.params.expect("item/completed params"))?;
|
||||
if let ThreadItem::CollabAgentToolCall { id, .. } = &completed.item
|
||||
&& id == SPAWN_CALL_ID
|
||||
{
|
||||
return Ok::<ThreadItem, anyhow::Error>(completed.item);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
let ThreadItem::CollabAgentToolCall {
|
||||
id,
|
||||
tool,
|
||||
status,
|
||||
sender_thread_id,
|
||||
receiver_thread_ids,
|
||||
prompt,
|
||||
model,
|
||||
reasoning_effort,
|
||||
agents_states,
|
||||
} = spawn_completed
|
||||
else {
|
||||
unreachable!("loop ensures we break on collab agent tool call items");
|
||||
};
|
||||
let receiver_thread_id = receiver_thread_ids
|
||||
.first()
|
||||
.cloned()
|
||||
.expect("spawn completion should include child thread id");
|
||||
assert_eq!(id, SPAWN_CALL_ID);
|
||||
assert_eq!(tool, CollabAgentTool::SpawnAgent);
|
||||
assert_eq!(status, CollabAgentToolCallStatus::Completed);
|
||||
assert_eq!(sender_thread_id, thread.id);
|
||||
assert_eq!(receiver_thread_ids, vec![receiver_thread_id.clone()]);
|
||||
assert_eq!(prompt, Some(CHILD_PROMPT.to_string()));
|
||||
assert_eq!(model, Some(ROLE_MODEL.to_string()));
|
||||
assert_eq!(reasoning_effort, Some(ROLE_REASONING_EFFORT));
|
||||
assert_eq!(
|
||||
agents_states,
|
||||
HashMap::from([(
|
||||
receiver_thread_id,
|
||||
CollabAgentState {
|
||||
status: CollabAgentStatus::PendingInit,
|
||||
message: None,
|
||||
},
|
||||
)])
|
||||
);
|
||||
|
||||
let turn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let turn_completed_notif = mcp
|
||||
.read_stream_until_notification_message("turn/completed")
|
||||
.await?;
|
||||
let turn_completed: TurnCompletedNotification = serde_json::from_value(
|
||||
turn_completed_notif.params.expect("turn/completed params"),
|
||||
)?;
|
||||
if turn_completed.thread_id == thread.id && turn_completed.turn.id == turn.turn.id {
|
||||
return Ok::<TurnCompletedNotification, anyhow::Error>(turn_completed);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
assert_eq!(turn_completed.thread_id, thread.id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_file_change_approval_accept_for_session_persists_v2() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::agent::guards::Guards;
|
||||
use crate::agent::role::DEFAULT_ROLE_NAME;
|
||||
use crate::agent::role::resolve_role_config;
|
||||
use crate::agent::status::is_final;
|
||||
use crate::codex_thread::ThreadConfigSnapshot;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::find_thread_path_by_id_str;
|
||||
@@ -360,6 +361,19 @@ impl AgentControl {
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) async fn get_agent_config_snapshot(
|
||||
&self,
|
||||
agent_id: ThreadId,
|
||||
) -> Option<ThreadConfigSnapshot> {
|
||||
let Ok(state) = self.upgrade() else {
|
||||
return None;
|
||||
};
|
||||
let Ok(thread) = state.get_thread(agent_id).await else {
|
||||
return None;
|
||||
};
|
||||
Some(thread.config_snapshot().await)
|
||||
}
|
||||
|
||||
/// Subscribe to status updates for `agent_id`, yielding the latest value and changes.
|
||||
pub(crate) async fn subscribe_status(
|
||||
&self,
|
||||
|
||||
@@ -52,6 +52,7 @@ pub mod models_manager;
|
||||
mod network_policy_decision;
|
||||
pub mod network_proxy_loader;
|
||||
mod original_image_detail;
|
||||
mod packages;
|
||||
pub use mcp_connection_manager::MCP_SANDBOX_STATE_CAPABILITY;
|
||||
pub use mcp_connection_manager::MCP_SANDBOX_STATE_METHOD;
|
||||
pub use mcp_connection_manager::SandboxState;
|
||||
|
||||
1
codex-rs/core/src/packages/mod.rs
Normal file
1
codex-rs/core/src/packages/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub(crate) mod versions;
|
||||
2
codex-rs/core/src/packages/versions.rs
Normal file
2
codex-rs/core/src/packages/versions.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
/// Pinned versions for package-manager-backed installs.
|
||||
pub(crate) const ARTIFACT_RUNTIME: &str = "2.4.0";
|
||||
@@ -15,9 +15,12 @@ use crate::tools::registry::ToolHandler;
|
||||
use crate::tools::registry::ToolKind;
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::AgentStatus;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use futures::StreamExt;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
@@ -26,8 +29,10 @@ use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::watch::Receiver;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::timeout;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub struct BatchJobHandler;
|
||||
@@ -103,6 +108,7 @@ struct JobRunnerOptions {
|
||||
struct ActiveJobItem {
|
||||
item_id: String,
|
||||
started_at: Instant,
|
||||
status_rx: Option<Receiver<AgentStatus>>,
|
||||
}
|
||||
|
||||
struct JobProgressEmitter {
|
||||
@@ -676,6 +682,12 @@ async fn run_agent_job_loop(
|
||||
ActiveJobItem {
|
||||
item_id: item.item_id.clone(),
|
||||
started_at: Instant::now(),
|
||||
status_rx: session
|
||||
.services
|
||||
.agent_control
|
||||
.subscribe_status(thread_id)
|
||||
.await
|
||||
.ok(),
|
||||
},
|
||||
);
|
||||
progressed = true;
|
||||
@@ -708,7 +720,7 @@ async fn run_agent_job_loop(
|
||||
break;
|
||||
}
|
||||
if !progressed {
|
||||
tokio::time::sleep(STATUS_POLL_INTERVAL).await;
|
||||
wait_for_status_change(&active_items).await;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -863,6 +875,12 @@ async fn recover_running_items(
|
||||
ActiveJobItem {
|
||||
item_id: item.item_id.clone(),
|
||||
started_at: started_at_from_item(&item),
|
||||
status_rx: session
|
||||
.services
|
||||
.agent_control
|
||||
.subscribe_status(thread_id)
|
||||
.await
|
||||
.ok(),
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -876,13 +894,44 @@ async fn find_finished_threads(
|
||||
) -> Vec<(ThreadId, String)> {
|
||||
let mut finished = Vec::new();
|
||||
for (thread_id, item) in active_items {
|
||||
if is_final(&session.services.agent_control.get_status(*thread_id).await) {
|
||||
let status = active_item_status(session.as_ref(), *thread_id, item).await;
|
||||
if is_final(&status) {
|
||||
finished.push((*thread_id, item.item_id.clone()));
|
||||
}
|
||||
}
|
||||
finished
|
||||
}
|
||||
|
||||
async fn active_item_status(
|
||||
session: &Session,
|
||||
thread_id: ThreadId,
|
||||
item: &ActiveJobItem,
|
||||
) -> AgentStatus {
|
||||
if let Some(status_rx) = item.status_rx.as_ref()
|
||||
&& status_rx.has_changed().is_ok()
|
||||
{
|
||||
return status_rx.borrow().clone();
|
||||
}
|
||||
session.services.agent_control.get_status(thread_id).await
|
||||
}
|
||||
|
||||
async fn wait_for_status_change(active_items: &HashMap<ThreadId, ActiveJobItem>) {
|
||||
let mut waiters = FuturesUnordered::new();
|
||||
for item in active_items.values() {
|
||||
if let Some(status_rx) = item.status_rx.as_ref() {
|
||||
let mut status_rx = status_rx.clone();
|
||||
waiters.push(async move {
|
||||
let _ = status_rx.changed().await;
|
||||
});
|
||||
}
|
||||
}
|
||||
if waiters.is_empty() {
|
||||
tokio::time::sleep(STATUS_POLL_INTERVAL).await;
|
||||
return;
|
||||
}
|
||||
let _ = timeout(STATUS_POLL_INTERVAL, waiters.next()).await;
|
||||
}
|
||||
|
||||
async fn reap_stale_active_items(
|
||||
session: Arc<Session>,
|
||||
db: Arc<codex_state::StateRuntime>,
|
||||
@@ -920,37 +969,24 @@ async fn finalize_finished_item(
|
||||
item_id: &str,
|
||||
thread_id: ThreadId,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut item = db
|
||||
let item = db
|
||||
.get_agent_job_item(job_id, item_id)
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!("job item not found for finalization: {job_id}/{item_id}")
|
||||
})?;
|
||||
if item.result_json.is_none() {
|
||||
tokio::time::sleep(Duration::from_millis(250)).await;
|
||||
item = db
|
||||
.get_agent_job_item(job_id, item_id)
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!("job item not found after grace period: {job_id}/{item_id}")
|
||||
})?;
|
||||
}
|
||||
if item.result_json.is_some() {
|
||||
if !db.mark_agent_job_item_completed(job_id, item_id).await? {
|
||||
db.mark_agent_job_item_failed(
|
||||
job_id,
|
||||
item_id,
|
||||
"worker reported result but item could not transition to completed",
|
||||
)
|
||||
.await?;
|
||||
if matches!(item.status, codex_state::AgentJobItemStatus::Running) {
|
||||
if item.result_json.is_some() {
|
||||
let _ = db.mark_agent_job_item_completed(job_id, item_id).await?;
|
||||
} else {
|
||||
let _ = db
|
||||
.mark_agent_job_item_failed(
|
||||
job_id,
|
||||
item_id,
|
||||
"worker finished without calling report_agent_job_result",
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
} else {
|
||||
db.mark_agent_job_item_failed(
|
||||
job_id,
|
||||
item_id,
|
||||
"worker finished without calling report_agent_job_result",
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
let _ = session
|
||||
.services
|
||||
|
||||
@@ -15,6 +15,7 @@ use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::features::Feature;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::packages::versions;
|
||||
use crate::protocol::ExecCommandSource;
|
||||
use crate::tools::context::FunctionToolOutput;
|
||||
use crate::tools::context::ToolInvocation;
|
||||
@@ -28,7 +29,6 @@ use crate::tools::registry::ToolKind;
|
||||
|
||||
const ARTIFACTS_TOOL_NAME: &str = "artifacts";
|
||||
const ARTIFACTS_PRAGMA_PREFIXES: [&str; 2] = ["// codex-artifacts:", "// codex-artifact-tool:"];
|
||||
pub(crate) const PINNED_ARTIFACT_RUNTIME_VERSION: &str = "2.4.0";
|
||||
const DEFAULT_EXECUTION_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
pub struct ArtifactsHandler;
|
||||
@@ -216,7 +216,7 @@ fn parse_pragma_prefix(line: &str) -> Option<&str> {
|
||||
fn default_runtime_manager(codex_home: std::path::PathBuf) -> ArtifactRuntimeManager {
|
||||
ArtifactRuntimeManager::new(ArtifactRuntimeManagerConfig::with_default_release(
|
||||
codex_home,
|
||||
PINNED_ARTIFACT_RUNTIME_VERSION,
|
||||
versions::ARTIFACT_RUNTIME,
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use crate::packages::versions;
|
||||
use codex_artifacts::RuntimeEntrypoints;
|
||||
use codex_artifacts::RuntimePathEntry;
|
||||
use tempfile::TempDir;
|
||||
@@ -46,7 +47,7 @@ fn default_runtime_manager_uses_openai_codex_release_base() {
|
||||
);
|
||||
assert_eq!(
|
||||
manager.config().release().runtime_version(),
|
||||
PINNED_ARTIFACT_RUNTIME_VERSION
|
||||
versions::ARTIFACT_RUNTIME
|
||||
);
|
||||
}
|
||||
|
||||
@@ -59,14 +60,14 @@ fn load_cached_runtime_reads_pinned_cache_path() {
|
||||
.path()
|
||||
.join("packages")
|
||||
.join("artifacts")
|
||||
.join(PINNED_ARTIFACT_RUNTIME_VERSION)
|
||||
.join(versions::ARTIFACT_RUNTIME)
|
||||
.join(platform.as_str());
|
||||
std::fs::create_dir_all(&install_dir).expect("create install dir");
|
||||
std::fs::write(
|
||||
install_dir.join("manifest.json"),
|
||||
serde_json::json!({
|
||||
"schema_version": 1,
|
||||
"runtime_version": PINNED_ARTIFACT_RUNTIME_VERSION,
|
||||
"runtime_version": versions::ARTIFACT_RUNTIME,
|
||||
"node": { "relative_path": "node/bin/node" },
|
||||
"entrypoints": {
|
||||
"build_js": { "relative_path": "artifact-tool/dist/artifact_tool.mjs" },
|
||||
@@ -95,10 +96,10 @@ fn load_cached_runtime_reads_pinned_cache_path() {
|
||||
&codex_home
|
||||
.path()
|
||||
.join(codex_artifacts::DEFAULT_CACHE_ROOT_RELATIVE),
|
||||
PINNED_ARTIFACT_RUNTIME_VERSION,
|
||||
versions::ARTIFACT_RUNTIME,
|
||||
)
|
||||
.expect("resolve runtime");
|
||||
assert_eq!(runtime.runtime_version(), PINNED_ARTIFACT_RUNTIME_VERSION);
|
||||
assert_eq!(runtime.runtime_version(), versions::ARTIFACT_RUNTIME);
|
||||
assert_eq!(
|
||||
runtime.manifest().entrypoints,
|
||||
RuntimeEntrypoints {
|
||||
|
||||
@@ -95,13 +95,15 @@ impl ToolHandler for Handler {
|
||||
.await;
|
||||
result?;
|
||||
|
||||
Ok(CloseAgentResult { status })
|
||||
Ok(CloseAgentResult {
|
||||
previous_status: status,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub(crate) struct CloseAgentResult {
|
||||
pub(crate) status: AgentStatus,
|
||||
pub(crate) previous_status: AgentStatus,
|
||||
}
|
||||
|
||||
impl ToolOutput for CloseAgentResult {
|
||||
|
||||
@@ -98,15 +98,37 @@ impl ToolHandler for Handler {
|
||||
),
|
||||
Err(_) => (None, AgentStatus::NotFound),
|
||||
};
|
||||
let (new_agent_nickname, new_agent_role) = match new_thread_id {
|
||||
Some(thread_id) => session
|
||||
let agent_snapshot = match new_thread_id {
|
||||
Some(thread_id) => {
|
||||
session
|
||||
.services
|
||||
.agent_control
|
||||
.get_agent_config_snapshot(thread_id)
|
||||
.await
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let (new_agent_nickname, new_agent_role) = match (&agent_snapshot, new_thread_id) {
|
||||
(Some(snapshot), _) => (
|
||||
snapshot.session_source.get_nickname(),
|
||||
snapshot.session_source.get_agent_role(),
|
||||
),
|
||||
(None, Some(thread_id)) => session
|
||||
.services
|
||||
.agent_control
|
||||
.get_agent_nickname_and_role(thread_id)
|
||||
.await
|
||||
.unwrap_or((None, None)),
|
||||
None => (None, None),
|
||||
(None, None) => (None, None),
|
||||
};
|
||||
let effective_model = agent_snapshot
|
||||
.as_ref()
|
||||
.map(|snapshot| snapshot.model.clone())
|
||||
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
|
||||
let effective_reasoning_effort = agent_snapshot
|
||||
.as_ref()
|
||||
.and_then(|snapshot| snapshot.reasoning_effort)
|
||||
.unwrap_or(args.reasoning_effort.unwrap_or_default());
|
||||
let nickname = new_agent_nickname.clone();
|
||||
session
|
||||
.send_event(
|
||||
@@ -118,8 +140,8 @@ impl ToolHandler for Handler {
|
||||
new_agent_nickname,
|
||||
new_agent_role,
|
||||
prompt,
|
||||
model: args.model.clone().unwrap_or_default(),
|
||||
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
|
||||
model: effective_model,
|
||||
reasoning_effort: effective_reasoning_effort,
|
||||
status,
|
||||
}
|
||||
.into(),
|
||||
|
||||
@@ -971,7 +971,7 @@ async fn wait_agent_returns_final_status_without_timeout() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn close_agent_submits_shutdown_and_returns_status() {
|
||||
async fn close_agent_submits_shutdown_and_returns_previous_status() {
|
||||
let (mut session, turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
session.services.agent_control = manager.agent_control();
|
||||
@@ -993,7 +993,7 @@ async fn close_agent_submits_shutdown_and_returns_status() {
|
||||
let (content, success) = expect_text_output(output);
|
||||
let result: close_agent::CloseAgentResult =
|
||||
serde_json::from_str(&content).expect("close_agent result should be json");
|
||||
assert_eq!(result.status, status_before);
|
||||
assert_eq!(result.previous_status, status_before);
|
||||
assert_eq!(success, Some(true));
|
||||
|
||||
let ops = manager.captured_ops();
|
||||
|
||||
@@ -195,9 +195,12 @@ fn close_agent_output_schema() -> JsonValue {
|
||||
json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": agent_status_output_schema()
|
||||
"previous_status": {
|
||||
"description": "The agent status observed before shutdown was requested.",
|
||||
"allOf": [agent_status_output_schema()]
|
||||
}
|
||||
},
|
||||
"required": ["status"],
|
||||
"required": ["previous_status"],
|
||||
"additionalProperties": false
|
||||
})
|
||||
}
|
||||
@@ -1523,7 +1526,7 @@ fn create_close_agent_tool() -> ToolSpec {
|
||||
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "close_agent".to_string(),
|
||||
description: "Close an agent when it is no longer needed and return its last known status. Don't keep agents open for too long if they are not needed anymore.".to_string(),
|
||||
description: "Close an agent when it is no longer needed and return its previous status before shutdown was requested. Don't keep agents open for too long if they are not needed anymore.".to_string(),
|
||||
strict: false,
|
||||
defer_loading: None,
|
||||
parameters: JsonSchema::Object {
|
||||
|
||||
@@ -16,10 +16,8 @@ use std::os::fd::AsRawFd;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use codex_core::error::CodexErr;
|
||||
use codex_core::error::Result;
|
||||
use codex_protocol::protocol::FileSystemSandboxPolicy;
|
||||
use codex_protocol::protocol::WritableRoot;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
|
||||
/// Linux "platform defaults" that keep common system binaries and dynamic
|
||||
@@ -41,10 +39,10 @@ const LINUX_PLATFORM_DEFAULT_READ_ROOTS: &[&str] = &[
|
||||
/// Options that control how bubblewrap is invoked.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(crate) struct BwrapOptions {
|
||||
/// Whether to mount a fresh `/proc` inside the PID namespace.
|
||||
/// Whether to mount a fresh `/proc` inside the sandbox.
|
||||
///
|
||||
/// This is the secure default, but some restrictive container environments
|
||||
/// deny `--proc /proc` even when PID namespaces are available.
|
||||
/// deny `--proc /proc`.
|
||||
pub mount_proc: bool,
|
||||
/// How networking should be configured inside the bubblewrap sandbox.
|
||||
pub network_mode: BwrapNetworkMode,
|
||||
@@ -167,7 +165,6 @@ fn create_bwrap_flags(
|
||||
// Request a user namespace explicitly rather than relying on bubblewrap's
|
||||
// auto-enable behavior, which is skipped when the caller runs as uid 0.
|
||||
args.push("--unshare-user".to_string());
|
||||
// Isolate the PID namespace.
|
||||
args.push("--unshare-pid".to_string());
|
||||
if options.network_mode.should_unshare_network() {
|
||||
args.push("--unshare-net".to_string());
|
||||
@@ -213,9 +210,15 @@ fn create_filesystem_args(
|
||||
file_system_sandbox_policy: &FileSystemSandboxPolicy,
|
||||
cwd: &Path,
|
||||
) -> Result<BwrapArgs> {
|
||||
let writable_roots = file_system_sandbox_policy.get_writable_roots_with_cwd(cwd);
|
||||
// Bubblewrap requires bind mount targets to exist. Skip missing writable
|
||||
// roots so mixed-platform configs can keep harmless paths for other
|
||||
// environments without breaking Linux command startup.
|
||||
let writable_roots = file_system_sandbox_policy
|
||||
.get_writable_roots_with_cwd(cwd)
|
||||
.into_iter()
|
||||
.filter(|writable_root| writable_root.root.as_path().exists())
|
||||
.collect::<Vec<_>>();
|
||||
let unreadable_roots = file_system_sandbox_policy.get_unreadable_roots_with_cwd(cwd);
|
||||
ensure_mount_targets_exist(&writable_roots)?;
|
||||
|
||||
let mut args = if file_system_sandbox_policy.has_full_disk_read_access() {
|
||||
// Read-only root, then mount a minimal device tree.
|
||||
@@ -385,23 +388,6 @@ fn create_filesystem_args(
|
||||
})
|
||||
}
|
||||
|
||||
/// Validate that writable roots exist before constructing mounts.
|
||||
///
|
||||
/// Bubblewrap requires bind mount targets to exist. We fail fast with a clear
|
||||
/// error so callers can present an actionable message.
|
||||
fn ensure_mount_targets_exist(writable_roots: &[WritableRoot]) -> Result<()> {
|
||||
for writable_root in writable_roots {
|
||||
let root = writable_root.root.as_path();
|
||||
if !root.exists() {
|
||||
return Err(CodexErr::UnsupportedOperation(format!(
|
||||
"Sandbox expected writable root {root}, but it does not exist.",
|
||||
root = root.display()
|
||||
)));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn path_to_string(path: &Path) -> String {
|
||||
path.to_string_lossy().to_string()
|
||||
}
|
||||
@@ -731,6 +717,41 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ignores_missing_writable_roots() {
|
||||
let temp_dir = TempDir::new().expect("temp dir");
|
||||
let existing_root = temp_dir.path().join("existing");
|
||||
let missing_root = temp_dir.path().join("missing");
|
||||
std::fs::create_dir(&existing_root).expect("create existing root");
|
||||
|
||||
let policy = SandboxPolicy::WorkspaceWrite {
|
||||
writable_roots: vec![
|
||||
AbsolutePathBuf::try_from(existing_root.as_path()).expect("absolute existing root"),
|
||||
AbsolutePathBuf::try_from(missing_root.as_path()).expect("absolute missing root"),
|
||||
],
|
||||
read_only_access: Default::default(),
|
||||
network_access: false,
|
||||
exclude_tmpdir_env_var: true,
|
||||
exclude_slash_tmp: true,
|
||||
};
|
||||
|
||||
let args = create_filesystem_args(&FileSystemSandboxPolicy::from(&policy), temp_dir.path())
|
||||
.expect("filesystem args");
|
||||
let existing_root = path_to_string(&existing_root);
|
||||
let missing_root = path_to_string(&missing_root);
|
||||
|
||||
assert!(
|
||||
args.args.windows(3).any(|window| {
|
||||
window == ["--bind", existing_root.as_str(), existing_root.as_str()]
|
||||
}),
|
||||
"existing writable root should be rebound writable",
|
||||
);
|
||||
assert!(
|
||||
!args.args.iter().any(|arg| arg == &missing_root),
|
||||
"missing writable root should be skipped",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mounts_dev_before_writable_dev_binds() {
|
||||
let sandbox_policy = SandboxPolicy::WorkspaceWrite {
|
||||
|
||||
@@ -310,6 +310,32 @@ async fn test_writable_root() {
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sandbox_ignores_missing_writable_roots_under_bwrap() {
|
||||
if should_skip_bwrap_tests().await {
|
||||
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
|
||||
return;
|
||||
}
|
||||
|
||||
let tempdir = tempfile::tempdir().expect("tempdir");
|
||||
let existing_root = tempdir.path().join("existing");
|
||||
let missing_root = tempdir.path().join("missing");
|
||||
std::fs::create_dir(&existing_root).expect("create existing root");
|
||||
|
||||
let output = run_cmd_result_with_writable_roots(
|
||||
&["bash", "-lc", "printf sandbox-ok"],
|
||||
&[existing_root, missing_root],
|
||||
LONG_TIMEOUT_MS,
|
||||
false,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.expect("sandboxed command should execute");
|
||||
|
||||
assert_eq!(output.exit_code, 0);
|
||||
assert_eq!(output.stdout.text, "sandbox-ok");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_no_new_privs_is_enabled() {
|
||||
let output = run_cmd_output(
|
||||
|
||||
@@ -3263,9 +3263,9 @@ pub struct CollabAgentSpawnEndEvent {
|
||||
/// Initial prompt sent to the agent. Can be empty to prevent CoT leaking at the
|
||||
/// beginning.
|
||||
pub prompt: String,
|
||||
/// Model requested for the spawned agent.
|
||||
/// Effective model used by the spawned agent after inheritance and role overrides.
|
||||
pub model: String,
|
||||
/// Reasoning effort requested for the spawned agent.
|
||||
/// Effective reasoning effort used by the spawned agent after inheritance and role overrides.
|
||||
pub reasoning_effort: ReasoningEffortConfig,
|
||||
/// Last known status of the new agent reported to the sender agent.
|
||||
pub status: AgentStatus,
|
||||
|
||||
@@ -1,11 +1,19 @@
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use serde::Deserialize;
|
||||
use std::io::Read;
|
||||
use std::path::Path;
|
||||
use std::process::Command;
|
||||
use std::process::Output;
|
||||
use std::process::Stdio;
|
||||
use std::sync::LazyLock;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
const POWERSHELL_PARSER_SCRIPT: &str = include_str!("powershell_parser.ps1");
|
||||
const POWERSHELL_PARSER_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const POWERSHELL_PARSER_POLL_INTERVAL: Duration = Duration::from_millis(10);
|
||||
|
||||
/// On Windows, we conservatively allow only clearly read-only PowerShell invocations
|
||||
/// that match a small safelist. Anything else (including direct CMD commands) is unsafe.
|
||||
@@ -127,7 +135,7 @@ fn is_powershell_executable(exe: &str) -> bool {
|
||||
fn parse_with_powershell_ast(executable: &str, script: &str) -> PowershellParseOutcome {
|
||||
let encoded_script = encode_powershell_base64(script);
|
||||
let encoded_parser_script = encoded_parser_script();
|
||||
match Command::new(executable)
|
||||
let mut child = match Command::new(executable)
|
||||
.args([
|
||||
"-NoLogo",
|
||||
"-NoProfile",
|
||||
@@ -136,18 +144,68 @@ fn parse_with_powershell_ast(executable: &str, script: &str) -> PowershellParseO
|
||||
encoded_parser_script,
|
||||
])
|
||||
.env("CODEX_POWERSHELL_PAYLOAD", &encoded_script)
|
||||
.output()
|
||||
// Match `Command::output()` so PowerShell does not stay alive waiting on inherited stdin
|
||||
// after the parser script has already finished.
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Ok(output) if output.status.success() => {
|
||||
if let Ok(result) =
|
||||
serde_json::from_slice::<PowershellParserOutput>(output.stdout.as_slice())
|
||||
{
|
||||
Ok(child) => child,
|
||||
Err(_) => return PowershellParseOutcome::Failed,
|
||||
};
|
||||
|
||||
let deadline = Instant::now() + POWERSHELL_PARSER_TIMEOUT;
|
||||
let output = loop {
|
||||
match child.try_wait() {
|
||||
Ok(Some(status)) => {
|
||||
let mut stdout = Vec::new();
|
||||
let mut stderr = Vec::new();
|
||||
|
||||
if let Some(mut reader) = child.stdout.take()
|
||||
&& reader.read_to_end(&mut stdout).is_err()
|
||||
{
|
||||
return PowershellParseOutcome::Failed;
|
||||
}
|
||||
|
||||
if let Some(mut reader) = child.stderr.take()
|
||||
&& reader.read_to_end(&mut stderr).is_err()
|
||||
{
|
||||
return PowershellParseOutcome::Failed;
|
||||
}
|
||||
|
||||
break Output {
|
||||
status,
|
||||
stdout,
|
||||
stderr,
|
||||
};
|
||||
}
|
||||
Ok(None) => {
|
||||
if Instant::now() >= deadline {
|
||||
let _ = child.kill();
|
||||
let _ = child.wait();
|
||||
return PowershellParseOutcome::Failed;
|
||||
}
|
||||
|
||||
thread::sleep(POWERSHELL_PARSER_POLL_INTERVAL);
|
||||
}
|
||||
Err(_) => {
|
||||
let _ = child.kill();
|
||||
let _ = child.wait();
|
||||
return PowershellParseOutcome::Failed;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match output.status.success() {
|
||||
true => {
|
||||
if let Ok(result) = serde_json::from_slice::<PowershellParserOutput>(&output.stdout) {
|
||||
result.into_outcome()
|
||||
} else {
|
||||
PowershellParseOutcome::Failed
|
||||
}
|
||||
}
|
||||
_ => PowershellParseOutcome::Failed,
|
||||
false => PowershellParseOutcome::Failed,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -348,7 +406,11 @@ fn is_safe_git_command(words: &[String]) -> bool {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::powershell::try_find_pwsh_executable_blocking;
|
||||
use std::fs;
|
||||
use std::string::ToString;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
|
||||
/// Converts a slice of string literals into owned `String`s for the tests.
|
||||
fn vec_str(args: &[&str]) -> Vec<String> {
|
||||
@@ -620,4 +682,26 @@ mod tests {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn powershell_ast_parser_times_out_for_stuck_child() {
|
||||
let unique = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("system time")
|
||||
.as_nanos();
|
||||
let script_path =
|
||||
std::env::temp_dir().join(format!("codex-powershell-parser-timeout-{unique}.cmd"));
|
||||
fs::write(&script_path, "@echo off\r\ntimeout /t 10 /nobreak >nul\r\n")
|
||||
.expect("write fake powershell");
|
||||
|
||||
let started = Instant::now();
|
||||
let outcome = parse_with_powershell_ast(
|
||||
script_path.to_str().expect("utf8 temp path"),
|
||||
"Write-Output ok",
|
||||
);
|
||||
let _ = fs::remove_file(&script_path);
|
||||
|
||||
assert!(matches!(outcome, PowershellParseOutcome::Failed));
|
||||
assert!(started.elapsed() < POWERSHELL_PARSER_TIMEOUT + Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -435,10 +435,13 @@ WHERE job_id = ? AND item_id = ? AND status = ?
|
||||
r#"
|
||||
UPDATE agent_job_items
|
||||
SET
|
||||
status = ?,
|
||||
result_json = ?,
|
||||
reported_at = ?,
|
||||
completed_at = ?,
|
||||
updated_at = ?,
|
||||
last_error = NULL
|
||||
last_error = NULL,
|
||||
assigned_thread_id = NULL
|
||||
WHERE
|
||||
job_id = ?
|
||||
AND item_id = ?
|
||||
@@ -446,9 +449,11 @@ WHERE
|
||||
AND assigned_thread_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobItemStatus::Completed.as_str())
|
||||
.bind(serialized)
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(job_id)
|
||||
.bind(item_id)
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
@@ -560,3 +565,120 @@ WHERE job_id = ?
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::runtime::test_support::unique_temp_dir;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
async fn create_running_single_item_job(
|
||||
runtime: &StateRuntime,
|
||||
) -> anyhow::Result<(String, String, String)> {
|
||||
let job_id = "job-1".to_string();
|
||||
let item_id = "item-1".to_string();
|
||||
let thread_id = "thread-1".to_string();
|
||||
runtime
|
||||
.create_agent_job(
|
||||
&AgentJobCreateParams {
|
||||
id: job_id.clone(),
|
||||
name: "test-job".to_string(),
|
||||
instruction: "Return a result".to_string(),
|
||||
auto_export: true,
|
||||
max_runtime_seconds: None,
|
||||
output_schema_json: None,
|
||||
input_headers: vec!["path".to_string()],
|
||||
input_csv_path: "/tmp/in.csv".to_string(),
|
||||
output_csv_path: "/tmp/out.csv".to_string(),
|
||||
},
|
||||
&[AgentJobItemCreateParams {
|
||||
item_id: item_id.clone(),
|
||||
row_index: 0,
|
||||
source_id: None,
|
||||
row_json: json!({"path":"file-1"}),
|
||||
}],
|
||||
)
|
||||
.await?;
|
||||
runtime.mark_agent_job_running(job_id.as_str()).await?;
|
||||
let marked_running = runtime
|
||||
.mark_agent_job_item_running_with_thread(
|
||||
job_id.as_str(),
|
||||
item_id.as_str(),
|
||||
thread_id.as_str(),
|
||||
)
|
||||
.await?;
|
||||
assert!(marked_running);
|
||||
Ok((job_id, item_id, thread_id))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn report_agent_job_item_result_completes_item_atomically() -> anyhow::Result<()> {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?;
|
||||
let (job_id, item_id, thread_id) = create_running_single_item_job(runtime.as_ref()).await?;
|
||||
|
||||
let accepted = runtime
|
||||
.report_agent_job_item_result(
|
||||
job_id.as_str(),
|
||||
item_id.as_str(),
|
||||
thread_id.as_str(),
|
||||
&json!({"ok": true}),
|
||||
)
|
||||
.await?;
|
||||
assert!(accepted);
|
||||
|
||||
let item = runtime
|
||||
.get_agent_job_item(job_id.as_str(), item_id.as_str())
|
||||
.await?
|
||||
.expect("job item should exist");
|
||||
assert_eq!(item.status, AgentJobItemStatus::Completed);
|
||||
assert_eq!(item.result_json, Some(json!({"ok": true})));
|
||||
assert_eq!(item.assigned_thread_id, None);
|
||||
assert_eq!(item.last_error, None);
|
||||
assert!(item.reported_at.is_some());
|
||||
assert!(item.completed_at.is_some());
|
||||
let progress = runtime.get_agent_job_progress(job_id.as_str()).await?;
|
||||
assert_eq!(
|
||||
progress,
|
||||
AgentJobProgress {
|
||||
total_items: 1,
|
||||
pending_items: 0,
|
||||
running_items: 0,
|
||||
completed_items: 1,
|
||||
failed_items: 0,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn report_agent_job_item_result_rejects_late_reports() -> anyhow::Result<()> {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?;
|
||||
let (job_id, item_id, thread_id) = create_running_single_item_job(runtime.as_ref()).await?;
|
||||
|
||||
let marked_failed = runtime
|
||||
.mark_agent_job_item_failed(job_id.as_str(), item_id.as_str(), "missing report")
|
||||
.await?;
|
||||
assert!(marked_failed);
|
||||
let accepted = runtime
|
||||
.report_agent_job_item_result(
|
||||
job_id.as_str(),
|
||||
item_id.as_str(),
|
||||
thread_id.as_str(),
|
||||
&json!({"late": true}),
|
||||
)
|
||||
.await?;
|
||||
assert!(!accepted);
|
||||
|
||||
let item = runtime
|
||||
.get_agent_job_item(job_id.as_str(), item_id.as_str())
|
||||
.await?
|
||||
.expect("job item should exist");
|
||||
assert_eq!(item.status, AgentJobItemStatus::Failed);
|
||||
assert_eq!(item.result_json, None);
|
||||
assert_eq!(item.last_error, Some("missing report".to_string()));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ base64 = { workspace = true }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
codex-ansi-escape = { workspace = true }
|
||||
codex-app-server-client = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-arg0 = { workspace = true }
|
||||
codex-backend-client = { workspace = true }
|
||||
|
||||
@@ -39,7 +39,6 @@ use crate::tui::TuiEvent;
|
||||
use crate::update_action::UpdateAction;
|
||||
use crate::version::CODEX_CLI_VERSION;
|
||||
use codex_ansi_escape::ansi_escape_line;
|
||||
use codex_app_server_client::InProcessAppServerClient;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::CodexAuth;
|
||||
@@ -53,6 +52,7 @@ use codex_core::config::types::ApprovalsReviewer;
|
||||
use codex_core::config::types::ModelAvailabilityNuxConfig;
|
||||
use codex_core::config_loader::ConfigLayerStackOrdering;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use codex_core::models_manager::manager::RefreshStrategy;
|
||||
use codex_core::models_manager::model_presets::HIDE_GPT_5_1_CODEX_MAX_MIGRATION_PROMPT_CONFIG;
|
||||
use codex_core::models_manager::model_presets::HIDE_GPT5_1_MIGRATION_PROMPT_CONFIG;
|
||||
@@ -113,7 +113,6 @@ use tokio::task::JoinHandle;
|
||||
use toml::Value as TomlValue;
|
||||
|
||||
mod agent_navigation;
|
||||
mod app_server_adapter;
|
||||
mod pending_interactive_replay;
|
||||
|
||||
use self::agent_navigation::AgentNavigationDirection;
|
||||
@@ -1948,7 +1947,7 @@ impl App {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn run(
|
||||
tui: &mut tui::Tui,
|
||||
mut app_server: InProcessAppServerClient,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
mut config: Config,
|
||||
cli_kv_overrides: Vec<(String, TomlValue)>,
|
||||
harness_overrides: ConfigOverrides,
|
||||
@@ -1968,8 +1967,20 @@ impl App {
|
||||
|
||||
let harness_overrides =
|
||||
normalize_harness_overrides_for_cwd(harness_overrides, &config.cwd)?;
|
||||
let auth_manager = app_server.auth_manager();
|
||||
let thread_manager = app_server.thread_manager();
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Cli,
|
||||
CollaborationModesConfig {
|
||||
default_mode_request_user_input: config
|
||||
.features
|
||||
.enabled(Feature::DefaultModeRequestUserInput),
|
||||
},
|
||||
));
|
||||
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
|
||||
thread_manager
|
||||
.plugins_manager()
|
||||
.maybe_start_curated_repo_sync_for_config(&config);
|
||||
let mut model = thread_manager
|
||||
.get_models_manager()
|
||||
.get_default_model(&config.model, RefreshStrategy::Offline)
|
||||
@@ -1987,13 +1998,6 @@ impl App {
|
||||
)
|
||||
.await;
|
||||
if let Some(exit_info) = exit_info {
|
||||
app_server
|
||||
.shutdown()
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
tracing::warn!("app-server shutdown failed: {err}");
|
||||
})
|
||||
.ok();
|
||||
return Ok(exit_info);
|
||||
}
|
||||
if let Some(updated_model) = config.model.clone() {
|
||||
@@ -2225,7 +2229,6 @@ impl App {
|
||||
|
||||
let mut thread_created_rx = thread_manager.subscribe_thread_created();
|
||||
let mut listen_for_threads = true;
|
||||
let mut listen_for_app_server_events = true;
|
||||
let mut waiting_for_initial_session_configured = wait_for_initial_session_configured;
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
@@ -2285,16 +2288,6 @@ impl App {
|
||||
Err(err) => break Err(err),
|
||||
}
|
||||
}
|
||||
app_server_event = app_server.next_event(), if listen_for_app_server_events => {
|
||||
match app_server_event {
|
||||
Some(event) => app.handle_app_server_event(&app_server, event).await,
|
||||
None => {
|
||||
listen_for_app_server_events = false;
|
||||
tracing::warn!("app-server event stream closed");
|
||||
}
|
||||
}
|
||||
AppRunControl::Continue
|
||||
}
|
||||
// Listen on new thread creation due to collab tools.
|
||||
created = thread_created_rx.recv(), if listen_for_threads => {
|
||||
match created {
|
||||
@@ -2325,9 +2318,6 @@ impl App {
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Err(err) = app_server.shutdown().await {
|
||||
tracing::warn!(error = %err, "failed to shut down embedded app server");
|
||||
}
|
||||
let clear_result = tui.terminal.clear();
|
||||
let exit_reason = match exit_reason_result {
|
||||
Ok(exit_reason) => {
|
||||
|
||||
@@ -1,72 +0,0 @@
|
||||
/*
|
||||
This module holds the temporary adapter layer between the TUI and the app
|
||||
server during the hybrid migration period.
|
||||
|
||||
For now, the TUI still owns its existing direct-core behavior, but startup
|
||||
allocates a local in-process app server and drains its event stream. Keeping
|
||||
the app-server-specific wiring here keeps that transitional logic out of the
|
||||
main `app.rs` orchestration path.
|
||||
|
||||
As more TUI flows move onto the app-server surface directly, this adapter
|
||||
should shrink and eventually disappear.
|
||||
*/
|
||||
|
||||
use super::App;
|
||||
use codex_app_server_client::InProcessAppServerClient;
|
||||
use codex_app_server_client::InProcessServerEvent;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
|
||||
impl App {
|
||||
pub(super) async fn handle_app_server_event(
|
||||
&mut self,
|
||||
app_server_client: &InProcessAppServerClient,
|
||||
event: InProcessServerEvent,
|
||||
) {
|
||||
match event {
|
||||
InProcessServerEvent::Lagged { skipped } => {
|
||||
tracing::warn!(
|
||||
skipped,
|
||||
"app-server event consumer lagged; dropping ignored events"
|
||||
);
|
||||
}
|
||||
InProcessServerEvent::ServerNotification(_) => {}
|
||||
InProcessServerEvent::LegacyNotification(_) => {}
|
||||
InProcessServerEvent::ServerRequest(request) => {
|
||||
let request_id = request.id().clone();
|
||||
tracing::warn!(
|
||||
?request_id,
|
||||
"rejecting app-server request while TUI still uses direct core APIs"
|
||||
);
|
||||
if let Err(err) = self
|
||||
.reject_app_server_request(
|
||||
app_server_client,
|
||||
request_id,
|
||||
"TUI client does not yet handle this app-server server request".to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!("{err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn reject_app_server_request(
|
||||
&self,
|
||||
app_server_client: &InProcessAppServerClient,
|
||||
request_id: codex_app_server_protocol::RequestId,
|
||||
reason: String,
|
||||
) -> std::result::Result<(), String> {
|
||||
app_server_client
|
||||
.reject_server_request(
|
||||
request_id,
|
||||
JSONRPCErrorError {
|
||||
code: -32000,
|
||||
message: reason,
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|err| format!("failed to reject app-server request: {err}"))
|
||||
}
|
||||
}
|
||||
@@ -7,10 +7,6 @@ use additional_dirs::add_dir_warning_message;
|
||||
use app::App;
|
||||
pub use app::AppExitInfo;
|
||||
pub use app::ExitReason;
|
||||
use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
|
||||
use codex_app_server_client::InProcessAppServerClient;
|
||||
use codex_app_server_client::InProcessClientStartArgs;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_cloud_requirements::cloud_requirements_loader;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::CodexAuth;
|
||||
@@ -50,15 +46,12 @@ use codex_state::log_db;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_oss::ensure_oss_provider_ready;
|
||||
use codex_utils_oss::get_default_model_for_oss_provider;
|
||||
use color_eyre::eyre::WrapErr;
|
||||
use cwd_prompt::CwdPromptAction;
|
||||
use cwd_prompt::CwdPromptOutcome;
|
||||
use cwd_prompt::CwdSelection;
|
||||
use std::fs::OpenOptions;
|
||||
use std::future::Future;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tracing::error;
|
||||
use tracing_appender::non_blocking;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
@@ -246,74 +239,10 @@ pub use public_widgets::composer_input::ComposerAction;
|
||||
pub use public_widgets::composer_input::ComposerInput;
|
||||
// (tests access modules directly within the crate)
|
||||
|
||||
async fn start_embedded_app_server(
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
config: Config,
|
||||
cli_kv_overrides: Vec<(String, toml::Value)>,
|
||||
loader_overrides: LoaderOverrides,
|
||||
cloud_requirements: CloudRequirementsLoader,
|
||||
feedback: codex_feedback::CodexFeedback,
|
||||
) -> color_eyre::Result<InProcessAppServerClient> {
|
||||
start_embedded_app_server_with(
|
||||
arg0_paths,
|
||||
config,
|
||||
cli_kv_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
feedback,
|
||||
InProcessAppServerClient::start,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn start_embedded_app_server_with<F, Fut>(
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
config: Config,
|
||||
cli_kv_overrides: Vec<(String, toml::Value)>,
|
||||
loader_overrides: LoaderOverrides,
|
||||
cloud_requirements: CloudRequirementsLoader,
|
||||
feedback: codex_feedback::CodexFeedback,
|
||||
start_client: F,
|
||||
) -> color_eyre::Result<InProcessAppServerClient>
|
||||
where
|
||||
F: FnOnce(InProcessClientStartArgs) -> Fut,
|
||||
Fut: Future<Output = std::io::Result<InProcessAppServerClient>>,
|
||||
{
|
||||
let config_warnings = config
|
||||
.startup_warnings
|
||||
.iter()
|
||||
.map(|warning| ConfigWarningNotification {
|
||||
summary: warning.clone(),
|
||||
details: None,
|
||||
path: None,
|
||||
range: None,
|
||||
})
|
||||
.collect();
|
||||
let client = start_client(InProcessClientStartArgs {
|
||||
arg0_paths,
|
||||
config: Arc::new(config),
|
||||
cli_overrides: cli_kv_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
feedback,
|
||||
config_warnings,
|
||||
session_source: codex_protocol::protocol::SessionSource::Cli,
|
||||
enable_codex_api_key_env: false,
|
||||
client_name: "codex-tui".to_string(),
|
||||
client_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
experimental_api: true,
|
||||
opt_out_notification_methods: Vec::new(),
|
||||
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
|
||||
})
|
||||
.await
|
||||
.wrap_err("failed to start embedded app server")?;
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
pub async fn run_main(
|
||||
mut cli: Cli,
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
loader_overrides: LoaderOverrides,
|
||||
_loader_overrides: LoaderOverrides,
|
||||
) -> std::io::Result<AppExitInfo> {
|
||||
let (sandbox_mode, approval_policy) = if cli.full_auto {
|
||||
(
|
||||
@@ -611,8 +540,6 @@ pub async fn run_main(
|
||||
|
||||
run_ratatui_app(
|
||||
cli,
|
||||
arg0_paths,
|
||||
loader_overrides,
|
||||
config,
|
||||
overrides,
|
||||
cli_kv_overrides,
|
||||
@@ -626,8 +553,6 @@ pub async fn run_main(
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn run_ratatui_app(
|
||||
cli: Cli,
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
loader_overrides: LoaderOverrides,
|
||||
initial_config: Config,
|
||||
overrides: ConfigOverrides,
|
||||
cli_kv_overrides: Vec<(String, toml::Value)>,
|
||||
@@ -1025,27 +950,10 @@ async fn run_ratatui_app(
|
||||
|
||||
let use_alt_screen = determine_alt_screen_mode(no_alt_screen, config.tui_alternate_screen);
|
||||
tui.set_alt_screen_enabled(use_alt_screen);
|
||||
let app_server = match start_embedded_app_server(
|
||||
arg0_paths,
|
||||
config.clone(),
|
||||
cli_kv_overrides.clone(),
|
||||
loader_overrides,
|
||||
cloud_requirements.clone(),
|
||||
feedback.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(app_server) => app_server,
|
||||
Err(err) => {
|
||||
restore();
|
||||
session_log::log_session_end();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
let app_result = App::run(
|
||||
&mut tui,
|
||||
app_server,
|
||||
auth_manager,
|
||||
config,
|
||||
cli_kv_overrides.clone(),
|
||||
overrides.clone(),
|
||||
@@ -1328,20 +1236,6 @@ mod tests {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn start_test_embedded_app_server(
|
||||
config: Config,
|
||||
) -> color_eyre::Result<InProcessAppServerClient> {
|
||||
start_embedded_app_server(
|
||||
Arg0DispatchPaths::default(),
|
||||
config,
|
||||
Vec::new(),
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
codex_feedback::CodexFeedback::new(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn windows_shows_trust_prompt_without_sandbox() -> std::io::Result<()> {
|
||||
@@ -1358,51 +1252,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn embedded_app_server_exposes_client_manager_accessors() -> color_eyre::Result<()> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let config = build_config(&temp_dir).await?;
|
||||
let app_server = start_test_embedded_app_server(config).await?;
|
||||
|
||||
assert!(Arc::ptr_eq(
|
||||
&app_server.auth_manager(),
|
||||
&app_server.auth_manager()
|
||||
));
|
||||
assert!(Arc::ptr_eq(
|
||||
&app_server.thread_manager(),
|
||||
&app_server.thread_manager()
|
||||
));
|
||||
|
||||
app_server.shutdown().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn embedded_app_server_start_failure_is_returned() -> color_eyre::Result<()> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let config = build_config(&temp_dir).await?;
|
||||
let result = start_embedded_app_server_with(
|
||||
Arg0DispatchPaths::default(),
|
||||
config,
|
||||
Vec::new(),
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
codex_feedback::CodexFeedback::new(),
|
||||
|_args| async { Err(std::io::Error::other("boom")) },
|
||||
)
|
||||
.await;
|
||||
let err = match result {
|
||||
Ok(_) => panic!("startup failure should be returned"),
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("failed to start embedded app server"),
|
||||
"error should preserve the embedded app server startup context"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn windows_shows_trust_prompt_with_sandbox() -> std::io::Result<()> {
|
||||
|
||||
@@ -89,6 +89,17 @@ impl App {
|
||||
);
|
||||
}
|
||||
notification => {
|
||||
if !app_server_client.is_remote()
|
||||
&& matches!(
|
||||
notification,
|
||||
ServerNotification::TurnCompleted(_)
|
||||
| ServerNotification::ThreadRealtimeItemAdded(_)
|
||||
| ServerNotification::ThreadRealtimeOutputAudioDelta(_)
|
||||
| ServerNotification::ThreadRealtimeError(_)
|
||||
)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if let Some((thread_id, events)) =
|
||||
server_notification_thread_events(notification)
|
||||
{
|
||||
@@ -116,6 +127,9 @@ impl App {
|
||||
AppServerEvent::LegacyNotification(notification) => {
|
||||
if let Some((thread_id, event)) = legacy_thread_event(notification.params) {
|
||||
self.pending_app_server_requests.note_legacy_event(&event);
|
||||
if legacy_event_is_shadowed_by_server_notification(&event.msg) {
|
||||
return;
|
||||
}
|
||||
if self.primary_thread_id.is_none()
|
||||
|| matches!(event.msg, EventMsg::SessionConfigured(_))
|
||||
&& self.primary_thread_id == Some(thread_id)
|
||||
@@ -198,6 +212,24 @@ fn legacy_thread_event(params: Option<Value>) -> Option<(ThreadId, Event)> {
|
||||
Some((thread_id, event))
|
||||
}
|
||||
|
||||
fn legacy_event_is_shadowed_by_server_notification(msg: &EventMsg) -> bool {
|
||||
matches!(
|
||||
msg,
|
||||
EventMsg::TokenCount(_)
|
||||
| EventMsg::Error(_)
|
||||
| EventMsg::ThreadNameUpdated(_)
|
||||
| EventMsg::TurnStarted(_)
|
||||
| EventMsg::ItemStarted(_)
|
||||
| EventMsg::ItemCompleted(_)
|
||||
| EventMsg::AgentMessageDelta(_)
|
||||
| EventMsg::PlanDelta(_)
|
||||
| EventMsg::AgentReasoningDelta(_)
|
||||
| EventMsg::AgentReasoningRawContentDelta(_)
|
||||
| EventMsg::RealtimeConversationStarted(_)
|
||||
| EventMsg::RealtimeConversationClosed(_)
|
||||
)
|
||||
}
|
||||
|
||||
fn server_notification_thread_events(
|
||||
notification: ServerNotification,
|
||||
) -> Option<(ThreadId, Vec<Event>)> {
|
||||
|
||||
Reference in New Issue
Block a user