Compare commits

...

20 Commits

Author SHA1 Message Date
Richard Lee
d0265f43f5 Align exec-server telemetry with OTEL conventions 2026-06-01 15:57:28 -07:00
Richard Lee
677997fad6 Retry resolved MCP test execution 2026-06-01 13:57:23 -07:00
Richard Lee
7eace99187 Merge remote-tracking branch 'origin/main' into starr/cca-exec-logging 2026-06-01 12:13:29 -07:00
Richard Lee
5065825bd2 Flush ad-hoc memory instruction seed 2026-06-01 12:09:44 -07:00
Richard Lee
f06a7b0da8 Merge remote-tracking branch 'origin/main' into starr/cca-exec-logging 2026-06-01 10:17:34 -07:00
starr-openai
5f6fecc3c9 codex: fix CI failure on PR #25019 2026-05-31 18:44:47 -07:00
starr-openai
c3ff443001 codex: add exec-server 80 20 telemetry 2026-05-29 16:55:05 -07:00
starr-openai
fc0f0c9933 codex: fix exec-server OTEL trace lifecycle 2026-05-29 16:14:32 -07:00
starr-openai
802f373110 codex: address PR review feedback (#25019) 2026-05-29 15:54:39 -07:00
starr-openai
4282a6e28b Stabilize exec-server OTLP smoke 2026-05-29 12:07:20 -07:00
starr-openai
d30f38c1d9 Poll exec-server stderr smoke 2026-05-29 12:07:20 -07:00
starr-openai
65968564df Avoid stderr smoke pipe hang 2026-05-29 12:07:20 -07:00
starr-openai
3aa56d259e Stabilize exec-server stderr smoke 2026-05-29 12:07:19 -07:00
starr-openai
0bdec5d1b7 Address exec-server OTEL review findings 2026-05-29 12:07:19 -07:00
starr-openai
b0f8167cc9 Fix exec-server Bazel telemetry test deps 2026-05-29 12:07:19 -07:00
starr-openai
568c79edac Remove exec-server identifiers from telemetry 2026-05-29 12:07:19 -07:00
starr-openai
21dfe383ed Add exec-server OTEL metrics 2026-05-29 12:07:18 -07:00
starr-openai
f23fd2fd5b Deduplicate remote exec-server telemetry events 2026-05-29 12:07:18 -07:00
starr-openai
715b462991 Align exec-server OTEL bootstrap proposal 2026-05-29 12:07:17 -07:00
starr-openai
a5412211d0 Add exec-server OTEL lifecycle logging 2026-05-29 12:07:17 -07:00
24 changed files with 1861 additions and 105 deletions

5
codex-rs/Cargo.lock generated
View File

@@ -2778,6 +2778,7 @@ dependencies = [
"codex-app-server-protocol",
"codex-client",
"codex-file-system",
"codex-otel",
"codex-protocol",
"codex-sandboxing",
"codex-test-binary-support",
@@ -2787,6 +2788,8 @@ dependencies = [
"ctor 0.6.3",
"futures",
"http 1.4.0",
"opentelemetry",
"opentelemetry_sdk",
"pretty_assertions",
"prost 0.14.3",
"reqwest 0.12.28",
@@ -2801,6 +2804,8 @@ dependencies = [
"tokio-util",
"toml 0.9.11+spec-1.1.0",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"uuid",
"wiremock",
]

View File

@@ -41,6 +41,9 @@ use owo_colors::OwoColorize;
use std::io::IsTerminal;
use std::path::PathBuf;
use supports_color::Stream;
use tracing::Instrument;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
#[cfg(any(target_os = "macos", target_os = "windows"))]
mod app_cmd;
@@ -544,6 +547,10 @@ struct ExecServerCommand {
use_agent_identity_auth: bool,
}
const EXEC_SERVER_DEFAULT_ANALYTICS_ENABLED: bool = false;
const EXEC_SERVER_DEFAULT_LOG_FILTER: &str = "error,opentelemetry_sdk=off,opentelemetry_otlp=off";
const EXEC_SERVER_OTEL_SERVICE_NAME: &str = "codex-exec-server";
#[derive(Debug, clap::Subcommand)]
#[allow(clippy::enum_variant_names)]
enum AppServerSubcommand {
@@ -1596,10 +1603,11 @@ async fn run_exec_server_command(
arg0_paths.codex_linux_sandbox_exe.clone(),
)?;
if let Some(base_url) = cmd.remote {
let config = load_exec_server_config(root_config_overrides, strict_config).await?;
let (_otel, telemetry) = init_exec_server_tracing(Some(&config));
let environment_id = cmd
.environment_id
.ok_or_else(|| anyhow::anyhow!("--environment-id is required when --remote is set"))?;
let config = load_exec_server_config(root_config_overrides, strict_config).await?;
let auth_provider =
load_exec_server_remote_auth_provider(&config, &base_url, cmd.use_agent_identity_auth)
.await?;
@@ -1611,25 +1619,79 @@ async fn run_exec_server_command(
if let Some(name) = cmd.name {
remote_config.name = name;
}
codex_exec_server::run_remote_environment(remote_config, runtime_paths).await?;
let remote_config = remote_config.with_telemetry(telemetry);
codex_exec_server::run_remote_environment(remote_config, runtime_paths)
.instrument(codex_exec_server::runtime_span())
.await?;
Ok(())
} else {
if strict_config {
// Local exec-server startup does not consume Config, but strict
// mode should still reject unknown fields before opening a listener.
let _validated_config =
load_exec_server_config(root_config_overrides, strict_config).await?;
}
let config = if strict_config {
Some(load_exec_server_config(root_config_overrides, strict_config).await?)
} else {
load_exec_server_config(root_config_overrides, /*strict_config*/ false)
.await
.ok()
};
let (_otel, telemetry) = init_exec_server_tracing(config.as_ref());
let listen_url = cmd
.listen
.as_deref()
.unwrap_or(codex_exec_server::DEFAULT_LISTEN_URL);
codex_exec_server::run_main(listen_url, runtime_paths)
codex_exec_server::run_main_with_telemetry(listen_url, runtime_paths, telemetry)
.instrument(codex_exec_server::runtime_span())
.await
.map_err(anyhow::Error::from_boxed)
}
}
fn init_exec_server_tracing(
config: Option<&codex_core::config::Config>,
) -> (impl Send + Sync, codex_exec_server::ExecServerTelemetry) {
let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_filter(exec_server_stderr_env_filter());
let otel = config.and_then(|config| {
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
codex_core::otel_init::build_provider(
config,
env!("CARGO_PKG_VERSION"),
Some(EXEC_SERVER_OTEL_SERVICE_NAME),
EXEC_SERVER_DEFAULT_ANALYTICS_ENABLED,
)
})) {
Ok(Ok(otel)) => otel,
Ok(Err(err)) => {
eprintln!("Could not create otel exporter: {err}");
None
}
Err(_) => {
eprintln!("Could not create otel exporter: panicked during initialization");
None
}
}
});
codex_core::otel_init::record_process_start(otel.as_ref(), EXEC_SERVER_OTEL_SERVICE_NAME);
let otel_logger_layer = otel.as_ref().and_then(|otel| otel.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|otel| otel.tracing_layer());
let telemetry = codex_exec_server::ExecServerTelemetry::new(
otel.as_ref().and_then(|otel| otel.metrics()).cloned(),
);
let _ = tracing_subscriber::registry()
.with(fmt_layer)
.with(otel_tracing_layer)
.with(otel_logger_layer)
.try_init();
tracing::callsite::rebuild_interest_cache();
(otel, telemetry)
}
fn exec_server_stderr_env_filter() -> EnvFilter {
EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new(EXEC_SERVER_DEFAULT_LOG_FILTER))
.unwrap_or_else(|_| EnvFilter::new("error"))
}
async fn load_exec_server_remote_auth_provider(
config: &codex_core::config::Config,
base_url: &str,

View File

@@ -1,6 +1,14 @@
use std::io::Read as _;
use std::io::Write as _;
use std::net::TcpListener;
use std::path::Path;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use anyhow::Result;
use predicates::prelude::PredicateBooleanExt;
use predicates::str::contains;
use tempfile::TempDir;
@@ -33,3 +41,249 @@ foo = "bar"
Ok(())
}
#[test]
fn local_exec_server_ignores_invalid_config_without_strict_config() -> Result<()> {
let codex_home = TempDir::new()?;
std::fs::write(codex_home.path().join("config.toml"), "not valid toml = [")?;
let mut cmd = codex_command(codex_home.path())?;
cmd.args(["exec-server", "--listen", "stdio"])
.assert()
.success()
.stderr(contains("not valid toml").not());
Ok(())
}
#[test]
fn local_exec_server_exports_real_otel_metrics() -> Result<()> {
let collector = TestCollector::start()?;
let codex_home = TempDir::new()?;
let base_url = &collector.base_url;
std::fs::write(
codex_home.path().join("config.toml"),
format!(
r#"
[analytics]
enabled = true
[otel]
environment = "test"
metrics_exporter = {{ otlp-http = {{ endpoint = "{base_url}/v1/metrics", protocol = "json" }} }}
"#
),
)?;
let mut cmd = codex_command(codex_home.path())?;
cmd.args(["exec-server", "--listen", "stdio"])
.write_stdin(
r#"{"id":1,"method":"initialize","params":{"clientName":"otel-test","resumeSessionId":null}}"#,
)
.assert()
.success();
let requests = collector.finish()?;
let metrics = requests
.iter()
.filter(|request| request.path == "/v1/metrics")
.map(|request| request.body.as_str())
.collect::<Vec<_>>()
.join("\n");
assert!(
metrics.contains("exec_server_connections_active"),
"{metrics}"
);
assert!(metrics.contains("exec_server_requests_total"), "{metrics}");
assert!(metrics.contains("initialize"), "{metrics}");
assert!(
metrics.contains("success") || metrics.contains("disconnected"),
"{metrics}"
);
Ok(())
}
#[test]
fn remote_exec_server_preserves_websocket_error_in_stderr() -> Result<()> {
let failed_websocket_listener = TcpListener::bind("127.0.0.1:0")?;
let failed_websocket_url = format!("ws://{}", failed_websocket_listener.local_addr()?);
drop(failed_websocket_listener);
let registry = TestRegistry::start(&failed_websocket_url)?;
let codex_home = TempDir::new()?;
let stderr_path = codex_home.path().join("remote.stderr");
let stderr_file = std::fs::File::create(&stderr_path)?;
let mut cmd = std::process::Command::new(codex_utils_cargo_bin::cargo_bin("codex")?);
cmd.env("CODEX_HOME", codex_home.path())
.env("CODEX_API_KEY", "test-key")
.env("RUST_LOG", "codex_exec_server=warn")
.args([
"exec-server",
"--remote",
&registry.base_url,
"--environment-id",
"env-test",
])
.stdout(std::process::Stdio::null())
.stderr(stderr_file);
let mut child = cmd.spawn()?;
let deadline = Instant::now() + Duration::from_secs(5);
let stderr = loop {
let stderr = std::fs::read_to_string(&stderr_path)?;
if stderr.contains("failed to connect remote exec-server websocket")
|| Instant::now() >= deadline
{
break stderr;
}
thread::sleep(Duration::from_millis(50));
};
let _ = child.kill();
child.wait()?;
registry.finish()?;
assert!(
stderr.contains("failed to connect remote exec-server websocket"),
"{stderr}"
);
assert!(stderr.contains("IO error"), "{stderr}");
Ok(())
}
struct CapturedRequest {
path: String,
body: String,
}
struct TestRegistry {
base_url: String,
server: thread::JoinHandle<Result<()>>,
}
impl TestRegistry {
fn start(websocket_url: &str) -> Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0")?;
let addr = listener.local_addr()?;
let websocket_url = websocket_url.to_string();
let server = thread::spawn(move || {
let (mut stream, _) = listener.accept()?;
let _request = read_http_request(&mut stream)?;
let body = format!(r#"{{"environment_id":"env-test","url":"{websocket_url}"}}"#);
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
stream.write_all(response.as_bytes())?;
Ok(())
});
Ok(Self {
base_url: format!("http://{addr}"),
server,
})
}
fn finish(self) -> Result<()> {
self.server
.join()
.map_err(|_| anyhow::anyhow!("registry thread panicked"))?
}
}
struct TestCollector {
base_url: String,
requests: mpsc::Receiver<Vec<CapturedRequest>>,
stop: mpsc::Sender<()>,
server: thread::JoinHandle<()>,
}
impl TestCollector {
fn start() -> Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0")?;
let addr = listener.local_addr()?;
listener.set_nonblocking(true)?;
let (tx, requests) = mpsc::channel();
let (stop, stop_rx) = mpsc::channel();
let server = thread::spawn(move || {
let mut captured = Vec::new();
loop {
match listener.accept() {
Ok((mut stream, _)) => {
if let Ok(request) = read_http_request(&mut stream) {
captured.push(request);
}
let _ = stream.write_all(
b"HTTP/1.1 202 Accepted\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
);
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
if stop_rx.try_recv().is_ok() {
break;
}
thread::sleep(Duration::from_millis(10));
}
Err(_) => break,
}
}
let _ = tx.send(captured);
});
Ok(Self {
base_url: format!("http://{addr}"),
requests,
stop,
server,
})
}
fn finish(self) -> Result<Vec<CapturedRequest>> {
let _ = self.stop.send(());
self.server
.join()
.map_err(|_| anyhow::anyhow!("collector thread panicked"))?;
Ok(self.requests.recv_timeout(Duration::from_secs(1))?)
}
}
fn read_http_request(stream: &mut std::net::TcpStream) -> std::io::Result<CapturedRequest> {
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
let mut bytes = Vec::new();
let mut scratch = [0_u8; 8192];
let header_end = loop {
let read = stream.read(&mut scratch)?;
if read == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"request closed before headers",
));
}
bytes.extend_from_slice(&scratch[..read]);
if let Some(header_end) = bytes.windows(4).position(|window| window == b"\r\n\r\n") {
break header_end;
}
};
let headers = String::from_utf8_lossy(&bytes[..header_end]);
let mut lines = headers.split("\r\n");
let path = lines
.next()
.and_then(|line| line.split_whitespace().nth(1))
.unwrap_or_default()
.to_string();
let content_length = lines
.filter_map(|line| line.split_once(':'))
.find(|(key, _)| key.eq_ignore_ascii_case("content-length"))
.and_then(|(_, value)| value.trim().parse::<usize>().ok())
.unwrap_or_default();
let mut body = bytes[header_end + 4..].to_vec();
while body.len() < content_length {
let read = stream.read(&mut scratch)?;
if read == 0 {
break;
}
body.extend_from_slice(&scratch[..read]);
}
body.truncate(content_length);
Ok(CapturedRequest {
path,
body: String::from_utf8_lossy(&body).into_owned(),
})
}

