mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
Hide the first websocket retry (#11548)
Sometimes connection needs to be quickly reestablished, don't produce an error for that.
This commit is contained in:
@@ -346,7 +346,7 @@ impl ModelClient {
|
||||
///
|
||||
/// This combines provider capability and feature gating; both must be true for websocket paths
|
||||
/// to be eligible.
|
||||
fn responses_websocket_enabled(&self, model_info: &ModelInfo) -> bool {
|
||||
pub fn responses_websocket_enabled(&self, model_info: &ModelInfo) -> bool {
|
||||
self.state.provider.supports_websockets
|
||||
&& (self.state.enable_responses_websockets || model_info.prefer_websockets)
|
||||
}
|
||||
|
||||
@@ -4482,16 +4482,26 @@ async fn run_sampling_request(
|
||||
"stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...",
|
||||
);
|
||||
|
||||
// Surface retry information to any UI/front‑end so the
|
||||
// user understands what is happening instead of staring
|
||||
// at a seemingly frozen screen.
|
||||
sess.notify_stream_error(
|
||||
&turn_context,
|
||||
format!("Reconnecting... {retries}/{max_retries}"),
|
||||
err,
|
||||
)
|
||||
.await;
|
||||
// In release builds, hide the first websocket retry notification to reduce noisy
|
||||
// transient reconnect messages. In debug builds, keep full visibility for diagnosis.
|
||||
let report_error = retries > 1
|
||||
|| cfg!(debug_assertions)
|
||||
|| !sess
|
||||
.services
|
||||
.model_client
|
||||
.responses_websocket_enabled(&turn_context.model_info);
|
||||
|
||||
if report_error {
|
||||
// Surface retry information to any UI/front‑end so the
|
||||
// user understands what is happening instead of staring
|
||||
// at a seemingly frozen screen.
|
||||
sess.notify_stream_error(
|
||||
&turn_context,
|
||||
format!("Reconnecting... {retries}/{max_retries}"),
|
||||
err,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
tokio::time::sleep(delay).await;
|
||||
} else {
|
||||
return Err(err);
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
use anyhow::Result;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
@@ -7,8 +13,11 @@ use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use wiremock::Mock;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::http::Method;
|
||||
@@ -113,6 +122,77 @@ async fn websocket_fallback_switches_to_http_after_retries_exhausted() -> Result
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn websocket_fallback_hides_first_websocket_retry_stream_error() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let response_mock = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config({
|
||||
let base_url = format!("{}/v1", server.uri());
|
||||
move |config| {
|
||||
config.model_provider.base_url = Some(base_url);
|
||||
config.model_provider.wire_api = codex_core::WireApi::Responses;
|
||||
config.features.enable(Feature::ResponsesWebsockets);
|
||||
config.model_provider.stream_max_retries = Some(2);
|
||||
config.model_provider.request_max_retries = Some(0);
|
||||
}
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
session_configured,
|
||||
cwd,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_configured.model.clone(),
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut stream_error_messages = Vec::new();
|
||||
loop {
|
||||
let event = timeout(Duration::from_secs(10), codex.next_event())
|
||||
.await
|
||||
.expect("timeout waiting for event")
|
||||
.expect("event stream ended unexpectedly")
|
||||
.msg;
|
||||
match event {
|
||||
EventMsg::StreamError(e) => stream_error_messages.push(e.message),
|
||||
EventMsg::TurnComplete(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let expected_stream_errors = if cfg!(debug_assertions) {
|
||||
vec!["Reconnecting... 1/2", "Reconnecting... 2/2"]
|
||||
} else {
|
||||
vec!["Reconnecting... 2/2"]
|
||||
};
|
||||
assert_eq!(stream_error_messages, expected_stream_errors);
|
||||
assert_eq!(response_mock.requests().len(), 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn websocket_fallback_is_sticky_across_turns() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
Reference in New Issue
Block a user