Add exec-server OTEL metrics

This commit is contained in:
starr-openai
2026-05-28 23:55:12 -07:00
parent f23fd2fd5b
commit 21dfe383ed
18 changed files with 595 additions and 55 deletions

3
codex-rs/Cargo.lock generated
View File

@@ -1878,6 +1878,8 @@ dependencies = [
"eventsource-stream",
"futures",
"http 1.4.0",
"opentelemetry",
"opentelemetry_sdk",
"pretty_assertions",
"regex-lite",
"reqwest 0.12.28",
@@ -2777,6 +2779,7 @@ dependencies = [
"codex-app-server-protocol",
"codex-client",
"codex-file-system",
"codex-otel",
"codex-protocol",
"codex-sandboxing",
"codex-test-binary-support",

View File

@@ -1509,7 +1509,7 @@ async fn run_exec_server_command(
arg0_paths.codex_linux_sandbox_exe.clone(),
)?;
let config = load_exec_server_config(root_config_overrides, strict_config).await?;
let _otel = init_exec_server_tracing(&config);
let (_otel, telemetry) = init_exec_server_tracing(&config);
let exec_server_span = exec_server_root_span();
if let Some(base_url) = cmd.remote {
let environment_id = cmd
@@ -1528,6 +1528,7 @@ async fn run_exec_server_command(
if let Some(name) = cmd.name {
remote_config.name = name;
}
let remote_config = remote_config.with_telemetry(telemetry);
codex_exec_server::run_remote_environment(remote_config, runtime_paths)
.instrument(exec_server_span)
.await?;
@@ -1538,14 +1539,16 @@ async fn run_exec_server_command(
.listen
.as_deref()
.unwrap_or(codex_exec_server::DEFAULT_LISTEN_URL);
codex_exec_server::run_main(listen_url, runtime_paths)
codex_exec_server::run_main_with_telemetry(listen_url, runtime_paths, telemetry)
.instrument(exec_server_span)
.await
.map_err(anyhow::Error::from_boxed)
}
}
fn init_exec_server_tracing(config: &codex_core::config::Config) -> impl Send + Sync {
fn init_exec_server_tracing(
config: &codex_core::config::Config,
) -> (impl Send + Sync, codex_exec_server::ExecServerTelemetry) {
let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_filter(exec_server_stderr_env_filter());
@@ -1571,13 +1574,16 @@ fn init_exec_server_tracing(config: &codex_core::config::Config) -> impl Send +
let otel_logger_layer = otel.as_ref().and_then(|otel| otel.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|otel| otel.tracing_layer());
let telemetry = codex_exec_server::ExecServerTelemetry::new(
otel.as_ref().and_then(|otel| otel.metrics()).cloned(),
);
let _ = tracing_subscriber::registry()
.with(fmt_layer)
.with(otel_tracing_layer)
.with(otel_logger_layer)
.try_init();
tracing::callsite::rebuild_interest_cache();
otel
(otel, telemetry)
}
fn exec_server_root_span() -> tracing::Span {

View File

@@ -20,6 +20,7 @@ codex-app-server-protocol = { workspace = true }
codex-api = { workspace = true }
codex-client = { workspace = true }
codex-file-system = { workspace = true }
codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-sandboxing = { workspace = true }
codex-utils-absolute-path = { workspace = true }
@@ -53,6 +54,8 @@ anyhow = { workspace = true }
codex-test-binary-support = { workspace = true }
ctor = { workspace = true }
http = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
pretty_assertions = { workspace = true }
serial_test = { workspace = true }
tempfile = { workspace = true }

View File

@@ -22,6 +22,7 @@ mod rpc;
mod runtime_paths;
mod sandboxed_file_system;
mod server;
mod telemetry;
pub use client::ExecServerClient;
pub use client::ExecServerError;
@@ -97,3 +98,5 @@ pub use runtime_paths::ExecServerRuntimePaths;
pub use server::DEFAULT_LISTEN_URL;
pub use server::ExecServerListenUrlParseError;
pub use server::run_main;
pub use server::run_main_with_telemetry;
pub use telemetry::ExecServerTelemetry;

View File

@@ -3,6 +3,7 @@ use std::collections::VecDeque;
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use async_trait::async_trait;
use codex_app_server_protocol::JSONRPCErrorError;
@@ -45,6 +46,7 @@ use crate::rpc::RpcServerOutboundMessage;
use crate::rpc::internal_error;
use crate::rpc::invalid_params;
use crate::rpc::invalid_request;
use crate::telemetry::ExecServerTelemetry;
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
const NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
@@ -74,6 +76,8 @@ struct RunningProcess {
output_notify: Arc<Notify>,
open_streams: usize,
closed: bool,
started_at: Instant,
metrics_finished: bool,
}
enum ProcessEntry {
@@ -84,6 +88,7 @@ enum ProcessEntry {
struct Inner {
notifications: std::sync::RwLock<Option<RpcNotificationSender>>,
processes: Mutex<HashMap<ProcessId, ProcessEntry>>,
telemetry: ExecServerTelemetry,
}
#[derive(Clone)]
@@ -103,16 +108,23 @@ impl Default for LocalProcess {
let (outgoing_tx, mut outgoing_rx) =
mpsc::channel::<RpcServerOutboundMessage>(NOTIFICATION_CHANNEL_CAPACITY);
tokio::spawn(async move { while outgoing_rx.recv().await.is_some() {} });
Self::new(RpcNotificationSender::new(outgoing_tx))
Self::new(
RpcNotificationSender::new(outgoing_tx),
ExecServerTelemetry::default(),
)
}
}
impl LocalProcess {
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
pub(crate) fn new(
notifications: RpcNotificationSender,
telemetry: ExecServerTelemetry,
) -> Self {
Self {
inner: Arc::new(Inner {
notifications: std::sync::RwLock::new(Some(notifications)),
processes: Mutex::new(HashMap::new()),
telemetry,
}),
}
}
@@ -129,6 +141,11 @@ impl LocalProcess {
.collect::<Vec<_>>()
};
for process in remaining {
if !process.metrics_finished {
self.inner
.telemetry
.process_finished("terminated", process.started_at.elapsed());
}
process.session.terminate();
}
}
@@ -226,9 +243,12 @@ impl LocalProcess {
output_notify: Arc::clone(&output_notify),
open_streams: 2,
closed: false,
started_at: Instant::now(),
metrics_finished: false,
})),
);
}
self.inner.telemetry.process_started();
tokio::spawn(stream_output(
process_id.clone(),
@@ -601,7 +621,7 @@ async fn watch_exit(
output_notify: Arc<Notify>,
) {
let exit_code = exit_rx.await.unwrap_or(-1);
let notification = {
let (notification, process_duration) = {
let mut processes = inner.processes.lock().await;
if let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) {
let seq = process.next_seq;
@@ -611,15 +631,25 @@ async fn watch_exit(
process
.events
.publish(ExecProcessEvent::Exited { seq, exit_code });
Some(ExecExitedNotification {
process_id: process_id.clone(),
seq,
exit_code,
})
process.metrics_finished = true;
(
Some(ExecExitedNotification {
process_id: process_id.clone(),
seq,
exit_code,
}),
Some(process.started_at.elapsed()),
)
} else {
None
(None, None)
}
};
if let Some(process_duration) = process_duration {
inner.telemetry.process_finished(
if exit_code == 0 { "success" } else { "error" },
process_duration,
);
}
output_notify.notify_waiters();
if let Some(notification) = notification
&& let Some(notifications) = notification_sender(&inner)
@@ -890,6 +920,8 @@ mod tests {
output_notify: Arc::clone(&output_notify),
open_streams: 2,
closed: false,
started_at: Instant::now(),
metrics_finished: false,
})),
);
assert!(previous.is_none());

View File

@@ -26,6 +26,7 @@ use crate::relay_proto::RelayMessageFrame;
use crate::relay_proto::RelayResume;
use crate::relay_proto::relay_message_frame;
use crate::server::ConnectionProcessor;
use crate::telemetry::ConnectionTransport;
const RELAY_MESSAGE_FRAME_VERSION: u32 = 1;
@@ -449,7 +450,9 @@ fn spawn_virtual_stream(
transport: JsonRpcTransport::Plain,
};
tokio::spawn(async move {
processor.run_connection(connection).await;
processor
.run_connection(connection, ConnectionTransport::Relay)
.await;
});
VirtualStream {

View File

@@ -12,6 +12,7 @@ use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
use crate::ExecServerTelemetry;
use crate::relay::run_multiplexed_environment;
use crate::server::ConnectionProcessor;
@@ -193,6 +194,7 @@ pub struct RemoteEnvironmentConfig {
pub environment_id: String,
pub name: String,
auth_provider: SharedAuthProvider,
telemetry: ExecServerTelemetry,
}
impl std::fmt::Debug for RemoteEnvironmentConfig {
@@ -218,8 +220,14 @@ impl RemoteEnvironmentConfig {
environment_id,
name: "codex-exec-server".to_string(),
auth_provider,
telemetry: ExecServerTelemetry::default(),
})
}
pub fn with_telemetry(mut self, telemetry: ExecServerTelemetry) -> Self {
self.telemetry = telemetry;
self
}
}
/// Register an exec-server for remote use and serve requests over the returned
@@ -231,7 +239,7 @@ pub async fn run_remote_environment(
ensure_rustls_crypto_provider();
let client =
EnvironmentRegistryClient::new(config.base_url.clone(), config.auth_provider.clone())?;
let processor = ConnectionProcessor::new(runtime_paths);
let processor = ConnectionProcessor::new(runtime_paths, config.telemetry.clone());
let mut backoff = Duration::from_secs(1);
let mut connection_attempt = 0_u32;
@@ -275,6 +283,7 @@ pub async fn run_remote_environment(
);
backoff = Duration::from_secs(1);
run_multiplexed_environment(websocket, processor.clone()).await;
config.telemetry.relay_reconnect("disconnected");
remote_event!(
warn,
WARN,
@@ -295,6 +304,7 @@ pub async fn run_remote_environment(
"codex.exec_server.remote_websocket_connect_failed",
"failed to connect remote exec-server websocket"
);
config.telemetry.relay_reconnect("connect_failed");
}
}

View File

@@ -12,10 +12,19 @@ pub use transport::DEFAULT_LISTEN_URL;
pub use transport::ExecServerListenUrlParseError;
use crate::ExecServerRuntimePaths;
use crate::ExecServerTelemetry;
pub async fn run_main(
listen_url: &str,
runtime_paths: ExecServerRuntimePaths,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
transport::run_transport(listen_url, runtime_paths).await
run_main_with_telemetry(listen_url, runtime_paths, ExecServerTelemetry::default()).await
}
pub async fn run_main_with_telemetry(
listen_url: &str,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
transport::run_transport(listen_url, runtime_paths, telemetry).await
}

View File

@@ -77,7 +77,7 @@ fn test_runtime_paths() -> ExecServerRuntimePaths {
async fn initialized_handler() -> Arc<ExecServerHandler> {
let (outgoing_tx, _outgoing_rx) = mpsc::channel(16);
let registry = SessionRegistry::new();
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
let handler = Arc::new(ExecServerHandler::new(
registry,
RpcNotificationSender::new(outgoing_tx),
@@ -155,7 +155,7 @@ async fn terminate_reports_false_after_process_exit() {
#[tokio::test]
async fn long_poll_read_fails_after_session_resume() {
let (first_tx, _first_rx) = mpsc::channel(16);
let registry = SessionRegistry::new();
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
let first_handler = Arc::new(ExecServerHandler::new(
Arc::clone(&registry),
RpcNotificationSender::new(first_tx),
@@ -228,7 +228,7 @@ async fn long_poll_read_fails_after_session_resume() {
#[tokio::test]
async fn active_session_resume_is_rejected() {
let (first_tx, _first_rx) = mpsc::channel(16);
let registry = SessionRegistry::new();
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
let first_handler = Arc::new(ExecServerHandler::new(
Arc::clone(&registry),
RpcNotificationSender::new(first_tx),
@@ -272,7 +272,7 @@ async fn active_session_resume_is_rejected() {
async fn output_and_exit_are_retained_after_notification_receiver_closes() {
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let handler = Arc::new(ExecServerHandler::new(
SessionRegistry::new(),
SessionRegistry::new(crate::ExecServerTelemetry::default()),
RpcNotificationSender::new(outgoing_tx),
test_runtime_paths(),
));

View File

@@ -10,6 +10,7 @@ use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
use crate::protocol::WriteResponse;
use crate::rpc::RpcNotificationSender;
use crate::telemetry::ExecServerTelemetry;
#[derive(Clone)]
pub(crate) struct ProcessHandler {
@@ -17,9 +18,12 @@ pub(crate) struct ProcessHandler {
}
impl ProcessHandler {
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
pub(crate) fn new(
notifications: RpcNotificationSender,
telemetry: ExecServerTelemetry,
) -> Self {
Self {
process: LocalProcess::new(notifications),
process: LocalProcess::new(notifications, telemetry),
}
}

View File

@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::debug;
@@ -16,26 +17,39 @@ use crate::rpc::method_not_found;
use crate::server::ExecServerHandler;
use crate::server::registry::build_router;
use crate::server::session_registry::SessionRegistry;
use crate::telemetry::ConnectionTransport;
use crate::telemetry::ExecServerTelemetry;
#[derive(Clone)]
pub(crate) struct ConnectionProcessor {
session_registry: Arc<SessionRegistry>,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
}
impl ConnectionProcessor {
pub(crate) fn new(runtime_paths: ExecServerRuntimePaths) -> Self {
pub(crate) fn new(
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Self {
Self {
session_registry: SessionRegistry::new(),
session_registry: SessionRegistry::new(telemetry.clone()),
runtime_paths,
telemetry,
}
}
pub(crate) async fn run_connection(&self, connection: JsonRpcConnection) {
pub(crate) async fn run_connection(
&self,
connection: JsonRpcConnection,
transport: ConnectionTransport,
) {
run_connection(
connection,
Arc::clone(&self.session_registry),
self.runtime_paths.clone(),
self.telemetry.clone(),
transport,
)
.await;
}
@@ -45,7 +59,10 @@ async fn run_connection(
connection: JsonRpcConnection,
session_registry: Arc<SessionRegistry>,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
transport: ConnectionTransport,
) {
let _connection_metrics = telemetry.connection_started(transport);
let router = Arc::new(build_router());
let JsonRpcConnection {
outgoing_tx: json_outgoing_tx,
@@ -100,31 +117,50 @@ async fn run_connection(
}
JsonRpcConnectionEvent::Message(message) => match message {
codex_app_server_protocol::JSONRPCMessage::Request(request) => {
let operation = request_operation(request.method.as_str());
let request_started_at = Instant::now();
if let Some(route) = router.request_route(request.method.as_str()) {
let message = tokio::select! {
message = route(Arc::clone(&handler), request) => message,
_ = disconnected_rx.changed() => {
telemetry.request_completed(
operation,
"disconnected",
request_started_at.elapsed(),
);
debug!("exec-server transport disconnected while handling request");
break;
}
};
telemetry.request_completed(
operation,
request_result(&message),
request_started_at.elapsed(),
);
if let Some(message) = message
&& outgoing_tx.send(message).await.is_err()
{
break;
}
} else if outgoing_tx
.send(RpcServerOutboundMessage::Error {
request_id: request.id,
error: method_not_found(format!(
"exec-server stub does not implement `{}` yet",
request.method
)),
})
.await
.is_err()
{
break;
} else {
telemetry.request_completed(
operation,
"error",
request_started_at.elapsed(),
);
if outgoing_tx
.send(RpcServerOutboundMessage::Error {
request_id: request.id,
error: method_not_found(format!(
"exec-server stub does not implement `{}` yet",
request.method
)),
})
.await
.is_err()
{
break;
}
}
}
codex_app_server_protocol::JSONRPCMessage::Notification(notification) => {
@@ -184,6 +220,35 @@ async fn run_connection(
let _ = outbound_task.await;
}
fn request_operation(method: &str) -> &'static str {
match method {
crate::protocol::INITIALIZE_METHOD => "initialize",
crate::protocol::EXEC_METHOD
| crate::protocol::EXEC_READ_METHOD
| crate::protocol::EXEC_WRITE_METHOD
| crate::protocol::EXEC_TERMINATE_METHOD => "process",
crate::protocol::FS_READ_FILE_METHOD
| crate::protocol::FS_WRITE_FILE_METHOD
| crate::protocol::FS_CREATE_DIRECTORY_METHOD
| crate::protocol::FS_GET_METADATA_METHOD
| crate::protocol::FS_READ_DIRECTORY_METHOD
| crate::protocol::FS_REMOVE_METHOD
| crate::protocol::FS_COPY_METHOD => "filesystem",
crate::protocol::HTTP_REQUEST_METHOD => "http",
_ => "unknown",
}
}
fn request_result(message: &Option<RpcServerOutboundMessage>) -> &'static str {
match message {
Some(RpcServerOutboundMessage::Error { .. }) => "error",
Some(
RpcServerOutboundMessage::Response { .. } | RpcServerOutboundMessage::Notification(_),
)
| None => "success",
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
@@ -226,7 +291,7 @@ mod tests {
#[tokio::test]
async fn transport_disconnect_detaches_session_during_in_flight_read() {
let registry = SessionRegistry::new();
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
let (mut first_writer, mut first_lines, first_task) =
spawn_test_connection(Arc::clone(&registry), "first");
@@ -322,7 +387,13 @@ mod tests {
let (server_writer, client_reader) = duplex(1 << 20);
let connection =
JsonRpcConnection::from_stdio(server_reader, server_writer, label.to_string());
let task = tokio::spawn(run_connection(connection, registry, test_runtime_paths()));
let task = tokio::spawn(run_connection(
connection,
registry,
test_runtime_paths(),
crate::ExecServerTelemetry::default(),
crate::telemetry::ConnectionTransport::Stdio,
));
(client_writer, BufReader::new(client_reader).lines(), task)
}

View File

@@ -10,6 +10,7 @@ use uuid::Uuid;
use crate::rpc::RpcNotificationSender;
use crate::rpc::invalid_request;
use crate::server::process_handler::ProcessHandler;
use crate::telemetry::ExecServerTelemetry;
#[cfg(test)]
const DETACHED_SESSION_TTL: Duration = Duration::from_millis(200);
@@ -18,6 +19,7 @@ const DETACHED_SESSION_TTL: Duration = Duration::from_secs(10);
pub(crate) struct SessionRegistry {
sessions: Mutex<HashMap<String, Arc<SessionEntry>>>,
telemetry: ExecServerTelemetry,
}
struct SessionEntry {
@@ -49,9 +51,10 @@ pub(crate) struct SessionHandle {
}
impl SessionRegistry {
pub(crate) fn new() -> Arc<Self> {
pub(crate) fn new(telemetry: ExecServerTelemetry) -> Arc<Self> {
Arc::new(Self {
sessions: Mutex::new(HashMap::new()),
telemetry,
})
}
@@ -94,7 +97,7 @@ impl SessionRegistry {
let session_id = Uuid::new_v4().to_string();
let entry = Arc::new(SessionEntry::new(
session_id.clone(),
ProcessHandler::new(notifications),
ProcessHandler::new(notifications, self.telemetry.clone()),
connection_id,
));
sessions.insert(session_id, Arc::clone(&entry));
@@ -140,6 +143,7 @@ impl Default for SessionRegistry {
fn default() -> Self {
Self {
sessions: Mutex::new(HashMap::new()),
telemetry: ExecServerTelemetry::default(),
}
}
}

View File

@@ -22,8 +22,10 @@ use tracing::info;
use tracing::warn;
use crate::ExecServerRuntimePaths;
use crate::ExecServerTelemetry;
use crate::connection::JsonRpcConnection;
use crate::server::processor::ConnectionProcessor;
use crate::telemetry::ConnectionTransport;
pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
@@ -80,38 +82,40 @@ pub(crate) fn parse_listen_url(
pub(crate) async fn run_transport(
listen_url: &str,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match parse_listen_url(listen_url)? {
ExecServerListenTransport::WebSocket(bind_address) => {
run_websocket_listener(bind_address, runtime_paths).await
run_websocket_listener(bind_address, runtime_paths, telemetry).await
}
ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths).await,
ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths, telemetry).await,
}
}
async fn run_stdio_connection(
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
run_stdio_connection_with_io(io::stdin(), io::stdout(), runtime_paths).await
run_stdio_connection_with_io(io::stdin(), io::stdout(), runtime_paths, telemetry).await
}
async fn run_stdio_connection_with_io<R, W>(
reader: R,
writer: W,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let processor = ConnectionProcessor::new(runtime_paths);
let processor = ConnectionProcessor::new(runtime_paths, telemetry);
tracing::info!("codex-exec-server listening on stdio");
processor
.run_connection(JsonRpcConnection::from_stdio(
reader,
writer,
"exec-server stdio".to_string(),
))
.run_connection(
JsonRpcConnection::from_stdio(reader, writer, "exec-server stdio".to_string()),
ConnectionTransport::Stdio,
)
.await;
Ok(())
}
@@ -119,10 +123,11 @@ where
async fn run_websocket_listener(
bind_address: SocketAddr,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind(bind_address).await?;
let local_addr = listener.local_addr()?;
let processor = ConnectionProcessor::new(runtime_paths);
let processor = ConnectionProcessor::new(runtime_paths, telemetry);
info!("codex-exec-server listening on ws://{local_addr}");
println!("ws://{local_addr}");
std::io::stdout().flush()?;
@@ -174,10 +179,13 @@ async fn websocket_upgrade_handler(
websocket.on_upgrade(move |stream| async move {
state
.processor
.run_connection(JsonRpcConnection::from_axum_websocket(
stream,
format!("exec-server websocket {peer_addr}"),
))
.run_connection(
JsonRpcConnection::from_axum_websocket(
stream,
format!("exec-server websocket {peer_addr}"),
),
ConnectionTransport::WebSocket,
)
.await;
})
}

View File

@@ -61,6 +61,7 @@ async fn stdio_listen_transport_serves_initialize() {
server_reader,
server_writer,
test_runtime_paths(),
crate::ExecServerTelemetry::default(),
));
let mut client_lines = BufReader::new(client_reader).lines();

View File

@@ -0,0 +1,320 @@
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use codex_otel::MetricsClient;
use tracing::warn;
const CONNECTIONS_ACTIVE_METRIC: &str = "exec_server.connections.active";
const CONNECTIONS_TOTAL_METRIC: &str = "exec_server.connections.total";
const REQUESTS_TOTAL_METRIC: &str = "exec_server.requests.total";
const REQUEST_DURATION_METRIC: &str = "exec_server.request.duration";
const PROCESSES_ACTIVE_METRIC: &str = "exec_server.processes.active";
const PROCESS_DURATION_METRIC: &str = "exec_server.process.duration";
const RELAY_RECONNECTS_METRIC: &str = "exec_server.relay.reconnects";
#[derive(Clone, Copy)]
pub(crate) enum ConnectionTransport {
Relay,
Stdio,
WebSocket,
}
impl ConnectionTransport {
fn metric_tag(self) -> &'static str {
match self {
Self::Relay => "relay",
Self::Stdio => "stdio",
Self::WebSocket => "websocket",
}
}
}
#[derive(Clone, Default)]
pub struct ExecServerTelemetry {
inner: Option<Arc<ExecServerTelemetryInner>>,
}
struct ExecServerTelemetryInner {
metrics: MetricsClient,
relay_connections: AtomicI64,
stdio_connections: AtomicI64,
websocket_connections: AtomicI64,
active_processes: AtomicI64,
}
pub(crate) struct ConnectionMetricGuard {
telemetry: ExecServerTelemetry,
transport: ConnectionTransport,
}
impl ExecServerTelemetry {
pub fn new(metrics: Option<MetricsClient>) -> Self {
Self {
inner: metrics.map(|metrics| {
Arc::new(ExecServerTelemetryInner {
metrics,
relay_connections: AtomicI64::new(0),
stdio_connections: AtomicI64::new(0),
websocket_connections: AtomicI64::new(0),
active_processes: AtomicI64::new(0),
})
}),
}
}
pub(crate) fn connection_started(
&self,
transport: ConnectionTransport,
) -> ConnectionMetricGuard {
self.with_inner(|inner| {
let active = inner
.connection_counter(transport)
.fetch_add(1, Ordering::AcqRel)
+ 1;
inner.gauge(
CONNECTIONS_ACTIVE_METRIC,
active,
&[("transport", transport.metric_tag())],
);
inner.counter(
CONNECTIONS_TOTAL_METRIC,
&[
("transport", transport.metric_tag()),
("result", "accepted"),
],
);
});
ConnectionMetricGuard {
telemetry: self.clone(),
transport,
}
}
pub(crate) fn request_completed(
&self,
operation: &'static str,
result: &'static str,
duration: Duration,
) {
self.with_inner(|inner| {
let tags = [("operation", operation), ("result", result)];
inner.counter(REQUESTS_TOTAL_METRIC, &tags);
inner.duration(REQUEST_DURATION_METRIC, duration, &tags);
});
}
pub(crate) fn process_started(&self) {
self.with_inner(|inner| {
let active = inner.active_processes.fetch_add(1, Ordering::AcqRel) + 1;
inner.gauge(PROCESSES_ACTIVE_METRIC, active, &[]);
});
}
pub(crate) fn process_finished(&self, result: &'static str, duration: Duration) {
self.with_inner(|inner| {
let active = inner.active_processes.fetch_sub(1, Ordering::AcqRel) - 1;
inner.gauge(PROCESSES_ACTIVE_METRIC, active, &[]);
inner.duration(PROCESS_DURATION_METRIC, duration, &[("result", result)]);
});
}
pub(crate) fn relay_reconnect(&self, reason: &'static str) {
self.with_inner(|inner| {
inner.counter(RELAY_RECONNECTS_METRIC, &[("reason", reason)]);
});
}
fn connection_finished(&self, transport: ConnectionTransport) {
self.with_inner(|inner| {
let active = inner
.connection_counter(transport)
.fetch_sub(1, Ordering::AcqRel)
- 1;
inner.gauge(
CONNECTIONS_ACTIVE_METRIC,
active,
&[("transport", transport.metric_tag())],
);
});
}
fn with_inner(&self, emit: impl FnOnce(&ExecServerTelemetryInner)) {
if let Some(inner) = &self.inner {
emit(inner);
}
}
}
impl Drop for ConnectionMetricGuard {
fn drop(&mut self) {
self.telemetry.connection_finished(self.transport);
}
}
impl ExecServerTelemetryInner {
fn connection_counter(&self, transport: ConnectionTransport) -> &AtomicI64 {
match transport {
ConnectionTransport::Relay => &self.relay_connections,
ConnectionTransport::Stdio => &self.stdio_connections,
ConnectionTransport::WebSocket => &self.websocket_connections,
}
}
fn counter(&self, name: &str, tags: &[(&str, &str)]) {
if let Err(err) = self.metrics.counter(name, /*inc*/ 1, tags) {
warn!(metric = name, error = %err, "failed to emit exec-server counter");
}
}
fn duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]) {
if let Err(err) = self.metrics.record_duration(name, duration, tags) {
warn!(metric = name, error = %err, "failed to emit exec-server duration");
}
}
fn gauge(&self, name: &str, value: i64, tags: &[(&str, &str)]) {
if let Err(err) = self.metrics.gauge(name, value, tags) {
warn!(metric = name, error = %err, "failed to emit exec-server gauge");
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::time::Duration;
use codex_otel::MetricsConfig;
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
use opentelemetry_sdk::metrics::data::Metric;
use opentelemetry_sdk::metrics::data::MetricData;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use pretty_assertions::assert_eq;
use super::ConnectionTransport;
use super::ExecServerTelemetry;
#[test]
fn emits_bounded_exec_server_metrics() {
let exporter = InMemoryMetricExporter::default();
let metrics = codex_otel::MetricsClient::new(MetricsConfig::in_memory(
"test",
"codex-exec-server",
env!("CARGO_PKG_VERSION"),
exporter.clone(),
))
.expect("metrics");
let telemetry = ExecServerTelemetry::new(Some(metrics.clone()));
let connection = telemetry.connection_started(ConnectionTransport::WebSocket);
telemetry.request_completed("process", "success", Duration::from_millis(12));
telemetry.process_started();
telemetry.process_finished("success", Duration::from_millis(34));
telemetry.relay_reconnect("connect_failed");
drop(connection);
metrics.shutdown().expect("shutdown metrics");
let metrics = latest_metrics(&exporter);
assert_eq!(
metric_points(&metrics, "exec_server.connections.total"),
vec![(
1.0,
BTreeMap::from([
("result".to_string(), "accepted".to_string()),
("transport".to_string(), "websocket".to_string()),
]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server.connections.active"),
vec![(
0.0,
BTreeMap::from([("transport".to_string(), "websocket".to_string())]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server.requests.total"),
vec![(
1.0,
BTreeMap::from([
("operation".to_string(), "process".to_string()),
("result".to_string(), "success".to_string()),
]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server.processes.active"),
vec![(0.0, BTreeMap::new())]
);
assert_eq!(
metric_points(&metrics, "exec_server.relay.reconnects"),
vec![(
1.0,
BTreeMap::from([("reason".to_string(), "connect_failed".to_string())]),
)]
);
assert_eq!(histogram_count(&metrics, "exec_server.request.duration"), 1);
assert_eq!(histogram_count(&metrics, "exec_server.process.duration"), 1);
}
fn latest_metrics(exporter: &InMemoryMetricExporter) -> ResourceMetrics {
exporter
.get_finished_metrics()
.expect("finished metrics")
.into_iter()
.last()
.expect("metrics export")
}
fn find_metric<'a>(resource_metrics: &'a ResourceMetrics, name: &str) -> &'a Metric {
resource_metrics
.scope_metrics()
.flat_map(opentelemetry_sdk::metrics::data::ScopeMetrics::metrics)
.find(|metric| metric.name() == name)
.unwrap_or_else(|| panic!("metric {name} missing"))
}
fn metric_points(
resource_metrics: &ResourceMetrics,
name: &str,
) -> Vec<(f64, BTreeMap<String, String>)> {
match find_metric(resource_metrics, name).data() {
AggregatedMetrics::I64(MetricData::Gauge(gauge)) => gauge
.data_points()
.map(|point| (point.value() as f64, attributes_to_map(point.attributes())))
.collect(),
AggregatedMetrics::U64(MetricData::Sum(sum)) => sum
.data_points()
.map(|point| (point.value() as f64, attributes_to_map(point.attributes())))
.collect(),
_ => panic!("unexpected metric data for {name}"),
}
}
fn histogram_count(resource_metrics: &ResourceMetrics, name: &str) -> u64 {
match find_metric(resource_metrics, name).data() {
AggregatedMetrics::F64(MetricData::Histogram(histogram)) => histogram
.data_points()
.map(opentelemetry_sdk::metrics::data::HistogramDataPoint::count)
.sum(),
_ => panic!("unexpected histogram data for {name}"),
}
}
fn attributes_to_map<'a>(
attributes: impl Iterator<Item = &'a KeyValue>,
) -> BTreeMap<String, String> {
attributes
.map(|attribute| {
(
attribute.key.as_str().to_string(),
attribute.value.as_str().to_string(),
)
})
.collect()
}
}

View File

@@ -12,6 +12,7 @@ use crate::metrics::validation::validate_tags;
use codex_utils_string::sanitize_metric_tag_value;
use opentelemetry::KeyValue;
use opentelemetry::metrics::Counter;
use opentelemetry::metrics::Gauge;
use opentelemetry::metrics::Histogram;
use opentelemetry::metrics::Meter;
use opentelemetry::metrics::MeterProvider as _;
@@ -83,6 +84,7 @@ struct MetricsClientInner {
meter_provider: SdkMeterProvider,
meter: Meter,
counters: Mutex<HashMap<String, Counter<u64>>>,
gauges: Mutex<HashMap<String, Gauge<i64>>>,
histograms: Mutex<HashMap<String, Histogram<f64>>>,
duration_histograms: Mutex<HashMap<String, Histogram<f64>>>,
runtime_reader: Option<Arc<ManualReader>>,
@@ -126,6 +128,21 @@ impl MetricsClientInner {
Ok(())
}
fn gauge(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> {
validate_metric_name(name)?;
let attributes = self.attributes(tags)?;
let mut gauges = self
.gauges
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let gauge = gauges
.entry(name.to_string())
.or_insert_with(|| self.meter.i64_gauge(name.to_string()).build());
gauge.record(value, &attributes);
Ok(())
}
fn duration_histogram(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> {
validate_metric_name(name)?;
let attributes = self.attributes(tags)?;
@@ -233,6 +250,7 @@ impl MetricsClient {
meter_provider,
meter,
counters: Mutex::new(HashMap::new()),
gauges: Mutex::new(HashMap::new()),
histograms: Mutex::new(HashMap::new()),
duration_histograms: Mutex::new(HashMap::new()),
runtime_reader,
@@ -250,6 +268,11 @@ impl MetricsClient {
self.0.histogram(name, value, tags)
}
/// Send a single gauge measurement.
pub fn gauge(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> {
self.0.gauge(name, value, tags)
}
/// Record a duration in milliseconds using a histogram.
pub fn record_duration(
&self,

View File

@@ -186,6 +186,11 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
))?;
metrics.counter("codex.turns", /*inc*/ 1, &[("source", "test")])?;
metrics.gauge(
"exec_server.connections.active",
/*value*/ 1,
&[("transport", "websocket")],
)?;
metrics.shutdown()?;
server.join().expect("server join");
@@ -220,6 +225,16 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
"expected metric name not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
assert!(
body.contains("exec_server.connections.active"),
"expected exec-server gauge not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
assert!(
body.contains("websocket"),
"expected exec-server metric tag not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
Ok(())
}

View File

@@ -23,6 +23,11 @@ fn send_builds_payload_with_tags_and_histograms() -> Result<()> {
/*value*/ 25,
&[("tool", "shell")],
)?;
metrics.gauge(
"codex.active",
/*value*/ 2,
&[("component", "exec-server")],
)?;
metrics.shutdown()?;
let resource_metrics = latest_metrics(&exporter);
@@ -78,6 +83,26 @@ fn send_builds_payload_with_tags_and_histograms() -> Result<()> {
]);
assert_eq!(histogram_attrs, expected_histogram_attributes);
let gauge = find_metric(&resource_metrics, "codex.active").expect("gauge metric missing");
let gauge_point = match gauge.data() {
opentelemetry_sdk::metrics::data::AggregatedMetrics::I64(data) => match data {
opentelemetry_sdk::metrics::data::MetricData::Gauge(gauge) => {
gauge.data_points().next().expect("gauge point")
}
_ => panic!("unexpected gauge aggregation"),
},
_ => panic!("unexpected gauge metric data type"),
};
assert_eq!(gauge_point.value(), 2);
assert_eq!(
attributes_to_map(gauge_point.attributes()),
BTreeMap::from([
("component".to_string(), "exec-server".to_string()),
("env".to_string(), "prod".to_string()),
("service".to_string(), "codex-cli".to_string()),
])
);
Ok(())
}