Compare commits

...

1 Commits

Author SHA1 Message Date
Felipe Coury
65742c56ee fix(core): retry unsupported reasoning summaries 2026-05-19 13:31:06 -03:00
3 changed files with 359 additions and 8 deletions

View File

@@ -24,6 +24,7 @@
//! fails, normal stream retry/fallback logic handles recovery on the same turn.
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::OnceLock;
@@ -177,6 +178,7 @@ struct ModelClientState {
include_attestation: bool,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
disable_websockets: AtomicBool,
reasoning_summary_unsupported_models: StdMutex<HashSet<String>>,
cached_websocket_session: StdMutex<WebsocketSession>,
}
@@ -281,6 +283,14 @@ enum WebsocketStreamOutcome {
FallbackToHttp,
}
enum PreparedResponseStream {
Ready {
stream: codex_api::ResponseStream,
buffered_events: Vec<std::result::Result<ResponseEvent, ApiError>>,
},
RetryWithoutReasoningSummary,
}
/// Result of opening a WebRTC Realtime call.
///
/// The SDP answer goes back to the client. The call id and auth headers stay on the server so the
@@ -346,6 +356,7 @@ impl ModelClient {
include_attestation,
attestation_provider,
disable_websockets: AtomicBool::new(false),
reasoning_summary_unsupported_models: StdMutex::new(HashSet::new()),
cached_websocket_session: StdMutex::new(WebsocketSession::default()),
}),
}
@@ -688,11 +699,13 @@ impl ModelClient {
}
fn build_reasoning(
&self,
model_info: &ModelInfo,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
) -> Option<Reasoning> {
if model_info.supports_reasoning_summaries {
let summary = self.reasoning_summary_for_model(model_info, summary);
Some(Reasoning {
effort: effort.or(model_info.default_reasoning_level),
summary: if summary == ReasoningSummaryConfig::None {
@@ -706,6 +719,99 @@ impl ModelClient {
}
}
fn reasoning_summary_for_model(
&self,
model_info: &ModelInfo,
summary: ReasoningSummaryConfig,
) -> ReasoningSummaryConfig {
if summary == ReasoningSummaryConfig::None
|| !self
.state
.reasoning_summary_unsupported_models
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.contains(&model_info.slug)
{
summary
} else {
ReasoningSummaryConfig::None
}
}
fn should_retry_without_reasoning_summary(
&self,
model_info: &ModelInfo,
summary: ReasoningSummaryConfig,
err: &ApiError,
) -> bool {
if summary == ReasoningSummaryConfig::None || !reasoning_summary_is_unsupported(err) {
return false;
}
let inserted = self
.state
.reasoning_summary_unsupported_models
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(model_info.slug.clone());
if inserted {
warn!(
model = %model_info.slug,
"retrying request without reasoning summary after model rejected reasoning.summary"
);
}
true
}
async fn prepare_response_stream(
&self,
mut stream: codex_api::ResponseStream,
model_info: &ModelInfo,
summary: ReasoningSummaryConfig,
inference_trace_attempt: &InferenceTraceAttempt,
) -> PreparedResponseStream {
let mut buffered_events = Vec::new();
loop {
let Some(event) = stream.next().await else {
return PreparedResponseStream::Ready {
stream,
buffered_events,
};
};
match event {
Ok(event) => {
let can_retry_after =
response_event_can_precede_reasoning_summary_rejection(&event);
buffered_events.push(Ok(event));
if !can_retry_after {
return PreparedResponseStream::Ready {
stream,
buffered_events,
};
}
}
Err(err) => {
if self.should_retry_without_reasoning_summary(model_info, summary, &err) {
let response_debug_context =
extract_response_debug_context_from_api_error(&err);
let err = map_api_error(err);
inference_trace_attempt.record_failed(
&err,
response_debug_context.request_id.as_deref(),
/*output_items*/ &[],
);
return PreparedResponseStream::RetryWithoutReasoningSummary;
}
buffered_events.push(Err(err));
return PreparedResponseStream::Ready {
stream,
buffered_events,
};
}
}
}
}
fn build_responses_request(
&self,
provider: &codex_api::Provider,
@@ -718,7 +824,7 @@ impl ModelClient {
let instructions = &prompt.base_instructions.text;
let input = prompt.get_formatted_input();
let tools = create_tools_json_for_responses_api(&prompt.tools)?;
let reasoning = Self::build_reasoning(model_info, effort, summary);
let reasoning = self.build_reasoning(model_info, effort, summary);
let include = if reasoning.is_some() {
vec!["reasoning.encrypted_content".to_string()]
} else {
@@ -1263,8 +1369,25 @@ impl ModelClientSession {
match stream_result {
Ok(stream) => {
let (stream, _) = map_response_stream(
let (stream, buffered_events) = match self
.client
.prepare_response_stream(
stream,
model_info,
summary,
&inference_trace_attempt,
)
.await
{
PreparedResponseStream::Ready {
stream,
buffered_events,
} => (stream, buffered_events),
PreparedResponseStream::RetryWithoutReasoningSummary => continue,
};
let (stream, _) = map_response_stream_with_buffered_events(
stream,
buffered_events,
session_telemetry.clone(),
inference_trace_attempt,
);
@@ -1293,12 +1416,18 @@ impl ModelClientSession {
Err(err) => {
let response_debug_context =
extract_response_debug_context_from_api_error(&err);
let should_retry_without_reasoning_summary = self
.client
.should_retry_without_reasoning_summary(model_info, summary, &err);
let err = map_api_error(err);
inference_trace_attempt.record_failed(
&err,
response_debug_context.request_id.as_deref(),
/*output_items*/ &[],
);
if should_retry_without_reasoning_summary {
continue;
}
return Err(err);
}
}
@@ -1423,22 +1552,52 @@ impl ModelClientSession {
"websocket connection is unavailable".to_string(),
))
})?;
let stream_result = websocket_connection
let stream_result = match websocket_connection
.stream_request(ws_request, self.websocket_session.connection_reused())
.await
.map_err(|err| {
{
Ok(stream_result) => stream_result,
Err(err) => {
let response_debug_context =
extract_response_debug_context_from_api_error(&err);
let should_retry_without_reasoning_summary = self
.client
.should_retry_without_reasoning_summary(model_info, summary, &err);
let err = map_api_error(err);
inference_trace_attempt.record_failed(
&err,
response_debug_context.request_id.as_deref(),
/*output_items*/ &[],
);
err
})?;
let (stream, last_request_rx) = map_response_stream(
if should_retry_without_reasoning_summary {
self.reset_websocket_session();
continue;
}
return Err(err);
}
};
let (stream_result, buffered_events) = match self
.client
.prepare_response_stream(
stream_result,
model_info,
summary,
&inference_trace_attempt,
)
.await
{
PreparedResponseStream::Ready {
stream,
buffered_events,
} => (stream, buffered_events),
PreparedResponseStream::RetryWithoutReasoningSummary => {
self.reset_websocket_session();
continue;
}
};
let (stream, last_request_rx) = map_response_stream_with_buffered_events(
stream_result,
buffered_events,
session_telemetry.clone(),
inference_trace_attempt,
);
@@ -1616,6 +1775,39 @@ impl ModelClientSession {
}
}
fn reasoning_summary_is_unsupported(err: &ApiError) -> bool {
let ApiError::Transport(TransportError::Http { status, body, .. }) = err else {
return false;
};
if *status != StatusCode::BAD_REQUEST {
return false;
}
let Some(body) = body.as_deref() else {
return false;
};
let Ok(payload) = serde_json::from_str::<serde_json::Value>(body) else {
return false;
};
let Some(error) = payload.get("error") else {
return false;
};
error.get("code").and_then(serde_json::Value::as_str) == Some("unsupported_parameter")
&& error.get("param").and_then(serde_json::Value::as_str) == Some("reasoning.summary")
}
fn response_event_can_precede_reasoning_summary_rejection(event: &ResponseEvent) -> bool {
matches!(
event,
ResponseEvent::Created
| ResponseEvent::ServerModel(_)
| ResponseEvent::ModelVerifications(_)
| ResponseEvent::ServerReasoningIncluded(_)
| ResponseEvent::RateLimits(_)
| ResponseEvent::ModelsEtag(_)
)
}
/// Parses per-turn metadata into an HTTP header value.
///
/// Invalid values are treated as absent so callers can compare and propagate
@@ -1712,8 +1904,9 @@ fn parent_thread_id_header_value(session_source: &SessionSource) -> Option<Strin
const RESPONSE_STREAM_CHANNEL_CAPACITY: usize = 1600;
const STREAM_DROPPED_REASON: &str = "response stream dropped before provider terminal event";
fn map_response_stream(
fn map_response_stream_with_buffered_events(
api_stream: codex_api::ResponseStream,
buffered_events: Vec<std::result::Result<ResponseEvent, ApiError>>,
session_telemetry: SessionTelemetry,
inference_trace_attempt: InferenceTraceAttempt,
) -> (ResponseStream, oneshot::Receiver<LastResponse>) {
@@ -1725,6 +1918,7 @@ fn map_response_stream(
rx_event,
upstream_request_id: None,
};
let api_stream = futures::stream::iter(buffered_events).chain(api_stream);
map_response_events(
upstream_request_id,
api_stream,

View File

@@ -58,6 +58,7 @@ use core_test_support::responses::ev_completed_with_tokens;
use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_response_sequence;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_once_match;
use core_test_support::responses::mount_sse_sequence;
@@ -1855,6 +1856,85 @@ async fn configured_reasoning_summary_is_sent() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unsupported_reasoning_summary_is_retried_without_summary() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = MockServer::start().await;
let responses = mount_response_sequence(
&server,
vec![
ResponseTemplate::new(400).set_body_json(json!({
"error": {
"type": "invalid_request_error",
"code": "unsupported_parameter",
"message": "Unsupported parameter: 'reasoning.summary' is not supported with the 'gpt-5.4' model.",
"param": "reasoning.summary"
}
})),
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(
sse(vec![ev_response_created("resp1"), ev_completed("resp1")]),
"text/event-stream",
),
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(
sse(vec![ev_response_created("resp2"), ev_completed("resp2")]),
"text/event-stream",
),
],
)
.await;
let TestCodex { codex, .. } = test_codex()
.with_config(|config| {
config.model_reasoning_summary = Some(ReasoningSummary::Concise);
})
.build(&server)
.await?;
for text in ["hello", "again"] {
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: text.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: Default::default(),
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
let requests = responses.requests();
assert_eq!(requests.len(), 3);
pretty_assertions::assert_eq!(
requests[0]
.body_json()
.get("reasoning")
.and_then(|reasoning| reasoning.get("summary"))
.and_then(|value| value.as_str()),
Some("concise")
);
for request in &requests[1..] {
pretty_assertions::assert_eq!(
request
.body_json()
.get("reasoning")
.and_then(|reasoning| reasoning.get("summary")),
None
);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_turn_explicit_reasoning_summary_overrides_model_catalog_default() -> anyhow::Result<()>
{

View File

@@ -1318,6 +1318,83 @@ async fn responses_websocket_invalid_request_error_with_status_is_forwarded() {
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_retries_without_unsupported_reasoning_summary() {
skip_if_no_network!();
let unsupported_reasoning_summary_error = json!({
"type": "error",
"status": 400,
"error": {
"type": "invalid_request_error",
"code": "unsupported_parameter",
"message": "Unsupported parameter: 'reasoning.summary' is not supported with the 'gpt-5.3-codex-spark' model.",
"param": "reasoning.summary"
}
});
let server = start_websocket_server(vec![
vec![
vec![
ev_response_created("resp-prewarm"),
ev_completed("resp-prewarm"),
],
vec![unsupported_reasoning_summary_error],
],
vec![vec![ev_response_created("resp-1"), ev_completed("resp-1")]],
])
.await;
let mut builder = test_codex().with_config(|config| {
config.model_provider.request_max_retries = Some(0);
config.model_provider.stream_max_retries = Some(0);
config.model_reasoning_summary = Some(ReasoningSummary::Concise);
});
let test = builder
.build_with_websocket_server(&server)
.await
.expect("build websocket codex");
test.codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: Default::default(),
})
.await
.expect("submission should succeed");
wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;
let connections = server.connections();
assert_eq!(connections.len(), 2);
let first_connection = connections.first().expect("missing first connection");
let second_connection = connections.get(1).expect("missing second connection");
assert_eq!(first_connection.len(), 2);
assert_eq!(second_connection.len(), 1);
assert_eq!(
first_connection
.get(1)
.expect("missing rejected request")
.body_json()["reasoning"]["summary"]
.as_str(),
Some("concise")
);
assert_eq!(
second_connection
.first()
.expect("missing retried request")
.body_json()["reasoning"]
.get("summary"),
None
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_connection_limit_error_reconnects_and_completes() {
skip_if_no_network!();