merge conflicts

This commit is contained in:
Roy Han
2026-03-19 12:11:05 -07:00
491 changed files with 37393 additions and 8085 deletions

View File

@@ -35,7 +35,7 @@ async fn websocket_test_codex_shell_chain() -> Result<()> {
]])
.await;
let mut builder = test_codex();
let mut builder = test_codex().with_windows_cmd_shell();
let test = builder.build_with_websocket_server(&server).await?;
test.submit_turn_with_policy(
@@ -183,7 +183,7 @@ async fn websocket_v2_test_codex_shell_chain() -> Result<()> {
]])
.await;
let mut builder = test_codex().with_config(|config| {
let mut builder = test_codex().with_windows_cmd_shell().with_config(|config| {
config
.features
.enable(Feature::ResponsesWebsocketsV2)

View File

@@ -1,6 +1,8 @@
#![allow(clippy::expect_used)]
use anyhow::Result;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_test_macros::large_stack_test;
use core_test_support::responses::ev_apply_patch_call;
use core_test_support::responses::ev_apply_patch_custom_tool_call;
@@ -740,7 +742,9 @@ async fn apply_patch_shell_command_heredoc_with_cd_updates_relative_workdir() ->
async fn apply_patch_cli_can_use_shell_command_output_as_patch_input() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = apply_patch_harness_with(|builder| builder.with_model("gpt-5.1")).await?;
let harness =
apply_patch_harness_with(|builder| builder.with_model("gpt-5.1").with_windows_cmd_shell())
.await?;
let source_contents = "line1\nnaïve café\nline3\n";
let source_path = harness.path("source.txt");
@@ -786,9 +790,21 @@ async fn apply_patch_cli_can_use_shell_command_output_as_patch_input() -> Result
match call_num {
0 => {
let command = if cfg!(windows) {
"Get-Content -Encoding utf8 source.txt"
// Encode the nested PowerShell script so `cmd.exe /c` does not leave the
// read command wrapped in quotes, and suppress progress records so the
// shell tool only returns the file contents back to apply_patch.
let script = "$ProgressPreference = 'SilentlyContinue'; [Console]::OutputEncoding = [System.Text.UTF8Encoding]::new($false); [System.IO.File]::ReadAllText('source.txt', [System.Text.UTF8Encoding]::new($false))";
let encoded = BASE64_STANDARD.encode(
script
.encode_utf16()
.flat_map(u16::to_le_bytes)
.collect::<Vec<u8>>(),
);
format!(
"powershell.exe -NoLogo -NoProfile -NonInteractive -EncodedCommand {encoded}"
)
} else {
"cat source.txt"
"cat source.txt".to_string()
};
let args = json!({
"command": command,
@@ -807,9 +823,7 @@ async fn apply_patch_cli_can_use_shell_command_output_as_patch_input() -> Result
let body_json: serde_json::Value =
request.body_json().expect("request body should be json");
let read_output = function_call_output_text(&body_json, &self.read_call_id);
eprintln!("read_output: \n{read_output}");
let stdout = stdout_from_shell_output(&read_output);
eprintln!("stdout: \n{stdout}");
let patch_lines = stdout
.lines()
.map(|line| format!("+{line}"))
@@ -819,8 +833,6 @@ async fn apply_patch_cli_can_use_shell_command_output_as_patch_input() -> Result
"*** Begin Patch\n*** Add File: target.txt\n{patch_lines}\n*** End Patch"
);
eprintln!("patch: \n{patch}");
let body = sse(vec![
ev_response_created("resp-2"),
ev_apply_patch_custom_tool_call(&self.apply_call_id, &patch),

View File

@@ -1,6 +1,7 @@
#![allow(clippy::unwrap_used, clippy::expect_used)]
use anyhow::Result;
use codex_core::CodexThread;
use codex_core::config::Constrained;
use codex_core::config_loader::ConfigLayerStack;
use codex_core::config_loader::ConfigLayerStackOrdering;
@@ -28,6 +29,7 @@ use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_once_match;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
@@ -46,9 +48,11 @@ use std::env;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
@@ -122,7 +126,11 @@ impl ActionKind {
ActionKind::WriteFile { target, content } => {
let (path, _) = target.resolve_for_patch(test);
let _ = fs::remove_file(&path);
let command = format!("printf {content:?} > {path:?} && cat {path:?}");
let path_str = path.display().to_string();
let script = format!(
"from pathlib import Path; path = Path({path_str:?}); content = {content:?}; path.write_text(content, encoding='utf-8'); print(path.read_text(encoding='utf-8'), end='')",
);
let command = format!("python3 -c {script:?}");
let event = shell_event(call_id, &command, 5_000, sandbox_permissions)?;
Ok((event, Some(command)))
}
@@ -677,6 +685,47 @@ async fn wait_for_completion(test: &TestCodex) {
.await;
}
fn body_contains(req: &Request, text: &str) -> bool {
let is_zstd = req
.headers
.get("content-encoding")
.and_then(|value| value.to_str().ok())
.is_some_and(|value| {
value
.split(',')
.any(|entry| entry.trim().eq_ignore_ascii_case("zstd"))
});
let bytes = if is_zstd {
zstd::stream::decode_all(std::io::Cursor::new(&req.body)).ok()
} else {
Some(req.body.clone())
};
bytes
.and_then(|body| String::from_utf8(body).ok())
.is_some_and(|body| body.contains(text))
}
async fn wait_for_spawned_thread(test: &TestCodex) -> Result<Arc<CodexThread>> {
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
let ids = test.thread_manager.list_thread_ids().await;
if let Some(thread_id) = ids
.iter()
.find(|id| **id != test.session_configured.session_id)
{
return test
.thread_manager
.get_thread(*thread_id)
.await
.map_err(anyhow::Error::from);
}
if tokio::time::Instant::now() >= deadline {
anyhow::bail!("timed out waiting for spawned thread");
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
fn scenarios() -> Vec<ScenarioSpec> {
use AskForApproval::*;
@@ -1611,6 +1660,9 @@ async fn run_scenario(scenario: &ScenarioSpec) -> Result<()> {
.action
.prepare(&test, &server, call_id, scenario.sandbox_permissions)
.await?;
if let Some(command) = expected_command.as_deref() {
eprintln!("approval scenario {} command: {command}", scenario.name);
}
let _ = mount_sse_once(
&server,
@@ -1692,6 +1744,10 @@ async fn run_scenario(scenario: &ScenarioSpec) -> Result<()> {
let output_item = results_mock.single_request().function_call_output(call_id);
let result = parse_result(&output_item);
eprintln!(
"approval scenario {} result: exit_code={:?} stdout={:?}",
scenario.name, result.exit_code, result.stdout
);
scenario.expectation.verify(&test, &result)?;
Ok(())
@@ -1985,6 +2041,188 @@ async fn approving_execpolicy_amendment_persists_policy_and_skips_future_prompts
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawned_subagent_execpolicy_amendment_propagates_to_parent_session() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let approval_policy = AskForApproval::UnlessTrusted;
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let sandbox_policy_for_config = sandbox_policy.clone();
let mut builder = test_codex().with_config(move |config| {
config.permissions.approval_policy = Constrained::allow_any(approval_policy);
config.permissions.sandbox_policy = Constrained::allow_any(sandbox_policy_for_config);
config
.features
.enable(Feature::Collab)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
const PARENT_PROMPT: &str = "spawn a child that repeats a command";
const CHILD_PROMPT: &str = "run the same command twice";
const SPAWN_CALL_ID: &str = "spawn-child-1";
const CHILD_CALL_ID_1: &str = "child-touch-1";
const PARENT_CALL_ID_2: &str = "parent-touch-2";
let child_file = test.cwd.path().join("subagent-allow-prefix.txt");
let _ = fs::remove_file(&child_file);
let spawn_args = serde_json::to_string(&json!({
"message": CHILD_PROMPT,
}))?;
mount_sse_once_match(
&server,
|req: &Request| body_contains(req, PARENT_PROMPT),
sse(vec![
ev_response_created("resp-parent-1"),
ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
ev_completed("resp-parent-1"),
]),
)
.await;
let child_cmd_args = serde_json::to_string(&json!({
"command": "touch subagent-allow-prefix.txt",
"timeout_ms": 1_000,
"prefix_rule": ["touch", "subagent-allow-prefix.txt"],
}))?;
mount_sse_once_match(
&server,
|req: &Request| body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID),
sse(vec![
ev_response_created("resp-child-1"),
ev_function_call(CHILD_CALL_ID_1, "shell_command", &child_cmd_args),
ev_completed("resp-child-1"),
]),
)
.await;
mount_sse_once_match(
&server,
|req: &Request| body_contains(req, CHILD_CALL_ID_1),
sse(vec![
ev_response_created("resp-child-2"),
ev_assistant_message("msg-child-2", "child done"),
ev_completed("resp-child-2"),
]),
)
.await;
mount_sse_once_match(
&server,
|req: &Request| body_contains(req, SPAWN_CALL_ID),
sse(vec![
ev_response_created("resp-parent-2"),
ev_assistant_message("msg-parent-2", "parent done"),
ev_completed("resp-parent-2"),
]),
)
.await;
let _ = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-parent-3"),
ev_function_call(PARENT_CALL_ID_2, "shell_command", &child_cmd_args),
ev_completed("resp-parent-3"),
]),
)
.await;
let _ = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-parent-4"),
ev_assistant_message("msg-parent-4", "parent rerun done"),
ev_completed("resp-parent-4"),
]),
)
.await;
submit_turn(
&test,
PARENT_PROMPT,
approval_policy,
sandbox_policy.clone(),
)
.await?;
let child = wait_for_spawned_thread(&test).await?;
let approval_event = wait_for_event_with_timeout(
&child,
|event| {
matches!(
event,
EventMsg::ExecApprovalRequest(_) | EventMsg::TurnComplete(_)
)
},
Duration::from_secs(2),
)
.await;
let EventMsg::ExecApprovalRequest(approval) = approval_event else {
panic!("expected child approval before completion");
};
let expected_execpolicy_amendment = ExecPolicyAmendment::new(vec![
"touch".to_string(),
"subagent-allow-prefix.txt".to_string(),
]);
assert_eq!(
approval.proposed_execpolicy_amendment,
Some(expected_execpolicy_amendment.clone())
);
child
.submit(Op::ExecApproval {
id: approval.effective_approval_id(),
turn_id: None,
decision: ReviewDecision::ApprovedExecpolicyAmendment {
proposed_execpolicy_amendment: expected_execpolicy_amendment,
},
})
.await?;
let child_event = wait_for_event_with_timeout(
&child,
|event| {
matches!(
event,
EventMsg::ExecApprovalRequest(_) | EventMsg::TurnComplete(_)
)
},
Duration::from_secs(2),
)
.await;
match child_event {
EventMsg::TurnComplete(_) => {}
EventMsg::ExecApprovalRequest(ev) => {
panic!("unexpected second child approval request: {:?}", ev.command)
}
other => panic!("unexpected event: {other:?}"),
}
assert!(
child_file.exists(),
"expected subagent command to create file"
);
fs::remove_file(&child_file)?;
assert!(
!child_file.exists(),
"expected child file to be removed before parent rerun"
);
submit_turn(
&test,
"parent reruns child command",
approval_policy,
sandbox_policy,
)
.await?;
wait_for_completion_without_approval(&test).await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[cfg(unix)]
async fn matched_prefix_rule_runs_unsandboxed_under_zsh_fork() -> Result<()> {

View File

@@ -392,6 +392,7 @@ async fn resume_replays_legacy_js_repl_image_rollout_shapes() {
timestamp: "2024-01-01T00:00:02.000Z".to_string(),
item: RolloutItem::ResponseItem(ResponseItem::CustomToolCallOutput {
call_id: "legacy-js-call".to_string(),
name: None,
output: FunctionCallOutputPayload::from_text("legacy js_repl stdout".to_string()),
}),
},
@@ -550,6 +551,7 @@ async fn resume_replays_image_tool_outputs_with_detail() {
timestamp: "2024-01-01T00:00:02.500Z".to_string(),
item: RolloutItem::ResponseItem(ResponseItem::CustomToolCallOutput {
call_id: custom_call_id.to_string(),
name: None,
output: FunctionCallOutputPayload::from_content_items(vec![
FunctionCallOutputContentItem::InputImage {
image_url: image_url.to_string(),
@@ -721,6 +723,7 @@ async fn chatgpt_auth_sends_correct_request() {
let mut model_provider = built_in_model_providers(/* openai_base_url */ None)["openai"].clone();
model_provider.base_url = Some(format!("{}/api/codex", server.uri()));
model_provider.supports_websockets = false;
let mut builder = test_codex()
.with_auth(create_dummy_codex_auth())
.with_config(move |config| {
@@ -795,6 +798,7 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
supports_websockets: false,
..built_in_model_providers(/* openai_base_url */ None)["openai"].clone()
};
@@ -1796,6 +1800,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(5_000),
websocket_connect_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
};
@@ -1835,7 +1840,6 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
config.model_verbosity,
false,
false,
false,
None,
);
let mut client_session = client.new_session();
@@ -1901,6 +1905,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
});
prompt.input.push(ResponseItem::CustomToolCallOutput {
call_id: "custom-tool-call-id".into(),
name: None,
output: FunctionCallOutputPayload::from_text("ok".into()),
});
@@ -1972,6 +1977,7 @@ async fn token_count_includes_rate_limits_snapshot() {
let mut provider = built_in_model_providers(/* openai_base_url */ None)["openai"].clone();
provider.base_url = Some(format!("{}/v1", server.uri()));
provider.supports_websockets = false;
let mut builder = test_codex()
.with_auth(CodexAuth::from_api_key("test"))
@@ -2398,6 +2404,7 @@ async fn azure_overrides_assign_properties_used_for_responses_url() {
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
websocket_connect_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
};
@@ -2482,6 +2489,7 @@ async fn env_var_overrides_loaded_auth() {
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
websocket_connect_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
};

View File

@@ -1,4 +1,6 @@
#![allow(clippy::expect_used, clippy::unwrap_used)]
use codex_api::WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY;
use codex_api::WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY;
use codex_core::CodexAuth;
use codex_core::ModelClient;
use codex_core::ModelClientSession;
@@ -8,9 +10,9 @@ use codex_core::ResponseEvent;
use codex_core::WireApi;
use codex_core::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
use codex_core::features::Feature;
use codex_core::ws_version_from_features;
use codex_otel::SessionTelemetry;
use codex_otel::TelemetryAuthMode;
use codex_otel::current_span_w3c_trace_context;
use codex_otel::metrics::MetricsClient;
use codex_otel::metrics::MetricsConfig;
use codex_protocol::ThreadId;
@@ -25,6 +27,7 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::user_input::UserInput;
use core_test_support::load_default_config_for_test;
use core_test_support::responses::WebSocketConnectionConfig;
@@ -36,6 +39,7 @@ use core_test_support::responses::start_websocket_server;
use core_test_support::responses::start_websocket_server_with_headers;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::tracing::install_test_tracing;
use core_test_support::wait_for_event;
use futures::StreamExt;
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
@@ -44,6 +48,7 @@ use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use tracing::Instrument;
use tracing_test::traced_test;
const MODEL: &str = "gpt-5.2-codex";
@@ -51,6 +56,32 @@ const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
const WS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
const X_CLIENT_REQUEST_ID_HEADER: &str = "x-client-request-id";
fn assert_request_trace_matches(body: &serde_json::Value, expected_trace: &W3cTraceContext) {
let client_metadata = body["client_metadata"]
.as_object()
.expect("missing client_metadata payload");
let actual_traceparent = client_metadata
.get(WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY)
.and_then(serde_json::Value::as_str)
.expect("missing traceparent");
let expected_traceparent = expected_trace
.traceparent
.as_deref()
.expect("missing expected traceparent");
assert_eq!(actual_traceparent, expected_traceparent);
assert_eq!(
client_metadata
.get(WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY)
.and_then(serde_json::Value::as_str),
expected_trace.tracestate.as_deref()
);
assert!(
body.get("trace").is_none(),
"top-level trace should not be sent"
);
}
struct WebsocketTestHarness {
_codex_home: TempDir,
client: ModelClient,
@@ -98,6 +129,134 @@ async fn responses_websocket_streams_request() {
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_streams_without_feature_flag_when_provider_supports_websockets() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![vec![
ev_response_created("resp-1"),
ev_completed("resp-1"),
]]])
.await;
let harness = websocket_harness_with_options(&server, false).await;
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
stream_until_complete(&mut client_session, &harness, &prompt).await;
assert_eq!(server.handshakes().len(), 1);
assert_eq!(server.single_connection().len(), 1);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_reuses_connection_with_per_turn_trace_payloads() {
skip_if_no_network!();
let _trace_test_context = install_test_tracing("client-websocket-test");
let server = start_websocket_server(vec![vec![
vec![ev_response_created("resp-1"), ev_completed("resp-1")],
vec![ev_response_created("resp-2"), ev_completed("resp-2")],
]])
.await;
let harness = websocket_harness(&server).await;
let prompt_one = prompt_with_input(vec![message_item("hello")]);
let prompt_two = prompt_with_input(vec![message_item("again")]);
let first_trace = {
let mut client_session = harness.client.new_session();
async {
let expected_trace =
current_span_w3c_trace_context().expect("current span should have trace context");
stream_until_complete(&mut client_session, &harness, &prompt_one).await;
expected_trace
}
.instrument(tracing::info_span!("client.websocket.turn_one"))
.await
};
let second_trace = {
let mut client_session = harness.client.new_session();
async {
let expected_trace =
current_span_w3c_trace_context().expect("current span should have trace context");
stream_until_complete(&mut client_session, &harness, &prompt_two).await;
expected_trace
}
.instrument(tracing::info_span!("client.websocket.turn_two"))
.await
};
assert_eq!(server.handshakes().len(), 1);
let connection = server.single_connection();
assert_eq!(connection.len(), 2);
let first_request = connection
.first()
.expect("missing first request")
.body_json();
let second_request = connection
.get(1)
.expect("missing second request")
.body_json();
assert_request_trace_matches(&first_request, &first_trace);
assert_request_trace_matches(&second_request, &second_trace);
let first_traceparent = first_request["client_metadata"]
[WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY]
.as_str()
.expect("missing first traceparent");
let second_traceparent = second_request["client_metadata"]
[WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY]
.as_str()
.expect("missing second traceparent");
assert_ne!(first_traceparent, second_traceparent);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_preconnect_does_not_replace_turn_trace_payload() {
skip_if_no_network!();
let _trace_test_context = install_test_tracing("client-websocket-test");
let server = start_websocket_server(vec![vec![vec![
ev_response_created("resp-1"),
ev_completed("resp-1"),
]]])
.await;
let harness = websocket_harness(&server).await;
let mut client_session = harness.client.new_session();
client_session
.preconnect_websocket(&harness.session_telemetry, &harness.model_info)
.await
.expect("websocket preconnect failed");
let prompt = prompt_with_input(vec![message_item("hello")]);
let expected_trace = async {
let expected_trace =
current_span_w3c_trace_context().expect("current span should have trace context");
stream_until_complete(&mut client_session, &harness, &prompt).await;
expected_trace
}
.instrument(tracing::info_span!("client.websocket.request"))
.await;
assert_eq!(server.handshakes().len(), 1);
let connection = server.single_connection();
assert_eq!(connection.len(), 1);
let request = connection.first().expect("missing request").body_json();
assert_request_trace_matches(&request, &expected_trace);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_preconnect_reuses_connection() {
skip_if_no_network!();
@@ -133,7 +292,7 @@ async fn responses_websocket_request_prewarm_reuses_connection() {
]])
.await;
let harness = websocket_harness_with_options(&server, false, false, true, true).await;
let harness = websocket_harness_with_options(&server, true).await;
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
client_session
@@ -252,7 +411,7 @@ async fn responses_websocket_request_prewarm_is_reused_even_with_header_changes(
]])
.await;
let harness = websocket_harness_with_options(&server, false, false, true, true).await;
let harness = websocket_harness_with_options(&server, true).await;
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
client_session
@@ -308,7 +467,7 @@ async fn responses_websocket_request_prewarm_is_reused_even_with_header_changes(
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_prewarm_uses_v2_when_model_prefers_websockets_and_feature_disabled() {
async fn responses_websocket_prewarm_uses_v2_when_provider_supports_websockets() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![vec![
@@ -317,7 +476,7 @@ async fn responses_websocket_prewarm_uses_v2_when_model_prefers_websockets_and_f
]]])
.await;
let harness = websocket_harness_with_options(&server, false, false, false, true).await;
let harness = websocket_harness_with_options(&server, false).await;
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
client_session
@@ -374,7 +533,7 @@ async fn responses_websocket_preconnect_runs_when_only_v2_feature_enabled() {
]]])
.await;
let harness = websocket_harness_with_options(&server, false, false, true, false).await;
let harness = websocket_harness_with_options(&server, true).await;
let mut client_session = harness.client.new_session();
client_session
.preconnect_websocket(&harness.session_telemetry, &harness.model_info)
@@ -404,7 +563,7 @@ async fn responses_websocket_preconnect_runs_when_only_v2_feature_enabled() {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_v2_requests_use_v2_when_model_prefers_websockets() {
async fn responses_websocket_v2_requests_use_v2_when_provider_supports_websockets() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![
@@ -417,7 +576,7 @@ async fn responses_websocket_v2_requests_use_v2_when_model_prefers_websockets()
]])
.await;
let harness = websocket_harness_with_options(&server, false, false, true, true).await;
let harness = websocket_harness_with_options(&server, true).await;
let mut client_session = harness.client.new_session();
let prompt_one = prompt_with_input(vec![message_item("hello")]);
let prompt_two = prompt_with_input(vec![
@@ -466,7 +625,7 @@ async fn responses_websocket_v2_incremental_requests_are_reused_across_turns() {
]])
.await;
let harness = websocket_harness_with_options(&server, false, false, true, true).await;
let harness = websocket_harness_with_options(&server, false).await;
let prompt_one = prompt_with_input(vec![message_item("hello")]);
let prompt_two = prompt_with_input(vec![
message_item("hello"),
@@ -510,7 +669,7 @@ async fn responses_websocket_v2_wins_when_both_features_enabled() {
]])
.await;
let harness = websocket_harness_with_options(&server, false, true, true, false).await;
let harness = websocket_harness_with_options(&server, false).await;
let mut client_session = harness.client.new_session();
let prompt_one = prompt_with_input(vec![message_item("hello")]);
let prompt_two = prompt_with_input(vec![
@@ -1502,6 +1661,13 @@ fn prompt_with_input_and_instructions(input: Vec<ResponseItem>, instructions: &s
}
fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo {
websocket_provider_with_connect_timeout(server, None)
}
fn websocket_provider_with_connect_timeout(
server: &WebSocketTestServer,
websocket_connect_timeout_ms: Option<u64>,
) -> ModelProviderInfo {
ModelProviderInfo {
name: "mock-ws".into(),
base_url: Some(format!("{}/v1", server.uri())),
@@ -1515,6 +1681,7 @@ fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo {
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(5_000),
websocket_connect_timeout_ms,
requires_openai_auth: false,
supports_websockets: true,
}
@@ -1528,53 +1695,39 @@ async fn websocket_harness_with_runtime_metrics(
server: &WebSocketTestServer,
runtime_metrics_enabled: bool,
) -> WebsocketTestHarness {
websocket_harness_with_options(server, runtime_metrics_enabled, true, false, false).await
websocket_harness_with_options(server, runtime_metrics_enabled).await
}
async fn websocket_harness_with_v2(
server: &WebSocketTestServer,
websocket_v2_enabled: bool,
runtime_metrics_enabled: bool,
) -> WebsocketTestHarness {
websocket_harness_with_options(server, false, true, websocket_v2_enabled, false).await
websocket_harness_with_options(server, runtime_metrics_enabled).await
}
async fn websocket_harness_with_options(
server: &WebSocketTestServer,
runtime_metrics_enabled: bool,
websocket_enabled: bool,
websocket_v2_enabled: bool,
prefer_websockets: bool,
) -> WebsocketTestHarness {
let provider = websocket_provider(server);
websocket_harness_with_provider_options(websocket_provider(server), runtime_metrics_enabled)
.await
}
async fn websocket_harness_with_provider_options(
provider: ModelProviderInfo,
runtime_metrics_enabled: bool,
) -> WebsocketTestHarness {
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home).await;
config.model = Some(MODEL.to_string());
if websocket_enabled {
config
.features
.enable(Feature::ResponsesWebsockets)
.expect("test config should allow feature update");
} else {
config
.features
.disable(Feature::ResponsesWebsockets)
.expect("test config should allow feature update");
}
if runtime_metrics_enabled {
config
.features
.enable(Feature::RuntimeMetrics)
.expect("test config should allow feature update");
}
if websocket_v2_enabled {
config
.features
.enable(Feature::ResponsesWebsocketsV2)
.expect("test config should allow feature update");
}
let config = Arc::new(config);
let mut model_info = codex_core::test_support::construct_model_info_offline(MODEL, &config);
model_info.prefer_websockets = prefer_websockets;
let model_info = codex_core::test_support::construct_model_info_offline(MODEL, &config);
let conversation_id = ThreadId::new();
let auth_manager =
codex_core::test_support::auth_manager_from_auth(CodexAuth::from_api_key("Test API Key"));
@@ -1605,7 +1758,6 @@ async fn websocket_harness_with_options(
provider.clone(),
SessionSource::Exec,
config.model_verbosity,
ws_version_from_features(&config),
false,
runtime_metrics_enabled,
None,

View File

@@ -1,6 +1,8 @@
#![allow(clippy::expect_used, clippy::unwrap_used)]
use anyhow::Result;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_core::config::types::McpServerConfig;
use codex_core::config::types::McpServerTransportConfig;
use codex_core::features::Feature;
@@ -305,7 +307,7 @@ async fn code_mode_only_restricts_prompt_tools() -> Result<()> {
let first_body = resp_mock.single_request().body_json();
assert_eq!(
tool_names(&first_body),
vec!["exec".to_string(), "exec_wait".to_string()]
vec!["exec".to_string(), "wait".to_string()]
);
Ok(())
@@ -361,6 +363,38 @@ text(output.output);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_update_plan_nested_tool_result_is_empty_object() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let (_test, second_mock) = run_code_mode_turn(
&server,
"use exec to run update_plan",
r#"
const result = await tools.update_plan({
plan: [{ step: "Run update_plan from code mode", status: "in_progress" }],
});
text(JSON.stringify(result));
"#,
false,
)
.await?;
let req = second_mock.single_request();
let (output, success) = custom_tool_output_body_and_success(&req, "call-1");
assert_ne!(
success,
Some(false),
"exec update_plan call failed unexpectedly: {output}"
);
let parsed: Value = serde_json::from_str(&output)?;
assert_eq!(parsed, serde_json::json!({}));
Ok(())
}
#[cfg_attr(windows, ignore = "flaky on windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_nested_tool_calls_can_run_in_parallel() -> Result<()> {
@@ -539,7 +573,47 @@ Error:\ boom\n
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_can_yield_and_resume_with_exec_wait() -> Result<()> {
async fn code_mode_exec_surfaces_handler_errors_as_exceptions() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let (_test, second_mock) = run_code_mode_turn(
&server,
"surface nested tool handler failures as script exceptions",
r#"
try {
await tools.exec_command({});
text("no-exception");
} catch (error) {
text(`caught:${error?.message ?? String(error)}`);
}
"#,
false,
)
.await?;
let request = second_mock.single_request();
let (output, success) = custom_tool_output_body_and_success(&request, "call-1");
assert_ne!(
success,
Some(false),
"script should catch the nested tool error: {output}"
);
assert!(
output.contains("caught:"),
"expected caught exception text in output: {output}"
);
assert!(
!output.contains("no-exception"),
"nested tool error should not allow success path: {output}"
);
Ok(())
}
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_can_yield_and_resume_with_wait() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
@@ -602,7 +676,7 @@ text("phase 3");
ev_response_created("resp-3"),
responses::ev_function_call(
"call-2",
"exec_wait",
"wait",
&serde_json::to_string(&serde_json::json!({
"cell_id": cell_id.clone(),
"yield_time_ms": 1_000,
@@ -646,7 +720,7 @@ text("phase 3");
ev_response_created("resp-5"),
responses::ev_function_call(
"call-3",
"exec_wait",
"wait",
&serde_json::to_string(&serde_json::json!({
"cell_id": cell_id.clone(),
"yield_time_ms": 1_000,
@@ -742,7 +816,7 @@ while (true) {}
ev_response_created("resp-3"),
responses::ev_function_call(
"call-2",
"exec_wait",
"wait",
&serde_json::to_string(&serde_json::json!({
"cell_id": cell_id.clone(),
"terminate": true,
@@ -869,7 +943,7 @@ text("session b done");
ev_response_created("resp-5"),
responses::ev_function_call(
"call-3",
"exec_wait",
"wait",
&serde_json::to_string(&serde_json::json!({
"cell_id": session_a_id.clone(),
"yield_time_ms": 1_000,
@@ -909,7 +983,7 @@ text("session b done");
ev_response_created("resp-7"),
responses::ev_function_call(
"call-4",
"exec_wait",
"wait",
&serde_json::to_string(&serde_json::json!({
"cell_id": session_b_id.clone(),
"yield_time_ms": 1_000,
@@ -947,7 +1021,7 @@ text("session b done");
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_can_terminate_and_continue() -> Result<()> {
async fn code_mode_wait_can_terminate_and_continue() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
@@ -999,7 +1073,7 @@ text("phase 2");
ev_response_created("resp-3"),
responses::ev_function_call(
"call-2",
"exec_wait",
"wait",
&serde_json::to_string(&serde_json::json!({
"cell_id": cell_id.clone(),
"terminate": true,
@@ -1073,7 +1147,7 @@ text("after terminate");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_returns_error_for_unknown_session() -> Result<()> {
async fn code_mode_wait_returns_error_for_unknown_session() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
@@ -1088,7 +1162,7 @@ async fn code_mode_exec_wait_returns_error_for_unknown_session() -> Result<()> {
ev_response_created("resp-1"),
responses::ev_function_call(
"call-1",
"exec_wait",
"wait",
&serde_json::to_string(&serde_json::json!({
"cell_id": "999999",
"yield_time_ms": 1_000,
@@ -1134,7 +1208,7 @@ async fn code_mode_exec_wait_returns_error_for_unknown_session() -> Result<()> {
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_terminate_returns_completed_session_if_it_finished_after_yield_control()
async fn code_mode_wait_terminate_returns_completed_session_if_it_finished_after_yield_control()
-> Result<()> {
skip_if_no_network!(Ok(()));
@@ -1229,7 +1303,7 @@ text("session b done");
ev_response_created("resp-5"),
responses::ev_function_call(
"call-3",
"exec_wait",
"wait",
&serde_json::to_string(&serde_json::json!({
"cell_id": session_b_id.clone(),
"yield_time_ms": 1_000,
@@ -1279,7 +1353,7 @@ text("session b done");
ev_response_created("resp-7"),
responses::ev_function_call(
"call-4",
"exec_wait",
"wait",
&serde_json::to_string(&serde_json::json!({
"cell_id": session_a_id.clone(),
"terminate": true,
@@ -1330,7 +1404,7 @@ text("session b done");
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_background_keeps_running_on_later_turn_without_exec_wait() -> Result<()> {
async fn code_mode_background_keeps_running_on_later_turn_without_wait() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
@@ -1423,7 +1497,7 @@ text("after yield");
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_uses_its_own_max_tokens_budget() -> Result<()> {
async fn code_mode_wait_uses_its_own_max_tokens_budget() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
@@ -1476,7 +1550,7 @@ text("token one token two token three token four token five token six token seve
ev_response_created("resp-3"),
responses::ev_function_call(
"call-2",
"exec_wait",
"wait",
&serde_json::to_string(&serde_json::json!({
"cell_id": cell_id.clone(),
"yield_time_ms": 1_000,
@@ -1551,6 +1625,44 @@ text({ json: true });
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_notify_injects_additional_exec_tool_output_into_active_context() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let (_test, second_mock) = run_code_mode_turn(
&server,
"use exec notify helper",
r#"
notify("code_mode_notify_marker");
await tools.test_sync_tool({});
text("done");
"#,
false,
)
.await?;
let req = second_mock.single_request();
let has_notify_output = req
.inputs_of_type("custom_tool_call_output")
.iter()
.any(|item| {
item.get("call_id").and_then(serde_json::Value::as_str) == Some("call-1")
&& item
.get("output")
.and_then(serde_json::Value::as_str)
.is_some_and(|text| text.contains("code_mode_notify_marker"))
&& item.get("name").and_then(serde_json::Value::as_str) == Some("exec")
});
assert!(
has_notify_output,
"expected notify marker in custom_tool_call_output item: {:?}",
req.input()
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exit_stops_script_immediately() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -1683,6 +1795,90 @@ image("data:image/png;base64,AAA");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_can_use_view_image_result_with_image_helper() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex()
.with_model("gpt-5.3-codex")
.with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
let _ = config.features.enable(Feature::ImageDetailOriginal);
});
let test = builder.build(&server).await?;
let image_bytes = BASE64_STANDARD.decode(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
)?;
let image_path = test.cwd_path().join("code_mode_view_image.png");
fs::write(&image_path, image_bytes)?;
let image_path_json = serde_json::to_string(&image_path.to_string_lossy().to_string())?;
let code = format!(
r#"
const out = await tools.view_image({{ path: {image_path_json}, detail: "original" }});
image(out);
"#
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call("call-1", "exec", &code),
ev_completed("resp-1"),
]),
)
.await;
let second_mock = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("use exec to call view_image and emit its image output")
.await?;
let req = second_mock.single_request();
let items = custom_tool_output_items(&req, "call-1");
let (_, success) = custom_tool_output_body_and_success(&req, "call-1");
assert_ne!(
success,
Some(false),
"code_mode view_image call failed unexpectedly"
);
assert_eq!(items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&items, 0),
);
assert_eq!(
items[1].get("type").and_then(Value::as_str),
Some("input_image")
);
let emitted_image_url = items[1]
.get("image_url")
.and_then(Value::as_str)
.expect("image helper should emit an input_image item with image_url");
assert!(emitted_image_url.starts_with("data:image/png;base64,"));
assert_eq!(
items[1].get("detail").and_then(Value::as_str),
Some("original")
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_can_apply_patch_via_nested_tool() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -1715,6 +1911,7 @@ async fn code_mode_can_apply_patch_via_nested_tool() -> Result<()> {
),
text_item(&items, 0),
);
assert_eq!(text_item(&items, 1), "{}");
let file_path = test.cwd_path().join(file_name);
assert_eq!(fs::read_to_string(&file_path)?, "hello from code_mode\n");
@@ -1957,6 +2154,7 @@ text(JSON.stringify(Object.getOwnPropertyNames(globalThis).sort()));
"isFinite",
"isNaN",
"load",
"notify",
"parseFloat",
"parseInt",
"store",
@@ -2005,7 +2203,7 @@ text(JSON.stringify(tool));
parsed,
serde_json::json!({
"name": "view_image",
"description": "View a local image from the filesystem (only use if given a full filepath by the user, and the image isn't already attached to the thread context within <image ...> tags).\n\nexec tool declaration:\n```ts\ndeclare const tools: { view_image(args: { path: string; }): Promise<unknown>; };\n```",
"description": "View a local image from the filesystem (only use if given a full filepath by the user, and the image isn't already attached to the thread context within <image ...> tags).\n\nexec tool declaration:\n```ts\ndeclare const tools: { view_image(args: { path: string; }): Promise<{ detail: string | null; image_url: string; }>; };\n```",
})
);

View File

@@ -96,6 +96,7 @@ fn non_openai_model_provider(server: &MockServer) -> ModelProviderInfo {
let mut provider = built_in_model_providers(/* openai_base_url */ None)["openai"].clone();
provider.name = "OpenAI (test)".into();
provider.base_url = Some(format!("{}/v1", server.uri()));
provider.supports_websockets = false;
provider
}

View File

@@ -4,23 +4,37 @@ use std::path::Path;
use anyhow::Context;
use anyhow::Result;
use codex_core::features::Feature;
use codex_protocol::items::parse_hook_prompt_fragment;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::start_streaming_sse_server;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::sleep;
const FIRST_CONTINUATION_PROMPT: &str = "Retry with exactly the phrase meow meow meow.";
const SECOND_CONTINUATION_PROMPT: &str = "Now tighten it to just: meow.";
const BLOCKED_PROMPT_CONTEXT: &str = "Remember the blocked lighthouse note.";
fn write_stop_hook(home: &Path, block_prompts: &[&str]) -> Result<()> {
let script_path = home.join("stop_hook.py");
@@ -69,7 +83,135 @@ else:
Ok(())
}
fn rollout_developer_texts(text: &str) -> Result<Vec<String>> {
fn write_parallel_stop_hooks(home: &Path, prompts: &[&str]) -> Result<()> {
let hook_entries = prompts
.iter()
.enumerate()
.map(|(index, prompt)| {
let script_path = home.join(format!("stop_hook_{index}.py"));
let script = format!(
r#"import json
import sys
payload = json.load(sys.stdin)
if payload["stop_hook_active"]:
print(json.dumps({{"systemMessage": "done"}}))
else:
print(json.dumps({{"decision": "block", "reason": {prompt:?}}}))
"#
);
fs::write(&script_path, script).with_context(|| {
format!(
"write stop hook script fixture at {}",
script_path.display()
)
})?;
Ok(serde_json::json!({
"type": "command",
"command": format!("python3 {}", script_path.display()),
}))
})
.collect::<Result<Vec<_>>>()?;
let hooks = serde_json::json!({
"hooks": {
"Stop": [{
"hooks": hook_entries,
}]
}
});
fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?;
Ok(())
}
fn write_user_prompt_submit_hook(
home: &Path,
blocked_prompt: &str,
additional_context: &str,
) -> Result<()> {
let script_path = home.join("user_prompt_submit_hook.py");
let log_path = home.join("user_prompt_submit_hook_log.jsonl");
let log_path = log_path.display();
let blocked_prompt_json =
serde_json::to_string(blocked_prompt).context("serialize blocked prompt for test")?;
let additional_context_json = serde_json::to_string(additional_context)
.context("serialize user prompt submit additional context for test")?;
let script = format!(
r#"import json
from pathlib import Path
import sys
payload = json.load(sys.stdin)
with Path(r"{log_path}").open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload) + "\n")
if payload.get("prompt") == {blocked_prompt_json}:
print(json.dumps({{
"decision": "block",
"reason": "blocked by hook",
"hookSpecificOutput": {{
"hookEventName": "UserPromptSubmit",
"additionalContext": {additional_context_json}
}}
}}))
"#,
);
let hooks = serde_json::json!({
"hooks": {
"UserPromptSubmit": [{
"hooks": [{
"type": "command",
"command": format!("python3 {}", script_path.display()),
"statusMessage": "running user prompt submit hook",
}]
}]
}
});
fs::write(&script_path, script).context("write user prompt submit hook script")?;
fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?;
Ok(())
}
fn write_session_start_hook_recording_transcript(home: &Path) -> Result<()> {
let script_path = home.join("session_start_hook.py");
let log_path = home.join("session_start_hook_log.jsonl");
let script = format!(
r#"import json
from pathlib import Path
import sys
payload = json.load(sys.stdin)
transcript_path = payload.get("transcript_path")
record = {{
"transcript_path": transcript_path,
"exists": Path(transcript_path).exists() if transcript_path else False,
}}
with Path(r"{log_path}").open("a", encoding="utf-8") as handle:
handle.write(json.dumps(record) + "\n")
"#,
log_path = log_path.display(),
);
let hooks = serde_json::json!({
"hooks": {
"SessionStart": [{
"hooks": [{
"type": "command",
"command": format!("python3 {}", script_path.display()),
"statusMessage": "running session start hook",
}]
}]
}
});
fs::write(&script_path, script).context("write session start hook script")?;
fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?;
Ok(())
}
fn rollout_hook_prompt_texts(text: &str) -> Result<Vec<String>> {
let mut texts = Vec::new();
for line in text.lines() {
let trimmed = line.trim();
@@ -78,11 +220,13 @@ fn rollout_developer_texts(text: &str) -> Result<Vec<String>> {
}
let rollout: RolloutLine = serde_json::from_str(trimmed).context("parse rollout line")?;
if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = rollout.item
&& role == "developer"
&& role == "user"
{
for item in content {
if let ContentItem::InputText { text } = item {
texts.push(text);
if let ContentItem::InputText { text } = item
&& let Some(fragment) = parse_hook_prompt_fragment(&text)
{
texts.push(fragment.text);
}
}
}
@@ -90,6 +234,16 @@ fn rollout_developer_texts(text: &str) -> Result<Vec<String>> {
Ok(texts)
}
fn request_hook_prompt_texts(
request: &core_test_support::responses::ResponsesRequest,
) -> Vec<String> {
request
.message_input_texts("user")
.into_iter()
.filter_map(|text| parse_hook_prompt_fragment(&text).map(|fragment| fragment.text))
.collect()
}
fn read_stop_hook_inputs(home: &Path) -> Result<Vec<serde_json::Value>> {
fs::read_to_string(home.join("stop_hook_log.jsonl"))
.context("read stop hook log")?
@@ -99,6 +253,58 @@ fn read_stop_hook_inputs(home: &Path) -> Result<Vec<serde_json::Value>> {
.collect()
}
fn read_session_start_hook_inputs(home: &Path) -> Result<Vec<serde_json::Value>> {
fs::read_to_string(home.join("session_start_hook_log.jsonl"))
.context("read session start hook log")?
.lines()
.filter(|line| !line.trim().is_empty())
.map(|line| serde_json::from_str(line).context("parse session start hook log line"))
.collect()
}
fn read_user_prompt_submit_hook_inputs(home: &Path) -> Result<Vec<serde_json::Value>> {
fs::read_to_string(home.join("user_prompt_submit_hook_log.jsonl"))
.context("read user prompt submit hook log")?
.lines()
.filter(|line| !line.trim().is_empty())
.map(|line| serde_json::from_str(line).context("parse user prompt submit hook log line"))
.collect()
}
fn ev_message_item_done(id: &str, text: &str) -> Value {
serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": id,
"content": [{"type": "output_text", "text": text}]
}
})
}
fn sse_event(event: Value) -> String {
sse(vec![event])
}
fn request_message_input_texts(body: &[u8], role: &str) -> Vec<String> {
let body: Value = match serde_json::from_slice(body) {
Ok(body) => body,
Err(error) => panic!("parse request body: {error}"),
};
body.get("input")
.and_then(Value::as_array)
.into_iter()
.flatten()
.filter(|item| item.get("type").and_then(Value::as_str) == Some("message"))
.filter(|item| item.get("role").and_then(Value::as_str) == Some(role))
.filter_map(|item| item.get("content").and_then(Value::as_array))
.flatten()
.filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text"))
.filter_map(|span| span.get("text").and_then(Value::as_str).map(str::to_owned))
.collect()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn stop_hook_can_block_multiple_times_in_same_turn() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -147,27 +353,47 @@ async fn stop_hook_can_block_multiple_times_in_same_turn() -> Result<()> {
let requests = responses.requests();
assert_eq!(requests.len(), 3);
assert!(
requests[1]
.message_input_texts("developer")
.contains(&FIRST_CONTINUATION_PROMPT.to_string()),
"second request should include the first continuation prompt",
assert_eq!(
request_hook_prompt_texts(&requests[1]),
vec![FIRST_CONTINUATION_PROMPT.to_string()],
"second request should include the first continuation prompt as user hook context",
);
assert!(
requests[2]
.message_input_texts("developer")
.contains(&FIRST_CONTINUATION_PROMPT.to_string()),
"third request should retain the first continuation prompt from history",
);
assert!(
requests[2]
.message_input_texts("developer")
.contains(&SECOND_CONTINUATION_PROMPT.to_string()),
"third request should include the second continuation prompt",
assert_eq!(
request_hook_prompt_texts(&requests[2]),
vec![
FIRST_CONTINUATION_PROMPT.to_string(),
SECOND_CONTINUATION_PROMPT.to_string(),
],
"third request should retain hook prompts in user history",
);
let hook_inputs = read_stop_hook_inputs(test.codex_home_path())?;
assert_eq!(hook_inputs.len(), 3);
let stop_turn_ids = hook_inputs
.iter()
.map(|input| {
input["turn_id"]
.as_str()
.expect("stop hook input turn_id")
.to_string()
})
.collect::<Vec<_>>();
assert!(
stop_turn_ids.iter().all(|turn_id| !turn_id.is_empty()),
"stop hook turn ids should be non-empty",
);
let first_stop_turn_id = stop_turn_ids
.first()
.expect("stop hook inputs should include a first turn id")
.clone();
assert_eq!(
stop_turn_ids,
vec![
first_stop_turn_id.clone(),
first_stop_turn_id.clone(),
first_stop_turn_id,
],
);
assert_eq!(
hook_inputs
.iter()
@@ -180,19 +406,64 @@ async fn stop_hook_can_block_multiple_times_in_same_turn() -> Result<()> {
let rollout_path = test.codex.rollout_path().expect("rollout path");
let rollout_text = fs::read_to_string(&rollout_path)?;
let developer_texts = rollout_developer_texts(&rollout_text)?;
let hook_prompt_texts = rollout_hook_prompt_texts(&rollout_text)?;
assert!(
developer_texts.contains(&FIRST_CONTINUATION_PROMPT.to_string()),
hook_prompt_texts.contains(&FIRST_CONTINUATION_PROMPT.to_string()),
"rollout should persist the first continuation prompt",
);
assert!(
developer_texts.contains(&SECOND_CONTINUATION_PROMPT.to_string()),
hook_prompt_texts.contains(&SECOND_CONTINUATION_PROMPT.to_string()),
"rollout should persist the second continuation prompt",
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn session_start_hook_sees_materialized_transcript_path() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let _response = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "hello from the reef"),
ev_completed("resp-1"),
]),
)
.await;
let mut builder = test_codex()
.with_pre_build_hook(|home| {
if let Err(error) = write_session_start_hook_recording_transcript(home) {
panic!("failed to write session start hook test fixture: {error}");
}
})
.with_config(|config| {
config
.features
.enable(Feature::CodexHooks)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
test.submit_turn("hello").await?;
let hook_inputs = read_session_start_hook_inputs(test.codex_home_path())?;
assert_eq!(hook_inputs.len(), 1);
assert_eq!(
hook_inputs[0]
.get("transcript_path")
.and_then(Value::as_str)
.map(str::is_empty),
Some(false)
);
assert_eq!(hook_inputs[0].get("exists"), Some(&Value::Bool(true)));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn resumed_thread_keeps_stop_continuation_prompt_in_history() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -260,12 +531,321 @@ async fn resumed_thread_keeps_stop_continuation_prompt_in_history() -> Result<()
resumed.submit_turn("and now continue").await?;
let resumed_request = resumed_response.single_request();
assert!(
resumed_request
.message_input_texts("developer")
.contains(&FIRST_CONTINUATION_PROMPT.to_string()),
"resumed request should keep the persisted continuation prompt in history",
assert_eq!(
request_hook_prompt_texts(&resumed_request),
vec![FIRST_CONTINUATION_PROMPT.to_string()],
"resumed request should keep the persisted continuation prompt in user history",
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multiple_blocking_stop_hooks_persist_multiple_hook_prompt_fragments() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let responses = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "draft one"),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-2", "final draft"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex()
.with_pre_build_hook(|home| {
if let Err(error) = write_parallel_stop_hooks(
home,
&[FIRST_CONTINUATION_PROMPT, SECOND_CONTINUATION_PROMPT],
) {
panic!("failed to write parallel stop hook fixtures: {error}");
}
})
.with_config(|config| {
config
.features
.enable(Feature::CodexHooks)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
test.submit_turn("hello again").await?;
let requests = responses.requests();
assert_eq!(requests.len(), 2);
assert_eq!(
request_hook_prompt_texts(&requests[1]),
vec![
FIRST_CONTINUATION_PROMPT.to_string(),
SECOND_CONTINUATION_PROMPT.to_string(),
],
"second request should receive one user hook prompt message with both fragments",
);
let rollout_path = test.codex.rollout_path().expect("rollout path");
let rollout_text = fs::read_to_string(&rollout_path)?;
assert_eq!(
rollout_hook_prompt_texts(&rollout_text)?,
vec![
FIRST_CONTINUATION_PROMPT.to_string(),
SECOND_CONTINUATION_PROMPT.to_string(),
],
"rollout should preserve both hook prompt fragments in order",
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn blocked_user_prompt_submit_persists_additional_context_for_next_turn() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let response = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "second prompt handled"),
ev_completed("resp-1"),
]),
)
.await;
let mut builder = test_codex()
.with_pre_build_hook(|home| {
if let Err(error) =
write_user_prompt_submit_hook(home, "blocked first prompt", BLOCKED_PROMPT_CONTEXT)
{
panic!("failed to write user prompt submit hook test fixture: {error}");
}
})
.with_config(|config| {
config
.features
.enable(Feature::CodexHooks)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
test.submit_turn("blocked first prompt").await?;
test.submit_turn("second prompt").await?;
let request = response.single_request();
assert!(
request
.message_input_texts("developer")
.contains(&BLOCKED_PROMPT_CONTEXT.to_string()),
"second request should include developer context persisted from the blocked prompt",
);
assert!(
request
.message_input_texts("user")
.iter()
.all(|text| !text.contains("blocked first prompt")),
"blocked prompt should not be sent to the model",
);
assert!(
request
.message_input_texts("user")
.iter()
.any(|text| text.contains("second prompt")),
"second request should include the accepted prompt",
);
let hook_inputs = read_user_prompt_submit_hook_inputs(test.codex_home_path())?;
assert_eq!(hook_inputs.len(), 2);
assert_eq!(
hook_inputs
.iter()
.map(|input| {
input["prompt"]
.as_str()
.expect("user prompt submit hook prompt")
.to_string()
})
.collect::<Vec<_>>(),
vec![
"blocked first prompt".to_string(),
"second prompt".to_string()
],
);
assert!(
hook_inputs.iter().all(|input| input["turn_id"]
.as_str()
.is_some_and(|turn_id| !turn_id.is_empty())),
"blocked and accepted prompt hooks should both receive a non-empty turn_id",
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn blocked_queued_prompt_does_not_strand_earlier_accepted_prompt() -> Result<()> {
skip_if_no_network!(Ok(()));
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
let first_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(ev_response_created("resp-1")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_message_item_added("msg-1", "")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_output_text_delta("first ")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_message_item_done("msg-1", "first response")),
},
StreamingSseChunk {
gate: Some(gate_completed_rx),
body: sse_event(ev_completed("resp-1")),
},
];
let second_chunks = vec![StreamingSseChunk {
gate: None,
body: sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-2", "accepted queued prompt handled"),
ev_completed("resp-2"),
]),
}];
let (server, _completions) =
start_streaming_sse_server(vec![first_chunks, second_chunks]).await;
let mut builder = test_codex()
.with_model("gpt-5.1")
.with_pre_build_hook(|home| {
if let Err(error) =
write_user_prompt_submit_hook(home, "blocked queued prompt", BLOCKED_PROMPT_CONTEXT)
{
panic!("failed to write user prompt submit hook test fixture: {error}");
}
})
.with_config(|config| {
config
.features
.enable(Feature::CodexHooks)
.expect("test config should allow feature update");
});
let test = builder.build_with_streaming_server(&server).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "initial prompt".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::AgentMessageContentDelta(_))
})
.await;
for text in ["accepted queued prompt", "blocked queued prompt"] {
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
}
sleep(Duration::from_millis(100)).await;
let _ = gate_completed_tx.send(());
let requests = tokio::time::timeout(Duration::from_secs(30), async {
loop {
let requests = server.requests().await;
if requests.len() >= 2 {
break requests;
}
sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("second request should arrive")
.into_iter()
.collect::<Vec<_>>();
sleep(Duration::from_millis(100)).await;
assert_eq!(requests.len(), 2);
let second_user_texts = request_message_input_texts(&requests[1], "user");
assert!(
second_user_texts.contains(&"accepted queued prompt".to_string()),
"second request should include the accepted queued prompt",
);
assert!(
!second_user_texts.contains(&"blocked queued prompt".to_string()),
"second request should not include the blocked queued prompt",
);
let hook_inputs = read_user_prompt_submit_hook_inputs(test.codex_home_path())?;
assert_eq!(hook_inputs.len(), 3);
assert_eq!(
hook_inputs
.iter()
.map(|input| {
input["prompt"]
.as_str()
.expect("queued prompt hook prompt")
.to_string()
})
.collect::<Vec<_>>(),
vec![
"initial prompt".to_string(),
"accepted queued prompt".to_string(),
"blocked queued prompt".to_string(),
],
);
let queued_turn_ids = hook_inputs
.iter()
.map(|input| {
input["turn_id"]
.as_str()
.expect("queued prompt hook turn_id")
.to_string()
})
.collect::<Vec<_>>();
assert!(
queued_turn_ids.iter().all(|turn_id| !turn_id.is_empty()),
"queued prompt hook turn ids should be non-empty",
);
let first_queued_turn_id = queued_turn_ids
.first()
.expect("queued prompt hook inputs should include a first turn id")
.clone();
assert_eq!(
queued_turn_ids,
vec![
first_queued_turn_id.clone(),
first_queued_turn_id.clone(),
first_queued_turn_id,
],
);
server.shutdown().await;
Ok(())
}

View File

@@ -53,7 +53,6 @@ fn test_model_info(
visibility: ModelVisibility::List,
supported_in_api: true,
input_modalities,
prefer_websockets: false,
used_fallback_model_metadata: false,
supports_search_tool: false,
priority: 1,
@@ -849,7 +848,6 @@ async fn model_switch_to_smaller_model_updates_token_context_window() -> Result<
visibility: ModelVisibility::List,
supported_in_api: true,
input_modalities: default_input_modalities(),
prefer_websockets: false,
used_fallback_model_metadata: false,
supports_search_tool: false,
priority: 1,

View File

@@ -351,7 +351,6 @@ fn test_remote_model(slug: &str, priority: i32) -> ModelInfo {
effective_context_window_percent: 95,
experimental_supported_tools: Vec::new(),
input_modalities: default_input_modalities(),
prefer_websockets: false,
used_fallback_model_metadata: false,
supports_search_tool: false,
}

View File

@@ -659,7 +659,6 @@ async fn remote_model_friendly_personality_instructions_with_feature() -> anyhow
effective_context_window_percent: 95,
experimental_supported_tools: Vec::new(),
input_modalities: default_input_modalities(),
prefer_websockets: false,
used_fallback_model_metadata: false,
supports_search_tool: false,
};
@@ -775,7 +774,6 @@ async fn user_turn_personality_remote_model_template_includes_update_message() -
effective_context_window_percent: 95,
experimental_supported_tools: Vec::new(),
input_modalities: default_input_modalities(),
prefer_websockets: false,
used_fallback_model_metadata: false,
supports_search_tool: false,
};

View File

@@ -13,6 +13,7 @@ use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeConversationVersion;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;
@@ -29,15 +30,17 @@ use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use serial_test::serial;
use std::ffi::OsString;
use std::fs;
use std::process::Command;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::timeout;
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
const MEMORY_PROMPT_PHRASE: &str =
"You have access to a memory folder with guidance from prior runs.";
const REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR: &str =
"CODEX_REALTIME_CONVERSATION_TEST_SUBPROCESS";
fn websocket_request_text(
request: &core_test_support::responses::WebSocketRequest,
) -> Option<String> {
@@ -81,6 +84,33 @@ where
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
fn run_realtime_conversation_test_in_subprocess(
test_name: &str,
openai_api_key: Option<&str>,
) -> Result<()> {
let mut command = Command::new(std::env::current_exe()?);
command
.arg("--exact")
.arg(test_name)
.env(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR, "1");
match openai_api_key {
Some(openai_api_key) => {
command.env(OPENAI_API_KEY_ENV_VAR, openai_api_key);
}
None => {
command.env_remove(OPENAI_API_KEY_ENV_VAR);
}
}
let output = command.output()?;
assert!(
output.status.success(),
"subprocess test `{test_name}` failed\nstdout:\n{}\nstderr:\n{}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr),
);
Ok(())
}
async fn seed_recent_thread(
test: &TestCodex,
title: &str,
@@ -159,6 +189,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
assert!(started.session_id.is_some());
assert_eq!(started.version, RealtimeConversationVersion::V1);
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
@@ -176,6 +207,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
item_id: None,
},
}))
.await?;
@@ -257,11 +289,16 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial(openai_api_key_env)]
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() {
return run_realtime_conversation_test_in_subprocess(
"suite::realtime_conversation::conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth",
Some("env-realtime-key"),
);
}
skip_if_no_network!(Ok(()));
let _env_guard = EnvGuard::set(OPENAI_API_KEY_ENV_VAR, "env-realtime-key");
let server = start_websocket_server(vec![
vec![],
vec![vec![json!({
@@ -366,34 +403,6 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> {
Ok(())
}
struct EnvGuard {
key: &'static str,
original: Option<OsString>,
}
impl EnvGuard {
fn set(key: &'static str, value: &str) -> Self {
let original = std::env::var_os(key);
// SAFETY: this guard restores the original value before the test exits.
unsafe {
std::env::set_var(key, value);
}
Self { key, original }
}
}
impl Drop for EnvGuard {
fn drop(&mut self) {
// SAFETY: this guard restores the original value for the modified env var.
unsafe {
match &self.original {
Some(value) => std::env::set_var(self.key, value),
None => std::env::remove_var(self.key),
}
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_audio_before_start_emits_error() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -409,6 +418,7 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> {
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
item_id: None,
},
}))
.await?;
@@ -425,6 +435,91 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Result<()> {
if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() {
return run_realtime_conversation_test_in_subprocess(
"suite::realtime_conversation::conversation_start_preflight_failure_emits_realtime_error_only",
/*openai_api_key*/ None,
);
}
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![]).await;
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let test = builder.build_with_websocket_server(&server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let err = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message),
}) => Some(message.clone()),
_ => None,
})
.await;
assert_eq!(err, "realtime conversation requires API key auth");
let closed = timeout(Duration::from_millis(200), async {
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
_ => None,
})
.await
})
.await;
assert!(closed.is_err(), "preflight failure should not emit closed");
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_start_connect_failure_emits_realtime_error_only() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![]).await;
let mut builder = test_codex().with_config(|config| {
config.experimental_realtime_ws_base_url = Some("http://127.0.0.1:1".to_string());
});
let test = builder.build_with_websocket_server(&server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let err = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message),
}) => Some(message.clone()),
_ => None,
})
.await;
assert!(!err.is_empty());
let closed = timeout(Duration::from_millis(200), async {
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
_ => None,
})
.await
})
.await;
assert!(closed.is_err(), "connect failure should not emit closed");
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_text_before_start_emits_error() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -518,6 +613,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
item_id: None,
},
}))
.await?;
@@ -1048,7 +1144,7 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Re
);
assert_eq!(
realtime_connections[0][1].body_json()["output_text"].as_str(),
Some("assistant says hi")
Some("\"Agent Final Message\":\n\nassistant says hi")
);
realtime_server.shutdown().await;
@@ -1153,7 +1249,7 @@ async fn conversation_handoff_persists_across_item_done_until_turn_complete() ->
);
assert_eq!(
first_append.body_json()["output_text"].as_str(),
Some("assistant message 1")
Some("\"Agent Final Message\":\n\nassistant message 1")
);
let _ = wait_for_event_match(&test.codex, |msg| match msg {
@@ -1177,7 +1273,7 @@ async fn conversation_handoff_persists_across_item_done_until_turn_complete() ->
);
assert_eq!(
second_append.body_json()["output_text"].as_str(),
Some("assistant message 2")
Some("\"Agent Final Message\":\n\nassistant message 2")
);
let completion = completions
@@ -1469,6 +1565,7 @@ async fn inbound_handoff_request_clears_active_transcript_after_each_handoff() -
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
item_id: None,
},
}))
.await?;
@@ -1699,7 +1796,7 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
);
assert_eq!(
mirrored_request_body["output_text"].as_str(),
Some("assistant says hi")
Some("\"Agent Final Message\":\n\nassistant says hi")
);
let audio_out = wait_for_event_match(&test.codex, |msg| match msg {
@@ -1954,6 +2051,7 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
item_id: None,
},
}))
.await?;

View File

@@ -289,7 +289,6 @@ async fn remote_models_remote_model_uses_unified_exec() -> Result<()> {
visibility: ModelVisibility::List,
supported_in_api: true,
input_modalities: default_input_modalities(),
prefer_websockets: false,
used_fallback_model_metadata: false,
supports_search_tool: false,
priority: 1,
@@ -533,7 +532,6 @@ async fn remote_models_apply_remote_base_instructions() -> Result<()> {
visibility: ModelVisibility::List,
supported_in_api: true,
input_modalities: default_input_modalities(),
prefer_websockets: false,
used_fallback_model_metadata: false,
supports_search_tool: false,
priority: 1,
@@ -685,6 +683,8 @@ async fn remote_models_do_not_append_removed_builtin_presets() -> Result<()> {
1,
"expected a single /models request"
);
// Keep the mock server alive until after async assertions complete.
drop(server);
Ok(())
}
@@ -1001,7 +1001,6 @@ fn test_remote_model_with_policy(
visibility,
supported_in_api: true,
input_modalities: default_input_modalities(),
prefer_websockets: false,
used_fallback_model_metadata: false,
supports_search_tool: false,
priority,

View File

@@ -419,7 +419,6 @@ async fn stdio_image_responses_are_sanitized_for_text_only_model() -> anyhow::Re
effective_context_window_percent: 95,
experimental_supported_tools: Vec::new(),
input_modalities: vec![InputModality::Text],
prefer_websockets: false,
used_fallback_model_metadata: false,
supports_search_tool: false,
}],

View File

@@ -64,7 +64,6 @@ fn test_model_info(
visibility,
supported_in_api: true,
input_modalities: default_input_modalities(),
prefer_websockets: false,
used_fallback_model_metadata: false,
supports_search_tool: false,
priority: 1,

View File

@@ -482,7 +482,7 @@ async fn tool_call_logs_include_thread_id() -> Result<()> {
if let Some(row) = rows.into_iter().find(|row| {
row.message
.as_deref()
.is_some_and(|m| m.starts_with("ToolCall:"))
.is_some_and(|m| m.contains("ToolCall:"))
}) {
let thread_id = row.thread_id;
let message = row.message;
@@ -497,7 +497,7 @@ async fn tool_call_logs_include_thread_id() -> Result<()> {
assert!(
message
.as_deref()
.is_some_and(|text| text.starts_with("ToolCall:")),
.is_some_and(|text| text.contains("ToolCall:")),
"expected ToolCall message, got {message:?}"
);

View File

@@ -76,6 +76,7 @@ async fn continue_after_stream_error() {
request_max_retries: Some(1),
stream_max_retries: Some(1),
stream_idle_timeout_ms: Some(2_000),
websocket_connect_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
};

View File

@@ -61,6 +61,7 @@ async fn retries_on_early_close() {
request_max_retries: Some(0),
stream_max_retries: Some(1),
stream_idle_timeout_ms: Some(2000),
websocket_connect_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
};

View File

@@ -1087,7 +1087,7 @@ async fn view_image_tool_errors_when_path_is_directory() -> anyhow::Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn view_image_tool_placeholder_for_non_image_files() -> anyhow::Result<()> {
async fn view_image_tool_errors_for_non_image_files() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
@@ -1150,20 +1150,19 @@ async fn view_image_tool_placeholder_for_non_image_files() -> anyhow::Result<()>
request.inputs_of_type("input_image").is_empty(),
"non-image file should not produce an input_image message"
);
let (placeholder, success) = request
let (error_text, success) = request
.function_call_output_content_and_success(call_id)
.expect("function_call_output should be present");
assert_eq!(success, None);
let placeholder = placeholder.expect("placeholder text present");
let error_text = error_text.expect("error text present");
assert!(
placeholder.contains("Codex could not read the local image at")
&& placeholder.contains("unsupported MIME type `application/json`"),
"placeholder should describe the unsupported file type: {placeholder}"
let expected_error = format!(
"unable to process image at `{}`: unsupported image `application/json`",
abs_path.display()
);
assert!(
placeholder.contains(&abs_path.display().to_string()),
"placeholder should mention path: {placeholder}"
error_text.contains(&expected_error),
"error should describe unsupported file type: {error_text}"
);
Ok(())
@@ -1270,7 +1269,6 @@ async fn view_image_tool_returns_unsupported_message_for_text_only_model() -> an
visibility: ModelVisibility::List,
supported_in_api: true,
input_modalities: vec![InputModality::Text],
prefer_websockets: false,
used_fallback_model_metadata: false,
supports_search_tool: false,
priority: 1,

View File

@@ -1,5 +1,4 @@
use anyhow::Result;
use codex_core::features::Feature;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
@@ -45,10 +44,7 @@ async fn websocket_fallback_switches_to_http_on_upgrade_required_connect() -> Re
move |config| {
config.model_provider.base_url = Some(base_url);
config.model_provider.wire_api = codex_core::WireApi::Responses;
config
.features
.enable(Feature::ResponsesWebsockets)
.expect("test config should allow feature update");
config.model_provider.supports_websockets = true;
// If we don't treat 426 specially, the sampling loop would retry the WebSocket
// handshake before switching to the HTTP transport.
config.model_provider.stream_max_retries = Some(2);
@@ -94,10 +90,7 @@ async fn websocket_fallback_switches_to_http_after_retries_exhausted() -> Result
move |config| {
config.model_provider.base_url = Some(base_url);
config.model_provider.wire_api = codex_core::WireApi::Responses;
config
.features
.enable(Feature::ResponsesWebsockets)
.expect("test config should allow feature update");
config.model_provider.supports_websockets = true;
config.model_provider.stream_max_retries = Some(2);
config.model_provider.request_max_retries = Some(0);
}
@@ -142,10 +135,7 @@ async fn websocket_fallback_hides_first_websocket_retry_stream_error() -> Result
move |config| {
config.model_provider.base_url = Some(base_url);
config.model_provider.wire_api = codex_core::WireApi::Responses;
config
.features
.enable(Feature::ResponsesWebsockets)
.expect("test config should allow feature update");
config.model_provider.supports_websockets = true;
config.model_provider.stream_max_retries = Some(2);
config.model_provider.request_max_retries = Some(0);
}
@@ -220,10 +210,7 @@ async fn websocket_fallback_is_sticky_across_turns() -> Result<()> {
move |config| {
config.model_provider.base_url = Some(base_url);
config.model_provider.wire_api = codex_core::WireApi::Responses;
config
.features
.enable(Feature::ResponsesWebsockets)
.expect("test config should allow feature update");
config.model_provider.supports_websockets = true;
config.model_provider.stream_max_retries = Some(2);
config.model_provider.request_max_retries = Some(0);
}