diff --git a/codex-rs/core/src/compact_remote_v2.rs b/codex-rs/core/src/compact_remote_v2.rs index 251d37b35c..3368a30201 100644 --- a/codex-rs/core/src/compact_remote_v2.rs +++ b/codex-rs/core/src/compact_remote_v2.rs @@ -16,10 +16,11 @@ use crate::hook_runtime::PostCompactHookOutcome; use crate::hook_runtime::PreCompactHookOutcome; use crate::hook_runtime::run_post_compact_hooks; use crate::hook_runtime::run_pre_compact_hooks; +use crate::responses_retry::ResponsesRetryDecision; +use crate::responses_retry::ResponsesRetryState; use crate::session::session::Session; use crate::session::turn::built_tools; use crate::session::turn_context::TurnContext; -use crate::util::backoff; use codex_analytics::CompactionImplementation; use codex_analytics::CompactionPhase; use codex_analytics::CompactionReason; @@ -287,7 +288,7 @@ async fn run_remote_compaction_request_v2( .info() .stream_max_retries() .min(MAX_REMOTE_COMPACTION_V2_STREAM_RETRIES); - let mut retries = 0; + let mut retry_state = ResponsesRetryState::new(max_retries); loop { let result = match client_session .stream( @@ -312,56 +313,44 @@ async fn run_remote_compaction_request_v2( log_remote_compaction_request_failure(sess, turn_context, prompt, &err).await; return Err(err); } - Err(err) - if retries >= max_retries - && client_session.try_switch_fallback_transport( - &turn_context.session_telemetry, - &turn_context.model_info, - ) => - { - sess.send_event( - turn_context, - EventMsg::Warning(WarningEvent { - message: format!( - "Falling back from WebSockets to HTTPS transport. {err:#}" - ), - }), - ) - .await; - retries = 0; - } - Err(err) if retries < max_retries => { - retries += 1; - let delay = match &err { - CodexErr::Stream(_, requested_delay) => { - requested_delay.unwrap_or_else(|| backoff(retries)) - } - _ => backoff(retries), - }; - warn!( - turn_id = %turn_context.sub_id, - retries, - max_retries, - compact_error = %err, - "remote compaction v2 stream failed; retrying request after delay" - ); - - let report_error = retries > 1 - || cfg!(debug_assertions) - || !sess.services.model_client.responses_websocket_enabled(); - if report_error { - sess.notify_stream_error( - turn_context, - format!("Reconnecting... {retries}/{max_retries}"), - err, - ) - .await; - } - tokio::time::sleep(delay).await; - } Err(err) => { - log_remote_compaction_request_failure(sess, turn_context, prompt, &err).await; - return Err(err); + match retry_state.handle_retryable_error( + &err, + client_session, + turn_context, + sess.services.model_client.responses_websocket_enabled(), + ) { + ResponsesRetryDecision::FallbackToHttp { message } => { + sess.send_event(turn_context, EventMsg::Warning(WarningEvent { message })) + .await; + } + ResponsesRetryDecision::Retry { + retries, + max_retries, + delay, + report_error, + reconnecting_message, + } => { + warn!( + turn_id = %turn_context.sub_id, + retries, + max_retries, + compact_error = %err, + "remote compaction v2 stream failed; retrying request after delay" + ); + + if report_error { + sess.notify_stream_error(turn_context, reconnecting_message, err) + .await; + } + tokio::time::sleep(delay).await; + } + ResponsesRetryDecision::Exhausted => { + log_remote_compaction_request_failure(sess, turn_context, prompt, &err) + .await; + return Err(err); + } + } } } } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index d5c1237638..aa5784d4fe 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -12,6 +12,7 @@ mod client_common; mod realtime_context; mod realtime_conversation; mod realtime_prompt; +mod responses_retry; pub(crate) mod session; pub use session::SteerInputError; mod codex_thread; diff --git a/codex-rs/core/src/responses_retry.rs b/codex-rs/core/src/responses_retry.rs new file mode 100644 index 0000000000..c7dacc0693 --- /dev/null +++ b/codex-rs/core/src/responses_retry.rs @@ -0,0 +1,89 @@ +//! Shared retry and transport fallback policy for Responses requests. + +use std::time::Duration; + +use crate::client::ModelClientSession; +use crate::session::turn_context::TurnContext; +use crate::util::backoff; +use codex_model_provider_info::ModelProviderInfo; +use codex_protocol::error::CodexErr; + +#[derive(Debug)] +pub(crate) struct ResponsesRetryState { + max_retries: u64, + retries: u64, +} + +impl ResponsesRetryState { + pub(crate) fn new(max_retries: u64) -> Self { + Self { + max_retries, + retries: 0, + } + } + + pub(crate) fn from_provider(provider_info: &ModelProviderInfo) -> Self { + Self::new(provider_info.stream_max_retries()) + } + + pub(crate) fn handle_retryable_error( + &mut self, + err: &CodexErr, + client_session: &mut ModelClientSession, + turn_context: &TurnContext, + responses_websocket_enabled: bool, + ) -> ResponsesRetryDecision { + if self.retries >= self.max_retries + && client_session.try_switch_fallback_transport( + &turn_context.session_telemetry, + &turn_context.model_info, + ) + { + self.retries = 0; + return ResponsesRetryDecision::FallbackToHttp { + message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"), + }; + } + + if self.retries < self.max_retries { + self.retries += 1; + let delay = retry_delay(err, self.retries); + let report_error = + self.retries > 1 || cfg!(debug_assertions) || !responses_websocket_enabled; + return ResponsesRetryDecision::Retry { + retries: self.retries, + max_retries: self.max_retries, + delay, + report_error, + reconnecting_message: format!( + "Reconnecting... {}/{}", + self.retries, self.max_retries + ), + }; + } + + ResponsesRetryDecision::Exhausted + } +} + +#[derive(Debug)] +pub(crate) enum ResponsesRetryDecision { + Retry { + retries: u64, + max_retries: u64, + delay: Duration, + report_error: bool, + reconnecting_message: String, + }, + FallbackToHttp { + message: String, + }, + Exhausted, +} + +fn retry_delay(err: &CodexErr, retries: u64) -> Duration { + match err { + CodexErr::Stream(_, requested_delay) => requested_delay.unwrap_or_else(|| backoff(retries)), + _ => backoff(retries), + } +} diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 26ba845460..1c82d2e84f 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -35,6 +35,8 @@ use crate::mentions::collect_explicit_app_ids; use crate::mentions::collect_explicit_plugin_mentions; use crate::mentions::collect_tool_mentions_from_messages; use crate::plugins::build_plugin_injections; +use crate::responses_retry::ResponsesRetryDecision; +use crate::responses_retry::ResponsesRetryState; use crate::session::PreviousTurnSettings; use crate::session::TurnInput; use crate::session::session::Session; @@ -58,7 +60,6 @@ use crate::tools::spec_plan::search_tool_enabled; use crate::tools::spec_plan::tool_suggest_enabled; use crate::turn_diff_tracker::TurnDiffTracker; use crate::turn_timing::record_turn_ttft_metric; -use crate::util::backoff; use crate::util::error_or_panic; use codex_analytics::AppInvocation; use codex_analytics::CompactionPhase; @@ -942,7 +943,7 @@ async fn run_sampling_request( Arc::clone(&turn_diff_tracker), ) .await; - let mut retries = 0; + let mut retry_state = ResponsesRetryState::from_provider(turn_context.provider.info()); let mut initial_input = Some(input); loop { let prompt_input = if let Some(input) = initial_input.take() { @@ -992,55 +993,41 @@ async fn run_sampling_request( return Err(err); } - // Use the configured provider-specific stream retry budget. - let max_retries = turn_context.provider.info().stream_max_retries(); - if retries >= max_retries - && client_session.try_switch_fallback_transport( - &turn_context.session_telemetry, - &turn_context.model_info, - ) - { - sess.send_event( - &turn_context, - EventMsg::Warning(WarningEvent { - message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"), - }), - ) - .await; - retries = 0; - continue; - } - if retries < max_retries { - retries += 1; - let delay = match &err { - CodexErr::Stream(_, requested_delay) => { - requested_delay.unwrap_or_else(|| backoff(retries)) - } - _ => backoff(retries), - }; - warn!( - "stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...", - ); - - // In release builds, hide the first websocket retry notification to reduce noisy - // transient reconnect messages. In debug builds, keep full visibility for diagnosis. - let report_error = retries > 1 - || cfg!(debug_assertions) - || !sess.services.model_client.responses_websocket_enabled(); - if report_error { - // Surface retry information to any UI/front‑end so the - // user understands what is happening instead of staring - // at a seemingly frozen screen. - sess.notify_stream_error( - &turn_context, - format!("Reconnecting... {retries}/{max_retries}"), - err, - ) - .await; + match retry_state.handle_retryable_error( + &err, + client_session, + &turn_context, + sess.services.model_client.responses_websocket_enabled(), + ) { + ResponsesRetryDecision::FallbackToHttp { message } => { + sess.send_event(&turn_context, EventMsg::Warning(WarningEvent { message })) + .await; + continue; + } + ResponsesRetryDecision::Retry { + retries, + max_retries, + delay, + report_error, + reconnecting_message, + } => { + warn!( + "stream disconnected - retrying sampling request ({}/{} in {:?})...", + retries, max_retries, delay + ); + + if report_error { + // Surface retry information to any UI/front‑end so the + // user understands what is happening instead of staring + // at a seemingly frozen screen. + sess.notify_stream_error(&turn_context, reconnecting_message, err) + .await; + } + tokio::time::sleep(delay).await; + } + ResponsesRetryDecision::Exhausted => { + return Err(err); } - tokio::time::sleep(delay).await; - } else { - return Err(err); } } }