View File

@@ -4,6 +4,8 @@ codex_rust_crate(
name = "exec-server",
crate_name = "codex_exec_server",
deps_extra = [
"@crates//:opentelemetry",
"@crates//:opentelemetry_sdk",
"@crates//:toml",
],
# Keep the crate's integration tests single-threaded under Bazel because

View File

@@ -20,6 +20,7 @@ codex-app-server-protocol = { workspace = true }
codex-api = { workspace = true }
codex-client = { workspace = true }
codex-file-system = { workspace = true }
codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-sandboxing = { workspace = true }
codex-utils-absolute-path = { workspace = true }
@@ -53,8 +54,12 @@ anyhow = { workspace = true }
codex-test-binary-support = { workspace = true }
ctor = { workspace = true }
http = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
pretty_assertions = { workspace = true }
serial_test = { workspace = true }
tempfile = { workspace = true }
test-case = "3.3.1"
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
wiremock = { workspace = true }

View File

@@ -23,6 +23,7 @@ mod rpc;
mod runtime_paths;
mod sandboxed_file_system;
mod server;
mod telemetry;
pub use client::ExecServerClient;
pub use client::ExecServerError;
@@ -105,3 +106,6 @@ pub use runtime_paths::ExecServerRuntimePaths;
pub use server::DEFAULT_LISTEN_URL;
pub use server::ExecServerListenUrlParseError;
pub use server::run_main;
pub use server::run_main_with_telemetry;
pub use telemetry::ExecServerTelemetry;
pub use telemetry::runtime_span;

View File

@@ -3,6 +3,7 @@ use std::collections::VecDeque;
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use async_trait::async_trait;
use codex_app_server_protocol::JSONRPCErrorError;
@@ -45,6 +46,7 @@ use crate::rpc::RpcServerOutboundMessage;
use crate::rpc::internal_error;
use crate::rpc::invalid_params;
use crate::rpc::invalid_request;
use crate::telemetry::ExecServerTelemetry;
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
const NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
@@ -74,6 +76,9 @@ struct RunningProcess {
output_notify: Arc<Notify>,
open_streams: usize,
closed: bool,
started_at: Instant,
metrics_finished: bool,
termination_requested: bool,
}
enum ProcessEntry {
@@ -84,6 +89,7 @@ enum ProcessEntry {
struct Inner {
notifications: std::sync::RwLock<Option<RpcNotificationSender>>,
processes: Mutex<HashMap<ProcessId, ProcessEntry>>,
telemetry: ExecServerTelemetry,
}
#[derive(Clone)]
@@ -103,16 +109,23 @@ impl Default for LocalProcess {
let (outgoing_tx, mut outgoing_rx) =
mpsc::channel::<RpcServerOutboundMessage>(NOTIFICATION_CHANNEL_CAPACITY);
tokio::spawn(async move { while outgoing_rx.recv().await.is_some() {} });
Self::new(RpcNotificationSender::new(outgoing_tx))
Self::new(
RpcNotificationSender::new(outgoing_tx),
ExecServerTelemetry::default(),
)
}
}
impl LocalProcess {
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
pub(crate) fn new(
notifications: RpcNotificationSender,
telemetry: ExecServerTelemetry,
) -> Self {
Self {
inner: Arc::new(Inner {
notifications: std::sync::RwLock::new(Some(notifications)),
processes: Mutex::new(HashMap::new()),
telemetry,
}),
}
}
@@ -129,6 +142,11 @@ impl LocalProcess {
.collect::<Vec<_>>()
};
for process in remaining {
if !process.metrics_finished {
self.inner
.telemetry
.process_finished("terminated", process.started_at.elapsed());
}
process.session.terminate();
}
}
@@ -226,9 +244,13 @@ impl LocalProcess {
output_notify: Arc::clone(&output_notify),
open_streams: 2,
closed: false,
started_at: Instant::now(),
metrics_finished: false,
termination_requested: false,
})),
);
}
self.inner.telemetry.process_started();
tokio::spawn(stream_output(
process_id.clone(),
@@ -389,12 +411,13 @@ impl LocalProcess {
) -> Result<TerminateResponse, JSONRPCErrorError> {
let _process_id = params.process_id.clone();
let running = {
let process_map = self.inner.processes.lock().await;
match process_map.get(&params.process_id) {
let mut process_map = self.inner.processes.lock().await;
match process_map.get_mut(&params.process_id) {
Some(ProcessEntry::Running(process)) => {
if process.exit_code.is_some() {
return Ok(TerminateResponse { running: false });
}
process.termination_requested = true;
process.session.terminate();
true
}
@@ -601,7 +624,7 @@ async fn watch_exit(
output_notify: Arc<Notify>,
) {
let exit_code = exit_rx.await.unwrap_or(-1);
let notification = {
let (notification, process_duration, termination_requested) = {
let mut processes = inner.processes.lock().await;
if let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) {
let seq = process.next_seq;
@@ -611,15 +634,32 @@ async fn watch_exit(
process
.events
.publish(ExecProcessEvent::Exited { seq, exit_code });
Some(ExecExitedNotification {
process_id: process_id.clone(),
seq,
exit_code,
})
process.metrics_finished = true;
(
Some(ExecExitedNotification {
process_id: process_id.clone(),
seq,
exit_code,
}),
Some(process.started_at.elapsed()),
process.termination_requested,
)
} else {
None
(None, None, false)
}
};
if let Some(process_duration) = process_duration {
inner.telemetry.process_finished(
if termination_requested {
"terminated"
} else if exit_code == 0 {
"success"
} else {
"error"
},
process_duration,
);
}
output_notify.notify_waiters();
if let Some(notification) = notification
&& let Some(notifications) = notification_sender(&inner)
@@ -890,6 +930,9 @@ mod tests {
output_notify: Arc::clone(&output_notify),
open_streams: 2,
closed: false,
started_at: Instant::now(),
metrics_finished: false,
termination_requested: false,
})),
);
assert!(previous.is_none());

View File

@@ -26,6 +26,7 @@ use crate::relay_proto::RelayMessageFrame;
use crate::relay_proto::RelayResume;
use crate::relay_proto::relay_message_frame;
use crate::server::ConnectionProcessor;
use crate::telemetry::ConnectionTransport;
const RELAY_MESSAGE_FRAME_VERSION: u32 = 1;
@@ -449,7 +450,9 @@ fn spawn_virtual_stream(
transport: JsonRpcTransport::Plain,
};
tokio::spawn(async move {
processor.run_connection(connection).await;
processor
.run_connection(connection, ConnectionTransport::Relay)
.await;
});
VirtualStream {

View File

@@ -1,21 +1,49 @@
use std::time::Duration;
use std::time::Instant;
use codex_api::SharedAuthProvider;
use reqwest::StatusCode;
use serde::Deserialize;
use tokio::time::sleep;
use tokio_tungstenite::connect_async;
use tracing::Instrument;
use tracing::info;
use tracing::warn;
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
use crate::ExecServerTelemetry;
use crate::relay::run_multiplexed_environment;
use crate::server::ConnectionProcessor;
const ERROR_BODY_PREVIEW_BYTES: usize = 4096;
macro_rules! emit_remote_otel_event {
($level:ident, $event_name:literal, $($fields:tt)*) => {{
let span = tracing::info_span!(
"codex.exec_server.remote_event",
otel.kind = "internal",
otel.name = $event_name,
);
span.in_scope(|| {
tracing::event!(
target: "codex_otel.log_only",
tracing::Level::$level,
event.name = $event_name,
$($fields)*
);
tracing::event!(
target: "codex_otel.trace_safe",
tracing::Level::$level,
event.name = $event_name,
$($fields)*
);
});
}};
}
#[derive(Clone)]
struct EnvironmentRegistryClient {
base_url: String,
@@ -94,6 +122,7 @@ pub struct RemoteEnvironmentConfig {
pub environment_id: String,
pub name: String,
auth_provider: SharedAuthProvider,
telemetry: ExecServerTelemetry,
}
impl std::fmt::Debug for RemoteEnvironmentConfig {
@@ -119,8 +148,14 @@ impl RemoteEnvironmentConfig {
environment_id,
name: "codex-exec-server".to_string(),
auth_provider,
telemetry: ExecServerTelemetry::default(),
})
}
pub fn with_telemetry(mut self, telemetry: ExecServerTelemetry) -> Self {
self.telemetry = telemetry;
self
}
}
/// Register an exec-server for remote use and serve requests over the returned
@@ -132,31 +167,148 @@ pub async fn run_remote_environment(
ensure_rustls_crypto_provider();
let client =
EnvironmentRegistryClient::new(config.base_url.clone(), config.auth_provider.clone())?;
let processor = ConnectionProcessor::new(runtime_paths);
let processor = ConnectionProcessor::new(runtime_paths, config.telemetry.clone());
let mut backoff = Duration::from_secs(1);
let mut connection_attempt = 0_u32;
loop {
let response = client.register_environment(&config.environment_id).await?;
eprintln!(
"codex exec-server remote environment registered with environment_id {}",
response.environment_id
let registration_started_at = Instant::now();
let registration_span = remote_registration_span();
let response = match client
.register_environment(&config.environment_id)
.instrument(registration_span.clone())
.await
{
Ok(response) => response,
Err(err) => {
registration_span.record("result", "error");
config
.telemetry
.remote_registration_completed("error", registration_started_at.elapsed());
drop(registration_span);
warn!(error = %err, "failed to register remote exec-server environment");
emit_remote_otel_event!(
WARN,
"codex.exec_server.remote_environment_registration_failed",
"failed to register remote exec-server environment"
);
return Err(err);
}
};
registration_span.record("result", "success");
config
.telemetry
.remote_registration_completed("success", registration_started_at.elapsed());
drop(registration_span);
eprintln!("codex exec-server remote environment registered");
info!("codex exec-server remote environment registered");
emit_remote_otel_event!(
INFO,
"codex.exec_server.remote_environment_registered",
"codex exec-server remote environment registered"
);
match connect_async(response.url.as_str()).await {
let websocket_connect_started_at = Instant::now();
let websocket_connect_span = remote_websocket_connect_span(connection_attempt + 1);
match connect_async(response.url.as_str())
.instrument(websocket_connect_span.clone())
.await
{
Ok((websocket, _)) => {
connection_attempt += 1;
websocket_connect_span.record("result", "success");
config.telemetry.remote_websocket_connect_completed(
"success",
websocket_connect_started_at.elapsed(),
);
drop(websocket_connect_span);
let remote_websocket = config.telemetry.remote_websocket_connected();
info!(
attempt = connection_attempt,
"connected remote exec-server websocket"
);
emit_remote_otel_event!(
INFO,
"codex.exec_server.remote_websocket_connected",
attempt = connection_attempt,
"connected remote exec-server websocket"
);
backoff = Duration::from_secs(1);
run_multiplexed_environment(websocket, processor.clone()).await;
drop(remote_websocket);
config.telemetry.remote_websocket_reconnect("disconnected");
warn!(
attempt = connection_attempt,
"remote exec-server websocket disconnected; retrying"
);
emit_remote_otel_event!(
WARN,
"codex.exec_server.remote_websocket_disconnected",
attempt = connection_attempt,
"remote exec-server websocket disconnected; retrying"
);
}
Err(err) => {
warn!("failed to connect remote exec-server websocket: {err}");
connection_attempt += 1;
websocket_connect_span.record("result", "error");
config.telemetry.remote_websocket_connect_completed(
"error",
websocket_connect_started_at.elapsed(),
);
drop(websocket_connect_span);
warn!(
attempt = connection_attempt,
error = %err,
"failed to connect remote exec-server websocket"
);
emit_remote_otel_event!(
WARN,
"codex.exec_server.remote_websocket_connect_failed",
attempt = connection_attempt,
"failed to connect remote exec-server websocket"
);
config
.telemetry
.remote_websocket_reconnect("connect_failed");
}
}
let backoff_ms = backoff.as_millis();
info!(
attempt = connection_attempt,
backoff_ms, "retrying remote exec-server websocket"
);
emit_remote_otel_event!(
INFO,
"codex.exec_server.remote_websocket_retrying",
attempt = connection_attempt,
backoff_ms,
"retrying remote exec-server websocket"
);
sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(30));
}
}
fn remote_registration_span() -> tracing::Span {
tracing::info_span!(
"codex.exec_server.remote.register",
otel.kind = "client",
otel.name = "codex.exec_server.remote.register",
result = tracing::field::Empty,
)
}
fn remote_websocket_connect_span(attempt: u32) -> tracing::Span {
tracing::info_span!(
"codex.exec_server.remote.websocket.connect",
otel.kind = "client",
otel.name = "codex.exec_server.remote.websocket.connect",
attempt,
result = tracing::field::Empty,
)
}
fn normalize_environment_id(environment_id: String) -> Result<String, ExecServerError> {
let environment_id = environment_id.trim().to_string();
if environment_id.is_empty() {
@@ -248,7 +400,12 @@ mod tests {
use codex_api::AuthProvider;
use http::HeaderMap;
use http::HeaderValue;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::InMemorySpanExporter;
use opentelemetry_sdk::trace::SdkTracerProvider;
use pretty_assertions::assert_eq;
use tracing_subscriber::filter::filter_fn;
use tracing_subscriber::prelude::*;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
@@ -363,4 +520,77 @@ mod tests {
assert!(debug.contains("<redacted>"));
assert!(!debug.contains("workspace-123"));
}
#[test]
fn remote_otel_events_finish_trace_spans_immediately() {
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("exec-server-test");
let subscriber = tracing_subscriber::registry().with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(codex_otel::OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
emit_remote_otel_event!(
INFO,
"codex.exec_server.remote_environment_registered",
"codex exec-server remote environment registered"
);
});
tracer_provider.force_flush().expect("flush traces");
let spans = span_exporter.get_finished_spans().expect("span export");
assert!(
spans.iter().any(|span| {
span.name.as_ref() == "codex.exec_server.remote_environment_registered"
}),
"expected finished remote OTEL lifecycle span, got {spans:?}"
);
}
#[test]
fn remote_operation_spans_finish_immediately() {
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("exec-server-test");
let subscriber = tracing_subscriber::registry().with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(codex_otel::OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let registration_span = remote_registration_span();
registration_span.in_scope(|| {});
registration_span.record("result", "success");
drop(registration_span);
let websocket_span = remote_websocket_connect_span(/*attempt*/ 2);
websocket_span.in_scope(|| {});
websocket_span.record("result", "success");
drop(websocket_span);
});
tracer_provider.force_flush().expect("flush traces");
let spans = span_exporter.get_finished_spans().expect("span export");
assert!(
spans
.iter()
.any(|span| span.name.as_ref() == "codex.exec_server.remote.register"),
"expected finished remote registration span, got {spans:?}"
);
assert!(
spans
.iter()
.any(|span| { span.name.as_ref() == "codex.exec_server.remote.websocket.connect" }),
"expected finished remote websocket connect span, got {spans:?}"
);
}
}

View File

@@ -12,10 +12,19 @@ pub use transport::DEFAULT_LISTEN_URL;
pub use transport::ExecServerListenUrlParseError;
use crate::ExecServerRuntimePaths;
use crate::ExecServerTelemetry;
pub async fn run_main(
listen_url: &str,
runtime_paths: ExecServerRuntimePaths,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
transport::run_transport(listen_url, runtime_paths).await
run_main_with_telemetry(listen_url, runtime_paths, ExecServerTelemetry::default()).await
}
pub async fn run_main_with_telemetry(
listen_url: &str,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
transport::run_transport(listen_url, runtime_paths, telemetry).await
}

View File

@@ -77,7 +77,7 @@ fn test_runtime_paths() -> ExecServerRuntimePaths {
async fn initialized_handler() -> Arc<ExecServerHandler> {
let (outgoing_tx, _outgoing_rx) = mpsc::channel(16);
let registry = SessionRegistry::new();
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
let handler = Arc::new(ExecServerHandler::new(
registry,
RpcNotificationSender::new(outgoing_tx),
@@ -155,7 +155,7 @@ async fn terminate_reports_false_after_process_exit() {
#[tokio::test]
async fn long_poll_read_fails_after_session_resume() {
let (first_tx, _first_rx) = mpsc::channel(16);
let registry = SessionRegistry::new();
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
let first_handler = Arc::new(ExecServerHandler::new(
Arc::clone(&registry),
RpcNotificationSender::new(first_tx),
@@ -228,7 +228,7 @@ async fn long_poll_read_fails_after_session_resume() {
#[tokio::test]
async fn active_session_resume_is_rejected() {
let (first_tx, _first_rx) = mpsc::channel(16);
let registry = SessionRegistry::new();
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
let first_handler = Arc::new(ExecServerHandler::new(
Arc::clone(&registry),
RpcNotificationSender::new(first_tx),
@@ -272,7 +272,7 @@ async fn active_session_resume_is_rejected() {
async fn output_and_exit_are_retained_after_notification_receiver_closes() {
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let handler = Arc::new(ExecServerHandler::new(
SessionRegistry::new(),
SessionRegistry::new(crate::ExecServerTelemetry::default()),
RpcNotificationSender::new(outgoing_tx),
test_runtime_paths(),
));

View File

@@ -10,6 +10,7 @@ use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
use crate::protocol::WriteResponse;
use crate::rpc::RpcNotificationSender;
use crate::telemetry::ExecServerTelemetry;
#[derive(Clone)]
pub(crate) struct ProcessHandler {
@@ -17,9 +18,12 @@ pub(crate) struct ProcessHandler {
}
impl ProcessHandler {
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
pub(crate) fn new(
notifications: RpcNotificationSender,
telemetry: ExecServerTelemetry,
) -> Self {
Self {
process: LocalProcess::new(notifications),
process: LocalProcess::new(notifications, telemetry),
}
}

View File

@@ -1,6 +1,8 @@
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::Instrument;
use tracing::debug;
use tracing::warn;
@@ -16,26 +18,39 @@ use crate::rpc::method_not_found;
use crate::server::ExecServerHandler;
use crate::server::registry::build_router;
use crate::server::session_registry::SessionRegistry;
use crate::telemetry::ConnectionTransport;
use crate::telemetry::ExecServerTelemetry;
#[derive(Clone)]
pub(crate) struct ConnectionProcessor {
session_registry: Arc<SessionRegistry>,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
}
impl ConnectionProcessor {
pub(crate) fn new(runtime_paths: ExecServerRuntimePaths) -> Self {
pub(crate) fn new(
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Self {
Self {
session_registry: SessionRegistry::new(),
session_registry: SessionRegistry::new(telemetry.clone()),
runtime_paths,
telemetry,
}
}
pub(crate) async fn run_connection(&self, connection: JsonRpcConnection) {
pub(crate) async fn run_connection(
&self,
connection: JsonRpcConnection,
transport: ConnectionTransport,
) {
run_connection(
connection,
Arc::clone(&self.session_registry),
self.runtime_paths.clone(),
self.telemetry.clone(),
transport,
)
.await;
}
@@ -45,7 +60,10 @@ async fn run_connection(
connection: JsonRpcConnection,
session_registry: Arc<SessionRegistry>,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
transport: ConnectionTransport,
) {
let _connection_metrics = telemetry.connection_started(transport);
let router = Arc::new(build_router());
let JsonRpcConnection {
outgoing_tx: json_outgoing_tx,
@@ -100,31 +118,50 @@ async fn run_connection(
}
JsonRpcConnectionEvent::Message(message) => match message {
codex_app_server_protocol::JSONRPCMessage::Request(request) => {
let method = request_method(request.method.as_str());
let request_span = request_span(method);
let request_started_at = Instant::now();
if let Some(route) = router.request_route(request.method.as_str()) {
let message = tokio::select! {
message = route(Arc::clone(&handler), request) => message,
message = route(Arc::clone(&handler), request).instrument(request_span.clone()) => message,
_ = disconnected_rx.changed() => {
request_span.record("result", "disconnected");
telemetry.request_completed(
method,
"disconnected",
request_started_at.elapsed(),
);
drop(request_span);
debug!("exec-server transport disconnected while handling request");
break;
}
};
let result = request_result(&message);
request_span.record("result", result);
telemetry.request_completed(method, result, request_started_at.elapsed());
drop(request_span);
if let Some(message) = message
&& outgoing_tx.send(message).await.is_err()
{
break;
}
} else if outgoing_tx
.send(RpcServerOutboundMessage::Error {
request_id: request.id,
error: method_not_found(format!(
"exec-server stub does not implement `{}` yet",
request.method
)),
})
.await
.is_err()
{
break;
} else {
request_span.record("result", "error");
telemetry.request_completed(method, "error", request_started_at.elapsed());
drop(request_span);
if outgoing_tx
.send(RpcServerOutboundMessage::Error {
request_id: request.id,
error: method_not_found(format!(
"exec-server stub does not implement `{}` yet",
request.method
)),
})
.await
.is_err()
{
break;
}
}
}
codex_app_server_protocol::JSONRPCMessage::Notification(notification) => {
@@ -184,6 +221,45 @@ async fn run_connection(
let _ = outbound_task.await;
}
fn request_method(method: &str) -> &'static str {
match method {
crate::protocol::INITIALIZE_METHOD => crate::protocol::INITIALIZE_METHOD,
crate::protocol::EXEC_METHOD => crate::protocol::EXEC_METHOD,
crate::protocol::EXEC_READ_METHOD => crate::protocol::EXEC_READ_METHOD,
crate::protocol::EXEC_WRITE_METHOD => crate::protocol::EXEC_WRITE_METHOD,
crate::protocol::EXEC_TERMINATE_METHOD => crate::protocol::EXEC_TERMINATE_METHOD,
crate::protocol::FS_READ_FILE_METHOD => crate::protocol::FS_READ_FILE_METHOD,
crate::protocol::FS_WRITE_FILE_METHOD => crate::protocol::FS_WRITE_FILE_METHOD,
crate::protocol::FS_CREATE_DIRECTORY_METHOD => crate::protocol::FS_CREATE_DIRECTORY_METHOD,
crate::protocol::FS_GET_METADATA_METHOD => crate::protocol::FS_GET_METADATA_METHOD,
crate::protocol::FS_READ_DIRECTORY_METHOD => crate::protocol::FS_READ_DIRECTORY_METHOD,
crate::protocol::FS_REMOVE_METHOD => crate::protocol::FS_REMOVE_METHOD,
crate::protocol::FS_COPY_METHOD => crate::protocol::FS_COPY_METHOD,
crate::protocol::HTTP_REQUEST_METHOD => crate::protocol::HTTP_REQUEST_METHOD,
_ => "unknown",
}
}
fn request_span(method: &'static str) -> tracing::Span {
tracing::info_span!(
"codex.exec_server.request",
otel.kind = "server",
otel.name = method,
method,
result = tracing::field::Empty,
)
}
fn request_result(message: &Option<RpcServerOutboundMessage>) -> &'static str {
match message {
Some(RpcServerOutboundMessage::Error { .. }) => "error",
Some(
RpcServerOutboundMessage::Response { .. } | RpcServerOutboundMessage::Notification(_),
)
| None => "success",
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
@@ -195,6 +271,10 @@ mod tests {
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::InMemorySpanExporter;
use opentelemetry_sdk::trace::SdkTracerProvider;
use pretty_assertions::assert_eq;
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::io::AsyncBufReadExt;
@@ -205,7 +285,11 @@ mod tests {
use tokio::io::duplex;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tracing_subscriber::filter::filter_fn;
use tracing_subscriber::prelude::*;
use super::request_method;
use super::request_span;
use super::run_connection;
use crate::ExecServerRuntimePaths;
use crate::ProcessId;
@@ -223,10 +307,58 @@ mod tests {
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::server::session_registry::SessionRegistry;
use crate::telemetry::runtime_span;
#[test]
fn request_method_uses_bounded_protocol_method_names() {
assert_eq!(request_method(EXEC_TERMINATE_METHOD), EXEC_TERMINATE_METHOD);
assert_eq!(request_method("custom/method"), "unknown");
}
#[test]
fn runtime_and_request_spans_are_exported() {
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("exec-server-test");
let subscriber = tracing_subscriber::registry().with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(codex_otel::OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let runtime_span = runtime_span();
runtime_span.in_scope(|| {
let request_span = request_span(INITIALIZE_METHOD);
request_span.in_scope(|| {});
request_span.record("result", "success");
drop(request_span);
});
drop(runtime_span);
});
tracer_provider.force_flush().expect("flush traces");
let spans = span_exporter.get_finished_spans().expect("span export");
assert!(
spans
.iter()
.any(|span| span.name.as_ref() == "codex.exec_server"),
"expected runtime span, got {spans:?}"
);
assert!(
spans
.iter()
.any(|span| span.name.as_ref() == INITIALIZE_METHOD),
"expected initialize request span, got {spans:?}"
);
}
#[tokio::test]
async fn transport_disconnect_detaches_session_during_in_flight_read() {
let registry = SessionRegistry::new();
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
let (mut first_writer, mut first_lines, first_task) =
spawn_test_connection(Arc::clone(&registry), "first");
@@ -322,7 +454,13 @@ mod tests {
let (server_writer, client_reader) = duplex(1 << 20);
let connection =
JsonRpcConnection::from_stdio(server_reader, server_writer, label.to_string());
let task = tokio::spawn(run_connection(connection, registry, test_runtime_paths()));
let task = tokio::spawn(run_connection(
connection,
registry,
test_runtime_paths(),
crate::ExecServerTelemetry::default(),
crate::telemetry::ConnectionTransport::Stdio,
));
(client_writer, BufReader::new(client_reader).lines(), task)
}

View File

@@ -10,6 +10,7 @@ use uuid::Uuid;
use crate::rpc::RpcNotificationSender;
use crate::rpc::invalid_request;
use crate::server::process_handler::ProcessHandler;
use crate::telemetry::ExecServerTelemetry;
#[cfg(test)]
const DETACHED_SESSION_TTL: Duration = Duration::from_millis(200);
@@ -18,6 +19,7 @@ const DETACHED_SESSION_TTL: Duration = Duration::from_secs(10);
pub(crate) struct SessionRegistry {
sessions: Mutex<HashMap<String, Arc<SessionEntry>>>,
telemetry: ExecServerTelemetry,
}
struct SessionEntry {
@@ -49,9 +51,10 @@ pub(crate) struct SessionHandle {
}
impl SessionRegistry {
pub(crate) fn new() -> Arc<Self> {
pub(crate) fn new(telemetry: ExecServerTelemetry) -> Arc<Self> {
Arc::new(Self {
sessions: Mutex::new(HashMap::new()),
telemetry,
})
}
@@ -94,7 +97,7 @@ impl SessionRegistry {
let session_id = Uuid::new_v4().to_string();
let entry = Arc::new(SessionEntry::new(
session_id.clone(),
ProcessHandler::new(notifications),
ProcessHandler::new(notifications, self.telemetry.clone()),
connection_id,
));
sessions.insert(session_id, Arc::clone(&entry));
@@ -140,6 +143,7 @@ impl Default for SessionRegistry {
fn default() -> Self {
Self {
sessions: Mutex::new(HashMap::new()),
telemetry: ExecServerTelemetry::default(),
}
}
}

View File

@@ -22,8 +22,10 @@ use tracing::info;
use tracing::warn;
use crate::ExecServerRuntimePaths;
use crate::ExecServerTelemetry;
use crate::connection::JsonRpcConnection;
use crate::server::processor::ConnectionProcessor;
use crate::telemetry::ConnectionTransport;
pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
@@ -80,38 +82,40 @@ pub(crate) fn parse_listen_url(
pub(crate) async fn run_transport(
listen_url: &str,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match parse_listen_url(listen_url)? {
ExecServerListenTransport::WebSocket(bind_address) => {
run_websocket_listener(bind_address, runtime_paths).await
run_websocket_listener(bind_address, runtime_paths, telemetry).await
}
ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths).await,
ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths, telemetry).await,
}
}
async fn run_stdio_connection(
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
run_stdio_connection_with_io(io::stdin(), io::stdout(), runtime_paths).await
run_stdio_connection_with_io(io::stdin(), io::stdout(), runtime_paths, telemetry).await
}
async fn run_stdio_connection_with_io<R, W>(
reader: R,
writer: W,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let processor = ConnectionProcessor::new(runtime_paths);
let processor = ConnectionProcessor::new(runtime_paths, telemetry);
tracing::info!("codex-exec-server listening on stdio");
processor
.run_connection(JsonRpcConnection::from_stdio(
reader,
writer,
"exec-server stdio".to_string(),
))
.run_connection(
JsonRpcConnection::from_stdio(reader, writer, "exec-server stdio".to_string()),
ConnectionTransport::Stdio,
)
.await;
Ok(())
}
@@ -119,10 +123,11 @@ where
async fn run_websocket_listener(
bind_address: SocketAddr,
runtime_paths: ExecServerRuntimePaths,
telemetry: ExecServerTelemetry,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind(bind_address).await?;
let local_addr = listener.local_addr()?;
let processor = ConnectionProcessor::new(runtime_paths);
let processor = ConnectionProcessor::new(runtime_paths, telemetry);
info!("codex-exec-server listening on ws://{local_addr}");
println!("ws://{local_addr}");
std::io::stdout().flush()?;
@@ -174,10 +179,13 @@ async fn websocket_upgrade_handler(
websocket.on_upgrade(move |stream| async move {
state
.processor
.run_connection(JsonRpcConnection::from_axum_websocket(
stream,
format!("exec-server websocket {peer_addr}"),
))
.run_connection(
JsonRpcConnection::from_axum_websocket(
stream,
format!("exec-server websocket {peer_addr}"),
),
ConnectionTransport::WebSocket,
)
.await;
})
}

View File

@@ -6,6 +6,9 @@ use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_otel::MetricsConfig;
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
use opentelemetry_sdk::metrics::data::ScopeMetrics;
use pretty_assertions::assert_eq;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
@@ -61,6 +64,7 @@ async fn stdio_listen_transport_serves_initialize() {
server_reader,
server_writer,
test_runtime_paths(),
crate::ExecServerTelemetry::default(),
));
let mut client_lines = BufReader::new(client_reader).lines();
@@ -111,6 +115,76 @@ async fn stdio_listen_transport_serves_initialize() {
.expect("stdio transport should not fail");
}
#[tokio::test]
async fn stdio_listen_transport_emits_connection_and_request_metrics() {
let exporter = InMemoryMetricExporter::default();
let metrics = codex_otel::MetricsClient::new(MetricsConfig::in_memory(
"test",
"codex-exec-server",
env!("CARGO_PKG_VERSION"),
exporter.clone(),
))
.expect("metrics");
let telemetry = crate::ExecServerTelemetry::new(Some(metrics.clone()));
let (mut client_writer, server_reader) = duplex(1 << 20);
let (server_writer, client_reader) = duplex(1 << 20);
let server_task = tokio::spawn(run_stdio_connection_with_io(
server_reader,
server_writer,
test_runtime_paths(),
telemetry,
));
let mut client_lines = BufReader::new(client_reader).lines();
write_jsonrpc_line(
&mut client_writer,
&JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(1),
method: INITIALIZE_METHOD.to_string(),
params: Some(
serde_json::to_value(InitializeParams {
client_name: "exec-server-metrics-test".to_string(),
resume_session_id: None,
})
.expect("initialize params should serialize"),
),
trace: None,
}),
)
.await;
let _response = timeout(Duration::from_secs(1), client_lines.next_line())
.await
.expect("initialize response should arrive")
.expect("initialize response read should succeed")
.expect("initialize response should be present");
drop(client_writer);
drop(client_lines);
timeout(Duration::from_secs(1), server_task)
.await
.expect("stdio transport should finish after client disconnect")
.expect("stdio transport task should join")
.expect("stdio transport should not fail");
metrics.shutdown().expect("shutdown metrics");
let metric_names = exporter
.get_finished_metrics()
.expect("finished metrics")
.into_iter()
.flat_map(|metrics| {
metrics
.scope_metrics()
.flat_map(ScopeMetrics::metrics)
.map(|metric| metric.name().to_string())
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
assert!(metric_names.contains(&"exec_server_connections_active".to_string()));
assert!(metric_names.contains(&"exec_server_connections_total".to_string()));
assert!(metric_names.contains(&"exec_server_requests_total".to_string()));
assert!(metric_names.contains(&"exec_server_request_duration_seconds".to_string()));
}
#[test]
fn parse_listen_url_accepts_websocket_url() {
let transport =

View File

@@ -0,0 +1,608 @@
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use codex_otel::MetricsClient;
use tracing::warn;
const CONNECTIONS_ACTIVE_METRIC: &str = "exec_server_connections_active";
const CONNECTIONS_ACTIVE_DESCRIPTION: &str = "Number of active exec-server connections.";
const CONNECTIONS_TOTAL_METRIC: &str = "exec_server_connections_total";
const CONNECTIONS_TOTAL_DESCRIPTION: &str = "Total number of accepted exec-server connections.";
const REMOTE_REGISTRATION_TOTAL_METRIC: &str = "exec_server_remote_registration_total";
const REMOTE_REGISTRATION_TOTAL_DESCRIPTION: &str =
"Total number of remote exec-server registration attempts.";
const REMOTE_REGISTRATION_DURATION_METRIC: &str =
"exec_server_remote_registration_duration_seconds";
const REMOTE_REGISTRATION_DURATION_DESCRIPTION: &str =
"Duration of remote exec-server registration attempts in seconds.";
const REMOTE_WEBSOCKET_ACTIVE_METRIC: &str = "exec_server_remote_websocket_active";
const REMOTE_WEBSOCKET_ACTIVE_DESCRIPTION: &str =
"Number of active remote exec-server WebSocket connections.";
const REMOTE_WEBSOCKET_CONNECT_TOTAL_METRIC: &str = "exec_server_remote_websocket_connect_total";
const REMOTE_WEBSOCKET_CONNECT_TOTAL_DESCRIPTION: &str =
"Total number of remote exec-server WebSocket connection attempts.";
const REMOTE_WEBSOCKET_CONNECT_DURATION_METRIC: &str =
"exec_server_remote_websocket_connect_duration_seconds";
const REMOTE_WEBSOCKET_CONNECT_DURATION_DESCRIPTION: &str =
"Duration of remote exec-server WebSocket connection attempts in seconds.";
const REMOTE_WEBSOCKET_RECONNECTS_METRIC: &str = "exec_server_remote_websocket_reconnects_total";
const REMOTE_WEBSOCKET_RECONNECTS_DESCRIPTION: &str =
"Total number of remote exec-server WebSocket reconnects.";
const REQUESTS_TOTAL_METRIC: &str = "exec_server_requests_total";
const REQUESTS_TOTAL_DESCRIPTION: &str = "Total number of exec-server requests.";
const REQUEST_DURATION_METRIC: &str = "exec_server_request_duration_seconds";
const REQUEST_DURATION_DESCRIPTION: &str = "Duration of exec-server requests in seconds.";
const PROCESSES_ACTIVE_METRIC: &str = "exec_server_processes_active";
const PROCESSES_ACTIVE_DESCRIPTION: &str = "Number of active exec-server processes.";
const PROCESSES_FINISHED_TOTAL_METRIC: &str = "exec_server_processes_finished_total";
const PROCESSES_FINISHED_TOTAL_DESCRIPTION: &str =
"Total number of finished exec-server processes.";
const PROCESS_DURATION_METRIC: &str = "exec_server_process_duration_seconds";
const PROCESS_DURATION_DESCRIPTION: &str = "Duration of exec-server processes in seconds.";
pub fn runtime_span() -> tracing::Span {
tracing::info_span!("codex.exec_server", otel.kind = "internal")
}
#[derive(Clone, Copy)]
pub(crate) enum ConnectionTransport {
Relay,
Stdio,
WebSocket,
}
impl ConnectionTransport {
fn metric_tag(self) -> &'static str {
match self {
Self::Relay => "relay",
Self::Stdio => "stdio",
Self::WebSocket => "websocket",
}
}
}
#[derive(Clone, Default)]
pub struct ExecServerTelemetry {
inner: Option<Arc<ExecServerTelemetryInner>>,
}
struct ExecServerTelemetryInner {
metrics: MetricsClient,
relay_connections: AtomicI64,
stdio_connections: AtomicI64,
websocket_connections: AtomicI64,
remote_websockets: AtomicI64,
active_processes: AtomicI64,
}
pub(crate) struct ConnectionMetricGuard {
telemetry: ExecServerTelemetry,
transport: ConnectionTransport,
}
pub(crate) struct RemoteWebSocketMetricGuard {
telemetry: ExecServerTelemetry,
}
impl ExecServerTelemetry {
pub fn new(metrics: Option<MetricsClient>) -> Self {
Self {
inner: metrics.map(|metrics| {
Arc::new(ExecServerTelemetryInner {
metrics,
relay_connections: AtomicI64::new(0),
stdio_connections: AtomicI64::new(0),
websocket_connections: AtomicI64::new(0),
remote_websockets: AtomicI64::new(0),
active_processes: AtomicI64::new(0),
})
}),
}
}
pub(crate) fn connection_started(
&self,
transport: ConnectionTransport,
) -> ConnectionMetricGuard {
self.with_inner(|inner| {
let active = inner
.connection_counter(transport)
.fetch_add(1, Ordering::AcqRel)
+ 1;
inner.gauge(
CONNECTIONS_ACTIVE_METRIC,
CONNECTIONS_ACTIVE_DESCRIPTION,
active,
&[("transport", transport.metric_tag())],
);
inner.counter(
CONNECTIONS_TOTAL_METRIC,
CONNECTIONS_TOTAL_DESCRIPTION,
&[
("transport", transport.metric_tag()),
("result", "accepted"),
],
);
});
ConnectionMetricGuard {
telemetry: self.clone(),
transport,
}
}
pub(crate) fn remote_registration_completed(&self, result: &'static str, duration: Duration) {
self.with_inner(|inner| {
let tags = [("result", result)];
inner.counter(
REMOTE_REGISTRATION_TOTAL_METRIC,
REMOTE_REGISTRATION_TOTAL_DESCRIPTION,
&tags,
);
inner.duration(
REMOTE_REGISTRATION_DURATION_METRIC,
REMOTE_REGISTRATION_DURATION_DESCRIPTION,
duration,
&tags,
);
});
}
pub(crate) fn remote_websocket_connected(&self) -> RemoteWebSocketMetricGuard {
self.with_inner(|inner| {
let active = inner.remote_websockets.fetch_add(1, Ordering::AcqRel) + 1;
inner.gauge(
REMOTE_WEBSOCKET_ACTIVE_METRIC,
REMOTE_WEBSOCKET_ACTIVE_DESCRIPTION,
active,
&[],
);
});
RemoteWebSocketMetricGuard {
telemetry: self.clone(),
}
}
pub(crate) fn remote_websocket_connect_completed(
&self,
result: &'static str,
duration: Duration,
) {
self.with_inner(|inner| {
let tags = [("result", result)];
inner.counter(
REMOTE_WEBSOCKET_CONNECT_TOTAL_METRIC,
REMOTE_WEBSOCKET_CONNECT_TOTAL_DESCRIPTION,
&tags,
);
inner.duration(
REMOTE_WEBSOCKET_CONNECT_DURATION_METRIC,
REMOTE_WEBSOCKET_CONNECT_DURATION_DESCRIPTION,
duration,
&tags,
);
});
}
pub(crate) fn request_completed(
&self,
method: &'static str,
result: &'static str,
duration: Duration,
) {
self.with_inner(|inner| {
let tags = [("method", method), ("result", result)];
inner.counter(REQUESTS_TOTAL_METRIC, REQUESTS_TOTAL_DESCRIPTION, &tags);
inner.duration(
REQUEST_DURATION_METRIC,
REQUEST_DURATION_DESCRIPTION,
duration,
&tags,
);
});
}
pub(crate) fn process_started(&self) {
self.with_inner(|inner| {
let active = inner.active_processes.fetch_add(1, Ordering::AcqRel) + 1;
inner.gauge(
PROCESSES_ACTIVE_METRIC,
PROCESSES_ACTIVE_DESCRIPTION,
active,
&[],
);
});
}
pub(crate) fn process_finished(&self, result: &'static str, duration: Duration) {
self.with_inner(|inner| {
let active = inner.active_processes.fetch_sub(1, Ordering::AcqRel) - 1;
inner.gauge(
PROCESSES_ACTIVE_METRIC,
PROCESSES_ACTIVE_DESCRIPTION,
active,
&[],
);
inner.counter(
PROCESSES_FINISHED_TOTAL_METRIC,
PROCESSES_FINISHED_TOTAL_DESCRIPTION,
&[("result", result)],
);
inner.duration(
PROCESS_DURATION_METRIC,
PROCESS_DURATION_DESCRIPTION,
duration,
&[("result", result)],
);
});
}
pub(crate) fn remote_websocket_reconnect(&self, reason: &'static str) {
self.with_inner(|inner| {
inner.counter(
REMOTE_WEBSOCKET_RECONNECTS_METRIC,
REMOTE_WEBSOCKET_RECONNECTS_DESCRIPTION,
&[("reason", reason)],
);
});
}
fn connection_finished(&self, transport: ConnectionTransport) {
self.with_inner(|inner| {
let active = inner
.connection_counter(transport)
.fetch_sub(1, Ordering::AcqRel)
- 1;
inner.gauge(
CONNECTIONS_ACTIVE_METRIC,
CONNECTIONS_ACTIVE_DESCRIPTION,
active,
&[("transport", transport.metric_tag())],
);
});
}
fn remote_websocket_disconnected(&self) {
self.with_inner(|inner| {
let active = inner.remote_websockets.fetch_sub(1, Ordering::AcqRel) - 1;
inner.gauge(
REMOTE_WEBSOCKET_ACTIVE_METRIC,
REMOTE_WEBSOCKET_ACTIVE_DESCRIPTION,
active,
&[],
);
});
}
fn with_inner(&self, emit: impl FnOnce(&ExecServerTelemetryInner)) {
if let Some(inner) = &self.inner {
emit(inner);
}
}
}
impl Drop for ConnectionMetricGuard {
fn drop(&mut self) {
self.telemetry.connection_finished(self.transport);
}
}
impl Drop for RemoteWebSocketMetricGuard {
fn drop(&mut self) {
self.telemetry.remote_websocket_disconnected();
}
}
impl ExecServerTelemetryInner {
fn connection_counter(&self, transport: ConnectionTransport) -> &AtomicI64 {
match transport {
ConnectionTransport::Relay => &self.relay_connections,
ConnectionTransport::Stdio => &self.stdio_connections,
ConnectionTransport::WebSocket => &self.websocket_connections,
}
}
fn counter(&self, name: &str, description: &str, tags: &[(&str, &str)]) {
if self
.metrics
.counter_with_description(name, description, /*inc*/ 1, tags)
.is_err()
{
warn!(metric = name, "failed to emit exec-server counter");
}
}
fn duration(&self, name: &str, description: &str, duration: Duration, tags: &[(&str, &str)]) {
if self
.metrics
.record_duration_seconds_with_description(name, description, duration, tags)
.is_err()
{
warn!(metric = name, "failed to emit exec-server duration");
}
}
fn gauge(&self, name: &str, description: &str, value: i64, tags: &[(&str, &str)]) {
if self
.metrics
.gauge_with_description(name, description, value, tags)
.is_err()
{
warn!(metric = name, "failed to emit exec-server gauge");
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::time::Duration;
use codex_otel::MetricsConfig;
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
use opentelemetry_sdk::metrics::data::Metric;
use opentelemetry_sdk::metrics::data::MetricData;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn emits_bounded_exec_server_metrics() {
let exporter = InMemoryMetricExporter::default();
let metrics = codex_otel::MetricsClient::new(MetricsConfig::in_memory(
"test",
"codex-exec-server",
env!("CARGO_PKG_VERSION"),
exporter.clone(),
))
.expect("metrics");
let telemetry = ExecServerTelemetry::new(Some(metrics.clone()));
let connection = telemetry.connection_started(ConnectionTransport::WebSocket);
telemetry.remote_registration_completed("success", Duration::from_millis(5));
let remote_websocket = telemetry.remote_websocket_connected();
telemetry.remote_websocket_connect_completed("success", Duration::from_millis(7));
telemetry.request_completed("process/start", "success", Duration::from_millis(12));
telemetry.process_started();
telemetry.process_finished("success", Duration::from_millis(34));
telemetry.remote_websocket_reconnect("connect_failed");
drop(remote_websocket);
drop(connection);
metrics.shutdown().expect("shutdown metrics");
let metrics = latest_metrics(&exporter);
assert_eq!(
metric_points(&metrics, "exec_server_connections_total"),
vec![(
1.0,
BTreeMap::from([
("result".to_string(), "accepted".to_string()),
("transport".to_string(), "websocket".to_string()),
]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server_connections_active"),
vec![(
0.0,
BTreeMap::from([("transport".to_string(), "websocket".to_string())]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server_remote_registration_total"),
vec![(
1.0,
BTreeMap::from([("result".to_string(), "success".to_string())]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server_remote_websocket_connect_total"),
vec![(
1.0,
BTreeMap::from([("result".to_string(), "success".to_string())]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server_remote_websocket_active"),
vec![(0.0, BTreeMap::new())]
);
assert_eq!(
metric_points(&metrics, "exec_server_requests_total"),
vec![(
1.0,
BTreeMap::from([
("method".to_string(), "process/start".to_string()),
("result".to_string(), "success".to_string()),
]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server_processes_active"),
vec![(0.0, BTreeMap::new())]
);
assert_eq!(
metric_points(&metrics, "exec_server_processes_finished_total"),
vec![(
1.0,
BTreeMap::from([("result".to_string(), "success".to_string())]),
)]
);
assert_eq!(
metric_points(&metrics, "exec_server_remote_websocket_reconnects_total"),
vec![(
1.0,
BTreeMap::from([("reason".to_string(), "connect_failed".to_string())]),
)]
);
assert_eq!(
histogram_count(&metrics, "exec_server_remote_registration_duration_seconds"),
1
);
assert_eq!(
histogram_count(
&metrics,
"exec_server_remote_websocket_connect_duration_seconds"
),
1
);
assert_eq!(
histogram_count(&metrics, "exec_server_request_duration_seconds"),
1
);
assert_eq!(
histogram_count(&metrics, "exec_server_process_duration_seconds"),
1
);
assert_eq!(
histogram_sum(&metrics, "exec_server_request_duration_seconds"),
0.012
);
for (name, description, unit) in [
(
"exec_server_connections_active",
CONNECTIONS_ACTIVE_DESCRIPTION,
"",
),
(
"exec_server_connections_total",
CONNECTIONS_TOTAL_DESCRIPTION,
"",
),
(
"exec_server_remote_registration_total",
REMOTE_REGISTRATION_TOTAL_DESCRIPTION,
"",
),
(
"exec_server_remote_registration_duration_seconds",
REMOTE_REGISTRATION_DURATION_DESCRIPTION,
"s",
),
(
"exec_server_remote_websocket_active",
REMOTE_WEBSOCKET_ACTIVE_DESCRIPTION,
"",
),
(
"exec_server_remote_websocket_connect_total",
REMOTE_WEBSOCKET_CONNECT_TOTAL_DESCRIPTION,
"",
),
(
"exec_server_remote_websocket_connect_duration_seconds",
REMOTE_WEBSOCKET_CONNECT_DURATION_DESCRIPTION,
"s",
),
(
"exec_server_remote_websocket_reconnects_total",
REMOTE_WEBSOCKET_RECONNECTS_DESCRIPTION,
"",
),
("exec_server_requests_total", REQUESTS_TOTAL_DESCRIPTION, ""),
(
"exec_server_request_duration_seconds",
REQUEST_DURATION_DESCRIPTION,
"s",
),
(
"exec_server_processes_active",
PROCESSES_ACTIVE_DESCRIPTION,
"",
),
(
"exec_server_processes_finished_total",
PROCESSES_FINISHED_TOTAL_DESCRIPTION,
"",
),
(
"exec_server_process_duration_seconds",
PROCESS_DURATION_DESCRIPTION,
"s",
),
] {
assert_metric_metadata(&metrics, name, description, unit);
}
}
fn latest_metrics(exporter: &InMemoryMetricExporter) -> ResourceMetrics {
exporter
.get_finished_metrics()
.expect("finished metrics")
.into_iter()
.last()
.expect("metrics export")
}
fn find_metric<'a>(resource_metrics: &'a ResourceMetrics, name: &str) -> &'a Metric {
resource_metrics
.scope_metrics()
.flat_map(opentelemetry_sdk::metrics::data::ScopeMetrics::metrics)
.find(|metric| metric.name() == name)
.unwrap_or_else(|| panic!("metric {name} missing"))
}
fn metric_points(
resource_metrics: &ResourceMetrics,
name: &str,
) -> Vec<(f64, BTreeMap<String, String>)> {
match find_metric(resource_metrics, name).data() {
AggregatedMetrics::I64(MetricData::Gauge(gauge)) => gauge
.data_points()
.map(|point| (point.value() as f64, attributes_to_map(point.attributes())))
.collect(),
AggregatedMetrics::U64(MetricData::Sum(sum)) => sum
.data_points()
.map(|point| (point.value() as f64, attributes_to_map(point.attributes())))
.collect(),
_ => panic!("unexpected metric data for {name}"),
}
}
fn histogram_count(resource_metrics: &ResourceMetrics, name: &str) -> u64 {
match find_metric(resource_metrics, name).data() {
AggregatedMetrics::F64(MetricData::Histogram(histogram)) => histogram
.data_points()
.map(opentelemetry_sdk::metrics::data::HistogramDataPoint::count)
.sum(),
_ => panic!("unexpected histogram data for {name}"),
}
}
fn histogram_sum(resource_metrics: &ResourceMetrics, name: &str) -> f64 {
match find_metric(resource_metrics, name).data() {
AggregatedMetrics::F64(MetricData::Histogram(histogram)) => histogram
.data_points()
.map(opentelemetry_sdk::metrics::data::HistogramDataPoint::sum)
.sum(),
_ => panic!("unexpected histogram data for {name}"),
}
}
fn assert_metric_metadata(
resource_metrics: &ResourceMetrics,
name: &str,
description: &str,
unit: &str,
) {
let metric = find_metric(resource_metrics, name);
assert_eq!(metric.description(), description);
assert_eq!(metric.unit(), unit);
}
fn attributes_to_map<'a>(
attributes: impl Iterator<Item = &'a KeyValue>,
) -> BTreeMap<String, String> {
attributes
.map(|attribute| {
(
attribute.key.as_str().to_string(),
attribute.value.as_str().to_string(),
)
})
.collect()
}
}

View File

@@ -16,7 +16,8 @@ pub(super) async fn seed_instructions(memory_root: &Path) -> std::io::Result<()>
.await
{
Ok(mut file) => {
tokio::io::AsyncWriteExt::write_all(&mut file, INSTRUCTIONS.as_bytes()).await
tokio::io::AsyncWriteExt::write_all(&mut file, INSTRUCTIONS.as_bytes()).await?;
tokio::io::AsyncWriteExt::flush(&mut file).await
}
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => Ok(()),
Err(err) => Err(err),

View File

@@ -12,6 +12,7 @@ use crate::metrics::validation::validate_tags;
use codex_utils_string::sanitize_metric_tag_value;
use opentelemetry::KeyValue;
use opentelemetry::metrics::Counter;
use opentelemetry::metrics::Gauge;
use opentelemetry::metrics::Histogram;
use opentelemetry::metrics::Meter;
use opentelemetry::metrics::MeterProvider as _;
@@ -42,8 +43,9 @@ use tracing::debug;
const ENV_ATTRIBUTE: &str = "env";
const METER_NAME: &str = "codex";
const DURATION_UNIT: &str = "ms";
const DURATION_DESCRIPTION: &str = "Duration in milliseconds.";
const MILLISECOND_DURATION_UNIT: &str = "ms";
const MILLISECOND_DURATION_DESCRIPTION: &str = "Duration in milliseconds.";
const SECOND_DURATION_UNIT: &str = "s";
#[derive(Clone, Debug)]
struct SharedManualReader {
@@ -83,6 +85,7 @@ struct MetricsClientInner {
meter_provider: SdkMeterProvider,
meter: Meter,
counters: Mutex<HashMap<String, Counter<u64>>>,
gauges: Mutex<HashMap<String, Gauge<i64>>>,
histograms: Mutex<HashMap<String, Histogram<f64>>>,
duration_histograms: Mutex<HashMap<String, Histogram<f64>>>,
runtime_reader: Option<Arc<ManualReader>>,
@@ -90,7 +93,13 @@ struct MetricsClientInner {
}
impl MetricsClientInner {
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) -> Result<()> {
fn counter(
&self,
name: &str,
description: Option<&str>,
inc: i64,
tags: &[(&str, &str)],
) -> Result<()> {
validate_metric_name(name)?;
if inc < 0 {
return Err(MetricsError::NegativeCounterIncrement {
@@ -104,9 +113,13 @@ impl MetricsClientInner {
.counters
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let counter = counters
.entry(name.to_string())
.or_insert_with(|| self.meter.u64_counter(name.to_string()).build());
let counter = counters.entry(name.to_string()).or_insert_with(|| {
let builder = self.meter.u64_counter(name.to_string());
match description {
Some(description) => builder.with_description(description.to_string()).build(),
None => builder.build(),
}
});
counter.add(inc as u64, &attributes);
Ok(())
}
@@ -126,7 +139,39 @@ impl MetricsClientInner {
Ok(())
}
fn duration_histogram(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> {
fn gauge(
&self,
name: &str,
description: Option<&str>,
value: i64,
tags: &[(&str, &str)],
) -> Result<()> {
validate_metric_name(name)?;
let attributes = self.attributes(tags)?;
let mut gauges = self
.gauges
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let gauge = gauges.entry(name.to_string()).or_insert_with(|| {
let builder = self.meter.i64_gauge(name.to_string());
match description {
Some(description) => builder.with_description(description.to_string()).build(),
None => builder.build(),
}
});
gauge.record(value, &attributes);
Ok(())
}
fn duration_histogram(
&self,
name: &str,
value: f64,
unit: &'static str,
description: &str,
tags: &[(&str, &str)],
) -> Result<()> {
validate_metric_name(name)?;
let attributes = self.attributes(tags)?;
@@ -137,11 +182,11 @@ impl MetricsClientInner {
let histogram = histograms.entry(name.to_string()).or_insert_with(|| {
self.meter
.f64_histogram(name.to_string())
.with_unit(DURATION_UNIT)
.with_description(DURATION_DESCRIPTION)
.with_unit(unit)
.with_description(description.to_string())
.build()
});
histogram.record(value as f64, &attributes);
histogram.record(value, &attributes);
Ok(())
}
@@ -233,6 +278,7 @@ impl MetricsClient {
meter_provider,
meter,
counters: Mutex::new(HashMap::new()),
gauges: Mutex::new(HashMap::new()),
histograms: Mutex::new(HashMap::new()),
duration_histograms: Mutex::new(HashMap::new()),
runtime_reader,
@@ -242,7 +288,18 @@ impl MetricsClient {
/// Send a single counter increment.
pub fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) -> Result<()> {
self.0.counter(name, inc, tags)
self.0.counter(name, /*description*/ None, inc, tags)
}
/// Send a single counter increment with an instrument description.
pub fn counter_with_description(
&self,
name: &str,
description: &str,
inc: i64,
tags: &[(&str, &str)],
) -> Result<()> {
self.0.counter(name, Some(description), inc, tags)
}
/// Send a single histogram sample.
@@ -250,6 +307,22 @@ impl MetricsClient {
self.0.histogram(name, value, tags)
}
/// Send a single gauge measurement.
pub fn gauge(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> {
self.0.gauge(name, /*description*/ None, value, tags)
}
/// Send a single gauge measurement with an instrument description.
pub fn gauge_with_description(
&self,
name: &str,
description: &str,
value: i64,
tags: &[(&str, &str)],
) -> Result<()> {
self.0.gauge(name, Some(description), value, tags)
}
/// Record a duration in milliseconds using a histogram.
pub fn record_duration(
&self,
@@ -259,7 +332,26 @@ impl MetricsClient {
) -> Result<()> {
self.0.duration_histogram(
name,
duration.as_millis().min(i64::MAX as u128) as i64,
duration.as_millis().min(i64::MAX as u128) as f64,
MILLISECOND_DURATION_UNIT,
MILLISECOND_DURATION_DESCRIPTION,
tags,
)
}
/// Record a duration in seconds using a histogram with an instrument description.
pub fn record_duration_seconds_with_description(
&self,
name: &str,
description: &str,
duration: Duration,
tags: &[(&str, &str)],
) -> Result<()> {
self.0.duration_histogram(
name,
duration.as_secs_f64(),
SECOND_DURATION_UNIT,
description,
tags,
)
}

View File

@@ -21,6 +21,7 @@ const KNOWN_ORIGINATOR_TAG_VALUES: &[&str] = &[
"none",
"codex_exec",
"codex-cli",
"codex-exec-server",
"codex_sdk_ts",
"codex-app-server-sdk",
];
@@ -80,6 +81,7 @@ mod tests {
use super::SERVICE_NAME_TAG;
use super::SESSION_SOURCE_TAG;
use super::SessionMetricTagValues;
use super::bounded_originator_tag_value;
use pretty_assertions::assert_eq;
#[test]
@@ -131,4 +133,12 @@ mod tests {
]
);
}
#[test]
fn bounded_originator_tag_value_accepts_exec_server() {
assert_eq!(
bounded_originator_tag_value("codex-exec-server"),
"codex-exec-server"
);
}
}

View File

@@ -186,6 +186,12 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
))?;
metrics.counter("codex.turns", /*inc*/ 1, &[("source", "test")])?;
metrics.gauge_with_description(
"exec_server_connections_active",
"Number of active exec-server connections.",
/*value*/ 1,
&[("transport", "websocket")],
)?;
metrics.shutdown()?;
server.join().expect("server join");
@@ -220,10 +226,119 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
"expected metric name not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
assert!(
body.contains("exec_server_connections_active"),
"expected exec-server gauge not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
assert!(
body.contains("websocket"),
"expected exec-server metric tag not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
Ok(())
}
#[test]
fn otlp_http_exporter_sends_logs_to_collector()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("local_addr");
listener.set_nonblocking(true).expect("set_nonblocking");
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
let server = thread::spawn(move || {
let mut captured = Vec::new();
let deadline = Instant::now() + Duration::from_secs(3);
while Instant::now() < deadline {
match listener.accept() {
Ok((mut stream, _)) => {
let result = read_http_request(&mut stream);
let _ = write_http_response(&mut stream, "202 Accepted");
if let Ok((path, headers, body)) = result {
captured.push(CapturedRequest {
path,
content_type: headers.get("content-type").cloned(),
body,
});
}
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
}
Err(_) => break,
}
}
let _ = tx.send(captured);
});
let otel = OtelProvider::from(&OtelSettings {
environment: "test".to_string(),
service_name: "codex-exec-server".to_string(),
service_version: env!("CARGO_PKG_VERSION").to_string(),
codex_home: PathBuf::from("."),
exporter: OtelExporter::OtlpHttp {
endpoint: format!("http://{addr}/v1/logs"),
headers: HashMap::new(),
protocol: OtelHttpProtocol::Json,
tls: None,
},
trace_exporter: OtelExporter::None,
metrics_exporter: OtelExporter::None,
runtime_metrics: false,
span_attributes: BTreeMap::new(),
tracestate: BTreeMap::new(),
})?
.expect("otel provider");
let logger_layer = otel.logger_layer().expect("logger layer");
let subscriber = tracing_subscriber::registry().with(logger_layer);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
tracing::event!(
target: "codex_otel.log_only",
tracing::Level::INFO,
event.name = "codex.exec_server.remote_environment_registered",
"codex exec-server remote environment registered"
);
});
otel.shutdown();
server.join().expect("server join");
let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured");
let request = captured
.iter()
.find(|req| req.path == "/v1/logs")
.unwrap_or_else(|| {
let paths = captured
.iter()
.map(|req| req.path.as_str())
.collect::<Vec<_>>()
.join(", ");
panic!("missing /v1/logs request; got {}: {paths}", captured.len());
});
let content_type = request
.content_type
.as_deref()
.unwrap_or("<missing content-type>");
assert!(
content_type.starts_with("application/json"),
"unexpected content-type: {content_type}"
);
let body = String::from_utf8_lossy(&request.body);
assert!(
body.contains("codex.exec_server.remote_environment_registered"),
"expected exec-server event not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
Ok(())
}
#[test]
fn otel_provider_rejects_header_unsafe_configured_tracestate() {
let result = OtelProvider::from(&OtelSettings {
@@ -341,6 +456,12 @@ fn otlp_http_exporter_sends_traces_to_collector()
let _guard = span.enter();
let propagated_trace =
current_span_w3c_trace_context().expect("current span should have trace context");
tracing::event!(
target: "codex_otel.trace_safe",
tracing::Level::INFO,
event.name = "codex.exec_server.remote_environment_registered",
"codex exec-server remote environment registered"
);
tracing::info!("trace loopback event");
propagated_trace
});
@@ -393,7 +514,11 @@ fn otlp_http_exporter_sends_traces_to_collector()
"expected configured span attribute not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
assert!(
body.contains("codex.exec_server.remote_environment_registered"),
"expected exec-server trace event not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
Ok(())
}

View File

@@ -13,8 +13,9 @@ fn send_builds_payload_with_tags_and_histograms() -> Result<()> {
let (metrics, exporter) =
build_metrics_with_defaults(&[("service", "codex-cli"), ("env", "prod")])?;
metrics.counter(
metrics.counter_with_description(
"codex.turns",
"Total number of Codex turns.",
/*inc*/ 1,
&[("model", "gpt-5.1"), ("env", "dev")],
)?;
@@ -23,11 +24,18 @@ fn send_builds_payload_with_tags_and_histograms() -> Result<()> {
/*value*/ 25,
&[("tool", "shell")],
)?;
metrics.gauge_with_description(
"codex.active",
"Number of active Codex operations.",
/*value*/ 2,
&[("component", "exec-server")],
)?;
metrics.shutdown()?;
let resource_metrics = latest_metrics(&exporter);
let counter = find_metric(&resource_metrics, "codex.turns").expect("counter metric missing");
assert_eq!(counter.description(), "Total number of Codex turns.");
let counter_attributes = match counter.data() {
opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data {
opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => {
@@ -78,6 +86,27 @@ fn send_builds_payload_with_tags_and_histograms() -> Result<()> {
]);
assert_eq!(histogram_attrs, expected_histogram_attributes);
let gauge = find_metric(&resource_metrics, "codex.active").expect("gauge metric missing");
assert_eq!(gauge.description(), "Number of active Codex operations.");
let gauge_point = match gauge.data() {
opentelemetry_sdk::metrics::data::AggregatedMetrics::I64(data) => match data {
opentelemetry_sdk::metrics::data::MetricData::Gauge(gauge) => {
gauge.data_points().next().expect("gauge point")
}
_ => panic!("unexpected gauge aggregation"),
},
_ => panic!("unexpected gauge metric data type"),
};
assert_eq!(gauge_point.value(), 2);
assert_eq!(
attributes_to_map(gauge_point.attributes()),
BTreeMap::from([
("component".to_string(), "exec-server".to_string()),
("env".to_string(), "prod".to_string()),
("service".to_string(), "codex-cli".to_string()),
])
);
Ok(())
}

View File

@@ -33,6 +33,37 @@ fn record_duration_records_histogram() -> Result<()> {
Ok(())
}
#[test]
fn record_duration_seconds_with_description_records_fractional_seconds() -> Result<()> {
let (metrics, exporter) = build_metrics_with_defaults(&[])?;
metrics.record_duration_seconds_with_description(
"exec_server_request_duration_seconds",
"Duration of exec-server requests in seconds.",
Duration::from_millis(15),
&[("method", "initialize")],
)?;
metrics.shutdown()?;
let resource_metrics = latest_metrics(&exporter);
let (bounds, bucket_counts, sum, count) =
histogram_data(&resource_metrics, "exec_server_request_duration_seconds");
assert!(!bounds.is_empty());
assert_eq!(bucket_counts.iter().sum::<u64>(), 1);
assert_eq!(sum, 0.015);
assert_eq!(count, 1);
let metric =
crate::harness::find_metric(&resource_metrics, "exec_server_request_duration_seconds")
.unwrap_or_else(|| panic!("metric exec_server_request_duration_seconds missing"));
assert_eq!(metric.unit(), "s");
assert_eq!(
metric.description(),
"Duration of exec-server requests in seconds."
);
Ok(())
}
// Ensures time_result returns the closure output and records timing.
#[test]
fn timer_result_records_success() -> Result<()> {

View File

@@ -65,6 +65,8 @@ mod tests {
use super::*;
use crate::utils::create_env_for_mcp_server;
use anyhow::Result;
#[cfg(unix)]
use std::ffi::OsStr;
use std::fs;
use std::path::Path;
use tempfile::TempDir;
@@ -75,24 +77,7 @@ mod tests {
#[tokio::test]
async fn test_unix_executes_script_without_extension() -> Result<()> {
let env = TestExecutableEnv::new()?;
// Linux can transiently report ETXTBSY while the freshly written test
// script is becoming executable on the backing filesystem.
let mut retries = 0;
let output = loop {
let mut cmd = Command::new(&env.program_name);
cmd.envs(&env.mcp_env);
let output = cmd.output().await;
if !output
.as_ref()
.is_err_and(|err| err.kind() == std::io::ErrorKind::ExecutableFileBusy)
|| retries == 2
{
break output;
}
retries += 1;
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
};
let output = output_with_etxtbsy_retry(OsStr::new(&env.program_name), &env.mcp_env).await;
assert!(
output.is_ok(),
@@ -145,9 +130,14 @@ mod tests {
let resolved = resolve(program, &env.mcp_env, std::env::current_dir()?.as_path())?;
// Verify resolved path executes successfully
let mut cmd = Command::new(resolved);
cmd.envs(&env.mcp_env);
let output = cmd.output().await;
#[cfg(unix)]
let output = output_with_etxtbsy_retry(resolved.as_os_str(), &env.mcp_env).await;
#[cfg(windows)]
let output = {
let mut cmd = Command::new(resolved);
cmd.envs(&env.mcp_env);
cmd.output().await
};
assert!(
output.is_ok(),
@@ -156,6 +146,31 @@ mod tests {
Ok(())
}
#[cfg(unix)]
async fn output_with_etxtbsy_retry(
program: &OsStr,
mcp_env: &HashMap<OsString, OsString>,
) -> std::io::Result<std::process::Output> {
// Linux can transiently report ETXTBSY while the freshly written test
// script is becoming executable on the backing filesystem.
let mut retries = 0;
loop {
let mut cmd = Command::new(program);
cmd.envs(mcp_env);
let output = cmd.output().await;
if !output
.as_ref()
.is_err_and(|err| err.kind() == std::io::ErrorKind::ExecutableFileBusy)
|| retries == 2
{
return output;
}
retries += 1;
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
// Test fixture for creating temporary executables in a controlled environment.
struct TestExecutableEnv {
// Held to prevent the temporary directory from being deleted.