diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 6047ca4725..cbe8aa6937 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1427,6 +1427,7 @@ dependencies = [ "multimap", "once_cell", "openssl-sys", + "opentelemetry_sdk", "os_info", "predicates", "pretty_assertions", @@ -1451,6 +1452,7 @@ dependencies = [ "thiserror 2.0.17", "time", "tokio", + "tokio-tungstenite", "tokio-util", "toml 0.9.5", "toml_edit 0.24.0+spec-1.1.0", @@ -1778,6 +1780,7 @@ dependencies = [ "strum_macros 0.27.2", "thiserror 2.0.17", "tokio", + "tokio-tungstenite", "tracing", "tracing-opentelemetry", "tracing-subscriber", diff --git a/codex-rs/codex-api/src/endpoint/responses_websocket.rs b/codex-rs/codex-api/src/endpoint/responses_websocket.rs index b088374b89..2a7d8726f1 100644 --- a/codex-rs/codex-api/src/endpoint/responses_websocket.rs +++ b/codex-rs/codex-api/src/endpoint/responses_websocket.rs @@ -6,6 +6,7 @@ use crate::error::ApiError; use crate::provider::Provider; use crate::sse::responses::ResponsesStreamEvent; use crate::sse::responses::process_responses_event; +use crate::telemetry::WebsocketTelemetry; use codex_client::TransportError; use futures::SinkExt; use futures::StreamExt; @@ -18,6 +19,7 @@ use std::time::Duration; use tokio::net::TcpStream; use tokio::sync::Mutex; use tokio::sync::mpsc; +use tokio::time::Instant; use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::tungstenite::Error as WsError; @@ -38,14 +40,21 @@ pub struct ResponsesWebsocketConnection { // TODO (pakrym): is this the right place for timeout? idle_timeout: Duration, server_reasoning_included: bool, + telemetry: Option>, } impl ResponsesWebsocketConnection { - fn new(stream: WsStream, idle_timeout: Duration, server_reasoning_included: bool) -> Self { + fn new( + stream: WsStream, + idle_timeout: Duration, + server_reasoning_included: bool, + telemetry: Option>, + ) -> Self { Self { stream: Arc::new(Mutex::new(Some(stream))), idle_timeout, server_reasoning_included, + telemetry, } } @@ -62,6 +71,7 @@ impl ResponsesWebsocketConnection { let stream = Arc::clone(&self.stream); let idle_timeout = self.idle_timeout; let server_reasoning_included = self.server_reasoning_included; + let telemetry = self.telemetry.clone(); let request_body = serde_json::to_value(&request).map_err(|err| { ApiError::Stream(format!("failed to encode websocket request: {err}")) })?; @@ -87,6 +97,7 @@ impl ResponsesWebsocketConnection { tx_event.clone(), request_body, idle_timeout, + telemetry, ) .await { @@ -114,6 +125,7 @@ impl ResponsesWebsocketClient { &self, extra_headers: HeaderMap, turn_state: Option>>, + telemetry: Option>, ) -> Result { let ws_url = self .provider @@ -130,6 +142,7 @@ impl ResponsesWebsocketClient { stream, self.provider.stream_idle_timeout, server_reasoning_included, + telemetry, )) } } @@ -218,6 +231,7 @@ async fn run_websocket_response_stream( tx_event: mpsc::Sender>, request_body: Value, idle_timeout: Duration, + telemetry: Option>, ) -> Result<(), ApiError> { let request_text = match serde_json::to_string(&request_body) { Ok(text) => text, @@ -228,16 +242,26 @@ async fn run_websocket_response_stream( } }; - if let Err(err) = ws_stream.send(Message::Text(request_text.into())).await { - return Err(ApiError::Stream(format!( - "failed to send websocket request: {err}" - ))); + let request_start = Instant::now(); + let result = ws_stream + .send(Message::Text(request_text.into())) + .await + .map_err(|err| ApiError::Stream(format!("failed to send websocket request: {err}"))); + + if let Some(t) = telemetry.as_ref() { + t.on_ws_request(request_start.elapsed(), result.as_ref().err()); } + result?; + loop { + let poll_start = Instant::now(); let response = tokio::time::timeout(idle_timeout, ws_stream.next()) .await .map_err(|_| ApiError::Stream("idle timeout waiting for websocket".into())); + if let Some(t) = telemetry.as_ref() { + t.on_ws_event(&response, poll_start.elapsed()); + } let message = match response { Ok(Some(Ok(msg))) => msg, Ok(Some(Err(err))) => { diff --git a/codex-rs/codex-api/src/lib.rs b/codex-rs/codex-api/src/lib.rs index 89e5c4dc5f..2e340970ca 100644 --- a/codex-rs/codex-api/src/lib.rs +++ b/codex-rs/codex-api/src/lib.rs @@ -40,3 +40,4 @@ pub use crate::requests::ResponsesRequest; pub use crate::requests::ResponsesRequestBuilder; pub use crate::sse::stream_from_fixture; pub use crate::telemetry::SseTelemetry; +pub use crate::telemetry::WebsocketTelemetry; diff --git a/codex-rs/codex-api/src/sse/responses.rs b/codex-rs/codex-api/src/sse/responses.rs index ad94cf5ea1..b363671a11 100644 --- a/codex-rs/codex-api/src/sse/responses.rs +++ b/codex-rs/codex-api/src/sse/responses.rs @@ -157,7 +157,7 @@ struct ResponseCompletedOutputTokensDetails { #[derive(Deserialize, Debug)] pub struct ResponsesStreamEvent { #[serde(rename = "type")] - kind: String, + pub(crate) kind: String, response: Option, item: Option, delta: Option, diff --git a/codex-rs/codex-api/src/telemetry.rs b/codex-rs/codex-api/src/telemetry.rs index d6a38b2af3..7b04fd2113 100644 --- a/codex-rs/codex-api/src/telemetry.rs +++ b/codex-rs/codex-api/src/telemetry.rs @@ -1,3 +1,4 @@ +use crate::error::ApiError; use codex_client::Request; use codex_client::RequestTelemetry; use codex_client::Response; @@ -10,6 +11,8 @@ use std::future::Future; use std::sync::Arc; use std::time::Duration; use tokio::time::Instant; +use tokio_tungstenite::tungstenite::Error; +use tokio_tungstenite::tungstenite::Message; /// Generic telemetry. pub trait SseTelemetry: Send + Sync { @@ -28,6 +31,17 @@ pub trait SseTelemetry: Send + Sync { ); } +/// Telemetry for Responses WebSocket transport. +pub trait WebsocketTelemetry: Send + Sync { + fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>); + + fn on_ws_event( + &self, + result: &Result>, ApiError>, + duration: Duration, + ); +} + pub(crate) trait WithStatus { fn status(&self) -> StatusCode; } diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 1715a04142..66ed20a435 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -90,6 +90,7 @@ tokio = { workspace = true, features = [ "signal", ] } tokio-util = { workspace = true, features = ["rt"] } +tokio-tungstenite = { workspace = true } toml = { workspace = true } toml_edit = { workspace = true } tracing = { workspace = true, features = ["log"] } @@ -145,6 +146,10 @@ image = { workspace = true, features = ["jpeg", "png"] } maplit = { workspace = true } predicates = { workspace = true } pretty_assertions = { workspace = true } +opentelemetry_sdk = { workspace = true, features = [ + "experimental_metrics_custom_reader", + "metrics", +] } serial_test = { workspace = true } tempfile = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index f01c145e56..6564e86c49 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -21,6 +21,7 @@ use codex_api::ResponsesWebsocketClient as ApiWebSocketResponsesClient; use codex_api::ResponsesWebsocketConnection as ApiWebSocketConnection; use codex_api::SseTelemetry; use codex_api::TransportError; +use codex_api::WebsocketTelemetry; use codex_api::build_conversation_headers; use codex_api::common::Reasoning; use codex_api::common::ResponsesWsRequest; @@ -46,6 +47,8 @@ use reqwest::StatusCode; use serde_json::Value; use std::time::Duration; use tokio::sync::mpsc; +use tokio_tungstenite::tungstenite::Error; +use tokio_tungstenite::tungstenite::Message; use tracing::warn; use crate::AuthManager; @@ -451,9 +454,14 @@ impl ModelClientSession { if needs_new { let mut headers = options.extra_headers.clone(); headers.extend(build_conversation_headers(options.conversation_id.clone())); + let websocket_telemetry = self.build_websocket_telemetry(); let new_conn: ApiWebSocketConnection = ApiWebSocketResponsesClient::new(api_provider, api_auth) - .connect(headers, options.turn_state.clone()) + .connect( + headers, + options.turn_state.clone(), + Some(websocket_telemetry), + ) .await?; self.connection = Some(new_conn); } @@ -650,6 +658,13 @@ impl ModelClientSession { let sse_telemetry: Arc = telemetry; (request_telemetry, sse_telemetry) } + + /// Builds telemetry for the Responses API WebSocket transport. + fn build_websocket_telemetry(&self) -> Arc { + let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone())); + let websocket_telemetry: Arc = telemetry; + websocket_telemetry + } } impl ModelClient { @@ -849,3 +864,19 @@ impl SseTelemetry for ApiTelemetry { self.otel_manager.log_sse_event(result, duration); } } + +impl WebsocketTelemetry for ApiTelemetry { + fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>) { + let error_message = error.map(std::string::ToString::to_string); + self.otel_manager + .record_websocket_request(duration, error_message.as_deref()); + } + + fn on_ws_event( + &self, + result: &std::result::Result>, ApiError>, + duration: Duration, + ) { + self.otel_manager.record_websocket_event(result, duration); + } +} diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index 5037479987..fc0ff37cf4 100644 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -14,6 +14,8 @@ use codex_core::features::Feature; use codex_core::models_manager::manager::ModelsManager; use codex_core::protocol::SessionSource; use codex_otel::OtelManager; +use codex_otel::metrics::MetricsClient; +use codex_otel::metrics::MetricsConfig; use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary; use core_test_support::load_default_config_for_test; @@ -25,15 +27,19 @@ use core_test_support::responses::start_websocket_server; use core_test_support::responses::start_websocket_server_with_headers; use core_test_support::skip_if_no_network; use futures::StreamExt; +use opentelemetry_sdk::metrics::InMemoryMetricExporter; use pretty_assertions::assert_eq; use std::sync::Arc; +use std::time::Duration; use tempfile::TempDir; +use tracing_test::traced_test; const MODEL: &str = "gpt-5.2-codex"; struct WebsocketTestHarness { _codex_home: TempDir, client: ModelClient, + otel_manager: OtelManager, } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -64,6 +70,38 @@ async fn responses_websocket_streams_request() { server.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[traced_test] +async fn responses_websocket_emits_websocket_telemetry_events() { + skip_if_no_network!(); + + let server = start_websocket_server(vec![vec![vec![ + ev_response_created("resp-1"), + ev_completed("resp-1"), + ]]]) + .await; + + let harness = websocket_harness(&server).await; + harness.otel_manager.reset_runtime_metrics(); + let mut session = harness.client.new_session(); + let prompt = prompt_with_input(vec![message_item("hello")]); + + stream_until_complete(&mut session, &prompt).await; + + tokio::time::sleep(Duration::from_millis(10)).await; + + let summary = harness + .otel_manager + .runtime_metrics_summary() + .expect("runtime metrics summary"); + assert_eq!(summary.api_calls.count, 0); + assert_eq!(summary.streaming_events.count, 0); + assert_eq!(summary.websocket_calls.count, 1); + assert_eq!(summary.websocket_events.count, 2); + + server.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn responses_websocket_emits_reasoning_included_event() { skip_if_no_network!(); @@ -211,6 +249,12 @@ async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness let model_info = ModelsManager::construct_model_info_offline(MODEL, &config); let conversation_id = ThreadId::new(); let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); + let exporter = InMemoryMetricExporter::default(); + let metrics = MetricsClient::new( + MetricsConfig::in_memory("test", "codex-core", env!("CARGO_PKG_VERSION"), exporter) + .with_runtime_reader(), + ) + .expect("in-memory metrics client"); let otel_manager = OtelManager::new( conversation_id, MODEL, @@ -221,12 +265,13 @@ async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness false, "test".to_string(), SessionSource::Exec, - ); + ) + .with_metrics(metrics); let client = ModelClient::new( Arc::clone(&config), None, model_info, - otel_manager, + otel_manager.clone(), provider.clone(), None, ReasoningSummary::Auto, @@ -238,6 +283,7 @@ async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness WebsocketTestHarness { _codex_home: codex_home, client, + otel_manager, } } diff --git a/codex-rs/otel/Cargo.toml b/codex-rs/otel/Cargo.toml index f9a54c53d9..0fbd2a8055 100644 --- a/codex-rs/otel/Cargo.toml +++ b/codex-rs/otel/Cargo.toml @@ -56,6 +56,7 @@ serde_json = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tokio-tungstenite = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/codex-rs/otel/src/metrics/names.rs b/codex-rs/otel/src/metrics/names.rs index 4db5dfb028..b8ff9364ad 100644 --- a/codex-rs/otel/src/metrics/names.rs +++ b/codex-rs/otel/src/metrics/names.rs @@ -4,3 +4,7 @@ pub(crate) const API_CALL_COUNT_METRIC: &str = "codex.api_request"; pub(crate) const API_CALL_DURATION_METRIC: &str = "codex.api_request.duration_ms"; pub(crate) const SSE_EVENT_COUNT_METRIC: &str = "codex.sse_event"; pub(crate) const SSE_EVENT_DURATION_METRIC: &str = "codex.sse_event.duration_ms"; +pub(crate) const WEBSOCKET_REQUEST_COUNT_METRIC: &str = "codex.websocket.request"; +pub(crate) const WEBSOCKET_REQUEST_DURATION_METRIC: &str = "codex.websocket.request.duration_ms"; +pub(crate) const WEBSOCKET_EVENT_COUNT_METRIC: &str = "codex.websocket.event"; +pub(crate) const WEBSOCKET_EVENT_DURATION_METRIC: &str = "codex.websocket.event.duration_ms"; diff --git a/codex-rs/otel/src/metrics/runtime_metrics.rs b/codex-rs/otel/src/metrics/runtime_metrics.rs index 98498a3532..dbd28010f6 100644 --- a/codex-rs/otel/src/metrics/runtime_metrics.rs +++ b/codex-rs/otel/src/metrics/runtime_metrics.rs @@ -4,6 +4,10 @@ use crate::metrics::names::SSE_EVENT_COUNT_METRIC; use crate::metrics::names::SSE_EVENT_DURATION_METRIC; use crate::metrics::names::TOOL_CALL_COUNT_METRIC; use crate::metrics::names::TOOL_CALL_DURATION_METRIC; +use crate::metrics::names::WEBSOCKET_EVENT_COUNT_METRIC; +use crate::metrics::names::WEBSOCKET_EVENT_DURATION_METRIC; +use crate::metrics::names::WEBSOCKET_REQUEST_COUNT_METRIC; +use crate::metrics::names::WEBSOCKET_REQUEST_DURATION_METRIC; use opentelemetry_sdk::metrics::data::AggregatedMetrics; use opentelemetry_sdk::metrics::data::Metric; use opentelemetry_sdk::metrics::data::MetricData; @@ -26,11 +30,17 @@ pub struct RuntimeMetricsSummary { pub tool_calls: RuntimeMetricTotals, pub api_calls: RuntimeMetricTotals, pub streaming_events: RuntimeMetricTotals, + pub websocket_calls: RuntimeMetricTotals, + pub websocket_events: RuntimeMetricTotals, } impl RuntimeMetricsSummary { pub fn is_empty(self) -> bool { - self.tool_calls.is_empty() && self.api_calls.is_empty() && self.streaming_events.is_empty() + self.tool_calls.is_empty() + && self.api_calls.is_empty() + && self.streaming_events.is_empty() + && self.websocket_calls.is_empty() + && self.websocket_events.is_empty() } pub(crate) fn from_snapshot(snapshot: &ResourceMetrics) -> Self { @@ -46,10 +56,20 @@ impl RuntimeMetricsSummary { count: sum_counter(snapshot, SSE_EVENT_COUNT_METRIC), duration_ms: sum_histogram_ms(snapshot, SSE_EVENT_DURATION_METRIC), }; + let websocket_calls = RuntimeMetricTotals { + count: sum_counter(snapshot, WEBSOCKET_REQUEST_COUNT_METRIC), + duration_ms: sum_histogram_ms(snapshot, WEBSOCKET_REQUEST_DURATION_METRIC), + }; + let websocket_events = RuntimeMetricTotals { + count: sum_counter(snapshot, WEBSOCKET_EVENT_COUNT_METRIC), + duration_ms: sum_histogram_ms(snapshot, WEBSOCKET_EVENT_DURATION_METRIC), + }; Self { tool_calls, api_calls, streaming_events, + websocket_calls, + websocket_events, } } } diff --git a/codex-rs/otel/src/traces/otel_manager.rs b/codex-rs/otel/src/traces/otel_manager.rs index 9a7744ee60..c585ec67d5 100644 --- a/codex-rs/otel/src/traces/otel_manager.rs +++ b/codex-rs/otel/src/traces/otel_manager.rs @@ -4,9 +4,14 @@ use crate::metrics::names::SSE_EVENT_COUNT_METRIC; use crate::metrics::names::SSE_EVENT_DURATION_METRIC; use crate::metrics::names::TOOL_CALL_COUNT_METRIC; use crate::metrics::names::TOOL_CALL_DURATION_METRIC; +use crate::metrics::names::WEBSOCKET_EVENT_COUNT_METRIC; +use crate::metrics::names::WEBSOCKET_EVENT_DURATION_METRIC; +use crate::metrics::names::WEBSOCKET_REQUEST_COUNT_METRIC; +use crate::metrics::names::WEBSOCKET_REQUEST_DURATION_METRIC; use crate::otel_provider::traceparent_context_from_env; use chrono::SecondsFormat; use chrono::Utc; +use codex_api::ApiError; use codex_api::ResponseEvent; use codex_app_server_protocol::AuthMode; use codex_protocol::ThreadId; @@ -36,6 +41,7 @@ pub use crate::OtelManager; pub use crate::ToolDecisionSource; const SSE_UNKNOWN_KIND: &str = "unknown"; +const WEBSOCKET_UNKNOWN_KIND: &str = "unknown"; impl OtelManager { #[allow(clippy::too_many_arguments)] @@ -190,6 +196,134 @@ impl OtelManager { ); } + pub fn record_websocket_request(&self, duration: Duration, error: Option<&str>) { + let success_str = if error.is_none() { "true" } else { "false" }; + self.counter( + WEBSOCKET_REQUEST_COUNT_METRIC, + 1, + &[("success", success_str)], + ); + self.record_duration( + WEBSOCKET_REQUEST_DURATION_METRIC, + duration, + &[("success", success_str)], + ); + tracing::event!( + tracing::Level::INFO, + event.name = "codex.websocket_request", + event.timestamp = %timestamp(), + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + user.email = self.metadata.account_email, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + duration_ms = %duration.as_millis(), + success = success_str, + error.message = error, + ); + } + + pub fn record_websocket_event( + &self, + result: &Result< + Option< + Result< + tokio_tungstenite::tungstenite::Message, + tokio_tungstenite::tungstenite::Error, + >, + >, + ApiError, + >, + duration: Duration, + ) { + let mut kind = None; + let mut error_message = None; + let mut success = true; + + match result { + Ok(Some(Ok(message))) => match message { + tokio_tungstenite::tungstenite::Message::Text(text) => { + match serde_json::from_str::(text) { + Ok(value) => { + kind = value + .get("type") + .and_then(|value| value.as_str()) + .map(std::string::ToString::to_string); + if kind.as_deref() == Some("response.failed") { + success = false; + error_message = value + .get("response") + .and_then(|value| value.get("error")) + .map(serde_json::Value::to_string) + .or_else(|| Some("response.failed event received".to_string())); + } + } + Err(err) => { + kind = Some("parse_error".to_string()); + error_message = Some(err.to_string()); + success = false; + } + } + } + tokio_tungstenite::tungstenite::Message::Binary(_) => { + success = false; + error_message = Some("unexpected binary websocket event".to_string()); + } + tokio_tungstenite::tungstenite::Message::Ping(_) + | tokio_tungstenite::tungstenite::Message::Pong(_) => { + return; + } + tokio_tungstenite::tungstenite::Message::Close(_) => { + success = false; + error_message = + Some("websocket closed by server before response.completed".to_string()); + } + tokio_tungstenite::tungstenite::Message::Frame(_) => { + success = false; + error_message = Some("unexpected websocket frame".to_string()); + } + }, + Ok(Some(Err(err))) => { + success = false; + error_message = Some(err.to_string()); + } + Ok(None) => { + success = false; + error_message = Some("stream closed before response.completed".to_string()); + } + Err(err) => { + success = false; + error_message = Some(err.to_string()); + } + } + + let kind_str = kind.as_deref().unwrap_or(WEBSOCKET_UNKNOWN_KIND); + let success_str = if success { "true" } else { "false" }; + let tags = [("kind", kind_str), ("success", success_str)]; + self.counter(WEBSOCKET_EVENT_COUNT_METRIC, 1, &tags); + self.record_duration(WEBSOCKET_EVENT_DURATION_METRIC, duration, &tags); + tracing::event!( + tracing::Level::INFO, + event.name = "codex.websocket_event", + event.timestamp = %timestamp(), + event.kind = %kind_str, + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + user.email = self.metadata.account_email, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + duration_ms = %duration.as_millis(), + success = success_str, + error.message = error_message.as_deref(), + ); + } + pub fn log_sse_event( &self, response: &Result>>, Elapsed>, diff --git a/codex-rs/otel/tests/suite/runtime_summary.rs b/codex-rs/otel/tests/suite/runtime_summary.rs index d51c40a9ed..78aed8a0a7 100644 --- a/codex-rs/otel/tests/suite/runtime_summary.rs +++ b/codex-rs/otel/tests/suite/runtime_summary.rs @@ -11,6 +11,7 @@ use eventsource_stream::Event as StreamEvent; use opentelemetry_sdk::metrics::InMemoryMetricExporter; use pretty_assertions::assert_eq; use std::time::Duration; +use tokio_tungstenite::tungstenite::Message; #[test] fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<()> { @@ -43,6 +44,7 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<( "ok", ); manager.record_api_request(1, Some(200), None, Duration::from_millis(300)); + manager.record_websocket_request(Duration::from_millis(400), None); let sse_response: std::result::Result< Option>>, tokio::time::error::Elapsed, @@ -53,6 +55,13 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<( retry: None, }))); manager.log_sse_event(&sse_response, Duration::from_millis(120)); + let ws_response: std::result::Result< + Option>, + codex_api::ApiError, + > = Ok(Some(Ok(Message::Text( + r#"{"type":"response.created"}"#.into(), + )))); + manager.record_websocket_event(&ws_response, Duration::from_millis(80)); let summary = manager .runtime_metrics_summary() @@ -70,6 +79,14 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<( count: 1, duration_ms: 120, }, + websocket_calls: RuntimeMetricTotals { + count: 1, + duration_ms: 400, + }, + websocket_events: RuntimeMetricTotals { + count: 1, + duration_ms: 80, + }, }; assert_eq!(summary, expected); diff --git a/codex-rs/tui/src/history_cell.rs b/codex-rs/tui/src/history_cell.rs index 826624889e..934c9506f5 100644 --- a/codex-rs/tui/src/history_cell.rs +++ b/codex-rs/tui/src/history_cell.rs @@ -2028,6 +2028,13 @@ fn runtime_metrics_label(summary: RuntimeMetricsSummary) -> Option { summary.api_calls.count )); } + if summary.websocket_calls.count > 0 { + let duration = format_duration_ms(summary.websocket_calls.duration_ms); + parts.push(format!( + "WebSocket: {} events send ({duration})", + summary.websocket_calls.count + )); + } if summary.streaming_events.count > 0 { let duration = format_duration_ms(summary.streaming_events.duration_ms); let stream_label = pluralize(summary.streaming_events.count, "Stream", "Streams"); @@ -2037,6 +2044,13 @@ fn runtime_metrics_label(summary: RuntimeMetricsSummary) -> Option { summary.streaming_events.count )); } + if summary.websocket_events.count > 0 { + let duration = format_duration_ms(summary.websocket_events.duration_ms); + parts.push(format!( + "{} events received ({duration})", + summary.websocket_events.count + )); + } if parts.is_empty() { None } else { @@ -2181,15 +2195,25 @@ mod tests { count: 6, duration_ms: 900, }, + websocket_calls: RuntimeMetricTotals { + count: 1, + duration_ms: 700, + }, + websocket_events: RuntimeMetricTotals { + count: 4, + duration_ms: 1_200, + }, }; let cell = FinalMessageSeparator::new(Some(12), Some(summary)); - let rendered = render_lines(&cell.display_lines(120)); + let rendered = render_lines(&cell.display_lines(200)); assert_eq!(rendered.len(), 1); assert!(rendered[0].contains("Worked for 12s")); assert!(rendered[0].contains("Local tools: 3 calls (2.5s)")); assert!(rendered[0].contains("Inference: 2 calls (1.2s)")); + assert!(rendered[0].contains("WebSocket: 1 events send (700ms)")); assert!(rendered[0].contains("Streams: 6 events (900ms)")); + assert!(rendered[0].contains("4 events received (1.2s)")); } #[test]