mirror of
https://github.com/openai/codex.git
synced 2026-05-05 03:47:01 +00:00
Summary is a required parameter on UserTurn. Ideally we'd like the core to decide the appropriate summary level. Make the summary optional and don't send it when not needed.
243 lines
8.6 KiB
Rust
243 lines
8.6 KiB
Rust
use anyhow::Result;
|
|
use codex_core::features::Feature;
|
|
use codex_protocol::protocol::AskForApproval;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::Op;
|
|
use codex_protocol::protocol::SandboxPolicy;
|
|
use codex_protocol::user_input::UserInput;
|
|
use core_test_support::responses;
|
|
use core_test_support::responses::ev_completed;
|
|
use core_test_support::responses::ev_response_created;
|
|
use core_test_support::responses::mount_sse_once;
|
|
use core_test_support::responses::mount_sse_sequence;
|
|
use core_test_support::responses::sse;
|
|
use core_test_support::skip_if_no_network;
|
|
use core_test_support::test_codex::TestCodex;
|
|
use core_test_support::test_codex::test_codex;
|
|
use pretty_assertions::assert_eq;
|
|
use tokio::time::Duration;
|
|
use tokio::time::timeout;
|
|
use wiremock::Mock;
|
|
use wiremock::ResponseTemplate;
|
|
use wiremock::http::Method;
|
|
use wiremock::matchers::method;
|
|
use wiremock::matchers::path_regex;
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn websocket_fallback_switches_to_http_on_upgrade_required_connect() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = responses::start_mock_server().await;
|
|
Mock::given(method("GET"))
|
|
.and(path_regex(".*/responses$"))
|
|
.respond_with(ResponseTemplate::new(426))
|
|
.mount(&server)
|
|
.await;
|
|
|
|
let response_mock = mount_sse_once(
|
|
&server,
|
|
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
|
)
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let base_url = format!("{}/v1", server.uri());
|
|
move |config| {
|
|
config.model_provider.base_url = Some(base_url);
|
|
config.model_provider.wire_api = codex_core::WireApi::Responses;
|
|
config.features.enable(Feature::ResponsesWebsockets);
|
|
// If we don't treat 426 specially, the sampling loop would retry the WebSocket
|
|
// handshake before switching to the HTTP transport.
|
|
config.model_provider.stream_max_retries = Some(2);
|
|
config.model_provider.request_max_retries = Some(0);
|
|
}
|
|
});
|
|
let test = builder.build(&server).await?;
|
|
|
|
test.submit_turn("hello").await?;
|
|
|
|
let requests = server.received_requests().await.unwrap_or_default();
|
|
let websocket_attempts = requests
|
|
.iter()
|
|
.filter(|req| req.method == Method::GET && req.url.path().ends_with("/responses"))
|
|
.count();
|
|
let http_attempts = requests
|
|
.iter()
|
|
.filter(|req| req.method == Method::POST && req.url.path().ends_with("/responses"))
|
|
.count();
|
|
|
|
// Startup prewarm now only preconnects for v1 (one websocket GET with no request body).
|
|
// The first turn then attempts websocket once, sees 426, and falls back to HTTP.
|
|
assert_eq!(websocket_attempts, 2);
|
|
assert_eq!(http_attempts, 1);
|
|
assert_eq!(response_mock.requests().len(), 1);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn websocket_fallback_switches_to_http_after_retries_exhausted() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = responses::start_mock_server().await;
|
|
let response_mock = mount_sse_once(
|
|
&server,
|
|
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
|
)
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let base_url = format!("{}/v1", server.uri());
|
|
move |config| {
|
|
config.model_provider.base_url = Some(base_url);
|
|
config.model_provider.wire_api = codex_core::WireApi::Responses;
|
|
config.features.enable(Feature::ResponsesWebsockets);
|
|
config.model_provider.stream_max_retries = Some(2);
|
|
config.model_provider.request_max_retries = Some(0);
|
|
}
|
|
});
|
|
let test = builder.build(&server).await?;
|
|
|
|
test.submit_turn("hello").await?;
|
|
|
|
let requests = server.received_requests().await.unwrap_or_default();
|
|
let websocket_attempts = requests
|
|
.iter()
|
|
.filter(|req| req.method == Method::GET && req.url.path().ends_with("/responses"))
|
|
.count();
|
|
let http_attempts = requests
|
|
.iter()
|
|
.filter(|req| req.method == Method::POST && req.url.path().ends_with("/responses"))
|
|
.count();
|
|
|
|
// Deferred request prewarm is attempted at startup.
|
|
// The first turn then makes 3 websocket stream attempts (initial try + 2 retries),
|
|
// after which fallback activates and the request is replayed over HTTP.
|
|
assert_eq!(websocket_attempts, 4);
|
|
assert_eq!(http_attempts, 1);
|
|
assert_eq!(response_mock.requests().len(), 1);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn websocket_fallback_hides_first_websocket_retry_stream_error() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = responses::start_mock_server().await;
|
|
let response_mock = mount_sse_once(
|
|
&server,
|
|
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
|
)
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let base_url = format!("{}/v1", server.uri());
|
|
move |config| {
|
|
config.model_provider.base_url = Some(base_url);
|
|
config.model_provider.wire_api = codex_core::WireApi::Responses;
|
|
config.features.enable(Feature::ResponsesWebsockets);
|
|
config.model_provider.stream_max_retries = Some(2);
|
|
config.model_provider.request_max_retries = Some(0);
|
|
}
|
|
});
|
|
let TestCodex {
|
|
codex,
|
|
session_configured,
|
|
cwd,
|
|
..
|
|
} = builder.build(&server).await?;
|
|
|
|
codex
|
|
.submit(Op::UserTurn {
|
|
items: vec![UserInput::Text {
|
|
text: "hello".into(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
final_output_json_schema: None,
|
|
cwd: cwd.path().to_path_buf(),
|
|
approval_policy: AskForApproval::Never,
|
|
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
|
model: session_configured.model.clone(),
|
|
effort: None,
|
|
summary: None,
|
|
collaboration_mode: None,
|
|
personality: None,
|
|
})
|
|
.await?;
|
|
|
|
let mut stream_error_messages = Vec::new();
|
|
loop {
|
|
let event = timeout(Duration::from_secs(10), codex.next_event())
|
|
.await
|
|
.expect("timeout waiting for event")
|
|
.expect("event stream ended unexpectedly")
|
|
.msg;
|
|
match event {
|
|
EventMsg::StreamError(e) => stream_error_messages.push(e.message),
|
|
EventMsg::TurnComplete(_) => break,
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
let expected_stream_errors = if cfg!(debug_assertions) {
|
|
vec!["Reconnecting... 1/2", "Reconnecting... 2/2"]
|
|
} else {
|
|
vec!["Reconnecting... 2/2"]
|
|
};
|
|
assert_eq!(stream_error_messages, expected_stream_errors);
|
|
assert_eq!(response_mock.requests().len(), 1);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn websocket_fallback_is_sticky_across_turns() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = responses::start_mock_server().await;
|
|
let response_mock = mount_sse_sequence(
|
|
&server,
|
|
vec![
|
|
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
|
sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]),
|
|
],
|
|
)
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config({
|
|
let base_url = format!("{}/v1", server.uri());
|
|
move |config| {
|
|
config.model_provider.base_url = Some(base_url);
|
|
config.model_provider.wire_api = codex_core::WireApi::Responses;
|
|
config.features.enable(Feature::ResponsesWebsockets);
|
|
config.model_provider.stream_max_retries = Some(2);
|
|
config.model_provider.request_max_retries = Some(0);
|
|
}
|
|
});
|
|
let test = builder.build(&server).await?;
|
|
|
|
test.submit_turn("first").await?;
|
|
test.submit_turn("second").await?;
|
|
|
|
let requests = server.received_requests().await.unwrap_or_default();
|
|
let websocket_attempts = requests
|
|
.iter()
|
|
.filter(|req| req.method == Method::GET && req.url.path().ends_with("/responses"))
|
|
.count();
|
|
let http_attempts = requests
|
|
.iter()
|
|
.filter(|req| req.method == Method::POST && req.url.path().ends_with("/responses"))
|
|
.count();
|
|
|
|
// WebSocket attempts all happen on the first turn:
|
|
// 1 deferred request prewarm attempt (startup) + 3 stream attempts
|
|
// (initial try + 2 retries) before fallback.
|
|
// Fallback is sticky, so the second turn stays on HTTP and adds no websocket attempts.
|
|
assert_eq!(websocket_attempts, 4);
|
|
assert_eq!(http_attempts, 2);
|
|
assert_eq!(response_mock.requests().len(), 2);
|
|
|
|
Ok(())
|
|
}
|