centralize responses retry policy

This commit is contained in:
Roy Han
2026-05-22 12:08:49 -07:00
parent 75b7e06621
commit eecab425b3
4 changed files with 167 additions and 101 deletions

View File

@@ -16,10 +16,11 @@ use crate::hook_runtime::PostCompactHookOutcome;
use crate::hook_runtime::PreCompactHookOutcome;
use crate::hook_runtime::run_post_compact_hooks;
use crate::hook_runtime::run_pre_compact_hooks;
use crate::responses_retry::ResponsesRetryDecision;
use crate::responses_retry::ResponsesRetryState;
use crate::session::session::Session;
use crate::session::turn::built_tools;
use crate::session::turn_context::TurnContext;
use crate::util::backoff;
use codex_analytics::CompactionImplementation;
use codex_analytics::CompactionPhase;
use codex_analytics::CompactionReason;
@@ -287,7 +288,7 @@ async fn run_remote_compaction_request_v2(
.info()
.stream_max_retries()
.min(MAX_REMOTE_COMPACTION_V2_STREAM_RETRIES);
let mut retries = 0;
let mut retry_state = ResponsesRetryState::new(max_retries);
loop {
let result = match client_session
.stream(
@@ -312,56 +313,44 @@ async fn run_remote_compaction_request_v2(
log_remote_compaction_request_failure(sess, turn_context, prompt, &err).await;
return Err(err);
}
Err(err)
if retries >= max_retries
&& client_session.try_switch_fallback_transport(
&turn_context.session_telemetry,
&turn_context.model_info,
) =>
{
sess.send_event(
turn_context,
EventMsg::Warning(WarningEvent {
message: format!(
"Falling back from WebSockets to HTTPS transport. {err:#}"
),
}),
)
.await;
retries = 0;
}
Err(err) if retries < max_retries => {
retries += 1;
let delay = match &err {
CodexErr::Stream(_, requested_delay) => {
requested_delay.unwrap_or_else(|| backoff(retries))
}
_ => backoff(retries),
};
warn!(
turn_id = %turn_context.sub_id,
retries,
max_retries,
compact_error = %err,
"remote compaction v2 stream failed; retrying request after delay"
);
let report_error = retries > 1
|| cfg!(debug_assertions)
|| !sess.services.model_client.responses_websocket_enabled();
if report_error {
sess.notify_stream_error(
turn_context,
format!("Reconnecting... {retries}/{max_retries}"),
err,
)
.await;
}
tokio::time::sleep(delay).await;
}
Err(err) => {
log_remote_compaction_request_failure(sess, turn_context, prompt, &err).await;
return Err(err);
match retry_state.handle_retryable_error(
&err,
client_session,
turn_context,
sess.services.model_client.responses_websocket_enabled(),
) {
ResponsesRetryDecision::FallbackToHttp { message } => {
sess.send_event(turn_context, EventMsg::Warning(WarningEvent { message }))
.await;
}
ResponsesRetryDecision::Retry {
retries,
max_retries,
delay,
report_error,
reconnecting_message,
} => {
warn!(
turn_id = %turn_context.sub_id,
retries,
max_retries,
compact_error = %err,
"remote compaction v2 stream failed; retrying request after delay"
);
if report_error {
sess.notify_stream_error(turn_context, reconnecting_message, err)
.await;
}
tokio::time::sleep(delay).await;
}
ResponsesRetryDecision::Exhausted => {
log_remote_compaction_request_failure(sess, turn_context, prompt, &err)
.await;
return Err(err);
}
}
}
}
}

View File

@@ -12,6 +12,7 @@ mod client_common;
mod realtime_context;
mod realtime_conversation;
mod realtime_prompt;
mod responses_retry;
pub(crate) mod session;
pub use session::SteerInputError;
mod codex_thread;

View File

@@ -0,0 +1,89 @@
//! Shared retry and transport fallback policy for Responses requests.
use std::time::Duration;
use crate::client::ModelClientSession;
use crate::session::turn_context::TurnContext;
use crate::util::backoff;
use codex_model_provider_info::ModelProviderInfo;
use codex_protocol::error::CodexErr;
#[derive(Debug)]
pub(crate) struct ResponsesRetryState {
max_retries: u64,
retries: u64,
}
impl ResponsesRetryState {
pub(crate) fn new(max_retries: u64) -> Self {
Self {
max_retries,
retries: 0,
}
}
pub(crate) fn from_provider(provider_info: &ModelProviderInfo) -> Self {
Self::new(provider_info.stream_max_retries())
}
pub(crate) fn handle_retryable_error(
&mut self,
err: &CodexErr,
client_session: &mut ModelClientSession,
turn_context: &TurnContext,
responses_websocket_enabled: bool,
) -> ResponsesRetryDecision {
if self.retries >= self.max_retries
&& client_session.try_switch_fallback_transport(
&turn_context.session_telemetry,
&turn_context.model_info,
)
{
self.retries = 0;
return ResponsesRetryDecision::FallbackToHttp {
message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"),
};
}
if self.retries < self.max_retries {
self.retries += 1;
let delay = retry_delay(err, self.retries);
let report_error =
self.retries > 1 || cfg!(debug_assertions) || !responses_websocket_enabled;
return ResponsesRetryDecision::Retry {
retries: self.retries,
max_retries: self.max_retries,
delay,
report_error,
reconnecting_message: format!(
"Reconnecting... {}/{}",
self.retries, self.max_retries
),
};
}
ResponsesRetryDecision::Exhausted
}
}
#[derive(Debug)]
pub(crate) enum ResponsesRetryDecision {
Retry {
retries: u64,
max_retries: u64,
delay: Duration,
report_error: bool,
reconnecting_message: String,
},
FallbackToHttp {
message: String,
},
Exhausted,
}
fn retry_delay(err: &CodexErr, retries: u64) -> Duration {
match err {
CodexErr::Stream(_, requested_delay) => requested_delay.unwrap_or_else(|| backoff(retries)),
_ => backoff(retries),
}
}

View File

@@ -35,6 +35,8 @@ use crate::mentions::collect_explicit_app_ids;
use crate::mentions::collect_explicit_plugin_mentions;
use crate::mentions::collect_tool_mentions_from_messages;
use crate::plugins::build_plugin_injections;
use crate::responses_retry::ResponsesRetryDecision;
use crate::responses_retry::ResponsesRetryState;
use crate::session::PreviousTurnSettings;
use crate::session::TurnInput;
use crate::session::session::Session;
@@ -58,7 +60,6 @@ use crate::tools::spec_plan::search_tool_enabled;
use crate::tools::spec_plan::tool_suggest_enabled;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::turn_timing::record_turn_ttft_metric;
use crate::util::backoff;
use crate::util::error_or_panic;
use codex_analytics::AppInvocation;
use codex_analytics::CompactionPhase;
@@ -942,7 +943,7 @@ async fn run_sampling_request(
Arc::clone(&turn_diff_tracker),
)
.await;
let mut retries = 0;
let mut retry_state = ResponsesRetryState::from_provider(turn_context.provider.info());
let mut initial_input = Some(input);
loop {
let prompt_input = if let Some(input) = initial_input.take() {
@@ -992,55 +993,41 @@ async fn run_sampling_request(
return Err(err);
}
// Use the configured provider-specific stream retry budget.
let max_retries = turn_context.provider.info().stream_max_retries();
if retries >= max_retries
&& client_session.try_switch_fallback_transport(
&turn_context.session_telemetry,
&turn_context.model_info,
)
{
sess.send_event(
&turn_context,
EventMsg::Warning(WarningEvent {
message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"),
}),
)
.await;
retries = 0;
continue;
}
if retries < max_retries {
retries += 1;
let delay = match &err {
CodexErr::Stream(_, requested_delay) => {
requested_delay.unwrap_or_else(|| backoff(retries))
}
_ => backoff(retries),
};
warn!(
"stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...",
);
// In release builds, hide the first websocket retry notification to reduce noisy
// transient reconnect messages. In debug builds, keep full visibility for diagnosis.
let report_error = retries > 1
|| cfg!(debug_assertions)
|| !sess.services.model_client.responses_websocket_enabled();
if report_error {
// Surface retry information to any UI/frontend so the
// user understands what is happening instead of staring
// at a seemingly frozen screen.
sess.notify_stream_error(
&turn_context,
format!("Reconnecting... {retries}/{max_retries}"),
err,
)
.await;
match retry_state.handle_retryable_error(
&err,
client_session,
&turn_context,
sess.services.model_client.responses_websocket_enabled(),
) {
ResponsesRetryDecision::FallbackToHttp { message } => {
sess.send_event(&turn_context, EventMsg::Warning(WarningEvent { message }))
.await;
continue;
}
ResponsesRetryDecision::Retry {
retries,
max_retries,
delay,
report_error,
reconnecting_message,
} => {
warn!(
"stream disconnected - retrying sampling request ({}/{} in {:?})...",
retries, max_retries, delay
);
if report_error {
// Surface retry information to any UI/frontend so the
// user understands what is happening instead of staring
// at a seemingly frozen screen.
sess.notify_stream_error(&turn_context, reconnecting_message, err)
.await;
}
tokio::time::sleep(delay).await;
}
ResponsesRetryDecision::Exhausted => {
return Err(err);
}
tokio::time::sleep(delay).await;
} else {
return Err(err);
}
}
}