codex: add exec-server 80 20 telemetry

This commit is contained in:
starr-openai
2026-05-29 16:55:05 -07:00
parent fc0f0c9933
commit c3ff443001
5 changed files with 257 additions and 41 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -2803,6 +2803,8 @@ dependencies = [
"tokio-util",
"toml 0.9.11+spec-1.1.0",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"uuid",
"wiremock",
]

View File

@@ -60,4 +60,6 @@ pretty_assertions = { workspace = true }
serial_test = { workspace = true }
tempfile = { workspace = true }
test-case = "3.3.1"
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
wiremock = { workspace = true }

View File

@@ -1,10 +1,12 @@
use std::time::Duration;
use std::time::Instant;
use codex_api::SharedAuthProvider;
use reqwest::StatusCode;
use serde::Deserialize;
use tokio::time::sleep;
use tokio_tungstenite::connect_async;
use tracing::Instrument;
use tracing::info;
use tracing::warn;
@@ -170,9 +172,20 @@ pub async fn run_remote_environment(
let mut connection_attempt = 0_u32;
loop {
let response = match client.register_environment(&config.environment_id).await {
let registration_started_at = Instant::now();
let registration_span = remote_registration_span();
let response = match client
.register_environment(&config.environment_id)
.instrument(registration_span.clone())
.await
{
Ok(response) => response,
Err(err) => {
registration_span.record("result", "error");
config
.telemetry
.remote_registration_completed("error", registration_started_at.elapsed());
drop(registration_span);
warn!(error = %err, "failed to register remote exec-server environment");
emit_remote_otel_event!(
WARN,
@@ -182,6 +195,11 @@ pub async fn run_remote_environment(
return Err(err);
}
};
registration_span.record("result", "success");
config
.telemetry
.remote_registration_completed("success", registration_started_at.elapsed());
drop(registration_span);
eprintln!("codex exec-server remote environment registered");
info!("codex exec-server remote environment registered");
emit_remote_otel_event!(
@@ -190,9 +208,21 @@ pub async fn run_remote_environment(
"codex exec-server remote environment registered"
);
match connect_async(response.url.as_str()).await {
let websocket_connect_started_at = Instant::now();
let websocket_connect_span = remote_websocket_connect_span(connection_attempt + 1);
match connect_async(response.url.as_str())
.instrument(websocket_connect_span.clone())
.await
{
Ok((websocket, _)) => {
connection_attempt += 1;
websocket_connect_span.record("result", "success");
config.telemetry.remote_websocket_connect_completed(
"success",
websocket_connect_started_at.elapsed(),
);
drop(websocket_connect_span);
let remote_websocket = config.telemetry.remote_websocket_connected();
info!(
attempt = connection_attempt,
"connected remote exec-server websocket"
@@ -205,7 +235,8 @@ pub async fn run_remote_environment(
);
backoff = Duration::from_secs(1);
run_multiplexed_environment(websocket, processor.clone()).await;
config.telemetry.relay_reconnect("disconnected");
drop(remote_websocket);
config.telemetry.remote_websocket_reconnect("disconnected");
warn!(
attempt = connection_attempt,
"remote exec-server websocket disconnected; retrying"
@@ -219,6 +250,12 @@ pub async fn run_remote_environment(
}
Err(err) => {
connection_attempt += 1;
websocket_connect_span.record("result", "error");
config.telemetry.remote_websocket_connect_completed(
"error",
websocket_connect_started_at.elapsed(),
);
drop(websocket_connect_span);
warn!(
attempt = connection_attempt,
error = %err,
@@ -230,7 +267,9 @@ pub async fn run_remote_environment(
attempt = connection_attempt,
"failed to connect remote exec-server websocket"
);
config.telemetry.relay_reconnect("connect_failed");
config
.telemetry
.remote_websocket_reconnect("connect_failed");
}
}
@@ -251,6 +290,25 @@ pub async fn run_remote_environment(
}
}
fn remote_registration_span() -> tracing::Span {
tracing::info_span!(
"codex.exec_server.remote.register",
otel.kind = "client",
otel.name = "codex.exec_server.remote.register",
result = tracing::field::Empty,
)
}
fn remote_websocket_connect_span(attempt: u32) -> tracing::Span {
tracing::info_span!(
"codex.exec_server.remote.websocket.connect",
otel.kind = "client",
otel.name = "codex.exec_server.remote.websocket.connect",
attempt,
result = tracing::field::Empty,
)
}
fn normalize_environment_id(environment_id: String) -> Result<String, ExecServerError> {
let environment_id = environment_id.trim().to_string();
if environment_id.is_empty() {
@@ -494,4 +552,45 @@ mod tests {
"expected finished remote OTEL lifecycle span, got {spans:?}"
);
}
#[test]
fn remote_operation_spans_finish_immediately() {
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("exec-server-test");
let subscriber = tracing_subscriber::registry().with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(codex_otel::OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let registration_span = remote_registration_span();
registration_span.in_scope(|| {});
registration_span.record("result", "success");
drop(registration_span);
let websocket_span = remote_websocket_connect_span(2);
websocket_span.in_scope(|| {});
websocket_span.record("result", "success");
drop(websocket_span);
});
tracer_provider.force_flush().expect("flush traces");
let spans = span_exporter.get_finished_spans().expect("span export");
assert!(
spans
.iter()
.any(|span| span.name.as_ref() == "codex.exec_server.remote.register"),
"expected finished remote registration span, got {spans:?}"
);
assert!(
spans
.iter()
.any(|span| { span.name.as_ref() == "codex.exec_server.remote.websocket.connect" }),
"expected finished remote websocket connect span, got {spans:?}"
);
}
}

View File

@@ -2,6 +2,7 @@ use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::Instrument;
use tracing::debug;
use tracing::warn;
@@ -117,37 +118,37 @@ async fn run_connection(
}
JsonRpcConnectionEvent::Message(message) => match message {
codex_app_server_protocol::JSONRPCMessage::Request(request) => {
let operation = request_operation(request.method.as_str());
let method = request_method(request.method.as_str());
let request_span = request_span(method);
let request_started_at = Instant::now();
if let Some(route) = router.request_route(request.method.as_str()) {
let message = tokio::select! {
message = route(Arc::clone(&handler), request) => message,
message = route(Arc::clone(&handler), request).instrument(request_span.clone()) => message,
_ = disconnected_rx.changed() => {
request_span.record("result", "disconnected");
telemetry.request_completed(
operation,
method,
"disconnected",
request_started_at.elapsed(),
);
drop(request_span);
debug!("exec-server transport disconnected while handling request");
break;
}
};
telemetry.request_completed(
operation,
request_result(&message),
request_started_at.elapsed(),
);
let result = request_result(&message);
request_span.record("result", result);
telemetry.request_completed(method, result, request_started_at.elapsed());
drop(request_span);
if let Some(message) = message
&& outgoing_tx.send(message).await.is_err()
{
break;
}
} else {
telemetry.request_completed(
operation,
"error",
request_started_at.elapsed(),
);
request_span.record("result", "error");
telemetry.request_completed(method, "error", request_started_at.elapsed());
drop(request_span);
if outgoing_tx
.send(RpcServerOutboundMessage::Error {
request_id: request.id,
@@ -220,25 +221,35 @@ async fn run_connection(
let _ = outbound_task.await;
}
fn request_operation(method: &str) -> &'static str {
fn request_method(method: &str) -> &'static str {
match method {
crate::protocol::INITIALIZE_METHOD => "initialize",
crate::protocol::EXEC_METHOD
| crate::protocol::EXEC_READ_METHOD
| crate::protocol::EXEC_WRITE_METHOD
| crate::protocol::EXEC_TERMINATE_METHOD => "process",
crate::protocol::FS_READ_FILE_METHOD
| crate::protocol::FS_WRITE_FILE_METHOD
| crate::protocol::FS_CREATE_DIRECTORY_METHOD
| crate::protocol::FS_GET_METADATA_METHOD
| crate::protocol::FS_READ_DIRECTORY_METHOD
| crate::protocol::FS_REMOVE_METHOD
| crate::protocol::FS_COPY_METHOD => "filesystem",
crate::protocol::HTTP_REQUEST_METHOD => "http",
crate::protocol::INITIALIZE_METHOD => crate::protocol::INITIALIZE_METHOD,
crate::protocol::EXEC_METHOD => crate::protocol::EXEC_METHOD,
crate::protocol::EXEC_READ_METHOD => crate::protocol::EXEC_READ_METHOD,
crate::protocol::EXEC_WRITE_METHOD => crate::protocol::EXEC_WRITE_METHOD,
crate::protocol::EXEC_TERMINATE_METHOD => crate::protocol::EXEC_TERMINATE_METHOD,
crate::protocol::FS_READ_FILE_METHOD => crate::protocol::FS_READ_FILE_METHOD,
crate::protocol::FS_WRITE_FILE_METHOD => crate::protocol::FS_WRITE_FILE_METHOD,
crate::protocol::FS_CREATE_DIRECTORY_METHOD => crate::protocol::FS_CREATE_DIRECTORY_METHOD,
crate::protocol::FS_GET_METADATA_METHOD => crate::protocol::FS_GET_METADATA_METHOD,
crate::protocol::FS_READ_DIRECTORY_METHOD => crate::protocol::FS_READ_DIRECTORY_METHOD,
crate::protocol::FS_REMOVE_METHOD => crate::protocol::FS_REMOVE_METHOD,
crate::protocol::FS_COPY_METHOD => crate::protocol::FS_COPY_METHOD,
crate::protocol::HTTP_REQUEST_METHOD => crate::protocol::HTTP_REQUEST_METHOD,
_ => "unknown",
}
}
fn request_span(method: &'static str) -> tracing::Span {
tracing::info_span!(
"codex.exec_server.request",
otel.kind = "server",
otel.name = method,
method,
result = tracing::field::Empty,
)
}
fn request_result(message: &Option<RpcServerOutboundMessage>) -> &'static str {
match message {
Some(RpcServerOutboundMessage::Error { .. }) => "error",
@@ -260,6 +271,7 @@ mod tests {
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use pretty_assertions::assert_eq;
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::io::AsyncBufReadExt;
@@ -271,6 +283,7 @@ mod tests {
use tokio::task::JoinHandle;
use tokio::time::timeout;
use super::request_method;
use super::run_connection;
use crate::ExecServerRuntimePaths;
use crate::ProcessId;
@@ -289,6 +302,12 @@ mod tests {
use crate::protocol::TerminateResponse;
use crate::server::session_registry::SessionRegistry;
#[test]
fn request_method_uses_bounded_protocol_method_names() {
assert_eq!(request_method(EXEC_TERMINATE_METHOD), EXEC_TERMINATE_METHOD);
assert_eq!(request_method("custom/method"), "unknown");
}
#[tokio::test]
async fn transport_disconnect_detaches_session_during_in_flight_read() {
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());

View File

@@ -8,11 +8,18 @@ use tracing::warn;
const CONNECTIONS_ACTIVE_METRIC: &str = "exec_server.connections.active";
const CONNECTIONS_TOTAL_METRIC: &str = "exec_server.connections.total";
const REMOTE_REGISTRATION_TOTAL_METRIC: &str = "exec_server.remote.registration.total";
const REMOTE_REGISTRATION_DURATION_METRIC: &str = "exec_server.remote.registration.duration";
const REMOTE_WEBSOCKET_ACTIVE_METRIC: &str = "exec_server.remote.websocket.active";
const REMOTE_WEBSOCKET_CONNECT_TOTAL_METRIC: &str = "exec_server.remote.websocket.connect.total";
const REMOTE_WEBSOCKET_CONNECT_DURATION_METRIC: &str =
"exec_server.remote.websocket.connect.duration";
const REMOTE_WEBSOCKET_RECONNECTS_METRIC: &str = "exec_server.remote.websocket.reconnects";
const REQUESTS_TOTAL_METRIC: &str = "exec_server.requests.total";
const REQUEST_DURATION_METRIC: &str = "exec_server.request.duration";
const PROCESSES_ACTIVE_METRIC: &str = "exec_server.processes.active";
const PROCESSES_FINISHED_TOTAL_METRIC: &str = "exec_server.processes.finished_total";
const PROCESS_DURATION_METRIC: &str = "exec_server.process.duration";
const RELAY_RECONNECTS_METRIC: &str = "exec_server.relay.reconnects";
#[derive(Clone, Copy)]
pub(crate) enum ConnectionTransport {
@@ -41,6 +48,7 @@ struct ExecServerTelemetryInner {
relay_connections: AtomicI64,
stdio_connections: AtomicI64,
websocket_connections: AtomicI64,
remote_websockets: AtomicI64,
active_processes: AtomicI64,
}
@@ -49,6 +57,10 @@ pub(crate) struct ConnectionMetricGuard {
transport: ConnectionTransport,
}
pub(crate) struct RemoteWebSocketMetricGuard {
telemetry: ExecServerTelemetry,
}
impl ExecServerTelemetry {
pub fn new(metrics: Option<MetricsClient>) -> Self {
Self {
@@ -58,6 +70,7 @@ impl ExecServerTelemetry {
relay_connections: AtomicI64::new(0),
stdio_connections: AtomicI64::new(0),
websocket_connections: AtomicI64::new(0),
remote_websockets: AtomicI64::new(0),
active_processes: AtomicI64::new(0),
})
}),
@@ -92,14 +105,44 @@ impl ExecServerTelemetry {
}
}
pub(crate) fn request_completed(
pub(crate) fn remote_registration_completed(&self, result: &'static str, duration: Duration) {
self.with_inner(|inner| {
let tags = [("result", result)];
inner.counter(REMOTE_REGISTRATION_TOTAL_METRIC, &tags);
inner.duration(REMOTE_REGISTRATION_DURATION_METRIC, duration, &tags);
});
}
pub(crate) fn remote_websocket_connected(&self) -> RemoteWebSocketMetricGuard {
self.with_inner(|inner| {
let active = inner.remote_websockets.fetch_add(1, Ordering::AcqRel) + 1;
inner.gauge(REMOTE_WEBSOCKET_ACTIVE_METRIC, active, &[]);
});
RemoteWebSocketMetricGuard {
telemetry: self.clone(),
}
}
pub(crate) fn remote_websocket_connect_completed(
&self,
operation: &'static str,
result: &'static str,
duration: Duration,
) {
self.with_inner(|inner| {
let tags = [("operation", operation), ("result", result)];
let tags = [("result", result)];
inner.counter(REMOTE_WEBSOCKET_CONNECT_TOTAL_METRIC, &tags);
inner.duration(REMOTE_WEBSOCKET_CONNECT_DURATION_METRIC, duration, &tags);
});
}
pub(crate) fn request_completed(
&self,
method: &'static str,
result: &'static str,
duration: Duration,
) {
self.with_inner(|inner| {
let tags = [("method", method), ("result", result)];
inner.counter(REQUESTS_TOTAL_METRIC, &tags);
inner.duration(REQUEST_DURATION_METRIC, duration, &tags);
});
@@ -116,13 +159,14 @@ impl ExecServerTelemetry {
self.with_inner(|inner| {
let active = inner.active_processes.fetch_sub(1, Ordering::AcqRel) - 1;
inner.gauge(PROCESSES_ACTIVE_METRIC, active, &[]);
inner.counter(PROCESSES_FINISHED_TOTAL_METRIC, &[("result", result)]);
inner.duration(PROCESS_DURATION_METRIC, duration, &[("result", result)]);
});
}
pub(crate) fn relay_reconnect(&self, reason: &'static str) {
pub(crate) fn remote_websocket_reconnect(&self, reason: &'static str) {
self.with_inner(|inner| {
inner.counter(RELAY_RECONNECTS_METRIC, &[("reason", reason)]);
inner.counter(REMOTE_WEBSOCKET_RECONNECTS_METRIC, &[("reason", reason)]);
});
}
@@ -140,6 +184,13 @@ impl ExecServerTelemetry {
});
}
fn remote_websocket_disconnected(&self) {
self.with_inner(|inner| {
let active = inner.remote_websockets.fetch_sub(1, Ordering::AcqRel) - 1;
inner.gauge(REMOTE_WEBSOCKET_ACTIVE_METRIC, active, &[]);
});
}
fn with_inner(&self, emit: impl FnOnce(&ExecServerTelemetryInner)) {
if let Some(inner) = &self.inner {
emit(inner);
@@ -153,6 +204,12 @@ impl Drop for ConnectionMetricGuard {
}
}
impl Drop for RemoteWebSocketMetricGuard {
fn drop(&mut self) {
self.telemetry.remote_websocket_disconnected();
}
}
impl ExecServerTelemetryInner {
fn connection_counter(&self, transport: ConnectionTransport) -> &AtomicI64 {
match transport {
@@ -211,10 +268,14 @@ mod tests {
let telemetry = ExecServerTelemetry::new(Some(metrics.clone()));
let connection = telemetry.connection_started(ConnectionTransport::WebSocket);
telemetry.request_completed("process", "success", Duration::from_millis(12));
telemetry.remote_registration_completed("success", Duration::from_millis(5));
let remote_websocket = telemetry.remote_websocket_connected();
telemetry.remote_websocket_connect_completed("success", Duration::from_millis(7));
telemetry.request_completed("process/start", "success", Duration::from_millis(12));
telemetry.process_started();
telemetry.process_finished("success", Duration::from_millis(34));
telemetry.relay_reconnect("connect_failed");
telemetry.remote_websocket_reconnect("connect_failed");
drop(remote_websocket);
drop(connection);
metrics.shutdown().expect("shutdown metrics");
@@ -236,12 +297,30 @@ mod tests {
BTreeMap::from([("transport".to_string(), "websocket".to_string())]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server.remote.registration.total"),
vec![(
1.0,
BTreeMap::from([("result".to_string(), "success".to_string())]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server.remote.websocket.connect.total"),
vec![(
1.0,
BTreeMap::from([("result".to_string(), "success".to_string())]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server.remote.websocket.active"),
vec![(0.0, BTreeMap::new())]
);
assert_eq!(
metric_points(&metrics, "exec_server.requests.total"),
vec![(
1.0,
BTreeMap::from([
("operation".to_string(), "process".to_string()),
("method".to_string(), "process/start".to_string()),
("result".to_string(), "success".to_string()),
]),
)]
@@ -251,12 +330,27 @@ mod tests {
vec![(0.0, BTreeMap::new())]
);
assert_eq!(
metric_points(&metrics, "exec_server.relay.reconnects"),
metric_points(&metrics, "exec_server.processes.finished_total"),
vec![(
1.0,
BTreeMap::from([("result".to_string(), "success".to_string())]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server.remote.websocket.reconnects"),
vec![(
1.0,
BTreeMap::from([("reason".to_string(), "connect_failed".to_string())]),
)]
);
assert_eq!(
histogram_count(&metrics, "exec_server.remote.registration.duration"),
1
);
assert_eq!(
histogram_count(&metrics, "exec_server.remote.websocket.connect.duration"),
1
);
assert_eq!(histogram_count(&metrics, "exec_server.request.duration"), 1);
assert_eq!(histogram_count(&metrics, "exec_server.process.duration"), 1);
}