Merge branch 'main' into rhan/surface-updates

This commit is contained in:
rhan-oai
2026-03-19 11:23:44 -07:00
committed by GitHub
489 changed files with 37167 additions and 8086 deletions

View File

@@ -150,7 +150,7 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()
&BTreeMap::default(),
REMOTE_AUTO_COMPACT_LIMIT,
Some(true),
"openai",
"mock_provider",
COMPACT_PROMPT,
)?;
write_chatgpt_auth(

View File

@@ -29,7 +29,11 @@ use tokio::time::timeout;
use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Error as WebSocketError;
use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::http::HeaderValue;
use tokio_tungstenite::tungstenite::http::header::ORIGIN;
pub(super) const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(5);
@@ -107,6 +111,55 @@ async fn websocket_transport_serves_health_endpoints_on_same_listener() -> Resul
Ok(())
}
#[tokio::test]
async fn websocket_transport_rejects_requests_with_origin_header() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
let client = reqwest::Client::new();
let deadline = Instant::now() + Duration::from_secs(10);
let healthz = loop {
match client
.get(format!("http://{bind_addr}/healthz"))
.header(ORIGIN.as_str(), "https://example.com")
.send()
.await
.with_context(|| format!("failed to GET http://{bind_addr}/healthz with Origin header"))
{
Ok(response) => break response,
Err(err) => {
if Instant::now() >= deadline {
bail!("failed to GET http://{bind_addr}/healthz with Origin header: {err}");
}
sleep(Duration::from_millis(50)).await;
}
}
};
assert_eq!(healthz.status(), StatusCode::FORBIDDEN);
let url = format!("ws://{bind_addr}");
let mut request = url.into_client_request()?;
request
.headers_mut()
.insert(ORIGIN, HeaderValue::from_static("https://example.com"));
match connect_async(request).await {
Err(WebSocketError::Http(response)) => {
assert_eq!(response.status(), StatusCode::FORBIDDEN);
}
Ok(_) => bail!("expected websocket handshake with Origin header to be rejected"),
Err(err) => bail!("expected HTTP rejection for Origin header, got {err}"),
}
process
.kill()
.await
.context("failed to stop websocket app-server process")?;
Ok(())
}
pub(super) async fn spawn_websocket_server(codex_home: &Path) -> Result<(Child, SocketAddr)> {
let program = codex_utils_cargo_bin::cargo_bin("codex-app-server")
.context("should find app-server binary")?;

View File

@@ -38,6 +38,7 @@ mod thread_name_websocket;
mod thread_read;
mod thread_resume;
mod thread_rollback;
mod thread_shell_command;
mod thread_start;
mod thread_status;
mod thread_unarchive;

View File

@@ -44,6 +44,12 @@ use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
@@ -86,6 +92,7 @@ async fn plugin_install_returns_invalid_request_for_missing_marketplace_file() -
codex_home.path().join("missing-marketplace.json"),
)?,
plugin_name: "missing-plugin".to_string(),
force_remote_sync: false,
})
.await?;
@@ -124,6 +131,7 @@ async fn plugin_install_returns_invalid_request_for_not_available_plugin() -> Re
.send_plugin_install_request(PluginInstallParams {
marketplace_path,
plugin_name: "sample-plugin".to_string(),
force_remote_sync: false,
})
.await?;
@@ -138,6 +146,126 @@ async fn plugin_install_returns_invalid_request_for_not_available_plugin() -> Re
Ok(())
}
#[tokio::test]
async fn plugin_install_returns_invalid_request_for_disallowed_product_plugin() -> Result<()> {
let codex_home = TempDir::new()?;
let repo_root = TempDir::new()?;
std::fs::create_dir_all(repo_root.path().join(".agents/plugins"))?;
std::fs::write(
repo_root.path().join(".agents/plugins/marketplace.json"),
r#"{
"name": "debug",
"plugins": [
{
"name": "sample-plugin",
"source": {
"source": "local",
"path": "./sample-plugin"
},
"policy": {
"products": ["CHATGPT"]
}
}
]
}"#,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp =
McpProcess::new_with_args(codex_home.path(), &["--session-source", "atlas"]).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path,
plugin_name: "sample-plugin".to_string(),
force_remote_sync: false,
})
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(err.error.message.contains("not available for install"));
Ok(())
}
#[tokio::test]
async fn plugin_install_force_remote_sync_enables_remote_plugin_before_local_install() -> Result<()>
{
let server = MockServer::start().await;
let codex_home = TempDir::new()?;
write_plugin_remote_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
None,
None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
Mock::given(method("POST"))
.and(path("/backend-api/plugins/sample-plugin@debug/enable"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(r#"{"id":"sample-plugin@debug","enabled":true}"#),
)
.expect(1)
.mount(&server)
.await;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path,
plugin_name: "sample-plugin".to_string(),
force_remote_sync: true,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
assert!(
codex_home
.path()
.join("plugins/cache/debug/sample-plugin/local/.codex-plugin/plugin.json")
.is_file()
);
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
assert!(config.contains(r#"[plugins."sample-plugin@debug"]"#));
assert!(config.contains("enabled = true"));
Ok(())
}
#[tokio::test]
async fn plugin_install_tracks_analytics_event() -> Result<()> {
let analytics_server = start_analytics_events_server().await?;
@@ -172,6 +300,7 @@ async fn plugin_install_tracks_analytics_event() -> Result<()> {
.send_plugin_install_request(PluginInstallParams {
marketplace_path,
plugin_name: "sample-plugin".to_string(),
force_remote_sync: false,
})
.await?;
let response: JSONRPCResponse = timeout(
@@ -182,21 +311,22 @@ async fn plugin_install_tracks_analytics_event() -> Result<()> {
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
let payloads = timeout(DEFAULT_TIMEOUT, async {
let payload = timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = analytics_server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if !requests.is_empty() {
break requests;
if let Some(request) = requests.iter().find(|request| {
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
}) {
break request.body.clone();
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?;
let payload: serde_json::Value =
serde_json::from_slice(&payloads[0].body).expect("analytics payload");
let payload: serde_json::Value = serde_json::from_slice(&payload).expect("analytics payload");
assert_eq!(
payload,
json!({
@@ -285,6 +415,7 @@ async fn plugin_install_returns_apps_needing_auth() -> Result<()> {
.send_plugin_install_request(PluginInstallParams {
marketplace_path,
plugin_name: "sample-plugin".to_string(),
force_remote_sync: false,
})
.await?;
@@ -367,6 +498,7 @@ async fn plugin_install_filters_disallowed_apps_needing_auth() -> Result<()> {
.send_plugin_install_request(PluginInstallParams {
marketplace_path,
plugin_name: "sample-plugin".to_string(),
force_remote_sync: false,
})
.await?;
@@ -549,6 +681,23 @@ fn write_analytics_config(codex_home: &std::path::Path, base_url: &str) -> std::
)
}
fn write_plugin_remote_sync_config(
codex_home: &std::path::Path,
base_url: &str,
) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"
chatgpt_base_url = "{base_url}"
[features]
plugins = true
"#
),
)
}
fn write_plugin_marketplace(
repo_root: &std::path::Path,
marketplace_name: &str,
@@ -557,12 +706,24 @@ fn write_plugin_marketplace(
install_policy: Option<&str>,
auth_policy: Option<&str>,
) -> std::io::Result<()> {
let install_policy = install_policy
.map(|install_policy| format!(",\n \"installPolicy\": \"{install_policy}\""))
.unwrap_or_default();
let auth_policy = auth_policy
.map(|auth_policy| format!(",\n \"authPolicy\": \"{auth_policy}\""))
.unwrap_or_default();
let policy = if install_policy.is_some() || auth_policy.is_some() {
let installation = install_policy
.map(|installation| format!("\n \"installation\": \"{installation}\""))
.unwrap_or_default();
let separator = if install_policy.is_some() && auth_policy.is_some() {
","
} else {
""
};
let authentication = auth_policy
.map(|authentication| {
format!("{separator}\n \"authentication\": \"{authentication}\"")
})
.unwrap_or_default();
format!(",\n \"policy\": {{{installation}{authentication}\n }}")
} else {
String::new()
};
std::fs::create_dir_all(repo_root.join(".git"))?;
std::fs::create_dir_all(repo_root.join(".agents/plugins"))?;
std::fs::write(
@@ -576,7 +737,7 @@ fn write_plugin_marketplace(
"source": {{
"source": "local",
"path": "{source_path}"
}}{install_policy}{auth_policy}
}}{policy}
}}
]
}}"#

View File

@@ -1,6 +1,7 @@
use std::time::Duration;
use anyhow::Result;
use anyhow::bail;
use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess;
use app_test_support::to_response;
@@ -28,12 +29,22 @@ use wiremock::matchers::path;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567";
fn write_plugins_enabled_config(codex_home: &std::path::Path) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
r#"[features]
plugins = true
"#,
)
}
#[tokio::test]
async fn plugin_list_returns_invalid_request_for_invalid_marketplace_file() -> Result<()> {
async fn plugin_list_skips_invalid_marketplace_file() -> Result<()> {
let codex_home = TempDir::new()?;
let repo_root = TempDir::new()?;
std::fs::create_dir_all(repo_root.path().join(".git"))?;
std::fs::create_dir_all(repo_root.path().join(".agents/plugins"))?;
write_plugins_enabled_config(codex_home.path())?;
std::fs::write(
repo_root.path().join(".agents/plugins/marketplace.json"),
"{not json",
@@ -57,14 +68,23 @@ async fn plugin_list_returns_invalid_request_for_invalid_marketplace_file() -> R
})
.await?;
let err = timeout(
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginListResponse = to_response(response)?;
assert_eq!(err.error.code, -32600);
assert!(err.error.message.contains("invalid marketplace file"));
assert!(
response.marketplaces.iter().all(|marketplace| {
marketplace.path
!= AbsolutePathBuf::try_from(
repo_root.path().join(".agents/plugins/marketplace.json"),
)
.expect("absolute marketplace path")
}),
"invalid marketplace should be skipped"
);
Ok(())
}
@@ -98,6 +118,7 @@ async fn plugin_list_rejects_relative_cwds() -> Result<()> {
async fn plugin_list_accepts_omitted_cwds() -> Result<()> {
let codex_home = TempDir::new()?;
std::fs::create_dir_all(codex_home.path().join(".agents/plugins"))?;
write_plugins_enabled_config(codex_home.path())?;
std::fs::write(
codex_home.path().join(".agents/plugins/marketplace.json"),
r#"{
@@ -152,6 +173,9 @@ async fn plugin_list_includes_install_and_enabled_state_from_config() -> Result<
repo_root.path().join(".agents/plugins/marketplace.json"),
r#"{
"name": "codex-curated",
"interface": {
"displayName": "ChatGPT Official"
},
"plugins": [
{
"name": "enabled-plugin",
@@ -220,6 +244,13 @@ enabled = false
.expect("expected repo marketplace entry");
assert_eq!(marketplace.name, "codex-curated");
assert_eq!(
marketplace
.interface
.as_ref()
.and_then(|interface| interface.display_name.as_deref()),
Some("ChatGPT Official")
);
assert_eq!(marketplace.plugins.len(), 3);
assert_eq!(marketplace.plugins[0].id, "enabled-plugin@codex-curated");
assert_eq!(marketplace.plugins[0].name, "enabled-plugin");
@@ -375,6 +406,7 @@ async fn plugin_list_returns_plugin_interface_with_absolute_asset_paths() -> Res
std::fs::create_dir_all(repo_root.path().join(".git"))?;
std::fs::create_dir_all(repo_root.path().join(".agents/plugins"))?;
std::fs::create_dir_all(plugin_root.join(".codex-plugin"))?;
write_plugins_enabled_config(codex_home.path())?;
std::fs::write(
repo_root.path().join(".agents/plugins/marketplace.json"),
r#"{
@@ -386,8 +418,10 @@ async fn plugin_list_returns_plugin_interface_with_absolute_asset_paths() -> Res
"source": "local",
"path": "./plugins/demo-plugin"
},
"installPolicy": "AVAILABLE",
"authPolicy": "ON_INSTALL",
"policy": {
"installation": "AVAILABLE",
"authentication": "ON_INSTALL"
},
"category": "Design"
}
]
@@ -506,6 +540,7 @@ async fn plugin_list_accepts_legacy_string_default_prompt() -> Result<()> {
std::fs::create_dir_all(repo_root.path().join(".git"))?;
std::fs::create_dir_all(repo_root.path().join(".agents/plugins"))?;
std::fs::create_dir_all(plugin_root.join(".codex-plugin"))?;
write_plugins_enabled_config(codex_home.path())?;
std::fs::write(
repo_root.path().join(".agents/plugins/marketplace.json"),
r#"{
@@ -625,6 +660,7 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
)?;
write_openai_curated_marketplace(codex_home.path(), &["linear", "gmail", "calendar"])?;
write_installed_plugin(&codex_home, "openai-curated", "linear")?;
write_installed_plugin(&codex_home, "openai-curated", "gmail")?;
write_installed_plugin(&codex_home, "openai-curated", "calendar")?;
Mock::given(method("GET"))
@@ -639,6 +675,16 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/featured"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(r#"["linear@openai-curated","calendar@openai-curated"]"#),
)
.mount(&server)
.await;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
@@ -657,6 +703,13 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
.await??;
let response: PluginListResponse = to_response(response)?;
assert_eq!(response.remote_sync_error, None);
assert_eq!(
response.featured_plugin_ids,
vec![
"linear@openai-curated".to_string(),
"calendar@openai-curated".to_string(),
]
);
let curated_marketplace = response
.marketplaces
@@ -671,14 +724,14 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
.collect::<Vec<_>>(),
vec![
("linear@openai-curated".to_string(), true, true),
("gmail@openai-curated".to_string(), true, false),
("gmail@openai-curated".to_string(), false, false),
("calendar@openai-curated".to_string(), false, false),
]
);
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
assert!(config.contains(r#"[plugins."linear@openai-curated"]"#));
assert!(config.contains(r#"[plugins."gmail@openai-curated"]"#));
assert!(!config.contains(r#"[plugins."gmail@openai-curated"]"#));
assert!(!config.contains(r#"[plugins."calendar@openai-curated"]"#));
assert!(
@@ -688,12 +741,10 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
.is_dir()
);
assert!(
codex_home
!codex_home
.path()
.join(format!(
"plugins/cache/openai-curated/gmail/{TEST_CURATED_PLUGIN_SHA}"
))
.is_dir()
.join("plugins/cache/openai-curated/gmail")
.exists()
);
assert!(
!codex_home
@@ -704,6 +755,114 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
Ok(())
}
#[tokio::test]
async fn plugin_list_fetches_featured_plugin_ids_without_chatgpt_auth() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_plugin_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?;
write_openai_curated_marketplace(codex_home.path(), &["linear", "gmail"])?;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/featured"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
.mount(&server)
.await;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_list_request(PluginListParams {
cwds: None,
force_remote_sync: false,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginListResponse = to_response(response)?;
assert_eq!(
response.featured_plugin_ids,
vec!["linear@openai-curated".to_string()]
);
assert_eq!(response.remote_sync_error, None);
Ok(())
}
#[tokio::test]
async fn plugin_list_uses_warmed_featured_plugin_ids_cache_on_first_request() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_plugin_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?;
write_openai_curated_marketplace(codex_home.path(), &["linear", "gmail"])?;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/featured"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
.expect(1)
.mount(&server)
.await;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
wait_for_featured_plugin_request_count(&server, 1).await?;
let request_id = mcp
.send_plugin_list_request(PluginListParams {
cwds: None,
force_remote_sync: false,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginListResponse = to_response(response)?;
assert_eq!(
response.featured_plugin_ids,
vec!["linear@openai-curated".to_string()]
);
assert_eq!(response.remote_sync_error, None);
Ok(())
}
async fn wait_for_featured_plugin_request_count(
server: &MockServer,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
bail!("wiremock did not record requests");
};
let featured_request_count = requests
.iter()
.filter(|request| {
request.method == "GET" && request.url.path().ends_with("/plugins/featured")
})
.count();
if featured_request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if featured_request_count > expected_count {
bail!(
"expected exactly {expected_count} /plugins/featured requests, got {featured_request_count}"
);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
fn write_installed_plugin(
codex_home: &TempDir,
marketplace_name: &str,
@@ -736,6 +895,9 @@ plugins = true
[plugins."linear@openai-curated"]
enabled = false
[plugins."gmail@openai-curated"]
enabled = false
[plugins."calendar@openai-curated"]
enabled = true
"#

View File

@@ -25,6 +25,7 @@ async fn plugin_read_returns_plugin_details_with_bundle_contents() -> Result<()>
std::fs::create_dir_all(repo_root.path().join(".agents/plugins"))?;
std::fs::create_dir_all(plugin_root.join(".codex-plugin"))?;
std::fs::create_dir_all(plugin_root.join("skills/thread-summarizer"))?;
std::fs::create_dir_all(plugin_root.join("skills/chatgpt-only"))?;
std::fs::write(
repo_root.path().join(".agents/plugins/marketplace.json"),
r#"{
@@ -36,8 +37,10 @@ async fn plugin_read_returns_plugin_details_with_bundle_contents() -> Result<()>
"source": "local",
"path": "./plugins/demo-plugin"
},
"installPolicy": "AVAILABLE",
"authPolicy": "ON_INSTALL",
"policy": {
"installation": "AVAILABLE",
"authentication": "ON_INSTALL"
},
"category": "Design"
}
]
@@ -77,6 +80,32 @@ description: Summarize email threads
---
# Thread Summarizer
"#,
)?;
std::fs::write(
plugin_root.join("skills/chatgpt-only/SKILL.md"),
r#"---
name: chatgpt-only
description: Visible only for ChatGPT
---
# ChatGPT Only
"#,
)?;
std::fs::create_dir_all(plugin_root.join("skills/thread-summarizer/agents"))?;
std::fs::write(
plugin_root.join("skills/thread-summarizer/agents/openai.yaml"),
r#"policy:
products:
- CODEX
"#,
)?;
std::fs::create_dir_all(plugin_root.join("skills/chatgpt-only/agents"))?;
std::fs::write(
plugin_root.join("skills/chatgpt-only/agents/openai.yaml"),
r#"policy:
products:
- CHATGPT
"#,
)?;
std::fs::write(
@@ -230,6 +259,7 @@ async fn plugin_read_accepts_legacy_string_default_prompt() -> Result<()> {
}
}"##,
)?;
write_plugins_enabled_config(&codex_home)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
@@ -283,6 +313,7 @@ async fn plugin_read_returns_invalid_request_when_plugin_is_missing() -> Result<
]
}"#,
)?;
write_plugins_enabled_config(&codex_home)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
@@ -334,6 +365,7 @@ async fn plugin_read_returns_invalid_request_when_plugin_manifest_is_missing() -
]
}"#,
)?;
write_plugins_enabled_config(&codex_home)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
@@ -380,3 +412,13 @@ fn write_installed_plugin(
)?;
Ok(())
}
fn write_plugins_enabled_config(codex_home: &TempDir) -> Result<()> {
std::fs::write(
codex_home.path().join("config.toml"),
r#"[features]
plugins = true
"#,
)?;
Ok(())
}

View File

@@ -16,6 +16,12 @@ use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
@@ -38,6 +44,7 @@ enabled = true
let params = PluginUninstallParams {
plugin_id: "sample-plugin@debug".to_string(),
force_remote_sync: false,
};
let request_id = mcp.send_plugin_uninstall_request(params.clone()).await?;
@@ -70,6 +77,74 @@ enabled = true
Ok(())
}
#[tokio::test]
async fn plugin_uninstall_force_remote_sync_calls_remote_uninstall_first() -> Result<()> {
let server = MockServer::start().await;
let codex_home = TempDir::new()?;
write_installed_plugin(&codex_home, "debug", "sample-plugin")?;
std::fs::write(
codex_home.path().join("config.toml"),
format!(
r#"chatgpt_base_url = "{}/backend-api/"
[features]
plugins = true
[plugins."sample-plugin@debug"]
enabled = true
"#,
server.uri()
),
)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
Mock::given(method("POST"))
.and(path("/backend-api/plugins/sample-plugin@debug/uninstall"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(r#"{"id":"sample-plugin@debug","enabled":false}"#),
)
.expect(1)
.mount(&server)
.await;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_uninstall_request(PluginUninstallParams {
plugin_id: "sample-plugin@debug".to_string(),
force_remote_sync: true,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginUninstallResponse = to_response(response)?;
assert_eq!(response, PluginUninstallResponse {});
assert!(
!codex_home
.path()
.join("plugins/cache/debug/sample-plugin")
.exists()
);
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
assert!(!config.contains(r#"[plugins."sample-plugin@debug"]"#));
Ok(())
}
#[tokio::test]
async fn plugin_uninstall_tracks_analytics_event() -> Result<()> {
let analytics_server = start_analytics_events_server().await?;
@@ -97,6 +172,7 @@ async fn plugin_uninstall_tracks_analytics_event() -> Result<()> {
let request_id = mcp
.send_plugin_uninstall_request(PluginUninstallParams {
plugin_id: "sample-plugin@debug".to_string(),
force_remote_sync: false,
})
.await?;
let response: JSONRPCResponse = timeout(
@@ -107,21 +183,22 @@ async fn plugin_uninstall_tracks_analytics_event() -> Result<()> {
let response: PluginUninstallResponse = to_response(response)?;
assert_eq!(response, PluginUninstallResponse {});
let payloads = timeout(DEFAULT_TIMEOUT, async {
let payload = timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = analytics_server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if !requests.is_empty() {
break requests;
if let Some(request) = requests.iter().find(|request| {
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
}) {
break request.body.clone();
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?;
let payload: serde_json::Value =
serde_json::from_slice(&payloads[0].body).expect("analytics payload");
let payload: serde_json::Value = serde_json::from_slice(&payload).expect("analytics payload");
assert_eq!(
payload,
json!({

View File

@@ -25,6 +25,7 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_core::features::FEATURES;
use codex_core::features::Feature;
use codex_protocol::protocol::RealtimeConversationVersion;
use core_test_support::responses::start_websocket_server;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
@@ -70,6 +71,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
"message": "upstream boom"
}),
],
vec![],
]])
.await;
@@ -114,6 +116,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
.await?;
assert_eq!(started.thread_id, thread_start.thread.id);
assert!(started.session_id.is_some());
assert_eq!(started.version, RealtimeConversationVersion::V2);
let startup_context_request = realtime_server.wait_for_request(0, 0).await;
assert_eq!(
@@ -135,6 +138,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
sample_rate: 24_000,
num_channels: 1,
samples_per_channel: Some(480),
item_id: None,
},
})
.await?;
@@ -186,12 +190,12 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
read_notification::<ThreadRealtimeClosedNotification>(&mut mcp, "thread/realtime/closed")
.await?;
assert_eq!(closed.thread_id, output_audio.thread_id);
assert_eq!(closed.reason.as_deref(), Some("transport_closed"));
assert_eq!(closed.reason.as_deref(), Some("error"));
let connections = realtime_server.connections();
assert_eq!(connections.len(), 1);
let connection = &connections[0];
assert_eq!(connection.len(), 3);
assert_eq!(connection.len(), 4);
assert_eq!(
connection[0].body_json()["type"].as_str(),
Some("session.update")
@@ -211,6 +215,10 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
.as_str()
.context("expected websocket request type")?
.to_string(),
connection[3].body_json()["type"]
.as_str()
.context("expected websocket request type")?
.to_string(),
];
request_types.sort();
assert_eq!(
@@ -218,6 +226,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
[
"conversation.item.create".to_string(),
"input_audio_buffer.append".to_string(),
"response.create".to_string(),
]
);

View File

@@ -450,6 +450,7 @@ async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is
"payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent {
message: "Still running".to_string(),
phase: None,
memory_citation: None,
}))?,
})
.to_string(),

View File

@@ -0,0 +1,439 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_shell_command_sse_response;
use app_test_support::format_with_current_shell_display;
use app_test_support::to_response;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionOutputDeltaNotification;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::CommandExecutionSource;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
use codex_app_server_protocol::ThreadShellCommandParams;
use codex_app_server_protocol::ThreadShellCommandResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::features::FEATURES;
use codex_core::features::Feature;
use pretty_assertions::assert_eq;
use std::collections::BTreeMap;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_shell_command_runs_as_standalone_turn_and_persists_history() -> Result<()> {
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let workspace = tmp.path().join("workspace");
std::fs::create_dir(&workspace)?;
let server = create_mock_responses_server_sequence(vec![]).await;
create_config_toml(
codex_home.as_path(),
&server.uri(),
"never",
&BTreeMap::default(),
)?;
let mut mcp = McpProcess::new(codex_home.as_path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
persist_extended_history: true,
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let shell_id = mcp
.send_thread_shell_command_request(ThreadShellCommandParams {
thread_id: thread.id.clone(),
command: "printf 'hello from bang\\n'".to_string(),
})
.await?;
let shell_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(shell_id)),
)
.await??;
let _: ThreadShellCommandResponse = to_response::<ThreadShellCommandResponse>(shell_resp)?;
let started = wait_for_command_execution_started(&mut mcp, None).await?;
let ThreadItem::CommandExecution {
id, source, status, ..
} = &started.item
else {
unreachable!("helper returns command execution item");
};
let command_id = id.clone();
assert_eq!(source, &CommandExecutionSource::UserShell);
assert_eq!(status, &CommandExecutionStatus::InProgress);
let delta = wait_for_command_execution_output_delta(&mut mcp, &command_id).await?;
assert_eq!(delta.delta, "hello from bang\n");
let completed = wait_for_command_execution_completed(&mut mcp, Some(&command_id)).await?;
let ThreadItem::CommandExecution {
id,
source,
status,
aggregated_output,
exit_code,
..
} = &completed.item
else {
unreachable!("helper returns command execution item");
};
assert_eq!(id, &command_id);
assert_eq!(source, &CommandExecutionSource::UserShell);
assert_eq!(status, &CommandExecutionStatus::Completed);
assert_eq!(aggregated_output.as_deref(), Some("hello from bang\n"));
assert_eq!(*exit_code, Some(0));
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: thread.id,
include_turns: true,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.turns.len(), 1);
let ThreadItem::CommandExecution {
source,
status,
aggregated_output,
..
} = thread.turns[0]
.items
.iter()
.find(|item| matches!(item, ThreadItem::CommandExecution { .. }))
.expect("expected persisted command execution item")
else {
unreachable!("matched command execution item");
};
assert_eq!(source, &CommandExecutionSource::UserShell);
assert_eq!(status, &CommandExecutionStatus::Completed);
assert_eq!(aggregated_output.as_deref(), Some("hello from bang\n"));
Ok(())
}
#[tokio::test]
async fn thread_shell_command_uses_existing_active_turn() -> Result<()> {
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let workspace = tmp.path().join("workspace");
std::fs::create_dir(&workspace)?;
let responses = vec![
create_shell_command_sse_response(
vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
],
None,
Some(5000),
"call-approve",
)?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
create_config_toml(
codex_home.as_path(),
&server.uri(),
"untrusted",
&BTreeMap::default(),
)?;
let mut mcp = McpProcess::new(codex_home.as_path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
persist_extended_history: true,
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "run python".to_string(),
text_elements: Vec::new(),
}],
cwd: Some(workspace.clone()),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
let agent_started = wait_for_command_execution_started(&mut mcp, Some("call-approve")).await?;
let ThreadItem::CommandExecution {
command, source, ..
} = &agent_started.item
else {
unreachable!("helper returns command execution item");
};
assert_eq!(source, &CommandExecutionSource::Agent);
assert_eq!(
command,
&format_with_current_shell_display("python3 -c 'print(42)'")
);
let server_req = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_request_message(),
)
.await??;
let ServerRequest::CommandExecutionRequestApproval { request_id, .. } = server_req else {
panic!("expected approval request");
};
let shell_id = mcp
.send_thread_shell_command_request(ThreadShellCommandParams {
thread_id: thread.id.clone(),
command: "printf 'active turn bang\\n'".to_string(),
})
.await?;
let shell_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(shell_id)),
)
.await??;
let _: ThreadShellCommandResponse = to_response::<ThreadShellCommandResponse>(shell_resp)?;
let started =
wait_for_command_execution_started_by_source(&mut mcp, CommandExecutionSource::UserShell)
.await?;
assert_eq!(started.turn_id, turn.id);
let command_id = match &started.item {
ThreadItem::CommandExecution { id, .. } => id.clone(),
_ => unreachable!("helper returns command execution item"),
};
let completed = wait_for_command_execution_completed(&mut mcp, Some(&command_id)).await?;
assert_eq!(completed.turn_id, turn.id);
let ThreadItem::CommandExecution {
source,
aggregated_output,
..
} = &completed.item
else {
unreachable!("helper returns command execution item");
};
assert_eq!(source, &CommandExecutionSource::UserShell);
assert_eq!(aggregated_output.as_deref(), Some("active turn bang\n"));
mcp.send_response(
request_id,
serde_json::to_value(CommandExecutionRequestApprovalResponse {
decision: CommandExecutionApprovalDecision::Decline,
})?,
)
.await?;
let _: TurnCompletedNotification = serde_json::from_value(
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??
.params
.expect("turn/completed params"),
)?;
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: thread.id,
include_turns: true,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.turns.len(), 1);
assert!(
thread.turns[0].items.iter().any(|item| {
matches!(
item,
ThreadItem::CommandExecution {
source: CommandExecutionSource::UserShell,
aggregated_output,
..
} if aggregated_output.as_deref() == Some("active turn bang\n")
)
}),
"expected active-turn shell command to be persisted on the existing turn"
);
Ok(())
}
async fn wait_for_command_execution_started(
mcp: &mut McpProcess,
expected_id: Option<&str>,
) -> Result<ItemStartedNotification> {
loop {
let notif = mcp
.read_stream_until_notification_message("item/started")
.await?;
let started: ItemStartedNotification = serde_json::from_value(
notif
.params
.ok_or_else(|| anyhow::anyhow!("missing item/started params"))?,
)?;
let ThreadItem::CommandExecution { id, .. } = &started.item else {
continue;
};
if expected_id.is_none() || expected_id == Some(id.as_str()) {
return Ok(started);
}
}
}
async fn wait_for_command_execution_started_by_source(
mcp: &mut McpProcess,
expected_source: CommandExecutionSource,
) -> Result<ItemStartedNotification> {
loop {
let started = wait_for_command_execution_started(mcp, None).await?;
let ThreadItem::CommandExecution { source, .. } = &started.item else {
continue;
};
if source == &expected_source {
return Ok(started);
}
}
}
async fn wait_for_command_execution_completed(
mcp: &mut McpProcess,
expected_id: Option<&str>,
) -> Result<ItemCompletedNotification> {
loop {
let notif = mcp
.read_stream_until_notification_message("item/completed")
.await?;
let completed: ItemCompletedNotification = serde_json::from_value(
notif
.params
.ok_or_else(|| anyhow::anyhow!("missing item/completed params"))?,
)?;
let ThreadItem::CommandExecution { id, .. } = &completed.item else {
continue;
};
if expected_id.is_none() || expected_id == Some(id.as_str()) {
return Ok(completed);
}
}
}
async fn wait_for_command_execution_output_delta(
mcp: &mut McpProcess,
item_id: &str,
) -> Result<CommandExecutionOutputDeltaNotification> {
loop {
let notif = mcp
.read_stream_until_notification_message("item/commandExecution/outputDelta")
.await?;
let delta: CommandExecutionOutputDeltaNotification = serde_json::from_value(
notif
.params
.ok_or_else(|| anyhow::anyhow!("missing output delta params"))?,
)?;
if delta.item_id == item_id {
return Ok(delta);
}
}
}
fn create_config_toml(
codex_home: &Path,
server_uri: &str,
approval_policy: &str,
feature_flags: &BTreeMap<Feature, bool>,
) -> std::io::Result<()> {
let feature_entries = feature_flags
.iter()
.map(|(feature, enabled)| {
let key = FEATURES
.iter()
.find(|spec| spec.id == *feature)
.map(|spec| spec.key)
.unwrap_or_else(|| panic!("missing feature key for {feature:?}"));
format!("{key} = {enabled}")
})
.collect::<Vec<_>>()
.join("\n");
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"
model = "mock-model"
approval_policy = "{approval_policy}"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[features]
{feature_entries}
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -13,7 +13,6 @@ use codex_app_server::INPUT_TOO_LARGE_ERROR_CODE;
use codex_app_server::INVALID_PARAMS_ERROR_CODE;
use codex_app_server_protocol::ByteRange;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::CollabAgentState;
use codex_app_server_protocol::CollabAgentStatus;
use codex_app_server_protocol::CollabAgentTool;
use codex_app_server_protocol::CollabAgentToolCallStatus;
@@ -1826,16 +1825,18 @@ async fn turn_start_emits_spawn_agent_item_with_model_metadata_v2() -> Result<()
assert_eq!(prompt, Some(CHILD_PROMPT.to_string()));
assert_eq!(model, Some(REQUESTED_MODEL.to_string()));
assert_eq!(reasoning_effort, Some(REQUESTED_REASONING_EFFORT));
assert_eq!(
agents_states,
HashMap::from([(
receiver_thread_id,
CollabAgentState {
status: CollabAgentStatus::PendingInit,
message: None,
},
)])
let agent_state = agents_states
.get(&receiver_thread_id)
.expect("spawn completion should include child agent state");
assert!(
matches!(
agent_state.status,
CollabAgentStatus::PendingInit | CollabAgentStatus::Running
),
"child agent should still be initializing or already running, got {:?}",
agent_state.status
);
assert_eq!(agent_state.message, None);
let turn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
@@ -1857,6 +1858,189 @@ async fn turn_start_emits_spawn_agent_item_with_model_metadata_v2() -> Result<()
Ok(())
}
#[tokio::test]
async fn turn_start_emits_spawn_agent_item_with_effective_role_model_metadata_v2() -> Result<()> {
skip_if_no_network!(Ok(()));
const CHILD_PROMPT: &str = "child: do work";
const PARENT_PROMPT: &str = "spawn a child and continue";
const SPAWN_CALL_ID: &str = "spawn-call-1";
const REQUESTED_MODEL: &str = "gpt-5.1";
const REQUESTED_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::Low;
const ROLE_MODEL: &str = "gpt-5.1-codex-max";
const ROLE_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::High;
let server = responses::start_mock_server().await;
let spawn_args = serde_json::to_string(&json!({
"message": CHILD_PROMPT,
"agent_type": "custom",
"model": REQUESTED_MODEL,
"reasoning_effort": REQUESTED_REASONING_EFFORT,
}))?;
let _parent_turn = responses::mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, PARENT_PROMPT),
responses::sse(vec![
responses::ev_response_created("resp-turn1-1"),
responses::ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
responses::ev_completed("resp-turn1-1"),
]),
)
.await;
let _child_turn = responses::mount_sse_once_match(
&server,
|req: &wiremock::Request| {
body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID)
},
responses::sse(vec![
responses::ev_response_created("resp-child-1"),
responses::ev_assistant_message("msg-child-1", "child done"),
responses::ev_completed("resp-child-1"),
]),
)
.await;
let _parent_follow_up = responses::mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
responses::sse(vec![
responses::ev_response_created("resp-turn1-2"),
responses::ev_assistant_message("msg-turn1-2", "parent done"),
responses::ev_completed("resp-turn1-2"),
]),
)
.await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
"never",
&BTreeMap::from([(Feature::Collab, true)]),
)?;
std::fs::write(
codex_home.path().join("custom-role.toml"),
format!("model = \"{ROLE_MODEL}\"\nmodel_reasoning_effort = \"{ROLE_REASONING_EFFORT}\"\n",),
)?;
let config_path = codex_home.path().join("config.toml");
let base_config = std::fs::read_to_string(&config_path)?;
std::fs::write(
&config_path,
format!(
r#"{base_config}
[agents.custom]
description = "Custom role"
config_file = "./custom-role.toml"
"#
),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.2-codex".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: PARENT_PROMPT.to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let turn: TurnStartResponse = to_response::<TurnStartResponse>(turn_resp)?;
let spawn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let completed_notif = mcp
.read_stream_until_notification_message("item/completed")
.await?;
let completed: ItemCompletedNotification =
serde_json::from_value(completed_notif.params.expect("item/completed params"))?;
if let ThreadItem::CollabAgentToolCall { id, .. } = &completed.item
&& id == SPAWN_CALL_ID
{
return Ok::<ThreadItem, anyhow::Error>(completed.item);
}
}
})
.await??;
let ThreadItem::CollabAgentToolCall {
id,
tool,
status,
sender_thread_id,
receiver_thread_ids,
prompt,
model,
reasoning_effort,
agents_states,
} = spawn_completed
else {
unreachable!("loop ensures we break on collab agent tool call items");
};
let receiver_thread_id = receiver_thread_ids
.first()
.cloned()
.expect("spawn completion should include child thread id");
assert_eq!(id, SPAWN_CALL_ID);
assert_eq!(tool, CollabAgentTool::SpawnAgent);
assert_eq!(status, CollabAgentToolCallStatus::Completed);
assert_eq!(sender_thread_id, thread.id);
assert_eq!(receiver_thread_ids, vec![receiver_thread_id.clone()]);
assert_eq!(prompt, Some(CHILD_PROMPT.to_string()));
assert_eq!(model, Some(ROLE_MODEL.to_string()));
assert_eq!(reasoning_effort, Some(ROLE_REASONING_EFFORT));
let agent_state = agents_states
.get(&receiver_thread_id)
.expect("spawn completion should include child agent state");
assert!(
matches!(
agent_state.status,
CollabAgentStatus::PendingInit | CollabAgentStatus::Running
),
"child agent should still be initializing or already running, got {:?}",
agent_state.status
);
assert_eq!(agent_state.message, None);
let turn_completed = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let turn_completed_notif = mcp
.read_stream_until_notification_message("turn/completed")
.await?;
let turn_completed: TurnCompletedNotification = serde_json::from_value(
turn_completed_notif.params.expect("turn/completed params"),
)?;
if turn_completed.thread_id == thread.id && turn_completed.turn.id == turn.turn.id {
return Ok::<TurnCompletedNotification, anyhow::Error>(turn_completed);
}
}
})
.await??;
assert_eq!(turn_completed.thread_id, thread.id);
Ok(())
}
#[tokio::test]
async fn turn_start_file_change_approval_accept_for_session_persists_v2() -> Result<()> {
skip_if_no_network!(Ok(()));