mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Add websocket telemetry metrics and labels (#10316)
Summary - expose websocket telemetry hooks through the responses client so request durations and event processing can be reported - record websocket request/event metrics and emit runtime telemetry events that the history UI now surfaces - improve tests to cover websocket telemetry reporting and guard runtime summary updates <img width="824" height="79" alt="Screenshot 2026-01-31 at 5 28 12 PM" src="https://github.com/user-attachments/assets/ea9a7965-d8b4-4e3c-a984-ef4fdc44c81d" />
This commit is contained in:
3
codex-rs/Cargo.lock
generated
3
codex-rs/Cargo.lock
generated
@@ -1427,6 +1427,7 @@ dependencies = [
|
||||
"multimap",
|
||||
"once_cell",
|
||||
"openssl-sys",
|
||||
"opentelemetry_sdk",
|
||||
"os_info",
|
||||
"predicates",
|
||||
"pretty_assertions",
|
||||
@@ -1451,6 +1452,7 @@ dependencies = [
|
||||
"thiserror 2.0.17",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-tungstenite",
|
||||
"tokio-util",
|
||||
"toml 0.9.5",
|
||||
"toml_edit 0.24.0+spec-1.1.0",
|
||||
@@ -1778,6 +1780,7 @@ dependencies = [
|
||||
"strum_macros 0.27.2",
|
||||
"thiserror 2.0.17",
|
||||
"tokio",
|
||||
"tokio-tungstenite",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::error::ApiError;
|
||||
use crate::provider::Provider;
|
||||
use crate::sse::responses::ResponsesStreamEvent;
|
||||
use crate::sse::responses::process_responses_event;
|
||||
use crate::telemetry::WebsocketTelemetry;
|
||||
use codex_client::TransportError;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
@@ -18,6 +19,7 @@ use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Instant;
|
||||
use tokio_tungstenite::MaybeTlsStream;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::tungstenite::Error as WsError;
|
||||
@@ -38,14 +40,21 @@ pub struct ResponsesWebsocketConnection {
|
||||
// TODO (pakrym): is this the right place for timeout?
|
||||
idle_timeout: Duration,
|
||||
server_reasoning_included: bool,
|
||||
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
|
||||
}
|
||||
|
||||
impl ResponsesWebsocketConnection {
|
||||
fn new(stream: WsStream, idle_timeout: Duration, server_reasoning_included: bool) -> Self {
|
||||
fn new(
|
||||
stream: WsStream,
|
||||
idle_timeout: Duration,
|
||||
server_reasoning_included: bool,
|
||||
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
stream: Arc::new(Mutex::new(Some(stream))),
|
||||
idle_timeout,
|
||||
server_reasoning_included,
|
||||
telemetry,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,6 +71,7 @@ impl ResponsesWebsocketConnection {
|
||||
let stream = Arc::clone(&self.stream);
|
||||
let idle_timeout = self.idle_timeout;
|
||||
let server_reasoning_included = self.server_reasoning_included;
|
||||
let telemetry = self.telemetry.clone();
|
||||
let request_body = serde_json::to_value(&request).map_err(|err| {
|
||||
ApiError::Stream(format!("failed to encode websocket request: {err}"))
|
||||
})?;
|
||||
@@ -87,6 +97,7 @@ impl ResponsesWebsocketConnection {
|
||||
tx_event.clone(),
|
||||
request_body,
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -114,6 +125,7 @@ impl<A: AuthProvider> ResponsesWebsocketClient<A> {
|
||||
&self,
|
||||
extra_headers: HeaderMap,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
|
||||
) -> Result<ResponsesWebsocketConnection, ApiError> {
|
||||
let ws_url = self
|
||||
.provider
|
||||
@@ -130,6 +142,7 @@ impl<A: AuthProvider> ResponsesWebsocketClient<A> {
|
||||
stream,
|
||||
self.provider.stream_idle_timeout,
|
||||
server_reasoning_included,
|
||||
telemetry,
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -218,6 +231,7 @@ async fn run_websocket_response_stream(
|
||||
tx_event: mpsc::Sender<std::result::Result<ResponseEvent, ApiError>>,
|
||||
request_body: Value,
|
||||
idle_timeout: Duration,
|
||||
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
|
||||
) -> Result<(), ApiError> {
|
||||
let request_text = match serde_json::to_string(&request_body) {
|
||||
Ok(text) => text,
|
||||
@@ -228,16 +242,26 @@ async fn run_websocket_response_stream(
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = ws_stream.send(Message::Text(request_text.into())).await {
|
||||
return Err(ApiError::Stream(format!(
|
||||
"failed to send websocket request: {err}"
|
||||
)));
|
||||
let request_start = Instant::now();
|
||||
let result = ws_stream
|
||||
.send(Message::Text(request_text.into()))
|
||||
.await
|
||||
.map_err(|err| ApiError::Stream(format!("failed to send websocket request: {err}")));
|
||||
|
||||
if let Some(t) = telemetry.as_ref() {
|
||||
t.on_ws_request(request_start.elapsed(), result.as_ref().err());
|
||||
}
|
||||
|
||||
result?;
|
||||
|
||||
loop {
|
||||
let poll_start = Instant::now();
|
||||
let response = tokio::time::timeout(idle_timeout, ws_stream.next())
|
||||
.await
|
||||
.map_err(|_| ApiError::Stream("idle timeout waiting for websocket".into()));
|
||||
if let Some(t) = telemetry.as_ref() {
|
||||
t.on_ws_event(&response, poll_start.elapsed());
|
||||
}
|
||||
let message = match response {
|
||||
Ok(Some(Ok(msg))) => msg,
|
||||
Ok(Some(Err(err))) => {
|
||||
|
||||
@@ -40,3 +40,4 @@ pub use crate::requests::ResponsesRequest;
|
||||
pub use crate::requests::ResponsesRequestBuilder;
|
||||
pub use crate::sse::stream_from_fixture;
|
||||
pub use crate::telemetry::SseTelemetry;
|
||||
pub use crate::telemetry::WebsocketTelemetry;
|
||||
|
||||
@@ -157,7 +157,7 @@ struct ResponseCompletedOutputTokensDetails {
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ResponsesStreamEvent {
|
||||
#[serde(rename = "type")]
|
||||
kind: String,
|
||||
pub(crate) kind: String,
|
||||
response: Option<Value>,
|
||||
item: Option<Value>,
|
||||
delta: Option<String>,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::error::ApiError;
|
||||
use codex_client::Request;
|
||||
use codex_client::RequestTelemetry;
|
||||
use codex_client::Response;
|
||||
@@ -10,6 +11,8 @@ use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio_tungstenite::tungstenite::Error;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
/// Generic telemetry.
|
||||
pub trait SseTelemetry: Send + Sync {
|
||||
@@ -28,6 +31,17 @@ pub trait SseTelemetry: Send + Sync {
|
||||
);
|
||||
}
|
||||
|
||||
/// Telemetry for Responses WebSocket transport.
|
||||
pub trait WebsocketTelemetry: Send + Sync {
|
||||
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>);
|
||||
|
||||
fn on_ws_event(
|
||||
&self,
|
||||
result: &Result<Option<Result<Message, Error>>, ApiError>,
|
||||
duration: Duration,
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) trait WithStatus {
|
||||
fn status(&self) -> StatusCode;
|
||||
}
|
||||
|
||||
@@ -90,6 +90,7 @@ tokio = { workspace = true, features = [
|
||||
"signal",
|
||||
] }
|
||||
tokio-util = { workspace = true, features = ["rt"] }
|
||||
tokio-tungstenite = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
toml_edit = { workspace = true }
|
||||
tracing = { workspace = true, features = ["log"] }
|
||||
@@ -145,6 +146,10 @@ image = { workspace = true, features = ["jpeg", "png"] }
|
||||
maplit = { workspace = true }
|
||||
predicates = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
opentelemetry_sdk = { workspace = true, features = [
|
||||
"experimental_metrics_custom_reader",
|
||||
"metrics",
|
||||
] }
|
||||
serial_test = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
|
||||
@@ -21,6 +21,7 @@ use codex_api::ResponsesWebsocketClient as ApiWebSocketResponsesClient;
|
||||
use codex_api::ResponsesWebsocketConnection as ApiWebSocketConnection;
|
||||
use codex_api::SseTelemetry;
|
||||
use codex_api::TransportError;
|
||||
use codex_api::WebsocketTelemetry;
|
||||
use codex_api::build_conversation_headers;
|
||||
use codex_api::common::Reasoning;
|
||||
use codex_api::common::ResponsesWsRequest;
|
||||
@@ -46,6 +47,8 @@ use reqwest::StatusCode;
|
||||
use serde_json::Value;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_tungstenite::tungstenite::Error;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::AuthManager;
|
||||
@@ -451,9 +454,14 @@ impl ModelClientSession {
|
||||
if needs_new {
|
||||
let mut headers = options.extra_headers.clone();
|
||||
headers.extend(build_conversation_headers(options.conversation_id.clone()));
|
||||
let websocket_telemetry = self.build_websocket_telemetry();
|
||||
let new_conn: ApiWebSocketConnection =
|
||||
ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
||||
.connect(headers, options.turn_state.clone())
|
||||
.connect(
|
||||
headers,
|
||||
options.turn_state.clone(),
|
||||
Some(websocket_telemetry),
|
||||
)
|
||||
.await?;
|
||||
self.connection = Some(new_conn);
|
||||
}
|
||||
@@ -650,6 +658,13 @@ impl ModelClientSession {
|
||||
let sse_telemetry: Arc<dyn SseTelemetry> = telemetry;
|
||||
(request_telemetry, sse_telemetry)
|
||||
}
|
||||
|
||||
/// Builds telemetry for the Responses API WebSocket transport.
|
||||
fn build_websocket_telemetry(&self) -> Arc<dyn WebsocketTelemetry> {
|
||||
let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone()));
|
||||
let websocket_telemetry: Arc<dyn WebsocketTelemetry> = telemetry;
|
||||
websocket_telemetry
|
||||
}
|
||||
}
|
||||
|
||||
impl ModelClient {
|
||||
@@ -849,3 +864,19 @@ impl SseTelemetry for ApiTelemetry {
|
||||
self.otel_manager.log_sse_event(result, duration);
|
||||
}
|
||||
}
|
||||
|
||||
impl WebsocketTelemetry for ApiTelemetry {
|
||||
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>) {
|
||||
let error_message = error.map(std::string::ToString::to_string);
|
||||
self.otel_manager
|
||||
.record_websocket_request(duration, error_message.as_deref());
|
||||
}
|
||||
|
||||
fn on_ws_event(
|
||||
&self,
|
||||
result: &std::result::Result<Option<std::result::Result<Message, Error>>, ApiError>,
|
||||
duration: Duration,
|
||||
) {
|
||||
self.otel_manager.record_websocket_event(result, duration);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,8 @@ use codex_core::features::Feature;
|
||||
use codex_core::models_manager::manager::ModelsManager;
|
||||
use codex_core::protocol::SessionSource;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_otel::metrics::MetricsClient;
|
||||
use codex_otel::metrics::MetricsConfig;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
@@ -25,15 +27,19 @@ use core_test_support::responses::start_websocket_server;
|
||||
use core_test_support::responses::start_websocket_server_with_headers;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use futures::StreamExt;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tracing_test::traced_test;
|
||||
|
||||
const MODEL: &str = "gpt-5.2-codex";
|
||||
|
||||
struct WebsocketTestHarness {
|
||||
_codex_home: TempDir,
|
||||
client: ModelClient,
|
||||
otel_manager: OtelManager,
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
@@ -64,6 +70,38 @@ async fn responses_websocket_streams_request() {
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[traced_test]
|
||||
async fn responses_websocket_emits_websocket_telemetry_events() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
harness.otel_manager.reset_runtime_metrics();
|
||||
let mut session = harness.client.new_session();
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
|
||||
stream_until_complete(&mut session, &prompt).await;
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
let summary = harness
|
||||
.otel_manager
|
||||
.runtime_metrics_summary()
|
||||
.expect("runtime metrics summary");
|
||||
assert_eq!(summary.api_calls.count, 0);
|
||||
assert_eq!(summary.streaming_events.count, 0);
|
||||
assert_eq!(summary.websocket_calls.count, 1);
|
||||
assert_eq!(summary.websocket_events.count, 2);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_emits_reasoning_included_event() {
|
||||
skip_if_no_network!();
|
||||
@@ -211,6 +249,12 @@ async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness
|
||||
let model_info = ModelsManager::construct_model_info_offline(MODEL, &config);
|
||||
let conversation_id = ThreadId::new();
|
||||
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let metrics = MetricsClient::new(
|
||||
MetricsConfig::in_memory("test", "codex-core", env!("CARGO_PKG_VERSION"), exporter)
|
||||
.with_runtime_reader(),
|
||||
)
|
||||
.expect("in-memory metrics client");
|
||||
let otel_manager = OtelManager::new(
|
||||
conversation_id,
|
||||
MODEL,
|
||||
@@ -221,12 +265,13 @@ async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness
|
||||
false,
|
||||
"test".to_string(),
|
||||
SessionSource::Exec,
|
||||
);
|
||||
)
|
||||
.with_metrics(metrics);
|
||||
let client = ModelClient::new(
|
||||
Arc::clone(&config),
|
||||
None,
|
||||
model_info,
|
||||
otel_manager,
|
||||
otel_manager.clone(),
|
||||
provider.clone(),
|
||||
None,
|
||||
ReasoningSummary::Auto,
|
||||
@@ -238,6 +283,7 @@ async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness
|
||||
WebsocketTestHarness {
|
||||
_codex_home: codex_home,
|
||||
client,
|
||||
otel_manager,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -56,6 +56,7 @@ serde_json = { workspace = true }
|
||||
strum_macros = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-tungstenite = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-opentelemetry = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
|
||||
@@ -4,3 +4,7 @@ pub(crate) const API_CALL_COUNT_METRIC: &str = "codex.api_request";
|
||||
pub(crate) const API_CALL_DURATION_METRIC: &str = "codex.api_request.duration_ms";
|
||||
pub(crate) const SSE_EVENT_COUNT_METRIC: &str = "codex.sse_event";
|
||||
pub(crate) const SSE_EVENT_DURATION_METRIC: &str = "codex.sse_event.duration_ms";
|
||||
pub(crate) const WEBSOCKET_REQUEST_COUNT_METRIC: &str = "codex.websocket.request";
|
||||
pub(crate) const WEBSOCKET_REQUEST_DURATION_METRIC: &str = "codex.websocket.request.duration_ms";
|
||||
pub(crate) const WEBSOCKET_EVENT_COUNT_METRIC: &str = "codex.websocket.event";
|
||||
pub(crate) const WEBSOCKET_EVENT_DURATION_METRIC: &str = "codex.websocket.event.duration_ms";
|
||||
|
||||
@@ -4,6 +4,10 @@ use crate::metrics::names::SSE_EVENT_COUNT_METRIC;
|
||||
use crate::metrics::names::SSE_EVENT_DURATION_METRIC;
|
||||
use crate::metrics::names::TOOL_CALL_COUNT_METRIC;
|
||||
use crate::metrics::names::TOOL_CALL_DURATION_METRIC;
|
||||
use crate::metrics::names::WEBSOCKET_EVENT_COUNT_METRIC;
|
||||
use crate::metrics::names::WEBSOCKET_EVENT_DURATION_METRIC;
|
||||
use crate::metrics::names::WEBSOCKET_REQUEST_COUNT_METRIC;
|
||||
use crate::metrics::names::WEBSOCKET_REQUEST_DURATION_METRIC;
|
||||
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
|
||||
use opentelemetry_sdk::metrics::data::Metric;
|
||||
use opentelemetry_sdk::metrics::data::MetricData;
|
||||
@@ -26,11 +30,17 @@ pub struct RuntimeMetricsSummary {
|
||||
pub tool_calls: RuntimeMetricTotals,
|
||||
pub api_calls: RuntimeMetricTotals,
|
||||
pub streaming_events: RuntimeMetricTotals,
|
||||
pub websocket_calls: RuntimeMetricTotals,
|
||||
pub websocket_events: RuntimeMetricTotals,
|
||||
}
|
||||
|
||||
impl RuntimeMetricsSummary {
|
||||
pub fn is_empty(self) -> bool {
|
||||
self.tool_calls.is_empty() && self.api_calls.is_empty() && self.streaming_events.is_empty()
|
||||
self.tool_calls.is_empty()
|
||||
&& self.api_calls.is_empty()
|
||||
&& self.streaming_events.is_empty()
|
||||
&& self.websocket_calls.is_empty()
|
||||
&& self.websocket_events.is_empty()
|
||||
}
|
||||
|
||||
pub(crate) fn from_snapshot(snapshot: &ResourceMetrics) -> Self {
|
||||
@@ -46,10 +56,20 @@ impl RuntimeMetricsSummary {
|
||||
count: sum_counter(snapshot, SSE_EVENT_COUNT_METRIC),
|
||||
duration_ms: sum_histogram_ms(snapshot, SSE_EVENT_DURATION_METRIC),
|
||||
};
|
||||
let websocket_calls = RuntimeMetricTotals {
|
||||
count: sum_counter(snapshot, WEBSOCKET_REQUEST_COUNT_METRIC),
|
||||
duration_ms: sum_histogram_ms(snapshot, WEBSOCKET_REQUEST_DURATION_METRIC),
|
||||
};
|
||||
let websocket_events = RuntimeMetricTotals {
|
||||
count: sum_counter(snapshot, WEBSOCKET_EVENT_COUNT_METRIC),
|
||||
duration_ms: sum_histogram_ms(snapshot, WEBSOCKET_EVENT_DURATION_METRIC),
|
||||
};
|
||||
Self {
|
||||
tool_calls,
|
||||
api_calls,
|
||||
streaming_events,
|
||||
websocket_calls,
|
||||
websocket_events,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,9 +4,14 @@ use crate::metrics::names::SSE_EVENT_COUNT_METRIC;
|
||||
use crate::metrics::names::SSE_EVENT_DURATION_METRIC;
|
||||
use crate::metrics::names::TOOL_CALL_COUNT_METRIC;
|
||||
use crate::metrics::names::TOOL_CALL_DURATION_METRIC;
|
||||
use crate::metrics::names::WEBSOCKET_EVENT_COUNT_METRIC;
|
||||
use crate::metrics::names::WEBSOCKET_EVENT_DURATION_METRIC;
|
||||
use crate::metrics::names::WEBSOCKET_REQUEST_COUNT_METRIC;
|
||||
use crate::metrics::names::WEBSOCKET_REQUEST_DURATION_METRIC;
|
||||
use crate::otel_provider::traceparent_context_from_env;
|
||||
use chrono::SecondsFormat;
|
||||
use chrono::Utc;
|
||||
use codex_api::ApiError;
|
||||
use codex_api::ResponseEvent;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -36,6 +41,7 @@ pub use crate::OtelManager;
|
||||
pub use crate::ToolDecisionSource;
|
||||
|
||||
const SSE_UNKNOWN_KIND: &str = "unknown";
|
||||
const WEBSOCKET_UNKNOWN_KIND: &str = "unknown";
|
||||
|
||||
impl OtelManager {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -190,6 +196,134 @@ impl OtelManager {
|
||||
);
|
||||
}
|
||||
|
||||
pub fn record_websocket_request(&self, duration: Duration, error: Option<&str>) {
|
||||
let success_str = if error.is_none() { "true" } else { "false" };
|
||||
self.counter(
|
||||
WEBSOCKET_REQUEST_COUNT_METRIC,
|
||||
1,
|
||||
&[("success", success_str)],
|
||||
);
|
||||
self.record_duration(
|
||||
WEBSOCKET_REQUEST_DURATION_METRIC,
|
||||
duration,
|
||||
&[("success", success_str)],
|
||||
);
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.websocket_request",
|
||||
event.timestamp = %timestamp(),
|
||||
conversation.id = %self.metadata.conversation_id,
|
||||
app.version = %self.metadata.app_version,
|
||||
auth_mode = self.metadata.auth_mode,
|
||||
user.account_id = self.metadata.account_id,
|
||||
user.email = self.metadata.account_email,
|
||||
terminal.type = %self.metadata.terminal_type,
|
||||
model = %self.metadata.model,
|
||||
slug = %self.metadata.slug,
|
||||
duration_ms = %duration.as_millis(),
|
||||
success = success_str,
|
||||
error.message = error,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn record_websocket_event(
|
||||
&self,
|
||||
result: &Result<
|
||||
Option<
|
||||
Result<
|
||||
tokio_tungstenite::tungstenite::Message,
|
||||
tokio_tungstenite::tungstenite::Error,
|
||||
>,
|
||||
>,
|
||||
ApiError,
|
||||
>,
|
||||
duration: Duration,
|
||||
) {
|
||||
let mut kind = None;
|
||||
let mut error_message = None;
|
||||
let mut success = true;
|
||||
|
||||
match result {
|
||||
Ok(Some(Ok(message))) => match message {
|
||||
tokio_tungstenite::tungstenite::Message::Text(text) => {
|
||||
match serde_json::from_str::<serde_json::Value>(text) {
|
||||
Ok(value) => {
|
||||
kind = value
|
||||
.get("type")
|
||||
.and_then(|value| value.as_str())
|
||||
.map(std::string::ToString::to_string);
|
||||
if kind.as_deref() == Some("response.failed") {
|
||||
success = false;
|
||||
error_message = value
|
||||
.get("response")
|
||||
.and_then(|value| value.get("error"))
|
||||
.map(serde_json::Value::to_string)
|
||||
.or_else(|| Some("response.failed event received".to_string()));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
kind = Some("parse_error".to_string());
|
||||
error_message = Some(err.to_string());
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio_tungstenite::tungstenite::Message::Binary(_) => {
|
||||
success = false;
|
||||
error_message = Some("unexpected binary websocket event".to_string());
|
||||
}
|
||||
tokio_tungstenite::tungstenite::Message::Ping(_)
|
||||
| tokio_tungstenite::tungstenite::Message::Pong(_) => {
|
||||
return;
|
||||
}
|
||||
tokio_tungstenite::tungstenite::Message::Close(_) => {
|
||||
success = false;
|
||||
error_message =
|
||||
Some("websocket closed by server before response.completed".to_string());
|
||||
}
|
||||
tokio_tungstenite::tungstenite::Message::Frame(_) => {
|
||||
success = false;
|
||||
error_message = Some("unexpected websocket frame".to_string());
|
||||
}
|
||||
},
|
||||
Ok(Some(Err(err))) => {
|
||||
success = false;
|
||||
error_message = Some(err.to_string());
|
||||
}
|
||||
Ok(None) => {
|
||||
success = false;
|
||||
error_message = Some("stream closed before response.completed".to_string());
|
||||
}
|
||||
Err(err) => {
|
||||
success = false;
|
||||
error_message = Some(err.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
let kind_str = kind.as_deref().unwrap_or(WEBSOCKET_UNKNOWN_KIND);
|
||||
let success_str = if success { "true" } else { "false" };
|
||||
let tags = [("kind", kind_str), ("success", success_str)];
|
||||
self.counter(WEBSOCKET_EVENT_COUNT_METRIC, 1, &tags);
|
||||
self.record_duration(WEBSOCKET_EVENT_DURATION_METRIC, duration, &tags);
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.websocket_event",
|
||||
event.timestamp = %timestamp(),
|
||||
event.kind = %kind_str,
|
||||
conversation.id = %self.metadata.conversation_id,
|
||||
app.version = %self.metadata.app_version,
|
||||
auth_mode = self.metadata.auth_mode,
|
||||
user.account_id = self.metadata.account_id,
|
||||
user.email = self.metadata.account_email,
|
||||
terminal.type = %self.metadata.terminal_type,
|
||||
model = %self.metadata.model,
|
||||
slug = %self.metadata.slug,
|
||||
duration_ms = %duration.as_millis(),
|
||||
success = success_str,
|
||||
error.message = error_message.as_deref(),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn log_sse_event<E>(
|
||||
&self,
|
||||
response: &Result<Option<Result<StreamEvent, StreamError<E>>>, Elapsed>,
|
||||
|
||||
@@ -11,6 +11,7 @@ use eventsource_stream::Event as StreamEvent;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::time::Duration;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
#[test]
|
||||
fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<()> {
|
||||
@@ -43,6 +44,7 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
|
||||
"ok",
|
||||
);
|
||||
manager.record_api_request(1, Some(200), None, Duration::from_millis(300));
|
||||
manager.record_websocket_request(Duration::from_millis(400), None);
|
||||
let sse_response: std::result::Result<
|
||||
Option<std::result::Result<StreamEvent, eventsource_stream::EventStreamError<&str>>>,
|
||||
tokio::time::error::Elapsed,
|
||||
@@ -53,6 +55,13 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
|
||||
retry: None,
|
||||
})));
|
||||
manager.log_sse_event(&sse_response, Duration::from_millis(120));
|
||||
let ws_response: std::result::Result<
|
||||
Option<std::result::Result<Message, tokio_tungstenite::tungstenite::Error>>,
|
||||
codex_api::ApiError,
|
||||
> = Ok(Some(Ok(Message::Text(
|
||||
r#"{"type":"response.created"}"#.into(),
|
||||
))));
|
||||
manager.record_websocket_event(&ws_response, Duration::from_millis(80));
|
||||
|
||||
let summary = manager
|
||||
.runtime_metrics_summary()
|
||||
@@ -70,6 +79,14 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
|
||||
count: 1,
|
||||
duration_ms: 120,
|
||||
},
|
||||
websocket_calls: RuntimeMetricTotals {
|
||||
count: 1,
|
||||
duration_ms: 400,
|
||||
},
|
||||
websocket_events: RuntimeMetricTotals {
|
||||
count: 1,
|
||||
duration_ms: 80,
|
||||
},
|
||||
};
|
||||
assert_eq!(summary, expected);
|
||||
|
||||
|
||||
@@ -2028,6 +2028,13 @@ fn runtime_metrics_label(summary: RuntimeMetricsSummary) -> Option<String> {
|
||||
summary.api_calls.count
|
||||
));
|
||||
}
|
||||
if summary.websocket_calls.count > 0 {
|
||||
let duration = format_duration_ms(summary.websocket_calls.duration_ms);
|
||||
parts.push(format!(
|
||||
"WebSocket: {} events send ({duration})",
|
||||
summary.websocket_calls.count
|
||||
));
|
||||
}
|
||||
if summary.streaming_events.count > 0 {
|
||||
let duration = format_duration_ms(summary.streaming_events.duration_ms);
|
||||
let stream_label = pluralize(summary.streaming_events.count, "Stream", "Streams");
|
||||
@@ -2037,6 +2044,13 @@ fn runtime_metrics_label(summary: RuntimeMetricsSummary) -> Option<String> {
|
||||
summary.streaming_events.count
|
||||
));
|
||||
}
|
||||
if summary.websocket_events.count > 0 {
|
||||
let duration = format_duration_ms(summary.websocket_events.duration_ms);
|
||||
parts.push(format!(
|
||||
"{} events received ({duration})",
|
||||
summary.websocket_events.count
|
||||
));
|
||||
}
|
||||
if parts.is_empty() {
|
||||
None
|
||||
} else {
|
||||
@@ -2181,15 +2195,25 @@ mod tests {
|
||||
count: 6,
|
||||
duration_ms: 900,
|
||||
},
|
||||
websocket_calls: RuntimeMetricTotals {
|
||||
count: 1,
|
||||
duration_ms: 700,
|
||||
},
|
||||
websocket_events: RuntimeMetricTotals {
|
||||
count: 4,
|
||||
duration_ms: 1_200,
|
||||
},
|
||||
};
|
||||
let cell = FinalMessageSeparator::new(Some(12), Some(summary));
|
||||
let rendered = render_lines(&cell.display_lines(120));
|
||||
let rendered = render_lines(&cell.display_lines(200));
|
||||
|
||||
assert_eq!(rendered.len(), 1);
|
||||
assert!(rendered[0].contains("Worked for 12s"));
|
||||
assert!(rendered[0].contains("Local tools: 3 calls (2.5s)"));
|
||||
assert!(rendered[0].contains("Inference: 2 calls (1.2s)"));
|
||||
assert!(rendered[0].contains("WebSocket: 1 events send (700ms)"));
|
||||
assert!(rendered[0].contains("Streams: 6 events (900ms)"));
|
||||
assert!(rendered[0].contains("4 events received (1.2s)"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user