mirror of
https://github.com/openai/codex.git
synced 2026-05-28 06:55:01 +00:00
centralize Responses retry policy (#24131)
## Why #23951 added remote compaction v2 retries, but it left the retry and WS -> HTTPS fallback behavior duplicated between normal Responses turns and compaction. This follow-up centralizes the common retry handling so future changes to fallback, retry delay, retry notifications, and retry sleep do not have to be kept in sync across both callsites. ## What changed - Added `core/src/responses_retry.rs` with a shared handler for retryable Responses stream errors. - Reused that handler from normal turn sampling and remote compaction v2. - Kept each callsite responsible for its retry budget: normal turns still use `stream_max_retries`, while compaction v2 still uses `min(stream_max_retries, 2)`. - Preserved caller-specific behavior around non-retryable errors, context-window errors, usage-limit errors, and compact-specific final failure logging. The shared handler now owns: - WS -> HTTPS fallback warning emission - retry delay selection, including server-requested stream retry delay - retry logging - first-WebSocket-retry notification suppression - `Reconnecting... n/max` stream-error notification - sleeping before the next retry attempt ## Verification - `cargo test -p codex-core remote_compact_v2` - `cargo test -p codex-core websocket_fallback` - `just fix -p codex-core` Did not run the full workspace test suite. --------- Co-authored-by: jif-oai <jif@openai.com>
This commit is contained in:
@@ -16,10 +16,11 @@ use crate::hook_runtime::PostCompactHookOutcome;
|
||||
use crate::hook_runtime::PreCompactHookOutcome;
|
||||
use crate::hook_runtime::run_post_compact_hooks;
|
||||
use crate::hook_runtime::run_pre_compact_hooks;
|
||||
use crate::responses_retry::ResponsesStreamRequest;
|
||||
use crate::responses_retry::handle_retryable_response_stream_error;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn::built_tools;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::util::backoff;
|
||||
use codex_analytics::CompactionImplementation;
|
||||
use codex_analytics::CompactionPhase;
|
||||
use codex_analytics::CompactionReason;
|
||||
@@ -35,7 +36,6 @@ use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::TruncationPolicy;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::protocol::WarningEvent;
|
||||
use codex_rollout_trace::CompactionCheckpointTracePayload;
|
||||
use codex_rollout_trace::InferenceTraceContext;
|
||||
use codex_utils_output_truncation::approx_token_count;
|
||||
@@ -43,7 +43,6 @@ use codex_utils_output_truncation::truncate_text;
|
||||
use futures::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
// Mirror the current /responses/compact retained-message default while the
|
||||
// server-side path remains the reference implementation.
|
||||
@@ -313,56 +312,21 @@ async fn run_remote_compaction_request_v2(
|
||||
log_remote_compaction_request_failure(sess, turn_context, prompt, &err).await;
|
||||
return Err(err);
|
||||
}
|
||||
Err(err)
|
||||
if retries >= max_retries
|
||||
&& client_session.try_switch_fallback_transport(
|
||||
&turn_context.session_telemetry,
|
||||
&turn_context.model_info,
|
||||
) =>
|
||||
{
|
||||
sess.send_event(
|
||||
turn_context,
|
||||
EventMsg::Warning(WarningEvent {
|
||||
message: format!(
|
||||
"Falling back from WebSockets to HTTPS transport. {err:#}"
|
||||
),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
retries = 0;
|
||||
}
|
||||
Err(err) if retries < max_retries => {
|
||||
retries += 1;
|
||||
let delay = match &err {
|
||||
CodexErr::Stream(_, requested_delay) => {
|
||||
requested_delay.unwrap_or_else(|| backoff(retries))
|
||||
}
|
||||
_ => backoff(retries),
|
||||
};
|
||||
warn!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
retries,
|
||||
max_retries,
|
||||
compact_error = %err,
|
||||
"remote compaction v2 stream failed; retrying request after delay"
|
||||
);
|
||||
|
||||
let report_error = retries > 1
|
||||
|| cfg!(debug_assertions)
|
||||
|| !sess.services.model_client.responses_websocket_enabled();
|
||||
if report_error {
|
||||
sess.notify_stream_error(
|
||||
turn_context,
|
||||
format!("Reconnecting... {retries}/{max_retries}"),
|
||||
err,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
Err(err) => {
|
||||
log_remote_compaction_request_failure(sess, turn_context, prompt, &err).await;
|
||||
return Err(err);
|
||||
if let Err(err) = handle_retryable_response_stream_error(
|
||||
&mut retries,
|
||||
max_retries,
|
||||
err,
|
||||
client_session,
|
||||
sess,
|
||||
turn_context,
|
||||
ResponsesStreamRequest::RemoteCompactionV2,
|
||||
)
|
||||
.await
|
||||
{
|
||||
log_remote_compaction_request_failure(sess, turn_context, prompt, &err).await;
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ mod client_common;
|
||||
mod realtime_context;
|
||||
mod realtime_conversation;
|
||||
mod realtime_prompt;
|
||||
mod responses_retry;
|
||||
pub(crate) mod session;
|
||||
pub use session::SteerInputError;
|
||||
mod codex_thread;
|
||||
|
||||
105
codex-rs/core/src/responses_retry.rs
Normal file
105
codex-rs/core/src/responses_retry.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
//! Shared retry and transport fallback decisions for Responses requests.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::client::ModelClientSession;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::util::backoff;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::WarningEvent;
|
||||
use tracing::warn;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) enum ResponsesStreamRequest {
|
||||
Sampling,
|
||||
RemoteCompactionV2,
|
||||
}
|
||||
|
||||
/// Handles a retryable stream error and returns `Ok(())` when the caller should
|
||||
/// retry the request loop.
|
||||
pub(crate) async fn handle_retryable_response_stream_error(
|
||||
retries: &mut u64,
|
||||
max_retries: u64,
|
||||
err: CodexErr,
|
||||
client_session: &mut ModelClientSession,
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
request: ResponsesStreamRequest,
|
||||
) -> Result<(), CodexErr> {
|
||||
if *retries >= max_retries
|
||||
&& client_session.try_switch_fallback_transport(
|
||||
&turn_context.session_telemetry,
|
||||
&turn_context.model_info,
|
||||
)
|
||||
{
|
||||
sess.send_event(
|
||||
turn_context,
|
||||
EventMsg::Warning(WarningEvent {
|
||||
message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
*retries = 0;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if *retries < max_retries {
|
||||
*retries += 1;
|
||||
let retry_count = *retries;
|
||||
let delay = match &err {
|
||||
CodexErr::Stream(_, requested_delay) => {
|
||||
requested_delay.unwrap_or_else(|| backoff(retry_count))
|
||||
}
|
||||
_ => backoff(retry_count),
|
||||
};
|
||||
log_retry(request, turn_context, &err, retry_count, max_retries, delay);
|
||||
|
||||
// In release builds, hide the first websocket retry notification to reduce noisy
|
||||
// transient reconnect messages. In debug builds, keep full visibility for diagnosis.
|
||||
let report_error = retry_count > 1
|
||||
|| cfg!(debug_assertions)
|
||||
|| !sess.services.model_client.responses_websocket_enabled();
|
||||
if report_error {
|
||||
// Surface retry information to any UI/front-end so the user understands what is
|
||||
// happening instead of staring at a seemingly frozen screen.
|
||||
sess.notify_stream_error(
|
||||
turn_context,
|
||||
format!("Reconnecting... {retry_count}/{max_retries}"),
|
||||
err,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
tokio::time::sleep(delay).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err(err)
|
||||
}
|
||||
|
||||
fn log_retry(
|
||||
request: ResponsesStreamRequest,
|
||||
turn_context: &TurnContext,
|
||||
err: &CodexErr,
|
||||
retries: u64,
|
||||
max_retries: u64,
|
||||
delay: Duration,
|
||||
) {
|
||||
match request {
|
||||
ResponsesStreamRequest::Sampling => {
|
||||
warn!(
|
||||
"stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...",
|
||||
);
|
||||
}
|
||||
ResponsesStreamRequest::RemoteCompactionV2 => {
|
||||
warn!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
retries,
|
||||
max_retries,
|
||||
compact_error = %err,
|
||||
"remote compaction v2 stream failed; retrying request after delay"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -35,6 +35,8 @@ use crate::mentions::collect_explicit_app_ids;
|
||||
use crate::mentions::collect_explicit_plugin_mentions;
|
||||
use crate::mentions::collect_tool_mentions_from_messages;
|
||||
use crate::plugins::build_plugin_injections;
|
||||
use crate::responses_retry::ResponsesStreamRequest;
|
||||
use crate::responses_retry::handle_retryable_response_stream_error;
|
||||
use crate::session::PreviousTurnSettings;
|
||||
use crate::session::TurnInput;
|
||||
use crate::session::session::Session;
|
||||
@@ -58,7 +60,6 @@ use crate::tools::spec_plan::search_tool_enabled;
|
||||
use crate::tools::spec_plan::tool_suggest_enabled;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use crate::turn_timing::record_turn_ttft_metric;
|
||||
use crate::util::backoff;
|
||||
use crate::util::error_or_panic;
|
||||
use codex_analytics::AppInvocation;
|
||||
use codex_analytics::CompactionPhase;
|
||||
@@ -919,6 +920,7 @@ async fn run_sampling_request(
|
||||
Arc::clone(&turn_diff_tracker),
|
||||
)
|
||||
.await;
|
||||
let max_retries = turn_context.provider.info().stream_max_retries();
|
||||
let mut retries = 0;
|
||||
let mut initial_input = Some(input);
|
||||
loop {
|
||||
@@ -969,56 +971,16 @@ async fn run_sampling_request(
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
// Use the configured provider-specific stream retry budget.
|
||||
let max_retries = turn_context.provider.info().stream_max_retries();
|
||||
if retries >= max_retries
|
||||
&& client_session.try_switch_fallback_transport(
|
||||
&turn_context.session_telemetry,
|
||||
&turn_context.model_info,
|
||||
)
|
||||
{
|
||||
sess.send_event(
|
||||
&turn_context,
|
||||
EventMsg::Warning(WarningEvent {
|
||||
message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
retries = 0;
|
||||
continue;
|
||||
}
|
||||
if retries < max_retries {
|
||||
retries += 1;
|
||||
let delay = match &err {
|
||||
CodexErr::Stream(_, requested_delay) => {
|
||||
requested_delay.unwrap_or_else(|| backoff(retries))
|
||||
}
|
||||
_ => backoff(retries),
|
||||
};
|
||||
warn!(
|
||||
"stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...",
|
||||
);
|
||||
|
||||
// In release builds, hide the first websocket retry notification to reduce noisy
|
||||
// transient reconnect messages. In debug builds, keep full visibility for diagnosis.
|
||||
let report_error = retries > 1
|
||||
|| cfg!(debug_assertions)
|
||||
|| !sess.services.model_client.responses_websocket_enabled();
|
||||
if report_error {
|
||||
// Surface retry information to any UI/front‑end so the
|
||||
// user understands what is happening instead of staring
|
||||
// at a seemingly frozen screen.
|
||||
sess.notify_stream_error(
|
||||
&turn_context,
|
||||
format!("Reconnecting... {retries}/{max_retries}"),
|
||||
err,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
tokio::time::sleep(delay).await;
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
handle_retryable_response_stream_error(
|
||||
&mut retries,
|
||||
max_retries,
|
||||
err,
|
||||
client_session,
|
||||
&sess,
|
||||
&turn_context,
|
||||
ResponsesStreamRequest::Sampling,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user