Files
codex/codex-rs/core/tests/suite/websocket_fallback.rs
Eric Traut 1a25d8b6e5 [3 of 7] Remove UserTurn (#23075)
**Stack position:** [3 of 7]

## Summary

This PR finishes the input-op consolidation by moving the remaining
`Op::UserTurn` callers onto `Op::UserInput` and deleting `Op::UserTurn`.
This touches a lot of files, but it is a low-risk mechanical migration.

## Stack

1. [1 of 7] [Add thread settings to
UserInput](https://github.com/openai/codex/pull/23080)
2. [2 of 7] [Remove
UserInputWithTurnContext](https://github.com/openai/codex/pull/23081)
3. [3 of 7] [Remove
UserTurn](https://github.com/openai/codex/pull/23075) (this PR)
4. [4 of 7] [Placeholder for OverrideTurnContext
cleanup](https://github.com/openai/codex/pull/23087)
5. [5 of 7] [Replace OverrideTurnContext with
ThreadSettings](https://github.com/openai/codex/pull/22508)
6. [6 of 7] [Add app-server thread settings
API](https://github.com/openai/codex/pull/22509)
7. [7 of 7] [Sync TUI thread
settings](https://github.com/openai/codex/pull/22510)
2026-05-18 19:56:00 -07:00

255 lines
9.3 KiB
Rust

use anyhow::Result;
use codex_model_provider_info::WireApi;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::test_codex::turn_permission_fields;
use pretty_assertions::assert_eq;
use tokio::time::Duration;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::ResponseTemplate;
use wiremock::http::Method;
use wiremock::matchers::method;
use wiremock::matchers::path_regex;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn websocket_fallback_switches_to_http_on_upgrade_required_connect() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
Mock::given(method("GET"))
.and(path_regex(".*/responses$"))
.respond_with(ResponseTemplate::new(426))
.mount(&server)
.await;
let response_mock = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let mut builder = test_codex().with_config({
let base_url = format!("{}/v1", server.uri());
move |config| {
config.model_provider.base_url = Some(base_url);
config.model_provider.wire_api = WireApi::Responses;
config.model_provider.supports_websockets = true;
// If we don't treat 426 specially, the sampling loop would retry the WebSocket
// handshake before switching to the HTTP transport.
config.model_provider.stream_max_retries = Some(2);
config.model_provider.request_max_retries = Some(0);
}
});
let test = builder.build(&server).await?;
test.submit_turn("hello").await?;
let requests = server.received_requests().await.unwrap_or_default();
let websocket_attempts = requests
.iter()
.filter(|req| req.method == Method::GET && req.url.path().ends_with("/responses"))
.count();
let http_attempts = requests
.iter()
.filter(|req| req.method == Method::POST && req.url.path().ends_with("/responses"))
.count();
// The startup prewarm request sees 426 and immediately switches the session to HTTP fallback,
// so the first turn goes straight to HTTP with no additional websocket connect attempt.
assert_eq!(websocket_attempts, 1);
assert_eq!(http_attempts, 1);
assert_eq!(response_mock.requests().len(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn websocket_fallback_switches_to_http_after_retries_exhausted() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let response_mock = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let mut builder = test_codex().with_config({
let base_url = format!("{}/v1", server.uri());
move |config| {
config.model_provider.base_url = Some(base_url);
config.model_provider.wire_api = WireApi::Responses;
config.model_provider.supports_websockets = true;
config.model_provider.stream_max_retries = Some(2);
config.model_provider.request_max_retries = Some(0);
}
});
let test = builder.build(&server).await?;
test.submit_turn("hello").await?;
let requests = server.received_requests().await.unwrap_or_default();
let websocket_attempts = requests
.iter()
.filter(|req| req.method == Method::GET && req.url.path().ends_with("/responses"))
.count();
let http_attempts = requests
.iter()
.filter(|req| req.method == Method::POST && req.url.path().ends_with("/responses"))
.count();
// Deferred request prewarm is attempted at startup.
// The first turn then makes 3 websocket stream attempts (initial try + 2 retries),
// after which fallback activates and the request is replayed over HTTP.
assert_eq!(websocket_attempts, 4);
assert_eq!(http_attempts, 1);
assert_eq!(response_mock.requests().len(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn websocket_fallback_hides_first_websocket_retry_stream_error() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let response_mock = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let mut builder = test_codex().with_config({
let base_url = format!("{}/v1", server.uri());
move |config| {
config.model_provider.base_url = Some(base_url);
config.model_provider.wire_api = WireApi::Responses;
config.model_provider.supports_websockets = true;
config.model_provider.stream_max_retries = Some(2);
config.model_provider.request_max_retries = Some(0);
}
});
let TestCodex {
codex,
session_configured,
cwd,
..
} = builder.build(&server).await?;
let (sandbox_policy, permission_profile) =
turn_permission_fields(PermissionProfile::Disabled, cwd.path());
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
environments: None,
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: codex_protocol::protocol::ThreadSettingsOverrides {
cwd: Some(cwd.path().to_path_buf()),
approval_policy: Some(AskForApproval::Never),
sandbox_policy: Some(sandbox_policy),
permission_profile,
collaboration_mode: Some(codex_protocol::config_types::CollaborationMode {
mode: codex_protocol::config_types::ModeKind::Default,
settings: codex_protocol::config_types::Settings {
model: session_configured.model.clone(),
reasoning_effort: None,
developer_instructions: None,
},
}),
..Default::default()
},
})
.await?;
let mut stream_error_messages = Vec::new();
loop {
let event = timeout(Duration::from_secs(10), codex.next_event())
.await
.expect("timeout waiting for event")
.expect("event stream ended unexpectedly")
.msg;
match event {
EventMsg::StreamError(e) => stream_error_messages.push(e.message),
EventMsg::TurnComplete(_) => break,
_ => {}
}
}
let expected_stream_errors = if cfg!(debug_assertions) {
vec!["Reconnecting... 1/2", "Reconnecting... 2/2"]
} else {
vec!["Reconnecting... 2/2"]
};
assert_eq!(stream_error_messages, expected_stream_errors);
assert_eq!(response_mock.requests().len(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn websocket_fallback_is_sticky_across_turns() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let response_mock = mount_sse_sequence(
&server,
vec![
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]),
],
)
.await;
let mut builder = test_codex().with_config({
let base_url = format!("{}/v1", server.uri());
move |config| {
config.model_provider.base_url = Some(base_url);
config.model_provider.wire_api = WireApi::Responses;
config.model_provider.supports_websockets = true;
config.model_provider.stream_max_retries = Some(2);
config.model_provider.request_max_retries = Some(0);
}
});
let test = builder.build(&server).await?;
test.submit_turn("first").await?;
test.submit_turn("second").await?;
let requests = server.received_requests().await.unwrap_or_default();
let websocket_attempts = requests
.iter()
.filter(|req| req.method == Method::GET && req.url.path().ends_with("/responses"))
.count();
let http_attempts = requests
.iter()
.filter(|req| req.method == Method::POST && req.url.path().ends_with("/responses"))
.count();
// WebSocket attempts all happen on the first turn:
// 1 deferred request prewarm attempt (startup) + 3 stream attempts
// (initial try + 2 retries) before fallback.
// Fallback is sticky, so the second turn stays on HTTP and adds no websocket attempts.
assert_eq!(websocket_attempts, 4);
assert_eq!(http_attempts, 2);
assert_eq!(response_mock.requests().len(), 2);
Ok(())
}