From 04a8580f33d5673f1dad887948fe873b06780647 Mon Sep 17 00:00:00 2001 From: rhan-oai Date: Tue, 26 May 2026 02:01:18 -0700 Subject: [PATCH] centralize Responses retry policy (#24131) ## Why #23951 added remote compaction v2 retries, but it left the retry and WS -> HTTPS fallback behavior duplicated between normal Responses turns and compaction. This follow-up centralizes the common retry handling so future changes to fallback, retry delay, retry notifications, and retry sleep do not have to be kept in sync across both callsites. ## What changed - Added `core/src/responses_retry.rs` with a shared handler for retryable Responses stream errors. - Reused that handler from normal turn sampling and remote compaction v2. - Kept each callsite responsible for its retry budget: normal turns still use `stream_max_retries`, while compaction v2 still uses `min(stream_max_retries, 2)`. - Preserved caller-specific behavior around non-retryable errors, context-window errors, usage-limit errors, and compact-specific final failure logging. The shared handler now owns: - WS -> HTTPS fallback warning emission - retry delay selection, including server-requested stream retry delay - retry logging - first-WebSocket-retry notification suppression - `Reconnecting... n/max` stream-error notification - sleeping before the next retry attempt ## Verification - `cargo test -p codex-core remote_compact_v2` - `cargo test -p codex-core websocket_fallback` - `just fix -p codex-core` Did not run the full workspace test suite. --------- Co-authored-by: jif-oai --- codex-rs/core/src/compact_remote_v2.rs | 68 ++++------------ codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/responses_retry.rs | 105 +++++++++++++++++++++++++ codex-rs/core/src/session/turn.rs | 64 +++------------ 4 files changed, 135 insertions(+), 103 deletions(-) create mode 100644 codex-rs/core/src/responses_retry.rs diff --git a/codex-rs/core/src/compact_remote_v2.rs b/codex-rs/core/src/compact_remote_v2.rs index 0e235a941f..dcf241f990 100644 --- a/codex-rs/core/src/compact_remote_v2.rs +++ b/codex-rs/core/src/compact_remote_v2.rs @@ -16,10 +16,11 @@ use crate::hook_runtime::PostCompactHookOutcome; use crate::hook_runtime::PreCompactHookOutcome; use crate::hook_runtime::run_post_compact_hooks; use crate::hook_runtime::run_pre_compact_hooks; +use crate::responses_retry::ResponsesStreamRequest; +use crate::responses_retry::handle_retryable_response_stream_error; use crate::session::session::Session; use crate::session::turn::built_tools; use crate::session::turn_context::TurnContext; -use crate::util::backoff; use codex_analytics::CompactionImplementation; use codex_analytics::CompactionPhase; use codex_analytics::CompactionReason; @@ -35,7 +36,6 @@ use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::TruncationPolicy; use codex_protocol::protocol::TurnStartedEvent; -use codex_protocol::protocol::WarningEvent; use codex_rollout_trace::CompactionCheckpointTracePayload; use codex_rollout_trace::InferenceTraceContext; use codex_utils_output_truncation::approx_token_count; @@ -43,7 +43,6 @@ use codex_utils_output_truncation::truncate_text; use futures::StreamExt; use tokio_util::sync::CancellationToken; use tracing::info; -use tracing::warn; // Mirror the current /responses/compact retained-message default while the // server-side path remains the reference implementation. @@ -313,56 +312,21 @@ async fn run_remote_compaction_request_v2( log_remote_compaction_request_failure(sess, turn_context, prompt, &err).await; return Err(err); } - Err(err) - if retries >= max_retries - && client_session.try_switch_fallback_transport( - &turn_context.session_telemetry, - &turn_context.model_info, - ) => - { - sess.send_event( - turn_context, - EventMsg::Warning(WarningEvent { - message: format!( - "Falling back from WebSockets to HTTPS transport. {err:#}" - ), - }), - ) - .await; - retries = 0; - } - Err(err) if retries < max_retries => { - retries += 1; - let delay = match &err { - CodexErr::Stream(_, requested_delay) => { - requested_delay.unwrap_or_else(|| backoff(retries)) - } - _ => backoff(retries), - }; - warn!( - turn_id = %turn_context.sub_id, - retries, - max_retries, - compact_error = %err, - "remote compaction v2 stream failed; retrying request after delay" - ); - - let report_error = retries > 1 - || cfg!(debug_assertions) - || !sess.services.model_client.responses_websocket_enabled(); - if report_error { - sess.notify_stream_error( - turn_context, - format!("Reconnecting... {retries}/{max_retries}"), - err, - ) - .await; - } - tokio::time::sleep(delay).await; - } Err(err) => { - log_remote_compaction_request_failure(sess, turn_context, prompt, &err).await; - return Err(err); + if let Err(err) = handle_retryable_response_stream_error( + &mut retries, + max_retries, + err, + client_session, + sess, + turn_context, + ResponsesStreamRequest::RemoteCompactionV2, + ) + .await + { + log_remote_compaction_request_failure(sess, turn_context, prompt, &err).await; + return Err(err); + } } } } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index d5c1237638..aa5784d4fe 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -12,6 +12,7 @@ mod client_common; mod realtime_context; mod realtime_conversation; mod realtime_prompt; +mod responses_retry; pub(crate) mod session; pub use session::SteerInputError; mod codex_thread; diff --git a/codex-rs/core/src/responses_retry.rs b/codex-rs/core/src/responses_retry.rs new file mode 100644 index 0000000000..9a55fc8114 --- /dev/null +++ b/codex-rs/core/src/responses_retry.rs @@ -0,0 +1,105 @@ +//! Shared retry and transport fallback decisions for Responses requests. + +use std::time::Duration; + +use crate::client::ModelClientSession; +use crate::session::session::Session; +use crate::session::turn_context::TurnContext; +use crate::util::backoff; +use codex_protocol::error::CodexErr; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::WarningEvent; +use tracing::warn; + +#[derive(Debug, Clone, Copy)] +pub(crate) enum ResponsesStreamRequest { + Sampling, + RemoteCompactionV2, +} + +/// Handles a retryable stream error and returns `Ok(())` when the caller should +/// retry the request loop. +pub(crate) async fn handle_retryable_response_stream_error( + retries: &mut u64, + max_retries: u64, + err: CodexErr, + client_session: &mut ModelClientSession, + sess: &Session, + turn_context: &TurnContext, + request: ResponsesStreamRequest, +) -> Result<(), CodexErr> { + if *retries >= max_retries + && client_session.try_switch_fallback_transport( + &turn_context.session_telemetry, + &turn_context.model_info, + ) + { + sess.send_event( + turn_context, + EventMsg::Warning(WarningEvent { + message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"), + }), + ) + .await; + *retries = 0; + return Ok(()); + } + + if *retries < max_retries { + *retries += 1; + let retry_count = *retries; + let delay = match &err { + CodexErr::Stream(_, requested_delay) => { + requested_delay.unwrap_or_else(|| backoff(retry_count)) + } + _ => backoff(retry_count), + }; + log_retry(request, turn_context, &err, retry_count, max_retries, delay); + + // In release builds, hide the first websocket retry notification to reduce noisy + // transient reconnect messages. In debug builds, keep full visibility for diagnosis. + let report_error = retry_count > 1 + || cfg!(debug_assertions) + || !sess.services.model_client.responses_websocket_enabled(); + if report_error { + // Surface retry information to any UI/front-end so the user understands what is + // happening instead of staring at a seemingly frozen screen. + sess.notify_stream_error( + turn_context, + format!("Reconnecting... {retry_count}/{max_retries}"), + err, + ) + .await; + } + tokio::time::sleep(delay).await; + return Ok(()); + } + + Err(err) +} + +fn log_retry( + request: ResponsesStreamRequest, + turn_context: &TurnContext, + err: &CodexErr, + retries: u64, + max_retries: u64, + delay: Duration, +) { + match request { + ResponsesStreamRequest::Sampling => { + warn!( + "stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...", + ); + } + ResponsesStreamRequest::RemoteCompactionV2 => { + warn!( + turn_id = %turn_context.sub_id, + retries, + max_retries, + compact_error = %err, + "remote compaction v2 stream failed; retrying request after delay" + ); + } + } +} diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 5a2ed71dda..7d5433ff3e 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -35,6 +35,8 @@ use crate::mentions::collect_explicit_app_ids; use crate::mentions::collect_explicit_plugin_mentions; use crate::mentions::collect_tool_mentions_from_messages; use crate::plugins::build_plugin_injections; +use crate::responses_retry::ResponsesStreamRequest; +use crate::responses_retry::handle_retryable_response_stream_error; use crate::session::PreviousTurnSettings; use crate::session::TurnInput; use crate::session::session::Session; @@ -58,7 +60,6 @@ use crate::tools::spec_plan::search_tool_enabled; use crate::tools::spec_plan::tool_suggest_enabled; use crate::turn_diff_tracker::TurnDiffTracker; use crate::turn_timing::record_turn_ttft_metric; -use crate::util::backoff; use crate::util::error_or_panic; use codex_analytics::AppInvocation; use codex_analytics::CompactionPhase; @@ -919,6 +920,7 @@ async fn run_sampling_request( Arc::clone(&turn_diff_tracker), ) .await; + let max_retries = turn_context.provider.info().stream_max_retries(); let mut retries = 0; let mut initial_input = Some(input); loop { @@ -969,56 +971,16 @@ async fn run_sampling_request( return Err(err); } - // Use the configured provider-specific stream retry budget. - let max_retries = turn_context.provider.info().stream_max_retries(); - if retries >= max_retries - && client_session.try_switch_fallback_transport( - &turn_context.session_telemetry, - &turn_context.model_info, - ) - { - sess.send_event( - &turn_context, - EventMsg::Warning(WarningEvent { - message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"), - }), - ) - .await; - retries = 0; - continue; - } - if retries < max_retries { - retries += 1; - let delay = match &err { - CodexErr::Stream(_, requested_delay) => { - requested_delay.unwrap_or_else(|| backoff(retries)) - } - _ => backoff(retries), - }; - warn!( - "stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...", - ); - - // In release builds, hide the first websocket retry notification to reduce noisy - // transient reconnect messages. In debug builds, keep full visibility for diagnosis. - let report_error = retries > 1 - || cfg!(debug_assertions) - || !sess.services.model_client.responses_websocket_enabled(); - if report_error { - // Surface retry information to any UI/front‑end so the - // user understands what is happening instead of staring - // at a seemingly frozen screen. - sess.notify_stream_error( - &turn_context, - format!("Reconnecting... {retries}/{max_retries}"), - err, - ) - .await; - } - tokio::time::sleep(delay).await; - } else { - return Err(err); - } + handle_retryable_response_stream_error( + &mut retries, + max_retries, + err, + client_session, + &sess, + &turn_context, + ResponsesStreamRequest::Sampling, + ) + .await?; } }