Compare commits

...

8 Commits

Author SHA1 Message Date
Eric Traut
1c2f5fea39 Clarify stream lifecycle module docs 2026-04-26 21:46:35 -07:00
Eric Traut
0b60009a6e Merge branch 'main' into etraut/stream-lifecycle-diagnostics 2026-04-26 21:45:56 -07:00
Eric Traut
028d38a738 codex: fix CI failure on PR #19755 2026-04-26 21:28:01 -07:00
Eric Traut
2cf65451e5 codex: fix CI failure on PR #19755 2026-04-26 21:20:14 -07:00
Eric Traut
e12af2274c Document lifecycle log scope 2026-04-26 21:09:02 -07:00
Eric Traut
7771009b47 Remove redundant websocket lifecycle test 2026-04-26 21:08:12 -07:00
Eric Traut
00db3db537 Avoid lifecycle logging on completed streams 2026-04-26 21:06:28 -07:00
Eric Traut
8e815b1fde Add Responses stream lifecycle diagnostics
Refs #19745
2026-04-26 21:00:19 -07:00
12 changed files with 677 additions and 36 deletions

View File

@@ -66,7 +66,9 @@ pub struct MemorySummarizeOutput {
#[derive(Debug)]
pub enum ResponseEvent {
Created,
Created {
response_id: Option<String>,
},
OutputItemDone(ResponseItem),
OutputItemAdded(ResponseItem),
/// Emitted when the server includes `OpenAI-Model` on the stream response.

View File

@@ -10,6 +10,7 @@ use crate::requests::headers::build_conversation_headers;
use crate::requests::headers::insert_header;
use crate::requests::headers::subagent_header;
use crate::sse::spawn_response_stream;
use crate::stream_lifecycle::ResponseStreamLifecycleOptions;
use crate::telemetry::SseTelemetry;
use codex_client::HttpTransport;
use codex_client::RequestCompression;
@@ -70,6 +71,16 @@ impl<T: HttpTransport> ResponsesClient<T> {
&self,
request: ResponsesApiRequest,
options: ResponsesOptions,
) -> Result<ResponseStream, ApiError> {
self.stream_request_with_lifecycle(request, options, /*lifecycle*/ None)
.await
}
pub async fn stream_request_with_lifecycle(
&self,
request: ResponsesApiRequest,
options: ResponsesOptions,
lifecycle: Option<ResponseStreamLifecycleOptions>,
) -> Result<ResponseStream, ApiError> {
let ResponsesOptions {
conversation_id,
@@ -94,7 +105,8 @@ impl<T: HttpTransport> ResponsesClient<T> {
insert_header(&mut headers, "x-openai-subagent", &subagent);
}
self.stream(body, headers, compression, turn_state).await
self.stream_with_lifecycle(body, headers, compression, turn_state, lifecycle)
.await
}
fn path() -> &'static str {
@@ -118,6 +130,24 @@ impl<T: HttpTransport> ResponsesClient<T> {
extra_headers: HeaderMap,
compression: Compression,
turn_state: Option<Arc<OnceLock<String>>>,
) -> Result<ResponseStream, ApiError> {
self.stream_with_lifecycle(
body,
extra_headers,
compression,
turn_state,
/*lifecycle*/ None,
)
.await
}
pub async fn stream_with_lifecycle(
&self,
body: Value,
extra_headers: HeaderMap,
compression: Compression,
turn_state: Option<Arc<OnceLock<String>>>,
lifecycle: Option<ResponseStreamLifecycleOptions>,
) -> Result<ResponseStream, ApiError> {
let request_compression = match compression {
Compression::None => RequestCompression::None,
@@ -146,6 +176,7 @@ impl<T: HttpTransport> ResponsesClient<T> {
self.session.provider().stream_idle_timeout,
self.sse_telemetry.clone(),
turn_state,
lifecycle,
))
}
}

View File

@@ -7,6 +7,10 @@ use crate::provider::Provider;
use crate::rate_limits::parse_rate_limit_event;
use crate::sse::ResponsesStreamEvent;
use crate::sse::process_responses_event;
use crate::stream_lifecycle::ResponseStreamLifecycleOptions;
use crate::stream_lifecycle::ResponseStreamLifecycleRecorder;
use crate::stream_lifecycle::ResponseStreamTerminalState;
use crate::stream_lifecycle::finalize_lifecycle_error;
use crate::telemetry::WebsocketTelemetry;
use codex_client::TransportError;
use codex_client::maybe_build_rustls_client_config_with_custom_ca;
@@ -214,6 +218,16 @@ impl ResponsesWebsocketConnection {
&self,
request: ResponsesWsRequest,
connection_reused: bool,
) -> Result<ResponseStream, ApiError> {
self.stream_request_with_lifecycle(request, connection_reused, /*lifecycle*/ None)
.await
}
pub async fn stream_request_with_lifecycle(
&self,
request: ResponsesWsRequest,
connection_reused: bool,
lifecycle: Option<ResponseStreamLifecycleOptions>,
) -> Result<ResponseStream, ApiError> {
let (tx_event, rx_event) =
mpsc::channel::<std::result::Result<ResponseEvent, ApiError>>(1600);
@@ -263,6 +277,7 @@ impl ResponsesWebsocketConnection {
idle_timeout,
telemetry,
connection_reused,
lifecycle,
)
.await
};
@@ -540,8 +555,10 @@ async fn run_websocket_response_stream(
idle_timeout: Duration,
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
connection_reused: bool,
lifecycle: Option<ResponseStreamLifecycleOptions>,
) -> Result<(), ApiError> {
let mut last_server_model: Option<String> = None;
let mut lifecycle = lifecycle.map(ResponseStreamLifecycleRecorder::new);
let request_text = match serde_json::to_string(&request_body) {
Ok(text) => text,
Err(err) => {
@@ -579,15 +596,27 @@ async fn run_websocket_response_stream(
let message = match response {
Ok(Some(Ok(msg))) => msg,
Ok(Some(Err(err))) => {
return Err(ApiError::Stream(err.to_string()));
let error = ApiError::Stream(err.to_string());
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::StreamError,
error,
));
}
Ok(None) => {
return Err(ApiError::Stream(
"stream closed before response.completed".into(),
let error = ApiError::Stream("stream closed before response.completed".into());
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::ClosedBeforeCompletion,
error,
));
}
Err(err) => {
return Err(err);
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::IdleTimeout,
err,
));
}
};
@@ -609,6 +638,9 @@ async fn run_websocket_response_stream(
}
};
let model_verifications = event.model_verifications();
if let Some(lifecycle) = lifecycle.as_mut() {
lifecycle.observe_event(&event);
}
if event.kind() == "codex.rate_limits" {
if let Some(snapshot) = parse_rate_limit_event(&text) {
let _ = tx_event.send(Ok(ResponseEvent::RateLimits(snapshot))).await;
@@ -638,21 +670,38 @@ async fn run_websocket_response_stream(
let is_completed = matches!(event, ResponseEvent::Completed { .. });
let _ = tx_event.send(Ok(event)).await;
if is_completed {
if let Some(lifecycle) = lifecycle.as_mut() {
lifecycle.finalize_completed();
}
break;
}
}
Ok(None) => {}
Err(error) => {
return Err(error.into_api_error());
let error = error.into_api_error();
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::StreamError,
error,
));
}
}
}
Message::Binary(_) => {
return Err(ApiError::Stream("unexpected binary websocket event".into()));
let error = ApiError::Stream("unexpected binary websocket event".into());
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::StreamError,
error,
));
}
Message::Close(_) => {
return Err(ApiError::Stream(
"websocket closed by server before response.completed".into(),
let error =
ApiError::Stream("websocket closed by server before response.completed".into());
return Err(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::ClosedBeforeCompletion,
error,
));
}
Message::Frame(_) => {}

View File

@@ -8,6 +8,7 @@ pub(crate) mod provider;
pub(crate) mod rate_limits;
pub(crate) mod requests;
pub(crate) mod sse;
mod stream_lifecycle;
pub(crate) mod telemetry;
pub use crate::requests::headers::build_conversation_headers;
@@ -63,6 +64,8 @@ pub use crate::provider::RetryConfig;
pub use crate::provider::is_azure_responses_provider;
pub use crate::requests::Compression;
pub use crate::sse::stream_from_fixture;
pub use crate::stream_lifecycle::ResponseStreamLifecycleOptions;
pub use crate::stream_lifecycle::ResponseStreamTransport;
pub use crate::telemetry::SseTelemetry;
pub use crate::telemetry::WebsocketTelemetry;
pub use codex_protocol::protocol::RealtimeAudioFrame;

View File

@@ -2,6 +2,10 @@ use crate::common::ResponseEvent;
use crate::common::ResponseStream;
use crate::error::ApiError;
use crate::rate_limits::parse_all_rate_limits;
use crate::stream_lifecycle::ResponseStreamLifecycleOptions;
use crate::stream_lifecycle::ResponseStreamLifecycleRecorder;
use crate::stream_lifecycle::ResponseStreamTerminalState;
use crate::stream_lifecycle::finalize_lifecycle_error;
use crate::telemetry::SseTelemetry;
use codex_client::ByteStream;
use codex_client::StreamResponse;
@@ -52,6 +56,7 @@ pub fn stream_from_fixture(
tx_event,
idle_timeout,
/*telemetry*/ None,
/*lifecycle*/ None,
));
Ok(ResponseStream { rx_event })
}
@@ -61,6 +66,7 @@ pub fn spawn_response_stream(
idle_timeout: Duration,
telemetry: Option<Arc<dyn SseTelemetry>>,
turn_state: Option<Arc<OnceLock<String>>>,
lifecycle: Option<ResponseStreamLifecycleOptions>,
) -> ResponseStream {
let rate_limit_snapshots = parse_all_rate_limits(&stream_response.headers);
let models_etag = stream_response
@@ -101,7 +107,14 @@ pub fn spawn_response_stream(
.send(Ok(ResponseEvent::ServerReasoningIncluded(true)))
.await;
}
process_sse(stream_response.bytes, tx_event, idle_timeout, telemetry).await;
process_sse(
stream_response.bytes,
tx_event,
idle_timeout,
telemetry,
lifecycle,
)
.await;
});
ResponseStream { rx_event }
@@ -184,6 +197,13 @@ impl ResponsesStreamEvent {
&self.kind
}
pub(crate) fn response_id(&self) -> Option<&str> {
self.response
.as_ref()
.and_then(|response| response.get("id"))
.and_then(Value::as_str)
}
/// Returns the effective model reported by the server, if present.
///
/// Precedence:
@@ -328,7 +348,8 @@ pub fn process_responses_event(
}
"response.created" => {
if event.response.is_some() {
return Ok(Some(ResponseEvent::Created {}));
let response_id = event.response_id().map(str::to_string);
return Ok(Some(ResponseEvent::Created { response_id }));
}
}
"response.failed" => {
@@ -423,10 +444,12 @@ pub async fn process_sse(
tx_event: mpsc::Sender<Result<ResponseEvent, ApiError>>,
idle_timeout: Duration,
telemetry: Option<Arc<dyn SseTelemetry>>,
lifecycle: Option<ResponseStreamLifecycleOptions>,
) {
let mut stream = stream.eventsource();
let mut response_error: Option<ApiError> = None;
let mut last_server_model: Option<String> = None;
let mut lifecycle = lifecycle.map(ResponseStreamLifecycleRecorder::new);
loop {
let start = Instant::now();
@@ -438,20 +461,35 @@ pub async fn process_sse(
Ok(Some(Ok(sse))) => sse,
Ok(Some(Err(e))) => {
debug!("SSE Error: {e:#}");
let _ = tx_event.send(Err(ApiError::Stream(e.to_string()))).await;
let error = ApiError::Stream(e.to_string());
let error = finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::StreamError,
error,
);
let _ = tx_event.send(Err(error)).await;
return;
}
Ok(None) => {
let error = response_error.unwrap_or(ApiError::Stream(
"stream closed before response.completed".into(),
));
let error = response_error.unwrap_or_else(|| {
let error = ApiError::Stream("stream closed before response.completed".into());
finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::ClosedBeforeCompletion,
error,
)
});
let _ = tx_event.send(Err(error)).await;
return;
}
Err(_) => {
let _ = tx_event
.send(Err(ApiError::Stream("idle timeout waiting for SSE".into())))
.await;
let error = ApiError::Stream("idle timeout waiting for SSE".into());
let error = finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::IdleTimeout,
error,
);
let _ = tx_event.send(Err(error)).await;
return;
}
};
@@ -466,6 +504,9 @@ pub async fn process_sse(
}
};
let model_verifications = event.model_verifications();
if let Some(lifecycle) = lifecycle.as_mut() {
lifecycle.observe_event(&event);
}
if let Some(model) = event.response_model()
&& last_server_model.as_deref() != Some(model.as_str())
@@ -495,12 +536,20 @@ pub async fn process_sse(
return;
}
if is_completed {
if let Some(lifecycle) = lifecycle.as_mut() {
lifecycle.finalize_completed();
}
return;
}
}
Ok(None) => {}
Err(error) => {
response_error = Some(error.into_api_error());
let error = error.into_api_error();
response_error = Some(finalize_lifecycle_error(
&mut lifecycle,
ResponseStreamTerminalState::StreamError,
error,
));
}
};
}
@@ -607,6 +656,7 @@ mod tests {
tx,
idle_timeout(),
/*telemetry*/ None,
/*lifecycle*/ None,
));
let mut events = Vec::new();
@@ -638,6 +688,7 @@ mod tests {
tx,
idle_timeout(),
/*telemetry*/ None,
/*lifecycle*/ None,
));
let mut out = Vec::new();
@@ -831,6 +882,7 @@ mod tests {
tx,
idle_timeout(),
/*telemetry*/ None,
/*lifecycle*/ None,
));
let events = tokio::time::timeout(Duration::from_millis(1000), async {
@@ -989,7 +1041,7 @@ mod tests {
}
fn is_created(ev: &ResponseEvent) -> bool {
matches!(ev, ResponseEvent::Created)
matches!(ev, ResponseEvent::Created { .. })
}
fn is_output(ev: &ResponseEvent) -> bool {
matches!(ev, ResponseEvent::OutputItemDone(_))
@@ -1076,6 +1128,7 @@ mod tests {
idle_timeout(),
/*telemetry*/ None,
/*turn_state*/ None,
/*lifecycle*/ None,
);
let event = stream
.rx_event
@@ -1116,6 +1169,7 @@ mod tests {
idle_timeout(),
/*telemetry*/ None,
/*turn_state*/ None,
/*lifecycle*/ None,
);
let mut events = Vec::new();
while let Some(event) = stream.rx_event.recv().await {
@@ -1150,7 +1204,12 @@ mod tests {
.await;
assert_eq!(events.len(), 2);
assert_matches!(&events[0], ResponseEvent::Created);
assert_matches!(
&events[0],
ResponseEvent::Created {
response_id: Some(response_id)
} if response_id == "resp-1"
);
assert_matches!(
&events[1],
ResponseEvent::Completed {
@@ -1187,7 +1246,12 @@ mod tests {
&events[0],
ResponseEvent::ServerModel(model) if model == CYBER_RESTRICTED_MODEL_FOR_TESTS
);
assert_matches!(&events[1], ResponseEvent::Created);
assert_matches!(
&events[1],
ResponseEvent::Created {
response_id: Some(response_id)
} if response_id == "resp-1"
);
assert_matches!(
&events[2],
ResponseEvent::Completed {

View File

@@ -0,0 +1,431 @@
//! Captures lifecycle breadcrumbs for Responses streams so transport failures can
//! include useful diagnostics in logs and error messages.
use crate::error::ApiError;
use crate::sse::ResponsesStreamEvent;
use std::fmt;
use std::time::Instant;
use tracing::warn;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ResponseStreamLifecycleOptions {
pub attempt: u64,
pub transport: ResponseStreamTransport,
}
impl ResponseStreamLifecycleOptions {
pub fn new(attempt: u64, transport: ResponseStreamTransport) -> Self {
Self { attempt, transport }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResponseStreamTransport {
ResponsesHttp,
ResponsesWebsocket,
}
impl ResponseStreamTransport {
fn as_str(self) -> &'static str {
match self {
Self::ResponsesHttp => "responses_http",
Self::ResponsesWebsocket => "responses_websocket",
}
}
}
impl fmt::Display for ResponseStreamTransport {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ResponseStreamTerminalState {
Completed,
ClosedBeforeCompletion,
IdleTimeout,
StreamError,
}
impl ResponseStreamTerminalState {
fn as_str(self) -> &'static str {
match self {
Self::Completed => "completed",
Self::ClosedBeforeCompletion => "closed_before_completion",
Self::IdleTimeout => "idle_timeout",
Self::StreamError => "stream_error",
}
}
}
impl fmt::Display for ResponseStreamTerminalState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ResponseStreamLifecycleSummary {
options: ResponseStreamLifecycleOptions,
terminal_state: ResponseStreamTerminalState,
created_response_id: Option<String>,
completed_response_id: Option<String>,
first_event_elapsed_ms: Option<u64>,
last_event_elapsed_ms: Option<u64>,
last_event_kind: Option<String>,
first_output_item_added_elapsed_ms: Option<u64>,
first_output_item_done_elapsed_ms: Option<u64>,
first_output_text_delta_elapsed_ms: Option<u64>,
observed_event_kinds: Vec<String>,
event_count: u64,
}
impl ResponseStreamLifecycleSummary {
fn ids_mismatch(&self) -> bool {
match (&self.created_response_id, &self.completed_response_id) {
(Some(created), Some(completed)) => created != completed,
_ => false,
}
}
fn diagnostic_phrase(&self) -> String {
match self.terminal_state {
ResponseStreamTerminalState::Completed if self.ids_mismatch() => {
"stream completed with mismatched response IDs".to_string()
}
ResponseStreamTerminalState::Completed => "stream completed".to_string(),
ResponseStreamTerminalState::IdleTimeout if self.event_count == 0 => {
"stream timed out before receiving any events".to_string()
}
ResponseStreamTerminalState::IdleTimeout
if self.first_output_text_delta_elapsed_ms.is_some() =>
{
"stream timed out after durable text output started".to_string()
}
ResponseStreamTerminalState::IdleTimeout
if self.first_output_item_done_elapsed_ms.is_some() =>
{
"stream timed out after response.output_item.done".to_string()
}
ResponseStreamTerminalState::IdleTimeout
if self.first_output_item_added_elapsed_ms.is_some() =>
{
"stream timed out after response.output_item.added, before durable output"
.to_string()
}
ResponseStreamTerminalState::IdleTimeout => {
"stream timed out after receiving events".to_string()
}
ResponseStreamTerminalState::ClosedBeforeCompletion if self.event_count == 0 => {
"stream closed before response.completed and before receiving any events"
.to_string()
}
ResponseStreamTerminalState::ClosedBeforeCompletion => {
"stream closed before response.completed".to_string()
}
ResponseStreamTerminalState::StreamError if self.event_count == 0 => {
"stream failed before receiving any events".to_string()
}
ResponseStreamTerminalState::StreamError => {
"stream failed after receiving events".to_string()
}
}
}
fn log(&self) {
// Keep lifecycle logs focused on streams that did not complete normally,
// plus the suspicious case where created/completed response IDs disagree.
if self.terminal_state == ResponseStreamTerminalState::Completed && !self.ids_mismatch() {
return;
}
let observed_event_kinds = self.observed_event_kinds.join(",");
warn!(
stream_attempt = self.options.attempt,
stream_transport = %self.options.transport,
stream_terminal_state = %self.terminal_state,
stream_created_response_id = self.created_response_id.as_deref().unwrap_or(""),
stream_completed_response_id = self.completed_response_id.as_deref().unwrap_or(""),
stream_first_event_elapsed_ms = ?self.first_event_elapsed_ms,
stream_last_event_elapsed_ms = ?self.last_event_elapsed_ms,
stream_last_event_kind = self.last_event_kind.as_deref().unwrap_or(""),
stream_first_output_item_added_elapsed_ms = ?self.first_output_item_added_elapsed_ms,
stream_first_output_item_done_elapsed_ms = ?self.first_output_item_done_elapsed_ms,
stream_first_output_text_delta_elapsed_ms = ?self.first_output_text_delta_elapsed_ms,
stream_observed_event_kinds = %observed_event_kinds,
stream_event_count = self.event_count,
stream_diagnostic = %self.diagnostic_phrase(),
"responses stream lifecycle"
);
}
fn error_detail(&self) -> String {
let mut parts = vec![
format!("diagnostic: {}", self.diagnostic_phrase()),
format!("transport={}", self.options.transport),
format!("attempt={}", self.options.attempt),
format!("terminal={}", self.terminal_state),
format!("events={}", self.event_count),
];
if let Some(kind) = &self.last_event_kind {
parts.push(format!("last_event={kind}"));
}
if let Some(id) = &self.created_response_id {
parts.push(format!("created_response_id={id}"));
}
if let Some(id) = &self.completed_response_id {
parts.push(format!("completed_response_id={id}"));
}
if let Some(elapsed) = self.first_event_elapsed_ms {
parts.push(format!("first_event_ms={elapsed}"));
}
if let Some(elapsed) = self.last_event_elapsed_ms {
parts.push(format!("last_event_ms={elapsed}"));
}
if let Some(elapsed) = self.first_output_item_added_elapsed_ms {
parts.push(format!("first_output_item_added_ms={elapsed}"));
}
if let Some(elapsed) = self.first_output_item_done_elapsed_ms {
parts.push(format!("first_output_item_done_ms={elapsed}"));
}
if let Some(elapsed) = self.first_output_text_delta_elapsed_ms {
parts.push(format!("first_output_text_delta_ms={elapsed}"));
}
if !self.observed_event_kinds.is_empty() {
parts.push(format!(
"observed_event_kinds={}",
self.observed_event_kinds.join(",")
));
}
parts.join("; ")
}
fn decorate_error(&self, error: ApiError) -> ApiError {
match error {
ApiError::Stream(message) => ApiError::Stream(format!(
"{message}. Stream lifecycle: {}",
self.error_detail()
)),
other => other,
}
}
}
#[derive(Debug)]
pub(crate) struct ResponseStreamLifecycleRecorder {
options: ResponseStreamLifecycleOptions,
started_at: Instant,
created_response_id: Option<String>,
completed_response_id: Option<String>,
first_event_elapsed_ms: Option<u64>,
last_event_elapsed_ms: Option<u64>,
last_event_kind: Option<String>,
first_output_item_added_elapsed_ms: Option<u64>,
first_output_item_done_elapsed_ms: Option<u64>,
first_output_text_delta_elapsed_ms: Option<u64>,
observed_event_kinds: Vec<String>,
event_count: u64,
finalized: bool,
}
pub(crate) fn finalize_lifecycle_error(
lifecycle: &mut Option<ResponseStreamLifecycleRecorder>,
terminal_state: ResponseStreamTerminalState,
error: ApiError,
) -> ApiError {
if let Some(lifecycle) = lifecycle.as_mut() {
lifecycle.finalize_error(terminal_state, error)
} else {
error
}
}
impl ResponseStreamLifecycleRecorder {
pub(crate) fn new(options: ResponseStreamLifecycleOptions) -> Self {
Self {
options,
started_at: Instant::now(),
created_response_id: None,
completed_response_id: None,
first_event_elapsed_ms: None,
last_event_elapsed_ms: None,
last_event_kind: None,
first_output_item_added_elapsed_ms: None,
first_output_item_done_elapsed_ms: None,
first_output_text_delta_elapsed_ms: None,
observed_event_kinds: Vec::new(),
event_count: 0,
finalized: false,
}
}
pub(crate) fn observe_event(&mut self, event: &ResponsesStreamEvent) {
let elapsed_ms = self.elapsed_ms();
let kind = event.kind();
self.event_count += 1;
self.first_event_elapsed_ms.get_or_insert(elapsed_ms);
self.last_event_elapsed_ms = Some(elapsed_ms);
self.last_event_kind = Some(kind.to_string());
if !self
.observed_event_kinds
.iter()
.any(|observed| observed == kind)
{
self.observed_event_kinds.push(kind.to_string());
}
match kind {
"response.created" => {
if self.created_response_id.is_none() {
self.created_response_id = event.response_id().map(str::to_string);
}
}
"response.completed" => {
if self.completed_response_id.is_none() {
self.completed_response_id = event.response_id().map(str::to_string);
}
}
"response.output_item.added" => {
self.first_output_item_added_elapsed_ms
.get_or_insert(elapsed_ms);
}
"response.output_item.done" => {
self.first_output_item_done_elapsed_ms
.get_or_insert(elapsed_ms);
}
"response.output_text.delta" => {
self.first_output_text_delta_elapsed_ms
.get_or_insert(elapsed_ms);
}
_ => {}
}
}
pub(crate) fn finalize_completed(&mut self) {
let Some(summary) = self.finalize(ResponseStreamTerminalState::Completed) else {
return;
};
summary.log();
}
pub(crate) fn finalize_error(
&mut self,
terminal_state: ResponseStreamTerminalState,
error: ApiError,
) -> ApiError {
let Some(summary) = self.finalize(terminal_state) else {
return error;
};
summary.log();
summary.decorate_error(error)
}
fn finalize(
&mut self,
terminal_state: ResponseStreamTerminalState,
) -> Option<ResponseStreamLifecycleSummary> {
if self.finalized {
return None;
}
self.finalized = true;
Some(ResponseStreamLifecycleSummary {
options: self.options,
terminal_state,
created_response_id: self.created_response_id.clone(),
completed_response_id: self.completed_response_id.clone(),
first_event_elapsed_ms: self.first_event_elapsed_ms,
last_event_elapsed_ms: self.last_event_elapsed_ms,
last_event_kind: self.last_event_kind.clone(),
first_output_item_added_elapsed_ms: self.first_output_item_added_elapsed_ms,
first_output_item_done_elapsed_ms: self.first_output_item_done_elapsed_ms,
first_output_text_delta_elapsed_ms: self.first_output_text_delta_elapsed_ms,
observed_event_kinds: self.observed_event_kinds.clone(),
event_count: self.event_count,
})
}
fn elapsed_ms(&self) -> u64 {
u64::try_from(self.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn records_response_ids_milestones_and_observed_kinds_in_order() {
let mut recorder =
ResponseStreamLifecycleRecorder::new(ResponseStreamLifecycleOptions::new(
/*attempt*/ 2,
ResponseStreamTransport::ResponsesHttp,
));
for event in [
json!({"type": "response.created", "response": {"id": "resp-created"}}),
json!({"type": "response.output_item.added", "item": {"type": "message"}}),
json!({"type": "response.output_text.delta", "delta": "hi"}),
json!({"type": "response.output_item.done", "item": {"type": "message"}}),
json!({"type": "response.output_text.delta", "delta": " again"}),
json!({"type": "response.completed", "response": {"id": "resp-completed"}}),
] {
let event: ResponsesStreamEvent = serde_json::from_value(event).unwrap();
recorder.observe_event(&event);
}
let summary = recorder
.finalize(ResponseStreamTerminalState::Completed)
.expect("summary should finalize");
assert_eq!(summary.created_response_id.as_deref(), Some("resp-created"));
assert_eq!(
summary.completed_response_id.as_deref(),
Some("resp-completed")
);
assert_eq!(
summary.observed_event_kinds,
vec![
"response.created",
"response.output_item.added",
"response.output_text.delta",
"response.output_item.done",
"response.completed"
]
);
assert_eq!(summary.event_count, 6);
assert!(summary.first_output_item_added_elapsed_ms.is_some());
assert!(summary.first_output_item_done_elapsed_ms.is_some());
assert!(summary.first_output_text_delta_elapsed_ms.is_some());
assert!(summary.ids_mismatch());
}
#[test]
fn stream_errors_gain_diagnostic_context() {
let mut recorder =
ResponseStreamLifecycleRecorder::new(ResponseStreamLifecycleOptions::new(
/*attempt*/ 1,
ResponseStreamTransport::ResponsesWebsocket,
));
let event: ResponsesStreamEvent = serde_json::from_value(json!({
"type": "response.output_item.added",
"item": {"type": "message"}
}))
.unwrap();
recorder.observe_event(&event);
let error = recorder.finalize_error(
ResponseStreamTerminalState::IdleTimeout,
ApiError::Stream("idle timeout waiting for websocket".to_string()),
);
let ApiError::Stream(message) = error else {
panic!("expected stream error");
};
assert!(message.contains("stream timed out after response.output_item.added"));
assert!(message.contains("transport=responses_websocket"));
assert!(message.contains("observed_event_kinds=response.output_item.added"));
}
}

View File

@@ -47,6 +47,8 @@ use codex_api::Reasoning;
use codex_api::RequestTelemetry;
use codex_api::ReqwestTransport;
use codex_api::ResponseCreateWsRequest;
use codex_api::ResponseStreamLifecycleOptions;
use codex_api::ResponseStreamTransport;
use codex_api::ResponsesApiRequest;
use codex_api::ResponsesClient as ApiResponsesClient;
use codex_api::ResponsesOptions as ApiResponsesOptions;
@@ -1163,6 +1165,7 @@ impl ModelClientSession {
summary: ReasoningSummaryConfig,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
stream_attempt: Option<u64>,
inference_trace: &InferenceTraceContext,
) -> Result<ResponseStream> {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
@@ -1218,7 +1221,12 @@ impl ModelClientSession {
client_setup.api_auth,
)
.with_telemetry(Some(request_telemetry), Some(sse_telemetry));
let stream_result = client.stream_request(request, options).await;
let lifecycle = stream_attempt.map(|attempt| {
ResponseStreamLifecycleOptions::new(attempt, ResponseStreamTransport::ResponsesHttp)
});
let stream_result = client
.stream_request_with_lifecycle(request, options, lifecycle)
.await;
match stream_result {
Ok(stream) => {
@@ -1277,6 +1285,7 @@ impl ModelClientSession {
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
warmup: bool,
stream_attempt: Option<u64>,
request_trace: Option<W3cTraceContext>,
inference_trace: &InferenceTraceContext,
) -> Result<WebsocketStreamOutcome> {
@@ -1367,8 +1376,18 @@ impl ModelClientSession {
"websocket connection is unavailable".to_string(),
))
})?;
let lifecycle = stream_attempt.map(|attempt| {
ResponseStreamLifecycleOptions::new(
attempt,
ResponseStreamTransport::ResponsesWebsocket,
)
});
let stream_result = websocket_connection
.stream_request(ws_request, self.websocket_session.connection_reused())
.stream_request_with_lifecycle(
ws_request,
self.websocket_session.connection_reused(),
lifecycle,
)
.await
.map_err(|err| {
let err = map_api_error(err);
@@ -1449,6 +1468,7 @@ impl ModelClientSession {
service_tier,
turn_metadata_header,
/*warmup*/ true,
/*stream_attempt*/ None,
current_span_w3c_trace_context(),
&disabled_trace,
)
@@ -1492,6 +1512,33 @@ impl ModelClientSession {
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
inference_trace: &InferenceTraceContext,
) -> Result<ResponseStream> {
self.stream_with_attempt(
prompt,
model_info,
session_telemetry,
effort,
summary,
service_tier,
turn_metadata_header,
/*stream_attempt*/ 1,
inference_trace,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn stream_with_attempt(
&mut self,
prompt: &Prompt,
model_info: &ModelInfo,
session_telemetry: &SessionTelemetry,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
stream_attempt: u64,
inference_trace: &InferenceTraceContext,
) -> Result<ResponseStream> {
let wire_api = self.client.state.provider.info().wire_api;
match wire_api {
@@ -1508,6 +1555,7 @@ impl ModelClientSession {
service_tier,
turn_metadata_header,
/*warmup*/ false,
Some(stream_attempt),
request_trace,
inference_trace,
)
@@ -1528,6 +1576,7 @@ impl ModelClientSession {
summary,
service_tier,
turn_metadata_header,
Some(stream_attempt),
inference_trace,
)
.await

View File

@@ -1074,6 +1074,7 @@ async fn run_sampling_request(
Arc::clone(&turn_context),
client_session,
turn_metadata_header,
retries + 1,
Arc::clone(&turn_diff_tracker),
&prompt,
cancellation_token.child_token(),
@@ -1865,6 +1866,7 @@ async fn try_run_sampling_request(
turn_context: Arc<TurnContext>,
client_session: &mut ModelClientSession,
turn_metadata_header: Option<&str>,
stream_attempt: u64,
turn_diff_tracker: SharedTurnDiffTracker,
prompt: &Prompt,
cancellation_token: CancellationToken,
@@ -1883,7 +1885,7 @@ async fn try_run_sampling_request(
turn_context.provider.info().name.as_str(),
);
let mut stream = client_session
.stream(
.stream_with_attempt(
prompt,
&turn_context.model_info,
&turn_context.session_telemetry,
@@ -1891,6 +1893,7 @@ async fn try_run_sampling_request(
turn_context.reasoning_summary,
turn_context.config.service_tier,
turn_metadata_header,
stream_attempt,
&inference_trace,
)
.instrument(trace_span!("stream_request"))
@@ -1946,7 +1949,7 @@ async fn try_run_sampling_request(
record_turn_ttft_metric(&turn_context, &event).await;
match event {
ResponseEvent::Created => {}
ResponseEvent::Created { .. } => {}
ResponseEvent::OutputItemDone(item) => {
if let Some((_, mut consumer)) = active_tool_argument_diff_consumer.take()
&& let Some(event) = consumer.flush_on_complete()

View File

@@ -141,7 +141,7 @@ fn response_event_records_turn_ttft(event: &ResponseEvent) -> bool {
ResponseEvent::OutputTextDelta(_)
| ResponseEvent::ReasoningSummaryDelta { .. }
| ResponseEvent::ReasoningContentDelta { .. } => true,
ResponseEvent::Created
ResponseEvent::Created { .. }
| ResponseEvent::ServerModel(_)
| ResponseEvent::ModelVerifications(_)
| ResponseEvent::ServerReasoningIncluded(_)

View File

@@ -23,7 +23,7 @@ async fn turn_timing_state_records_ttft_only_once_per_turn() {
state.mark_turn_started(Instant::now()).await;
assert_eq!(
state
.record_ttft_for_response_event(&ResponseEvent::Created)
.record_ttft_for_response_event(&ResponseEvent::Created { response_id: None })
.await,
None
);

View File

@@ -2802,14 +2802,23 @@ async fn incomplete_response_emits_content_filter_error_message() -> anyhow::Res
.await?;
let error_event = wait_for_event(&codex, |ev| matches!(ev, EventMsg::Error(_))).await;
let EventMsg::Error(err) = error_event else {
panic!("expected incomplete content filter error; got {error_event:?}");
};
assert!(
matches!(
error_event,
EventMsg::Error(ref err)
if err.message
== "stream disconnected before completion: Incomplete response returned, reason: content_filter"
err.message.starts_with(
"stream disconnected before completion: Incomplete response returned, reason: content_filter"
),
"expected incomplete content filter error; got {error_event:?}"
"expected incomplete content filter error; got {err:?}"
);
assert!(
err.message
.contains("Stream lifecycle: diagnostic: stream failed after receiving events"),
"expected lifecycle diagnostic; got {err:?}"
);
assert!(
err.message.contains("last_event=response.incomplete"),
"expected incomplete event diagnostic; got {err:?}"
);
assert_eq!(responses_mock.requests().len(), 1);

View File

@@ -1045,7 +1045,7 @@ impl SessionTelemetry {
fn responses_type(event: &ResponseEvent) -> String {
match event {
ResponseEvent::Created => "created".into(),
ResponseEvent::Created { .. } => "created".into(),
ResponseEvent::OutputItemDone(item) | ResponseEvent::OutputItemAdded(item) => {
SessionTelemetry::responses_item_type(item)
}