From 21dfe383ed2c4c7bc8a2f1d63606fa54a2336219 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 28 May 2026 23:55:12 -0700 Subject: [PATCH] Add exec-server OTEL metrics --- codex-rs/Cargo.lock | 3 + codex-rs/cli/src/main.rs | 14 +- codex-rs/exec-server/Cargo.toml | 3 + codex-rs/exec-server/src/lib.rs | 3 + codex-rs/exec-server/src/local_process.rs | 50 ++- codex-rs/exec-server/src/relay.rs | 5 +- codex-rs/exec-server/src/remote.rs | 12 +- codex-rs/exec-server/src/server.rs | 11 +- .../exec-server/src/server/handler/tests.rs | 8 +- .../exec-server/src/server/process_handler.rs | 8 +- codex-rs/exec-server/src/server/processor.rs | 105 +++++- .../src/server/session_registry.rs | 8 +- codex-rs/exec-server/src/server/transport.rs | 36 +- .../exec-server/src/server/transport_tests.rs | 1 + codex-rs/exec-server/src/telemetry.rs | 320 ++++++++++++++++++ codex-rs/otel/src/metrics/client.rs | 23 ++ .../otel/tests/suite/otlp_http_loopback.rs | 15 + codex-rs/otel/tests/suite/send.rs | 25 ++ 18 files changed, 595 insertions(+), 55 deletions(-) create mode 100644 codex-rs/exec-server/src/telemetry.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 2eeafe30b9..e5e40e9855 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1878,6 +1878,8 @@ dependencies = [ "eventsource-stream", "futures", "http 1.4.0", + "opentelemetry", + "opentelemetry_sdk", "pretty_assertions", "regex-lite", "reqwest 0.12.28", @@ -2777,6 +2779,7 @@ dependencies = [ "codex-app-server-protocol", "codex-client", "codex-file-system", + "codex-otel", "codex-protocol", "codex-sandboxing", "codex-test-binary-support", diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index 6e318847e9..89be149853 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -1509,7 +1509,7 @@ async fn run_exec_server_command( arg0_paths.codex_linux_sandbox_exe.clone(), )?; let config = load_exec_server_config(root_config_overrides, strict_config).await?; - let _otel = init_exec_server_tracing(&config); + let (_otel, telemetry) = init_exec_server_tracing(&config); let exec_server_span = exec_server_root_span(); if let Some(base_url) = cmd.remote { let environment_id = cmd @@ -1528,6 +1528,7 @@ async fn run_exec_server_command( if let Some(name) = cmd.name { remote_config.name = name; } + let remote_config = remote_config.with_telemetry(telemetry); codex_exec_server::run_remote_environment(remote_config, runtime_paths) .instrument(exec_server_span) .await?; @@ -1538,14 +1539,16 @@ async fn run_exec_server_command( .listen .as_deref() .unwrap_or(codex_exec_server::DEFAULT_LISTEN_URL); - codex_exec_server::run_main(listen_url, runtime_paths) + codex_exec_server::run_main_with_telemetry(listen_url, runtime_paths, telemetry) .instrument(exec_server_span) .await .map_err(anyhow::Error::from_boxed) } } -fn init_exec_server_tracing(config: &codex_core::config::Config) -> impl Send + Sync { +fn init_exec_server_tracing( + config: &codex_core::config::Config, +) -> (impl Send + Sync, codex_exec_server::ExecServerTelemetry) { let fmt_layer = tracing_subscriber::fmt::layer() .with_writer(std::io::stderr) .with_filter(exec_server_stderr_env_filter()); @@ -1571,13 +1574,16 @@ fn init_exec_server_tracing(config: &codex_core::config::Config) -> impl Send + let otel_logger_layer = otel.as_ref().and_then(|otel| otel.logger_layer()); let otel_tracing_layer = otel.as_ref().and_then(|otel| otel.tracing_layer()); + let telemetry = codex_exec_server::ExecServerTelemetry::new( + otel.as_ref().and_then(|otel| otel.metrics()).cloned(), + ); let _ = tracing_subscriber::registry() .with(fmt_layer) .with(otel_tracing_layer) .with(otel_logger_layer) .try_init(); tracing::callsite::rebuild_interest_cache(); - otel + (otel, telemetry) } fn exec_server_root_span() -> tracing::Span { diff --git a/codex-rs/exec-server/Cargo.toml b/codex-rs/exec-server/Cargo.toml index d842094a16..99f372d8d1 100644 --- a/codex-rs/exec-server/Cargo.toml +++ b/codex-rs/exec-server/Cargo.toml @@ -20,6 +20,7 @@ codex-app-server-protocol = { workspace = true } codex-api = { workspace = true } codex-client = { workspace = true } codex-file-system = { workspace = true } +codex-otel = { workspace = true } codex-protocol = { workspace = true } codex-sandboxing = { workspace = true } codex-utils-absolute-path = { workspace = true } @@ -53,6 +54,8 @@ anyhow = { workspace = true } codex-test-binary-support = { workspace = true } ctor = { workspace = true } http = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } pretty_assertions = { workspace = true } serial_test = { workspace = true } tempfile = { workspace = true } diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index f2d16f8fac..8922fb62f8 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -22,6 +22,7 @@ mod rpc; mod runtime_paths; mod sandboxed_file_system; mod server; +mod telemetry; pub use client::ExecServerClient; pub use client::ExecServerError; @@ -97,3 +98,5 @@ pub use runtime_paths::ExecServerRuntimePaths; pub use server::DEFAULT_LISTEN_URL; pub use server::ExecServerListenUrlParseError; pub use server::run_main; +pub use server::run_main_with_telemetry; +pub use telemetry::ExecServerTelemetry; diff --git a/codex-rs/exec-server/src/local_process.rs b/codex-rs/exec-server/src/local_process.rs index bc69ec6105..fd5f75600b 100644 --- a/codex-rs/exec-server/src/local_process.rs +++ b/codex-rs/exec-server/src/local_process.rs @@ -3,6 +3,7 @@ use std::collections::VecDeque; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use async_trait::async_trait; use codex_app_server_protocol::JSONRPCErrorError; @@ -45,6 +46,7 @@ use crate::rpc::RpcServerOutboundMessage; use crate::rpc::internal_error; use crate::rpc::invalid_params; use crate::rpc::invalid_request; +use crate::telemetry::ExecServerTelemetry; const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024; const NOTIFICATION_CHANNEL_CAPACITY: usize = 256; @@ -74,6 +76,8 @@ struct RunningProcess { output_notify: Arc, open_streams: usize, closed: bool, + started_at: Instant, + metrics_finished: bool, } enum ProcessEntry { @@ -84,6 +88,7 @@ enum ProcessEntry { struct Inner { notifications: std::sync::RwLock>, processes: Mutex>, + telemetry: ExecServerTelemetry, } #[derive(Clone)] @@ -103,16 +108,23 @@ impl Default for LocalProcess { let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(NOTIFICATION_CHANNEL_CAPACITY); tokio::spawn(async move { while outgoing_rx.recv().await.is_some() {} }); - Self::new(RpcNotificationSender::new(outgoing_tx)) + Self::new( + RpcNotificationSender::new(outgoing_tx), + ExecServerTelemetry::default(), + ) } } impl LocalProcess { - pub(crate) fn new(notifications: RpcNotificationSender) -> Self { + pub(crate) fn new( + notifications: RpcNotificationSender, + telemetry: ExecServerTelemetry, + ) -> Self { Self { inner: Arc::new(Inner { notifications: std::sync::RwLock::new(Some(notifications)), processes: Mutex::new(HashMap::new()), + telemetry, }), } } @@ -129,6 +141,11 @@ impl LocalProcess { .collect::>() }; for process in remaining { + if !process.metrics_finished { + self.inner + .telemetry + .process_finished("terminated", process.started_at.elapsed()); + } process.session.terminate(); } } @@ -226,9 +243,12 @@ impl LocalProcess { output_notify: Arc::clone(&output_notify), open_streams: 2, closed: false, + started_at: Instant::now(), + metrics_finished: false, })), ); } + self.inner.telemetry.process_started(); tokio::spawn(stream_output( process_id.clone(), @@ -601,7 +621,7 @@ async fn watch_exit( output_notify: Arc, ) { let exit_code = exit_rx.await.unwrap_or(-1); - let notification = { + let (notification, process_duration) = { let mut processes = inner.processes.lock().await; if let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) { let seq = process.next_seq; @@ -611,15 +631,25 @@ async fn watch_exit( process .events .publish(ExecProcessEvent::Exited { seq, exit_code }); - Some(ExecExitedNotification { - process_id: process_id.clone(), - seq, - exit_code, - }) + process.metrics_finished = true; + ( + Some(ExecExitedNotification { + process_id: process_id.clone(), + seq, + exit_code, + }), + Some(process.started_at.elapsed()), + ) } else { - None + (None, None) } }; + if let Some(process_duration) = process_duration { + inner.telemetry.process_finished( + if exit_code == 0 { "success" } else { "error" }, + process_duration, + ); + } output_notify.notify_waiters(); if let Some(notification) = notification && let Some(notifications) = notification_sender(&inner) @@ -890,6 +920,8 @@ mod tests { output_notify: Arc::clone(&output_notify), open_streams: 2, closed: false, + started_at: Instant::now(), + metrics_finished: false, })), ); assert!(previous.is_none()); diff --git a/codex-rs/exec-server/src/relay.rs b/codex-rs/exec-server/src/relay.rs index ae07d7652c..591098e225 100644 --- a/codex-rs/exec-server/src/relay.rs +++ b/codex-rs/exec-server/src/relay.rs @@ -26,6 +26,7 @@ use crate::relay_proto::RelayMessageFrame; use crate::relay_proto::RelayResume; use crate::relay_proto::relay_message_frame; use crate::server::ConnectionProcessor; +use crate::telemetry::ConnectionTransport; const RELAY_MESSAGE_FRAME_VERSION: u32 = 1; @@ -449,7 +450,9 @@ fn spawn_virtual_stream( transport: JsonRpcTransport::Plain, }; tokio::spawn(async move { - processor.run_connection(connection).await; + processor + .run_connection(connection, ConnectionTransport::Relay) + .await; }); VirtualStream { diff --git a/codex-rs/exec-server/src/remote.rs b/codex-rs/exec-server/src/remote.rs index b9ef642b55..922b2b17cf 100644 --- a/codex-rs/exec-server/src/remote.rs +++ b/codex-rs/exec-server/src/remote.rs @@ -12,6 +12,7 @@ use codex_utils_rustls_provider::ensure_rustls_crypto_provider; use crate::ExecServerError; use crate::ExecServerRuntimePaths; +use crate::ExecServerTelemetry; use crate::relay::run_multiplexed_environment; use crate::server::ConnectionProcessor; @@ -193,6 +194,7 @@ pub struct RemoteEnvironmentConfig { pub environment_id: String, pub name: String, auth_provider: SharedAuthProvider, + telemetry: ExecServerTelemetry, } impl std::fmt::Debug for RemoteEnvironmentConfig { @@ -218,8 +220,14 @@ impl RemoteEnvironmentConfig { environment_id, name: "codex-exec-server".to_string(), auth_provider, + telemetry: ExecServerTelemetry::default(), }) } + + pub fn with_telemetry(mut self, telemetry: ExecServerTelemetry) -> Self { + self.telemetry = telemetry; + self + } } /// Register an exec-server for remote use and serve requests over the returned @@ -231,7 +239,7 @@ pub async fn run_remote_environment( ensure_rustls_crypto_provider(); let client = EnvironmentRegistryClient::new(config.base_url.clone(), config.auth_provider.clone())?; - let processor = ConnectionProcessor::new(runtime_paths); + let processor = ConnectionProcessor::new(runtime_paths, config.telemetry.clone()); let mut backoff = Duration::from_secs(1); let mut connection_attempt = 0_u32; @@ -275,6 +283,7 @@ pub async fn run_remote_environment( ); backoff = Duration::from_secs(1); run_multiplexed_environment(websocket, processor.clone()).await; + config.telemetry.relay_reconnect("disconnected"); remote_event!( warn, WARN, @@ -295,6 +304,7 @@ pub async fn run_remote_environment( "codex.exec_server.remote_websocket_connect_failed", "failed to connect remote exec-server websocket" ); + config.telemetry.relay_reconnect("connect_failed"); } } diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index bf33eb77ba..d716714bbb 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -12,10 +12,19 @@ pub use transport::DEFAULT_LISTEN_URL; pub use transport::ExecServerListenUrlParseError; use crate::ExecServerRuntimePaths; +use crate::ExecServerTelemetry; pub async fn run_main( listen_url: &str, runtime_paths: ExecServerRuntimePaths, ) -> Result<(), Box> { - transport::run_transport(listen_url, runtime_paths).await + run_main_with_telemetry(listen_url, runtime_paths, ExecServerTelemetry::default()).await +} + +pub async fn run_main_with_telemetry( + listen_url: &str, + runtime_paths: ExecServerRuntimePaths, + telemetry: ExecServerTelemetry, +) -> Result<(), Box> { + transport::run_transport(listen_url, runtime_paths, telemetry).await } diff --git a/codex-rs/exec-server/src/server/handler/tests.rs b/codex-rs/exec-server/src/server/handler/tests.rs index 6b632fe890..8d823276fb 100644 --- a/codex-rs/exec-server/src/server/handler/tests.rs +++ b/codex-rs/exec-server/src/server/handler/tests.rs @@ -77,7 +77,7 @@ fn test_runtime_paths() -> ExecServerRuntimePaths { async fn initialized_handler() -> Arc { let (outgoing_tx, _outgoing_rx) = mpsc::channel(16); - let registry = SessionRegistry::new(); + let registry = SessionRegistry::new(crate::ExecServerTelemetry::default()); let handler = Arc::new(ExecServerHandler::new( registry, RpcNotificationSender::new(outgoing_tx), @@ -155,7 +155,7 @@ async fn terminate_reports_false_after_process_exit() { #[tokio::test] async fn long_poll_read_fails_after_session_resume() { let (first_tx, _first_rx) = mpsc::channel(16); - let registry = SessionRegistry::new(); + let registry = SessionRegistry::new(crate::ExecServerTelemetry::default()); let first_handler = Arc::new(ExecServerHandler::new( Arc::clone(®istry), RpcNotificationSender::new(first_tx), @@ -228,7 +228,7 @@ async fn long_poll_read_fails_after_session_resume() { #[tokio::test] async fn active_session_resume_is_rejected() { let (first_tx, _first_rx) = mpsc::channel(16); - let registry = SessionRegistry::new(); + let registry = SessionRegistry::new(crate::ExecServerTelemetry::default()); let first_handler = Arc::new(ExecServerHandler::new( Arc::clone(®istry), RpcNotificationSender::new(first_tx), @@ -272,7 +272,7 @@ async fn active_session_resume_is_rejected() { async fn output_and_exit_are_retained_after_notification_receiver_closes() { let (outgoing_tx, outgoing_rx) = mpsc::channel(16); let handler = Arc::new(ExecServerHandler::new( - SessionRegistry::new(), + SessionRegistry::new(crate::ExecServerTelemetry::default()), RpcNotificationSender::new(outgoing_tx), test_runtime_paths(), )); diff --git a/codex-rs/exec-server/src/server/process_handler.rs b/codex-rs/exec-server/src/server/process_handler.rs index 38fbace1cd..b7ca273da0 100644 --- a/codex-rs/exec-server/src/server/process_handler.rs +++ b/codex-rs/exec-server/src/server/process_handler.rs @@ -10,6 +10,7 @@ use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; use crate::protocol::WriteResponse; use crate::rpc::RpcNotificationSender; +use crate::telemetry::ExecServerTelemetry; #[derive(Clone)] pub(crate) struct ProcessHandler { @@ -17,9 +18,12 @@ pub(crate) struct ProcessHandler { } impl ProcessHandler { - pub(crate) fn new(notifications: RpcNotificationSender) -> Self { + pub(crate) fn new( + notifications: RpcNotificationSender, + telemetry: ExecServerTelemetry, + ) -> Self { Self { - process: LocalProcess::new(notifications), + process: LocalProcess::new(notifications, telemetry), } } diff --git a/codex-rs/exec-server/src/server/processor.rs b/codex-rs/exec-server/src/server/processor.rs index 6fc0723f0c..6e352a0baa 100644 --- a/codex-rs/exec-server/src/server/processor.rs +++ b/codex-rs/exec-server/src/server/processor.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Instant; use tokio::sync::mpsc; use tracing::debug; @@ -16,26 +17,39 @@ use crate::rpc::method_not_found; use crate::server::ExecServerHandler; use crate::server::registry::build_router; use crate::server::session_registry::SessionRegistry; +use crate::telemetry::ConnectionTransport; +use crate::telemetry::ExecServerTelemetry; #[derive(Clone)] pub(crate) struct ConnectionProcessor { session_registry: Arc, runtime_paths: ExecServerRuntimePaths, + telemetry: ExecServerTelemetry, } impl ConnectionProcessor { - pub(crate) fn new(runtime_paths: ExecServerRuntimePaths) -> Self { + pub(crate) fn new( + runtime_paths: ExecServerRuntimePaths, + telemetry: ExecServerTelemetry, + ) -> Self { Self { - session_registry: SessionRegistry::new(), + session_registry: SessionRegistry::new(telemetry.clone()), runtime_paths, + telemetry, } } - pub(crate) async fn run_connection(&self, connection: JsonRpcConnection) { + pub(crate) async fn run_connection( + &self, + connection: JsonRpcConnection, + transport: ConnectionTransport, + ) { run_connection( connection, Arc::clone(&self.session_registry), self.runtime_paths.clone(), + self.telemetry.clone(), + transport, ) .await; } @@ -45,7 +59,10 @@ async fn run_connection( connection: JsonRpcConnection, session_registry: Arc, runtime_paths: ExecServerRuntimePaths, + telemetry: ExecServerTelemetry, + transport: ConnectionTransport, ) { + let _connection_metrics = telemetry.connection_started(transport); let router = Arc::new(build_router()); let JsonRpcConnection { outgoing_tx: json_outgoing_tx, @@ -100,31 +117,50 @@ async fn run_connection( } JsonRpcConnectionEvent::Message(message) => match message { codex_app_server_protocol::JSONRPCMessage::Request(request) => { + let operation = request_operation(request.method.as_str()); + let request_started_at = Instant::now(); if let Some(route) = router.request_route(request.method.as_str()) { let message = tokio::select! { message = route(Arc::clone(&handler), request) => message, _ = disconnected_rx.changed() => { + telemetry.request_completed( + operation, + "disconnected", + request_started_at.elapsed(), + ); debug!("exec-server transport disconnected while handling request"); break; } }; + telemetry.request_completed( + operation, + request_result(&message), + request_started_at.elapsed(), + ); if let Some(message) = message && outgoing_tx.send(message).await.is_err() { break; } - } else if outgoing_tx - .send(RpcServerOutboundMessage::Error { - request_id: request.id, - error: method_not_found(format!( - "exec-server stub does not implement `{}` yet", - request.method - )), - }) - .await - .is_err() - { - break; + } else { + telemetry.request_completed( + operation, + "error", + request_started_at.elapsed(), + ); + if outgoing_tx + .send(RpcServerOutboundMessage::Error { + request_id: request.id, + error: method_not_found(format!( + "exec-server stub does not implement `{}` yet", + request.method + )), + }) + .await + .is_err() + { + break; + } } } codex_app_server_protocol::JSONRPCMessage::Notification(notification) => { @@ -184,6 +220,35 @@ async fn run_connection( let _ = outbound_task.await; } +fn request_operation(method: &str) -> &'static str { + match method { + crate::protocol::INITIALIZE_METHOD => "initialize", + crate::protocol::EXEC_METHOD + | crate::protocol::EXEC_READ_METHOD + | crate::protocol::EXEC_WRITE_METHOD + | crate::protocol::EXEC_TERMINATE_METHOD => "process", + crate::protocol::FS_READ_FILE_METHOD + | crate::protocol::FS_WRITE_FILE_METHOD + | crate::protocol::FS_CREATE_DIRECTORY_METHOD + | crate::protocol::FS_GET_METADATA_METHOD + | crate::protocol::FS_READ_DIRECTORY_METHOD + | crate::protocol::FS_REMOVE_METHOD + | crate::protocol::FS_COPY_METHOD => "filesystem", + crate::protocol::HTTP_REQUEST_METHOD => "http", + _ => "unknown", + } +} + +fn request_result(message: &Option) -> &'static str { + match message { + Some(RpcServerOutboundMessage::Error { .. }) => "error", + Some( + RpcServerOutboundMessage::Response { .. } | RpcServerOutboundMessage::Notification(_), + ) + | None => "success", + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -226,7 +291,7 @@ mod tests { #[tokio::test] async fn transport_disconnect_detaches_session_during_in_flight_read() { - let registry = SessionRegistry::new(); + let registry = SessionRegistry::new(crate::ExecServerTelemetry::default()); let (mut first_writer, mut first_lines, first_task) = spawn_test_connection(Arc::clone(®istry), "first"); @@ -322,7 +387,13 @@ mod tests { let (server_writer, client_reader) = duplex(1 << 20); let connection = JsonRpcConnection::from_stdio(server_reader, server_writer, label.to_string()); - let task = tokio::spawn(run_connection(connection, registry, test_runtime_paths())); + let task = tokio::spawn(run_connection( + connection, + registry, + test_runtime_paths(), + crate::ExecServerTelemetry::default(), + crate::telemetry::ConnectionTransport::Stdio, + )); (client_writer, BufReader::new(client_reader).lines(), task) } diff --git a/codex-rs/exec-server/src/server/session_registry.rs b/codex-rs/exec-server/src/server/session_registry.rs index 82c779c6bb..0e10a6a820 100644 --- a/codex-rs/exec-server/src/server/session_registry.rs +++ b/codex-rs/exec-server/src/server/session_registry.rs @@ -10,6 +10,7 @@ use uuid::Uuid; use crate::rpc::RpcNotificationSender; use crate::rpc::invalid_request; use crate::server::process_handler::ProcessHandler; +use crate::telemetry::ExecServerTelemetry; #[cfg(test)] const DETACHED_SESSION_TTL: Duration = Duration::from_millis(200); @@ -18,6 +19,7 @@ const DETACHED_SESSION_TTL: Duration = Duration::from_secs(10); pub(crate) struct SessionRegistry { sessions: Mutex>>, + telemetry: ExecServerTelemetry, } struct SessionEntry { @@ -49,9 +51,10 @@ pub(crate) struct SessionHandle { } impl SessionRegistry { - pub(crate) fn new() -> Arc { + pub(crate) fn new(telemetry: ExecServerTelemetry) -> Arc { Arc::new(Self { sessions: Mutex::new(HashMap::new()), + telemetry, }) } @@ -94,7 +97,7 @@ impl SessionRegistry { let session_id = Uuid::new_v4().to_string(); let entry = Arc::new(SessionEntry::new( session_id.clone(), - ProcessHandler::new(notifications), + ProcessHandler::new(notifications, self.telemetry.clone()), connection_id, )); sessions.insert(session_id, Arc::clone(&entry)); @@ -140,6 +143,7 @@ impl Default for SessionRegistry { fn default() -> Self { Self { sessions: Mutex::new(HashMap::new()), + telemetry: ExecServerTelemetry::default(), } } } diff --git a/codex-rs/exec-server/src/server/transport.rs b/codex-rs/exec-server/src/server/transport.rs index 57d7048e2b..83557c49a1 100644 --- a/codex-rs/exec-server/src/server/transport.rs +++ b/codex-rs/exec-server/src/server/transport.rs @@ -22,8 +22,10 @@ use tracing::info; use tracing::warn; use crate::ExecServerRuntimePaths; +use crate::ExecServerTelemetry; use crate::connection::JsonRpcConnection; use crate::server::processor::ConnectionProcessor; +use crate::telemetry::ConnectionTransport; pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0"; @@ -80,38 +82,40 @@ pub(crate) fn parse_listen_url( pub(crate) async fn run_transport( listen_url: &str, runtime_paths: ExecServerRuntimePaths, + telemetry: ExecServerTelemetry, ) -> Result<(), Box> { match parse_listen_url(listen_url)? { ExecServerListenTransport::WebSocket(bind_address) => { - run_websocket_listener(bind_address, runtime_paths).await + run_websocket_listener(bind_address, runtime_paths, telemetry).await } - ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths).await, + ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths, telemetry).await, } } async fn run_stdio_connection( runtime_paths: ExecServerRuntimePaths, + telemetry: ExecServerTelemetry, ) -> Result<(), Box> { - run_stdio_connection_with_io(io::stdin(), io::stdout(), runtime_paths).await + run_stdio_connection_with_io(io::stdin(), io::stdout(), runtime_paths, telemetry).await } async fn run_stdio_connection_with_io( reader: R, writer: W, runtime_paths: ExecServerRuntimePaths, + telemetry: ExecServerTelemetry, ) -> Result<(), Box> where R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static, { - let processor = ConnectionProcessor::new(runtime_paths); + let processor = ConnectionProcessor::new(runtime_paths, telemetry); tracing::info!("codex-exec-server listening on stdio"); processor - .run_connection(JsonRpcConnection::from_stdio( - reader, - writer, - "exec-server stdio".to_string(), - )) + .run_connection( + JsonRpcConnection::from_stdio(reader, writer, "exec-server stdio".to_string()), + ConnectionTransport::Stdio, + ) .await; Ok(()) } @@ -119,10 +123,11 @@ where async fn run_websocket_listener( bind_address: SocketAddr, runtime_paths: ExecServerRuntimePaths, + telemetry: ExecServerTelemetry, ) -> Result<(), Box> { let listener = TcpListener::bind(bind_address).await?; let local_addr = listener.local_addr()?; - let processor = ConnectionProcessor::new(runtime_paths); + let processor = ConnectionProcessor::new(runtime_paths, telemetry); info!("codex-exec-server listening on ws://{local_addr}"); println!("ws://{local_addr}"); std::io::stdout().flush()?; @@ -174,10 +179,13 @@ async fn websocket_upgrade_handler( websocket.on_upgrade(move |stream| async move { state .processor - .run_connection(JsonRpcConnection::from_axum_websocket( - stream, - format!("exec-server websocket {peer_addr}"), - )) + .run_connection( + JsonRpcConnection::from_axum_websocket( + stream, + format!("exec-server websocket {peer_addr}"), + ), + ConnectionTransport::WebSocket, + ) .await; }) } diff --git a/codex-rs/exec-server/src/server/transport_tests.rs b/codex-rs/exec-server/src/server/transport_tests.rs index b9787d8a37..98c93c977a 100644 --- a/codex-rs/exec-server/src/server/transport_tests.rs +++ b/codex-rs/exec-server/src/server/transport_tests.rs @@ -61,6 +61,7 @@ async fn stdio_listen_transport_serves_initialize() { server_reader, server_writer, test_runtime_paths(), + crate::ExecServerTelemetry::default(), )); let mut client_lines = BufReader::new(client_reader).lines(); diff --git a/codex-rs/exec-server/src/telemetry.rs b/codex-rs/exec-server/src/telemetry.rs new file mode 100644 index 0000000000..3231213cc9 --- /dev/null +++ b/codex-rs/exec-server/src/telemetry.rs @@ -0,0 +1,320 @@ +use std::sync::Arc; +use std::sync::atomic::AtomicI64; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use codex_otel::MetricsClient; +use tracing::warn; + +const CONNECTIONS_ACTIVE_METRIC: &str = "exec_server.connections.active"; +const CONNECTIONS_TOTAL_METRIC: &str = "exec_server.connections.total"; +const REQUESTS_TOTAL_METRIC: &str = "exec_server.requests.total"; +const REQUEST_DURATION_METRIC: &str = "exec_server.request.duration"; +const PROCESSES_ACTIVE_METRIC: &str = "exec_server.processes.active"; +const PROCESS_DURATION_METRIC: &str = "exec_server.process.duration"; +const RELAY_RECONNECTS_METRIC: &str = "exec_server.relay.reconnects"; + +#[derive(Clone, Copy)] +pub(crate) enum ConnectionTransport { + Relay, + Stdio, + WebSocket, +} + +impl ConnectionTransport { + fn metric_tag(self) -> &'static str { + match self { + Self::Relay => "relay", + Self::Stdio => "stdio", + Self::WebSocket => "websocket", + } + } +} + +#[derive(Clone, Default)] +pub struct ExecServerTelemetry { + inner: Option>, +} + +struct ExecServerTelemetryInner { + metrics: MetricsClient, + relay_connections: AtomicI64, + stdio_connections: AtomicI64, + websocket_connections: AtomicI64, + active_processes: AtomicI64, +} + +pub(crate) struct ConnectionMetricGuard { + telemetry: ExecServerTelemetry, + transport: ConnectionTransport, +} + +impl ExecServerTelemetry { + pub fn new(metrics: Option) -> Self { + Self { + inner: metrics.map(|metrics| { + Arc::new(ExecServerTelemetryInner { + metrics, + relay_connections: AtomicI64::new(0), + stdio_connections: AtomicI64::new(0), + websocket_connections: AtomicI64::new(0), + active_processes: AtomicI64::new(0), + }) + }), + } + } + + pub(crate) fn connection_started( + &self, + transport: ConnectionTransport, + ) -> ConnectionMetricGuard { + self.with_inner(|inner| { + let active = inner + .connection_counter(transport) + .fetch_add(1, Ordering::AcqRel) + + 1; + inner.gauge( + CONNECTIONS_ACTIVE_METRIC, + active, + &[("transport", transport.metric_tag())], + ); + inner.counter( + CONNECTIONS_TOTAL_METRIC, + &[ + ("transport", transport.metric_tag()), + ("result", "accepted"), + ], + ); + }); + ConnectionMetricGuard { + telemetry: self.clone(), + transport, + } + } + + pub(crate) fn request_completed( + &self, + operation: &'static str, + result: &'static str, + duration: Duration, + ) { + self.with_inner(|inner| { + let tags = [("operation", operation), ("result", result)]; + inner.counter(REQUESTS_TOTAL_METRIC, &tags); + inner.duration(REQUEST_DURATION_METRIC, duration, &tags); + }); + } + + pub(crate) fn process_started(&self) { + self.with_inner(|inner| { + let active = inner.active_processes.fetch_add(1, Ordering::AcqRel) + 1; + inner.gauge(PROCESSES_ACTIVE_METRIC, active, &[]); + }); + } + + pub(crate) fn process_finished(&self, result: &'static str, duration: Duration) { + self.with_inner(|inner| { + let active = inner.active_processes.fetch_sub(1, Ordering::AcqRel) - 1; + inner.gauge(PROCESSES_ACTIVE_METRIC, active, &[]); + inner.duration(PROCESS_DURATION_METRIC, duration, &[("result", result)]); + }); + } + + pub(crate) fn relay_reconnect(&self, reason: &'static str) { + self.with_inner(|inner| { + inner.counter(RELAY_RECONNECTS_METRIC, &[("reason", reason)]); + }); + } + + fn connection_finished(&self, transport: ConnectionTransport) { + self.with_inner(|inner| { + let active = inner + .connection_counter(transport) + .fetch_sub(1, Ordering::AcqRel) + - 1; + inner.gauge( + CONNECTIONS_ACTIVE_METRIC, + active, + &[("transport", transport.metric_tag())], + ); + }); + } + + fn with_inner(&self, emit: impl FnOnce(&ExecServerTelemetryInner)) { + if let Some(inner) = &self.inner { + emit(inner); + } + } +} + +impl Drop for ConnectionMetricGuard { + fn drop(&mut self) { + self.telemetry.connection_finished(self.transport); + } +} + +impl ExecServerTelemetryInner { + fn connection_counter(&self, transport: ConnectionTransport) -> &AtomicI64 { + match transport { + ConnectionTransport::Relay => &self.relay_connections, + ConnectionTransport::Stdio => &self.stdio_connections, + ConnectionTransport::WebSocket => &self.websocket_connections, + } + } + + fn counter(&self, name: &str, tags: &[(&str, &str)]) { + if let Err(err) = self.metrics.counter(name, /*inc*/ 1, tags) { + warn!(metric = name, error = %err, "failed to emit exec-server counter"); + } + } + + fn duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]) { + if let Err(err) = self.metrics.record_duration(name, duration, tags) { + warn!(metric = name, error = %err, "failed to emit exec-server duration"); + } + } + + fn gauge(&self, name: &str, value: i64, tags: &[(&str, &str)]) { + if let Err(err) = self.metrics.gauge(name, value, tags) { + warn!(metric = name, error = %err, "failed to emit exec-server gauge"); + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + use std::time::Duration; + + use codex_otel::MetricsConfig; + use opentelemetry::KeyValue; + use opentelemetry_sdk::metrics::InMemoryMetricExporter; + use opentelemetry_sdk::metrics::data::AggregatedMetrics; + use opentelemetry_sdk::metrics::data::Metric; + use opentelemetry_sdk::metrics::data::MetricData; + use opentelemetry_sdk::metrics::data::ResourceMetrics; + use pretty_assertions::assert_eq; + + use super::ConnectionTransport; + use super::ExecServerTelemetry; + + #[test] + fn emits_bounded_exec_server_metrics() { + let exporter = InMemoryMetricExporter::default(); + let metrics = codex_otel::MetricsClient::new(MetricsConfig::in_memory( + "test", + "codex-exec-server", + env!("CARGO_PKG_VERSION"), + exporter.clone(), + )) + .expect("metrics"); + let telemetry = ExecServerTelemetry::new(Some(metrics.clone())); + + let connection = telemetry.connection_started(ConnectionTransport::WebSocket); + telemetry.request_completed("process", "success", Duration::from_millis(12)); + telemetry.process_started(); + telemetry.process_finished("success", Duration::from_millis(34)); + telemetry.relay_reconnect("connect_failed"); + drop(connection); + metrics.shutdown().expect("shutdown metrics"); + + let metrics = latest_metrics(&exporter); + assert_eq!( + metric_points(&metrics, "exec_server.connections.total"), + vec![( + 1.0, + BTreeMap::from([ + ("result".to_string(), "accepted".to_string()), + ("transport".to_string(), "websocket".to_string()), + ]), + )] + ); + assert_eq!( + metric_points(&metrics, "exec_server.connections.active"), + vec![( + 0.0, + BTreeMap::from([("transport".to_string(), "websocket".to_string())]), + )] + ); + assert_eq!( + metric_points(&metrics, "exec_server.requests.total"), + vec![( + 1.0, + BTreeMap::from([ + ("operation".to_string(), "process".to_string()), + ("result".to_string(), "success".to_string()), + ]), + )] + ); + assert_eq!( + metric_points(&metrics, "exec_server.processes.active"), + vec![(0.0, BTreeMap::new())] + ); + assert_eq!( + metric_points(&metrics, "exec_server.relay.reconnects"), + vec![( + 1.0, + BTreeMap::from([("reason".to_string(), "connect_failed".to_string())]), + )] + ); + assert_eq!(histogram_count(&metrics, "exec_server.request.duration"), 1); + assert_eq!(histogram_count(&metrics, "exec_server.process.duration"), 1); + } + + fn latest_metrics(exporter: &InMemoryMetricExporter) -> ResourceMetrics { + exporter + .get_finished_metrics() + .expect("finished metrics") + .into_iter() + .last() + .expect("metrics export") + } + + fn find_metric<'a>(resource_metrics: &'a ResourceMetrics, name: &str) -> &'a Metric { + resource_metrics + .scope_metrics() + .flat_map(opentelemetry_sdk::metrics::data::ScopeMetrics::metrics) + .find(|metric| metric.name() == name) + .unwrap_or_else(|| panic!("metric {name} missing")) + } + + fn metric_points( + resource_metrics: &ResourceMetrics, + name: &str, + ) -> Vec<(f64, BTreeMap)> { + match find_metric(resource_metrics, name).data() { + AggregatedMetrics::I64(MetricData::Gauge(gauge)) => gauge + .data_points() + .map(|point| (point.value() as f64, attributes_to_map(point.attributes()))) + .collect(), + AggregatedMetrics::U64(MetricData::Sum(sum)) => sum + .data_points() + .map(|point| (point.value() as f64, attributes_to_map(point.attributes()))) + .collect(), + _ => panic!("unexpected metric data for {name}"), + } + } + + fn histogram_count(resource_metrics: &ResourceMetrics, name: &str) -> u64 { + match find_metric(resource_metrics, name).data() { + AggregatedMetrics::F64(MetricData::Histogram(histogram)) => histogram + .data_points() + .map(opentelemetry_sdk::metrics::data::HistogramDataPoint::count) + .sum(), + _ => panic!("unexpected histogram data for {name}"), + } + } + + fn attributes_to_map<'a>( + attributes: impl Iterator, + ) -> BTreeMap { + attributes + .map(|attribute| { + ( + attribute.key.as_str().to_string(), + attribute.value.as_str().to_string(), + ) + }) + .collect() + } +} diff --git a/codex-rs/otel/src/metrics/client.rs b/codex-rs/otel/src/metrics/client.rs index 417c1f4bd9..354e158364 100644 --- a/codex-rs/otel/src/metrics/client.rs +++ b/codex-rs/otel/src/metrics/client.rs @@ -12,6 +12,7 @@ use crate::metrics::validation::validate_tags; use codex_utils_string::sanitize_metric_tag_value; use opentelemetry::KeyValue; use opentelemetry::metrics::Counter; +use opentelemetry::metrics::Gauge; use opentelemetry::metrics::Histogram; use opentelemetry::metrics::Meter; use opentelemetry::metrics::MeterProvider as _; @@ -83,6 +84,7 @@ struct MetricsClientInner { meter_provider: SdkMeterProvider, meter: Meter, counters: Mutex>>, + gauges: Mutex>>, histograms: Mutex>>, duration_histograms: Mutex>>, runtime_reader: Option>, @@ -126,6 +128,21 @@ impl MetricsClientInner { Ok(()) } + fn gauge(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> { + validate_metric_name(name)?; + let attributes = self.attributes(tags)?; + + let mut gauges = self + .gauges + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let gauge = gauges + .entry(name.to_string()) + .or_insert_with(|| self.meter.i64_gauge(name.to_string()).build()); + gauge.record(value, &attributes); + Ok(()) + } + fn duration_histogram(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> { validate_metric_name(name)?; let attributes = self.attributes(tags)?; @@ -233,6 +250,7 @@ impl MetricsClient { meter_provider, meter, counters: Mutex::new(HashMap::new()), + gauges: Mutex::new(HashMap::new()), histograms: Mutex::new(HashMap::new()), duration_histograms: Mutex::new(HashMap::new()), runtime_reader, @@ -250,6 +268,11 @@ impl MetricsClient { self.0.histogram(name, value, tags) } + /// Send a single gauge measurement. + pub fn gauge(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> { + self.0.gauge(name, value, tags) + } + /// Record a duration in milliseconds using a histogram. pub fn record_duration( &self, diff --git a/codex-rs/otel/tests/suite/otlp_http_loopback.rs b/codex-rs/otel/tests/suite/otlp_http_loopback.rs index cafdbeeec2..4f8fbe1c58 100644 --- a/codex-rs/otel/tests/suite/otlp_http_loopback.rs +++ b/codex-rs/otel/tests/suite/otlp_http_loopback.rs @@ -186,6 +186,11 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> { ))?; metrics.counter("codex.turns", /*inc*/ 1, &[("source", "test")])?; + metrics.gauge( + "exec_server.connections.active", + /*value*/ 1, + &[("transport", "websocket")], + )?; metrics.shutdown()?; server.join().expect("server join"); @@ -220,6 +225,16 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> { "expected metric name not found; body prefix: {}", &body.chars().take(2000).collect::() ); + assert!( + body.contains("exec_server.connections.active"), + "expected exec-server gauge not found; body prefix: {}", + &body.chars().take(2000).collect::() + ); + assert!( + body.contains("websocket"), + "expected exec-server metric tag not found; body prefix: {}", + &body.chars().take(2000).collect::() + ); Ok(()) } diff --git a/codex-rs/otel/tests/suite/send.rs b/codex-rs/otel/tests/suite/send.rs index fc382bf88a..879a2e8726 100644 --- a/codex-rs/otel/tests/suite/send.rs +++ b/codex-rs/otel/tests/suite/send.rs @@ -23,6 +23,11 @@ fn send_builds_payload_with_tags_and_histograms() -> Result<()> { /*value*/ 25, &[("tool", "shell")], )?; + metrics.gauge( + "codex.active", + /*value*/ 2, + &[("component", "exec-server")], + )?; metrics.shutdown()?; let resource_metrics = latest_metrics(&exporter); @@ -78,6 +83,26 @@ fn send_builds_payload_with_tags_and_histograms() -> Result<()> { ]); assert_eq!(histogram_attrs, expected_histogram_attributes); + let gauge = find_metric(&resource_metrics, "codex.active").expect("gauge metric missing"); + let gauge_point = match gauge.data() { + opentelemetry_sdk::metrics::data::AggregatedMetrics::I64(data) => match data { + opentelemetry_sdk::metrics::data::MetricData::Gauge(gauge) => { + gauge.data_points().next().expect("gauge point") + } + _ => panic!("unexpected gauge aggregation"), + }, + _ => panic!("unexpected gauge metric data type"), + }; + assert_eq!(gauge_point.value(), 2); + assert_eq!( + attributes_to_map(gauge_point.attributes()), + BTreeMap::from([ + ("component".to_string(), "exec-server".to_string()), + ("env".to_string(), "prod".to_string()), + ("service".to_string(), "codex-cli".to_string()), + ]) + ); + Ok(()) }