mirror of
https://github.com/openai/codex.git
synced 2026-05-05 03:47:01 +00:00
Compare commits
8 Commits
automation
...
etraut/str
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c2f5fea39 | ||
|
|
0b60009a6e | ||
|
|
028d38a738 | ||
|
|
2cf65451e5 | ||
|
|
e12af2274c | ||
|
|
7771009b47 | ||
|
|
00db3db537 | ||
|
|
8e815b1fde |
@@ -66,7 +66,9 @@ pub struct MemorySummarizeOutput {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ResponseEvent {
|
||||
Created,
|
||||
Created {
|
||||
response_id: Option<String>,
|
||||
},
|
||||
OutputItemDone(ResponseItem),
|
||||
OutputItemAdded(ResponseItem),
|
||||
/// Emitted when the server includes `OpenAI-Model` on the stream response.
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::requests::headers::build_conversation_headers;
|
||||
use crate::requests::headers::insert_header;
|
||||
use crate::requests::headers::subagent_header;
|
||||
use crate::sse::spawn_response_stream;
|
||||
use crate::stream_lifecycle::ResponseStreamLifecycleOptions;
|
||||
use crate::telemetry::SseTelemetry;
|
||||
use codex_client::HttpTransport;
|
||||
use codex_client::RequestCompression;
|
||||
@@ -70,6 +71,16 @@ impl<T: HttpTransport> ResponsesClient<T> {
|
||||
&self,
|
||||
request: ResponsesApiRequest,
|
||||
options: ResponsesOptions,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
self.stream_request_with_lifecycle(request, options, /*lifecycle*/ None)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn stream_request_with_lifecycle(
|
||||
&self,
|
||||
request: ResponsesApiRequest,
|
||||
options: ResponsesOptions,
|
||||
lifecycle: Option<ResponseStreamLifecycleOptions>,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
let ResponsesOptions {
|
||||
conversation_id,
|
||||
@@ -94,7 +105,8 @@ impl<T: HttpTransport> ResponsesClient<T> {
|
||||
insert_header(&mut headers, "x-openai-subagent", &subagent);
|
||||
}
|
||||
|
||||
self.stream(body, headers, compression, turn_state).await
|
||||
self.stream_with_lifecycle(body, headers, compression, turn_state, lifecycle)
|
||||
.await
|
||||
}
|
||||
|
||||
fn path() -> &'static str {
|
||||
@@ -118,6 +130,24 @@ impl<T: HttpTransport> ResponsesClient<T> {
|
||||
extra_headers: HeaderMap,
|
||||
compression: Compression,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
self.stream_with_lifecycle(
|
||||
body,
|
||||
extra_headers,
|
||||
compression,
|
||||
turn_state,
|
||||
/*lifecycle*/ None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn stream_with_lifecycle(
|
||||
&self,
|
||||
body: Value,
|
||||
extra_headers: HeaderMap,
|
||||
compression: Compression,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
lifecycle: Option<ResponseStreamLifecycleOptions>,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
let request_compression = match compression {
|
||||
Compression::None => RequestCompression::None,
|
||||
@@ -146,6 +176,7 @@ impl<T: HttpTransport> ResponsesClient<T> {
|
||||
self.session.provider().stream_idle_timeout,
|
||||
self.sse_telemetry.clone(),
|
||||
turn_state,
|
||||
lifecycle,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,10 @@ use crate::provider::Provider;
|
||||
use crate::rate_limits::parse_rate_limit_event;
|
||||
use crate::sse::ResponsesStreamEvent;
|
||||
use crate::sse::process_responses_event;
|
||||
use crate::stream_lifecycle::ResponseStreamLifecycleOptions;
|
||||
use crate::stream_lifecycle::ResponseStreamLifecycleRecorder;
|
||||
use crate::stream_lifecycle::ResponseStreamTerminalState;
|
||||
use crate::stream_lifecycle::finalize_lifecycle_error;
|
||||
use crate::telemetry::WebsocketTelemetry;
|
||||
use codex_client::TransportError;
|
||||
use codex_client::maybe_build_rustls_client_config_with_custom_ca;
|
||||
@@ -214,6 +218,16 @@ impl ResponsesWebsocketConnection {
|
||||
&self,
|
||||
request: ResponsesWsRequest,
|
||||
connection_reused: bool,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
self.stream_request_with_lifecycle(request, connection_reused, /*lifecycle*/ None)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn stream_request_with_lifecycle(
|
||||
&self,
|
||||
request: ResponsesWsRequest,
|
||||
connection_reused: bool,
|
||||
lifecycle: Option<ResponseStreamLifecycleOptions>,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
let (tx_event, rx_event) =
|
||||
mpsc::channel::<std::result::Result<ResponseEvent, ApiError>>(1600);
|
||||
@@ -263,6 +277,7 @@ impl ResponsesWebsocketConnection {
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
connection_reused,
|
||||
lifecycle,
|
||||
)
|
||||
.await
|
||||
};
|
||||
@@ -540,8 +555,10 @@ async fn run_websocket_response_stream(
|
||||
idle_timeout: Duration,
|
||||
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
|
||||
connection_reused: bool,
|
||||
lifecycle: Option<ResponseStreamLifecycleOptions>,
|
||||
) -> Result<(), ApiError> {
|
||||
let mut last_server_model: Option<String> = None;
|
||||
let mut lifecycle = lifecycle.map(ResponseStreamLifecycleRecorder::new);
|
||||
let request_text = match serde_json::to_string(&request_body) {
|
||||
Ok(text) => text,
|
||||
Err(err) => {
|
||||
@@ -579,15 +596,27 @@ async fn run_websocket_response_stream(
|
||||
let message = match response {
|
||||
Ok(Some(Ok(msg))) => msg,
|
||||
Ok(Some(Err(err))) => {
|
||||
return Err(ApiError::Stream(err.to_string()));
|
||||
let error = ApiError::Stream(err.to_string());
|
||||
return Err(finalize_lifecycle_error(
|
||||
&mut lifecycle,
|
||||
ResponseStreamTerminalState::StreamError,
|
||||
error,
|
||||
));
|
||||
}
|
||||
Ok(None) => {
|
||||
return Err(ApiError::Stream(
|
||||
"stream closed before response.completed".into(),
|
||||
let error = ApiError::Stream("stream closed before response.completed".into());
|
||||
return Err(finalize_lifecycle_error(
|
||||
&mut lifecycle,
|
||||
ResponseStreamTerminalState::ClosedBeforeCompletion,
|
||||
error,
|
||||
));
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(err);
|
||||
return Err(finalize_lifecycle_error(
|
||||
&mut lifecycle,
|
||||
ResponseStreamTerminalState::IdleTimeout,
|
||||
err,
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -609,6 +638,9 @@ async fn run_websocket_response_stream(
|
||||
}
|
||||
};
|
||||
let model_verifications = event.model_verifications();
|
||||
if let Some(lifecycle) = lifecycle.as_mut() {
|
||||
lifecycle.observe_event(&event);
|
||||
}
|
||||
if event.kind() == "codex.rate_limits" {
|
||||
if let Some(snapshot) = parse_rate_limit_event(&text) {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::RateLimits(snapshot))).await;
|
||||
@@ -638,21 +670,38 @@ async fn run_websocket_response_stream(
|
||||
let is_completed = matches!(event, ResponseEvent::Completed { .. });
|
||||
let _ = tx_event.send(Ok(event)).await;
|
||||
if is_completed {
|
||||
if let Some(lifecycle) = lifecycle.as_mut() {
|
||||
lifecycle.finalize_completed();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(error) => {
|
||||
return Err(error.into_api_error());
|
||||
let error = error.into_api_error();
|
||||
return Err(finalize_lifecycle_error(
|
||||
&mut lifecycle,
|
||||
ResponseStreamTerminalState::StreamError,
|
||||
error,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
Message::Binary(_) => {
|
||||
return Err(ApiError::Stream("unexpected binary websocket event".into()));
|
||||
let error = ApiError::Stream("unexpected binary websocket event".into());
|
||||
return Err(finalize_lifecycle_error(
|
||||
&mut lifecycle,
|
||||
ResponseStreamTerminalState::StreamError,
|
||||
error,
|
||||
));
|
||||
}
|
||||
Message::Close(_) => {
|
||||
return Err(ApiError::Stream(
|
||||
"websocket closed by server before response.completed".into(),
|
||||
let error =
|
||||
ApiError::Stream("websocket closed by server before response.completed".into());
|
||||
return Err(finalize_lifecycle_error(
|
||||
&mut lifecycle,
|
||||
ResponseStreamTerminalState::ClosedBeforeCompletion,
|
||||
error,
|
||||
));
|
||||
}
|
||||
Message::Frame(_) => {}
|
||||
|
||||
@@ -8,6 +8,7 @@ pub(crate) mod provider;
|
||||
pub(crate) mod rate_limits;
|
||||
pub(crate) mod requests;
|
||||
pub(crate) mod sse;
|
||||
mod stream_lifecycle;
|
||||
pub(crate) mod telemetry;
|
||||
|
||||
pub use crate::requests::headers::build_conversation_headers;
|
||||
@@ -63,6 +64,8 @@ pub use crate::provider::RetryConfig;
|
||||
pub use crate::provider::is_azure_responses_provider;
|
||||
pub use crate::requests::Compression;
|
||||
pub use crate::sse::stream_from_fixture;
|
||||
pub use crate::stream_lifecycle::ResponseStreamLifecycleOptions;
|
||||
pub use crate::stream_lifecycle::ResponseStreamTransport;
|
||||
pub use crate::telemetry::SseTelemetry;
|
||||
pub use crate::telemetry::WebsocketTelemetry;
|
||||
pub use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
|
||||
@@ -2,6 +2,10 @@ use crate::common::ResponseEvent;
|
||||
use crate::common::ResponseStream;
|
||||
use crate::error::ApiError;
|
||||
use crate::rate_limits::parse_all_rate_limits;
|
||||
use crate::stream_lifecycle::ResponseStreamLifecycleOptions;
|
||||
use crate::stream_lifecycle::ResponseStreamLifecycleRecorder;
|
||||
use crate::stream_lifecycle::ResponseStreamTerminalState;
|
||||
use crate::stream_lifecycle::finalize_lifecycle_error;
|
||||
use crate::telemetry::SseTelemetry;
|
||||
use codex_client::ByteStream;
|
||||
use codex_client::StreamResponse;
|
||||
@@ -52,6 +56,7 @@ pub fn stream_from_fixture(
|
||||
tx_event,
|
||||
idle_timeout,
|
||||
/*telemetry*/ None,
|
||||
/*lifecycle*/ None,
|
||||
));
|
||||
Ok(ResponseStream { rx_event })
|
||||
}
|
||||
@@ -61,6 +66,7 @@ pub fn spawn_response_stream(
|
||||
idle_timeout: Duration,
|
||||
telemetry: Option<Arc<dyn SseTelemetry>>,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
lifecycle: Option<ResponseStreamLifecycleOptions>,
|
||||
) -> ResponseStream {
|
||||
let rate_limit_snapshots = parse_all_rate_limits(&stream_response.headers);
|
||||
let models_etag = stream_response
|
||||
@@ -101,7 +107,14 @@ pub fn spawn_response_stream(
|
||||
.send(Ok(ResponseEvent::ServerReasoningIncluded(true)))
|
||||
.await;
|
||||
}
|
||||
process_sse(stream_response.bytes, tx_event, idle_timeout, telemetry).await;
|
||||
process_sse(
|
||||
stream_response.bytes,
|
||||
tx_event,
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
lifecycle,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
ResponseStream { rx_event }
|
||||
@@ -184,6 +197,13 @@ impl ResponsesStreamEvent {
|
||||
&self.kind
|
||||
}
|
||||
|
||||
pub(crate) fn response_id(&self) -> Option<&str> {
|
||||
self.response
|
||||
.as_ref()
|
||||
.and_then(|response| response.get("id"))
|
||||
.and_then(Value::as_str)
|
||||
}
|
||||
|
||||
/// Returns the effective model reported by the server, if present.
|
||||
///
|
||||
/// Precedence:
|
||||
@@ -328,7 +348,8 @@ pub fn process_responses_event(
|
||||
}
|
||||
"response.created" => {
|
||||
if event.response.is_some() {
|
||||
return Ok(Some(ResponseEvent::Created {}));
|
||||
let response_id = event.response_id().map(str::to_string);
|
||||
return Ok(Some(ResponseEvent::Created { response_id }));
|
||||
}
|
||||
}
|
||||
"response.failed" => {
|
||||
@@ -423,10 +444,12 @@ pub async fn process_sse(
|
||||
tx_event: mpsc::Sender<Result<ResponseEvent, ApiError>>,
|
||||
idle_timeout: Duration,
|
||||
telemetry: Option<Arc<dyn SseTelemetry>>,
|
||||
lifecycle: Option<ResponseStreamLifecycleOptions>,
|
||||
) {
|
||||
let mut stream = stream.eventsource();
|
||||
let mut response_error: Option<ApiError> = None;
|
||||
let mut last_server_model: Option<String> = None;
|
||||
let mut lifecycle = lifecycle.map(ResponseStreamLifecycleRecorder::new);
|
||||
|
||||
loop {
|
||||
let start = Instant::now();
|
||||
@@ -438,20 +461,35 @@ pub async fn process_sse(
|
||||
Ok(Some(Ok(sse))) => sse,
|
||||
Ok(Some(Err(e))) => {
|
||||
debug!("SSE Error: {e:#}");
|
||||
let _ = tx_event.send(Err(ApiError::Stream(e.to_string()))).await;
|
||||
let error = ApiError::Stream(e.to_string());
|
||||
let error = finalize_lifecycle_error(
|
||||
&mut lifecycle,
|
||||
ResponseStreamTerminalState::StreamError,
|
||||
error,
|
||||
);
|
||||
let _ = tx_event.send(Err(error)).await;
|
||||
return;
|
||||
}
|
||||
Ok(None) => {
|
||||
let error = response_error.unwrap_or(ApiError::Stream(
|
||||
"stream closed before response.completed".into(),
|
||||
));
|
||||
let error = response_error.unwrap_or_else(|| {
|
||||
let error = ApiError::Stream("stream closed before response.completed".into());
|
||||
finalize_lifecycle_error(
|
||||
&mut lifecycle,
|
||||
ResponseStreamTerminalState::ClosedBeforeCompletion,
|
||||
error,
|
||||
)
|
||||
});
|
||||
let _ = tx_event.send(Err(error)).await;
|
||||
return;
|
||||
}
|
||||
Err(_) => {
|
||||
let _ = tx_event
|
||||
.send(Err(ApiError::Stream("idle timeout waiting for SSE".into())))
|
||||
.await;
|
||||
let error = ApiError::Stream("idle timeout waiting for SSE".into());
|
||||
let error = finalize_lifecycle_error(
|
||||
&mut lifecycle,
|
||||
ResponseStreamTerminalState::IdleTimeout,
|
||||
error,
|
||||
);
|
||||
let _ = tx_event.send(Err(error)).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -466,6 +504,9 @@ pub async fn process_sse(
|
||||
}
|
||||
};
|
||||
let model_verifications = event.model_verifications();
|
||||
if let Some(lifecycle) = lifecycle.as_mut() {
|
||||
lifecycle.observe_event(&event);
|
||||
}
|
||||
|
||||
if let Some(model) = event.response_model()
|
||||
&& last_server_model.as_deref() != Some(model.as_str())
|
||||
@@ -495,12 +536,20 @@ pub async fn process_sse(
|
||||
return;
|
||||
}
|
||||
if is_completed {
|
||||
if let Some(lifecycle) = lifecycle.as_mut() {
|
||||
lifecycle.finalize_completed();
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(error) => {
|
||||
response_error = Some(error.into_api_error());
|
||||
let error = error.into_api_error();
|
||||
response_error = Some(finalize_lifecycle_error(
|
||||
&mut lifecycle,
|
||||
ResponseStreamTerminalState::StreamError,
|
||||
error,
|
||||
));
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -607,6 +656,7 @@ mod tests {
|
||||
tx,
|
||||
idle_timeout(),
|
||||
/*telemetry*/ None,
|
||||
/*lifecycle*/ None,
|
||||
));
|
||||
|
||||
let mut events = Vec::new();
|
||||
@@ -638,6 +688,7 @@ mod tests {
|
||||
tx,
|
||||
idle_timeout(),
|
||||
/*telemetry*/ None,
|
||||
/*lifecycle*/ None,
|
||||
));
|
||||
|
||||
let mut out = Vec::new();
|
||||
@@ -831,6 +882,7 @@ mod tests {
|
||||
tx,
|
||||
idle_timeout(),
|
||||
/*telemetry*/ None,
|
||||
/*lifecycle*/ None,
|
||||
));
|
||||
|
||||
let events = tokio::time::timeout(Duration::from_millis(1000), async {
|
||||
@@ -989,7 +1041,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn is_created(ev: &ResponseEvent) -> bool {
|
||||
matches!(ev, ResponseEvent::Created)
|
||||
matches!(ev, ResponseEvent::Created { .. })
|
||||
}
|
||||
fn is_output(ev: &ResponseEvent) -> bool {
|
||||
matches!(ev, ResponseEvent::OutputItemDone(_))
|
||||
@@ -1076,6 +1128,7 @@ mod tests {
|
||||
idle_timeout(),
|
||||
/*telemetry*/ None,
|
||||
/*turn_state*/ None,
|
||||
/*lifecycle*/ None,
|
||||
);
|
||||
let event = stream
|
||||
.rx_event
|
||||
@@ -1116,6 +1169,7 @@ mod tests {
|
||||
idle_timeout(),
|
||||
/*telemetry*/ None,
|
||||
/*turn_state*/ None,
|
||||
/*lifecycle*/ None,
|
||||
);
|
||||
let mut events = Vec::new();
|
||||
while let Some(event) = stream.rx_event.recv().await {
|
||||
@@ -1150,7 +1204,12 @@ mod tests {
|
||||
.await;
|
||||
|
||||
assert_eq!(events.len(), 2);
|
||||
assert_matches!(&events[0], ResponseEvent::Created);
|
||||
assert_matches!(
|
||||
&events[0],
|
||||
ResponseEvent::Created {
|
||||
response_id: Some(response_id)
|
||||
} if response_id == "resp-1"
|
||||
);
|
||||
assert_matches!(
|
||||
&events[1],
|
||||
ResponseEvent::Completed {
|
||||
@@ -1187,7 +1246,12 @@ mod tests {
|
||||
&events[0],
|
||||
ResponseEvent::ServerModel(model) if model == CYBER_RESTRICTED_MODEL_FOR_TESTS
|
||||
);
|
||||
assert_matches!(&events[1], ResponseEvent::Created);
|
||||
assert_matches!(
|
||||
&events[1],
|
||||
ResponseEvent::Created {
|
||||
response_id: Some(response_id)
|
||||
} if response_id == "resp-1"
|
||||
);
|
||||
assert_matches!(
|
||||
&events[2],
|
||||
ResponseEvent::Completed {
|
||||
|
||||
431
codex-rs/codex-api/src/stream_lifecycle.rs
Normal file
431
codex-rs/codex-api/src/stream_lifecycle.rs
Normal file
@@ -0,0 +1,431 @@
|
||||
//! Captures lifecycle breadcrumbs for Responses streams so transport failures can
|
||||
//! include useful diagnostics in logs and error messages.
|
||||
|
||||
use crate::error::ApiError;
|
||||
use crate::sse::ResponsesStreamEvent;
|
||||
use std::fmt;
|
||||
use std::time::Instant;
|
||||
use tracing::warn;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct ResponseStreamLifecycleOptions {
|
||||
pub attempt: u64,
|
||||
pub transport: ResponseStreamTransport,
|
||||
}
|
||||
|
||||
impl ResponseStreamLifecycleOptions {
|
||||
pub fn new(attempt: u64, transport: ResponseStreamTransport) -> Self {
|
||||
Self { attempt, transport }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ResponseStreamTransport {
|
||||
ResponsesHttp,
|
||||
ResponsesWebsocket,
|
||||
}
|
||||
|
||||
impl ResponseStreamTransport {
|
||||
fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::ResponsesHttp => "responses_http",
|
||||
Self::ResponsesWebsocket => "responses_websocket",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ResponseStreamTransport {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str(self.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(crate) enum ResponseStreamTerminalState {
|
||||
Completed,
|
||||
ClosedBeforeCompletion,
|
||||
IdleTimeout,
|
||||
StreamError,
|
||||
}
|
||||
|
||||
impl ResponseStreamTerminalState {
|
||||
fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::Completed => "completed",
|
||||
Self::ClosedBeforeCompletion => "closed_before_completion",
|
||||
Self::IdleTimeout => "idle_timeout",
|
||||
Self::StreamError => "stream_error",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ResponseStreamTerminalState {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str(self.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct ResponseStreamLifecycleSummary {
|
||||
options: ResponseStreamLifecycleOptions,
|
||||
terminal_state: ResponseStreamTerminalState,
|
||||
created_response_id: Option<String>,
|
||||
completed_response_id: Option<String>,
|
||||
first_event_elapsed_ms: Option<u64>,
|
||||
last_event_elapsed_ms: Option<u64>,
|
||||
last_event_kind: Option<String>,
|
||||
first_output_item_added_elapsed_ms: Option<u64>,
|
||||
first_output_item_done_elapsed_ms: Option<u64>,
|
||||
first_output_text_delta_elapsed_ms: Option<u64>,
|
||||
observed_event_kinds: Vec<String>,
|
||||
event_count: u64,
|
||||
}
|
||||
|
||||
impl ResponseStreamLifecycleSummary {
|
||||
fn ids_mismatch(&self) -> bool {
|
||||
match (&self.created_response_id, &self.completed_response_id) {
|
||||
(Some(created), Some(completed)) => created != completed,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn diagnostic_phrase(&self) -> String {
|
||||
match self.terminal_state {
|
||||
ResponseStreamTerminalState::Completed if self.ids_mismatch() => {
|
||||
"stream completed with mismatched response IDs".to_string()
|
||||
}
|
||||
ResponseStreamTerminalState::Completed => "stream completed".to_string(),
|
||||
ResponseStreamTerminalState::IdleTimeout if self.event_count == 0 => {
|
||||
"stream timed out before receiving any events".to_string()
|
||||
}
|
||||
ResponseStreamTerminalState::IdleTimeout
|
||||
if self.first_output_text_delta_elapsed_ms.is_some() =>
|
||||
{
|
||||
"stream timed out after durable text output started".to_string()
|
||||
}
|
||||
ResponseStreamTerminalState::IdleTimeout
|
||||
if self.first_output_item_done_elapsed_ms.is_some() =>
|
||||
{
|
||||
"stream timed out after response.output_item.done".to_string()
|
||||
}
|
||||
ResponseStreamTerminalState::IdleTimeout
|
||||
if self.first_output_item_added_elapsed_ms.is_some() =>
|
||||
{
|
||||
"stream timed out after response.output_item.added, before durable output"
|
||||
.to_string()
|
||||
}
|
||||
ResponseStreamTerminalState::IdleTimeout => {
|
||||
"stream timed out after receiving events".to_string()
|
||||
}
|
||||
ResponseStreamTerminalState::ClosedBeforeCompletion if self.event_count == 0 => {
|
||||
"stream closed before response.completed and before receiving any events"
|
||||
.to_string()
|
||||
}
|
||||
ResponseStreamTerminalState::ClosedBeforeCompletion => {
|
||||
"stream closed before response.completed".to_string()
|
||||
}
|
||||
ResponseStreamTerminalState::StreamError if self.event_count == 0 => {
|
||||
"stream failed before receiving any events".to_string()
|
||||
}
|
||||
ResponseStreamTerminalState::StreamError => {
|
||||
"stream failed after receiving events".to_string()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn log(&self) {
|
||||
// Keep lifecycle logs focused on streams that did not complete normally,
|
||||
// plus the suspicious case where created/completed response IDs disagree.
|
||||
if self.terminal_state == ResponseStreamTerminalState::Completed && !self.ids_mismatch() {
|
||||
return;
|
||||
}
|
||||
|
||||
let observed_event_kinds = self.observed_event_kinds.join(",");
|
||||
warn!(
|
||||
stream_attempt = self.options.attempt,
|
||||
stream_transport = %self.options.transport,
|
||||
stream_terminal_state = %self.terminal_state,
|
||||
stream_created_response_id = self.created_response_id.as_deref().unwrap_or(""),
|
||||
stream_completed_response_id = self.completed_response_id.as_deref().unwrap_or(""),
|
||||
stream_first_event_elapsed_ms = ?self.first_event_elapsed_ms,
|
||||
stream_last_event_elapsed_ms = ?self.last_event_elapsed_ms,
|
||||
stream_last_event_kind = self.last_event_kind.as_deref().unwrap_or(""),
|
||||
stream_first_output_item_added_elapsed_ms = ?self.first_output_item_added_elapsed_ms,
|
||||
stream_first_output_item_done_elapsed_ms = ?self.first_output_item_done_elapsed_ms,
|
||||
stream_first_output_text_delta_elapsed_ms = ?self.first_output_text_delta_elapsed_ms,
|
||||
stream_observed_event_kinds = %observed_event_kinds,
|
||||
stream_event_count = self.event_count,
|
||||
stream_diagnostic = %self.diagnostic_phrase(),
|
||||
"responses stream lifecycle"
|
||||
);
|
||||
}
|
||||
|
||||
fn error_detail(&self) -> String {
|
||||
let mut parts = vec![
|
||||
format!("diagnostic: {}", self.diagnostic_phrase()),
|
||||
format!("transport={}", self.options.transport),
|
||||
format!("attempt={}", self.options.attempt),
|
||||
format!("terminal={}", self.terminal_state),
|
||||
format!("events={}", self.event_count),
|
||||
];
|
||||
if let Some(kind) = &self.last_event_kind {
|
||||
parts.push(format!("last_event={kind}"));
|
||||
}
|
||||
if let Some(id) = &self.created_response_id {
|
||||
parts.push(format!("created_response_id={id}"));
|
||||
}
|
||||
if let Some(id) = &self.completed_response_id {
|
||||
parts.push(format!("completed_response_id={id}"));
|
||||
}
|
||||
if let Some(elapsed) = self.first_event_elapsed_ms {
|
||||
parts.push(format!("first_event_ms={elapsed}"));
|
||||
}
|
||||
if let Some(elapsed) = self.last_event_elapsed_ms {
|
||||
parts.push(format!("last_event_ms={elapsed}"));
|
||||
}
|
||||
if let Some(elapsed) = self.first_output_item_added_elapsed_ms {
|
||||
parts.push(format!("first_output_item_added_ms={elapsed}"));
|
||||
}
|
||||
if let Some(elapsed) = self.first_output_item_done_elapsed_ms {
|
||||
parts.push(format!("first_output_item_done_ms={elapsed}"));
|
||||
}
|
||||
if let Some(elapsed) = self.first_output_text_delta_elapsed_ms {
|
||||
parts.push(format!("first_output_text_delta_ms={elapsed}"));
|
||||
}
|
||||
if !self.observed_event_kinds.is_empty() {
|
||||
parts.push(format!(
|
||||
"observed_event_kinds={}",
|
||||
self.observed_event_kinds.join(",")
|
||||
));
|
||||
}
|
||||
parts.join("; ")
|
||||
}
|
||||
|
||||
fn decorate_error(&self, error: ApiError) -> ApiError {
|
||||
match error {
|
||||
ApiError::Stream(message) => ApiError::Stream(format!(
|
||||
"{message}. Stream lifecycle: {}",
|
||||
self.error_detail()
|
||||
)),
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ResponseStreamLifecycleRecorder {
|
||||
options: ResponseStreamLifecycleOptions,
|
||||
started_at: Instant,
|
||||
created_response_id: Option<String>,
|
||||
completed_response_id: Option<String>,
|
||||
first_event_elapsed_ms: Option<u64>,
|
||||
last_event_elapsed_ms: Option<u64>,
|
||||
last_event_kind: Option<String>,
|
||||
first_output_item_added_elapsed_ms: Option<u64>,
|
||||
first_output_item_done_elapsed_ms: Option<u64>,
|
||||
first_output_text_delta_elapsed_ms: Option<u64>,
|
||||
observed_event_kinds: Vec<String>,
|
||||
event_count: u64,
|
||||
finalized: bool,
|
||||
}
|
||||
|
||||
pub(crate) fn finalize_lifecycle_error(
|
||||
lifecycle: &mut Option<ResponseStreamLifecycleRecorder>,
|
||||
terminal_state: ResponseStreamTerminalState,
|
||||
error: ApiError,
|
||||
) -> ApiError {
|
||||
if let Some(lifecycle) = lifecycle.as_mut() {
|
||||
lifecycle.finalize_error(terminal_state, error)
|
||||
} else {
|
||||
error
|
||||
}
|
||||
}
|
||||
|
||||
impl ResponseStreamLifecycleRecorder {
|
||||
pub(crate) fn new(options: ResponseStreamLifecycleOptions) -> Self {
|
||||
Self {
|
||||
options,
|
||||
started_at: Instant::now(),
|
||||
created_response_id: None,
|
||||
completed_response_id: None,
|
||||
first_event_elapsed_ms: None,
|
||||
last_event_elapsed_ms: None,
|
||||
last_event_kind: None,
|
||||
first_output_item_added_elapsed_ms: None,
|
||||
first_output_item_done_elapsed_ms: None,
|
||||
first_output_text_delta_elapsed_ms: None,
|
||||
observed_event_kinds: Vec::new(),
|
||||
event_count: 0,
|
||||
finalized: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn observe_event(&mut self, event: &ResponsesStreamEvent) {
|
||||
let elapsed_ms = self.elapsed_ms();
|
||||
let kind = event.kind();
|
||||
self.event_count += 1;
|
||||
self.first_event_elapsed_ms.get_or_insert(elapsed_ms);
|
||||
self.last_event_elapsed_ms = Some(elapsed_ms);
|
||||
self.last_event_kind = Some(kind.to_string());
|
||||
if !self
|
||||
.observed_event_kinds
|
||||
.iter()
|
||||
.any(|observed| observed == kind)
|
||||
{
|
||||
self.observed_event_kinds.push(kind.to_string());
|
||||
}
|
||||
|
||||
match kind {
|
||||
"response.created" => {
|
||||
if self.created_response_id.is_none() {
|
||||
self.created_response_id = event.response_id().map(str::to_string);
|
||||
}
|
||||
}
|
||||
"response.completed" => {
|
||||
if self.completed_response_id.is_none() {
|
||||
self.completed_response_id = event.response_id().map(str::to_string);
|
||||
}
|
||||
}
|
||||
"response.output_item.added" => {
|
||||
self.first_output_item_added_elapsed_ms
|
||||
.get_or_insert(elapsed_ms);
|
||||
}
|
||||
"response.output_item.done" => {
|
||||
self.first_output_item_done_elapsed_ms
|
||||
.get_or_insert(elapsed_ms);
|
||||
}
|
||||
"response.output_text.delta" => {
|
||||
self.first_output_text_delta_elapsed_ms
|
||||
.get_or_insert(elapsed_ms);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn finalize_completed(&mut self) {
|
||||
let Some(summary) = self.finalize(ResponseStreamTerminalState::Completed) else {
|
||||
return;
|
||||
};
|
||||
summary.log();
|
||||
}
|
||||
|
||||
pub(crate) fn finalize_error(
|
||||
&mut self,
|
||||
terminal_state: ResponseStreamTerminalState,
|
||||
error: ApiError,
|
||||
) -> ApiError {
|
||||
let Some(summary) = self.finalize(terminal_state) else {
|
||||
return error;
|
||||
};
|
||||
summary.log();
|
||||
summary.decorate_error(error)
|
||||
}
|
||||
|
||||
fn finalize(
|
||||
&mut self,
|
||||
terminal_state: ResponseStreamTerminalState,
|
||||
) -> Option<ResponseStreamLifecycleSummary> {
|
||||
if self.finalized {
|
||||
return None;
|
||||
}
|
||||
self.finalized = true;
|
||||
Some(ResponseStreamLifecycleSummary {
|
||||
options: self.options,
|
||||
terminal_state,
|
||||
created_response_id: self.created_response_id.clone(),
|
||||
completed_response_id: self.completed_response_id.clone(),
|
||||
first_event_elapsed_ms: self.first_event_elapsed_ms,
|
||||
last_event_elapsed_ms: self.last_event_elapsed_ms,
|
||||
last_event_kind: self.last_event_kind.clone(),
|
||||
first_output_item_added_elapsed_ms: self.first_output_item_added_elapsed_ms,
|
||||
first_output_item_done_elapsed_ms: self.first_output_item_done_elapsed_ms,
|
||||
first_output_text_delta_elapsed_ms: self.first_output_text_delta_elapsed_ms,
|
||||
observed_event_kinds: self.observed_event_kinds.clone(),
|
||||
event_count: self.event_count,
|
||||
})
|
||||
}
|
||||
|
||||
fn elapsed_ms(&self) -> u64 {
|
||||
u64::try_from(self.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn records_response_ids_milestones_and_observed_kinds_in_order() {
|
||||
let mut recorder =
|
||||
ResponseStreamLifecycleRecorder::new(ResponseStreamLifecycleOptions::new(
|
||||
/*attempt*/ 2,
|
||||
ResponseStreamTransport::ResponsesHttp,
|
||||
));
|
||||
|
||||
for event in [
|
||||
json!({"type": "response.created", "response": {"id": "resp-created"}}),
|
||||
json!({"type": "response.output_item.added", "item": {"type": "message"}}),
|
||||
json!({"type": "response.output_text.delta", "delta": "hi"}),
|
||||
json!({"type": "response.output_item.done", "item": {"type": "message"}}),
|
||||
json!({"type": "response.output_text.delta", "delta": " again"}),
|
||||
json!({"type": "response.completed", "response": {"id": "resp-completed"}}),
|
||||
] {
|
||||
let event: ResponsesStreamEvent = serde_json::from_value(event).unwrap();
|
||||
recorder.observe_event(&event);
|
||||
}
|
||||
|
||||
let summary = recorder
|
||||
.finalize(ResponseStreamTerminalState::Completed)
|
||||
.expect("summary should finalize");
|
||||
|
||||
assert_eq!(summary.created_response_id.as_deref(), Some("resp-created"));
|
||||
assert_eq!(
|
||||
summary.completed_response_id.as_deref(),
|
||||
Some("resp-completed")
|
||||
);
|
||||
assert_eq!(
|
||||
summary.observed_event_kinds,
|
||||
vec![
|
||||
"response.created",
|
||||
"response.output_item.added",
|
||||
"response.output_text.delta",
|
||||
"response.output_item.done",
|
||||
"response.completed"
|
||||
]
|
||||
);
|
||||
assert_eq!(summary.event_count, 6);
|
||||
assert!(summary.first_output_item_added_elapsed_ms.is_some());
|
||||
assert!(summary.first_output_item_done_elapsed_ms.is_some());
|
||||
assert!(summary.first_output_text_delta_elapsed_ms.is_some());
|
||||
assert!(summary.ids_mismatch());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stream_errors_gain_diagnostic_context() {
|
||||
let mut recorder =
|
||||
ResponseStreamLifecycleRecorder::new(ResponseStreamLifecycleOptions::new(
|
||||
/*attempt*/ 1,
|
||||
ResponseStreamTransport::ResponsesWebsocket,
|
||||
));
|
||||
let event: ResponsesStreamEvent = serde_json::from_value(json!({
|
||||
"type": "response.output_item.added",
|
||||
"item": {"type": "message"}
|
||||
}))
|
||||
.unwrap();
|
||||
recorder.observe_event(&event);
|
||||
|
||||
let error = recorder.finalize_error(
|
||||
ResponseStreamTerminalState::IdleTimeout,
|
||||
ApiError::Stream("idle timeout waiting for websocket".to_string()),
|
||||
);
|
||||
|
||||
let ApiError::Stream(message) = error else {
|
||||
panic!("expected stream error");
|
||||
};
|
||||
assert!(message.contains("stream timed out after response.output_item.added"));
|
||||
assert!(message.contains("transport=responses_websocket"));
|
||||
assert!(message.contains("observed_event_kinds=response.output_item.added"));
|
||||
}
|
||||
}
|
||||
@@ -47,6 +47,8 @@ use codex_api::Reasoning;
|
||||
use codex_api::RequestTelemetry;
|
||||
use codex_api::ReqwestTransport;
|
||||
use codex_api::ResponseCreateWsRequest;
|
||||
use codex_api::ResponseStreamLifecycleOptions;
|
||||
use codex_api::ResponseStreamTransport;
|
||||
use codex_api::ResponsesApiRequest;
|
||||
use codex_api::ResponsesClient as ApiResponsesClient;
|
||||
use codex_api::ResponsesOptions as ApiResponsesOptions;
|
||||
@@ -1163,6 +1165,7 @@ impl ModelClientSession {
|
||||
summary: ReasoningSummaryConfig,
|
||||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
stream_attempt: Option<u64>,
|
||||
inference_trace: &InferenceTraceContext,
|
||||
) -> Result<ResponseStream> {
|
||||
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
|
||||
@@ -1218,7 +1221,12 @@ impl ModelClientSession {
|
||||
client_setup.api_auth,
|
||||
)
|
||||
.with_telemetry(Some(request_telemetry), Some(sse_telemetry));
|
||||
let stream_result = client.stream_request(request, options).await;
|
||||
let lifecycle = stream_attempt.map(|attempt| {
|
||||
ResponseStreamLifecycleOptions::new(attempt, ResponseStreamTransport::ResponsesHttp)
|
||||
});
|
||||
let stream_result = client
|
||||
.stream_request_with_lifecycle(request, options, lifecycle)
|
||||
.await;
|
||||
|
||||
match stream_result {
|
||||
Ok(stream) => {
|
||||
@@ -1277,6 +1285,7 @@ impl ModelClientSession {
|
||||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
warmup: bool,
|
||||
stream_attempt: Option<u64>,
|
||||
request_trace: Option<W3cTraceContext>,
|
||||
inference_trace: &InferenceTraceContext,
|
||||
) -> Result<WebsocketStreamOutcome> {
|
||||
@@ -1367,8 +1376,18 @@ impl ModelClientSession {
|
||||
"websocket connection is unavailable".to_string(),
|
||||
))
|
||||
})?;
|
||||
let lifecycle = stream_attempt.map(|attempt| {
|
||||
ResponseStreamLifecycleOptions::new(
|
||||
attempt,
|
||||
ResponseStreamTransport::ResponsesWebsocket,
|
||||
)
|
||||
});
|
||||
let stream_result = websocket_connection
|
||||
.stream_request(ws_request, self.websocket_session.connection_reused())
|
||||
.stream_request_with_lifecycle(
|
||||
ws_request,
|
||||
self.websocket_session.connection_reused(),
|
||||
lifecycle,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
let err = map_api_error(err);
|
||||
@@ -1449,6 +1468,7 @@ impl ModelClientSession {
|
||||
service_tier,
|
||||
turn_metadata_header,
|
||||
/*warmup*/ true,
|
||||
/*stream_attempt*/ None,
|
||||
current_span_w3c_trace_context(),
|
||||
&disabled_trace,
|
||||
)
|
||||
@@ -1492,6 +1512,33 @@ impl ModelClientSession {
|
||||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
inference_trace: &InferenceTraceContext,
|
||||
) -> Result<ResponseStream> {
|
||||
self.stream_with_attempt(
|
||||
prompt,
|
||||
model_info,
|
||||
session_telemetry,
|
||||
effort,
|
||||
summary,
|
||||
service_tier,
|
||||
turn_metadata_header,
|
||||
/*stream_attempt*/ 1,
|
||||
inference_trace,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn stream_with_attempt(
|
||||
&mut self,
|
||||
prompt: &Prompt,
|
||||
model_info: &ModelInfo,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
effort: Option<ReasoningEffortConfig>,
|
||||
summary: ReasoningSummaryConfig,
|
||||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
stream_attempt: u64,
|
||||
inference_trace: &InferenceTraceContext,
|
||||
) -> Result<ResponseStream> {
|
||||
let wire_api = self.client.state.provider.info().wire_api;
|
||||
match wire_api {
|
||||
@@ -1508,6 +1555,7 @@ impl ModelClientSession {
|
||||
service_tier,
|
||||
turn_metadata_header,
|
||||
/*warmup*/ false,
|
||||
Some(stream_attempt),
|
||||
request_trace,
|
||||
inference_trace,
|
||||
)
|
||||
@@ -1528,6 +1576,7 @@ impl ModelClientSession {
|
||||
summary,
|
||||
service_tier,
|
||||
turn_metadata_header,
|
||||
Some(stream_attempt),
|
||||
inference_trace,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -1074,6 +1074,7 @@ async fn run_sampling_request(
|
||||
Arc::clone(&turn_context),
|
||||
client_session,
|
||||
turn_metadata_header,
|
||||
retries + 1,
|
||||
Arc::clone(&turn_diff_tracker),
|
||||
&prompt,
|
||||
cancellation_token.child_token(),
|
||||
@@ -1865,6 +1866,7 @@ async fn try_run_sampling_request(
|
||||
turn_context: Arc<TurnContext>,
|
||||
client_session: &mut ModelClientSession,
|
||||
turn_metadata_header: Option<&str>,
|
||||
stream_attempt: u64,
|
||||
turn_diff_tracker: SharedTurnDiffTracker,
|
||||
prompt: &Prompt,
|
||||
cancellation_token: CancellationToken,
|
||||
@@ -1883,7 +1885,7 @@ async fn try_run_sampling_request(
|
||||
turn_context.provider.info().name.as_str(),
|
||||
);
|
||||
let mut stream = client_session
|
||||
.stream(
|
||||
.stream_with_attempt(
|
||||
prompt,
|
||||
&turn_context.model_info,
|
||||
&turn_context.session_telemetry,
|
||||
@@ -1891,6 +1893,7 @@ async fn try_run_sampling_request(
|
||||
turn_context.reasoning_summary,
|
||||
turn_context.config.service_tier,
|
||||
turn_metadata_header,
|
||||
stream_attempt,
|
||||
&inference_trace,
|
||||
)
|
||||
.instrument(trace_span!("stream_request"))
|
||||
@@ -1946,7 +1949,7 @@ async fn try_run_sampling_request(
|
||||
record_turn_ttft_metric(&turn_context, &event).await;
|
||||
|
||||
match event {
|
||||
ResponseEvent::Created => {}
|
||||
ResponseEvent::Created { .. } => {}
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
if let Some((_, mut consumer)) = active_tool_argument_diff_consumer.take()
|
||||
&& let Some(event) = consumer.flush_on_complete()
|
||||
|
||||
@@ -141,7 +141,7 @@ fn response_event_records_turn_ttft(event: &ResponseEvent) -> bool {
|
||||
ResponseEvent::OutputTextDelta(_)
|
||||
| ResponseEvent::ReasoningSummaryDelta { .. }
|
||||
| ResponseEvent::ReasoningContentDelta { .. } => true,
|
||||
ResponseEvent::Created
|
||||
ResponseEvent::Created { .. }
|
||||
| ResponseEvent::ServerModel(_)
|
||||
| ResponseEvent::ModelVerifications(_)
|
||||
| ResponseEvent::ServerReasoningIncluded(_)
|
||||
|
||||
@@ -23,7 +23,7 @@ async fn turn_timing_state_records_ttft_only_once_per_turn() {
|
||||
state.mark_turn_started(Instant::now()).await;
|
||||
assert_eq!(
|
||||
state
|
||||
.record_ttft_for_response_event(&ResponseEvent::Created)
|
||||
.record_ttft_for_response_event(&ResponseEvent::Created { response_id: None })
|
||||
.await,
|
||||
None
|
||||
);
|
||||
|
||||
@@ -2802,14 +2802,23 @@ async fn incomplete_response_emits_content_filter_error_message() -> anyhow::Res
|
||||
.await?;
|
||||
|
||||
let error_event = wait_for_event(&codex, |ev| matches!(ev, EventMsg::Error(_))).await;
|
||||
let EventMsg::Error(err) = error_event else {
|
||||
panic!("expected incomplete content filter error; got {error_event:?}");
|
||||
};
|
||||
assert!(
|
||||
matches!(
|
||||
error_event,
|
||||
EventMsg::Error(ref err)
|
||||
if err.message
|
||||
== "stream disconnected before completion: Incomplete response returned, reason: content_filter"
|
||||
err.message.starts_with(
|
||||
"stream disconnected before completion: Incomplete response returned, reason: content_filter"
|
||||
),
|
||||
"expected incomplete content filter error; got {error_event:?}"
|
||||
"expected incomplete content filter error; got {err:?}"
|
||||
);
|
||||
assert!(
|
||||
err.message
|
||||
.contains("Stream lifecycle: diagnostic: stream failed after receiving events"),
|
||||
"expected lifecycle diagnostic; got {err:?}"
|
||||
);
|
||||
assert!(
|
||||
err.message.contains("last_event=response.incomplete"),
|
||||
"expected incomplete event diagnostic; got {err:?}"
|
||||
);
|
||||
|
||||
assert_eq!(responses_mock.requests().len(), 1);
|
||||
|
||||
@@ -1045,7 +1045,7 @@ impl SessionTelemetry {
|
||||
|
||||
fn responses_type(event: &ResponseEvent) -> String {
|
||||
match event {
|
||||
ResponseEvent::Created => "created".into(),
|
||||
ResponseEvent::Created { .. } => "created".into(),
|
||||
ResponseEvent::OutputItemDone(item) | ResponseEvent::OutputItemAdded(item) => {
|
||||
SessionTelemetry::responses_item_type(item)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user