mirror of
https://github.com/openai/codex.git
synced 2026-06-02 11:22:01 +00:00
Compare commits
20 Commits
fcoury/tel
...
starr/cca-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0265f43f5 | ||
|
|
677997fad6 | ||
|
|
7eace99187 | ||
|
|
5065825bd2 | ||
|
|
f06a7b0da8 | ||
|
|
5f6fecc3c9 | ||
|
|
c3ff443001 | ||
|
|
fc0f0c9933 | ||
|
|
802f373110 | ||
|
|
4282a6e28b | ||
|
|
d30f38c1d9 | ||
|
|
65968564df | ||
|
|
3aa56d259e | ||
|
|
0bdec5d1b7 | ||
|
|
b0f8167cc9 | ||
|
|
568c79edac | ||
|
|
21dfe383ed | ||
|
|
f23fd2fd5b | ||
|
|
715b462991 | ||
|
|
a5412211d0 |
5
codex-rs/Cargo.lock
generated
5
codex-rs/Cargo.lock
generated
@@ -2778,6 +2778,7 @@ dependencies = [
|
||||
"codex-app-server-protocol",
|
||||
"codex-client",
|
||||
"codex-file-system",
|
||||
"codex-otel",
|
||||
"codex-protocol",
|
||||
"codex-sandboxing",
|
||||
"codex-test-binary-support",
|
||||
@@ -2787,6 +2788,8 @@ dependencies = [
|
||||
"ctor 0.6.3",
|
||||
"futures",
|
||||
"http 1.4.0",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"pretty_assertions",
|
||||
"prost 0.14.3",
|
||||
"reqwest 0.12.28",
|
||||
@@ -2801,6 +2804,8 @@ dependencies = [
|
||||
"tokio-util",
|
||||
"toml 0.9.11+spec-1.1.0",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
"wiremock",
|
||||
]
|
||||
|
||||
@@ -41,6 +41,9 @@ use owo_colors::OwoColorize;
|
||||
use std::io::IsTerminal;
|
||||
use std::path::PathBuf;
|
||||
use supports_color::Stream;
|
||||
use tracing::Instrument;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
#[cfg(any(target_os = "macos", target_os = "windows"))]
|
||||
mod app_cmd;
|
||||
@@ -544,6 +547,10 @@ struct ExecServerCommand {
|
||||
use_agent_identity_auth: bool,
|
||||
}
|
||||
|
||||
const EXEC_SERVER_DEFAULT_ANALYTICS_ENABLED: bool = false;
|
||||
const EXEC_SERVER_DEFAULT_LOG_FILTER: &str = "error,opentelemetry_sdk=off,opentelemetry_otlp=off";
|
||||
const EXEC_SERVER_OTEL_SERVICE_NAME: &str = "codex-exec-server";
|
||||
|
||||
#[derive(Debug, clap::Subcommand)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
enum AppServerSubcommand {
|
||||
@@ -1596,10 +1603,11 @@ async fn run_exec_server_command(
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
if let Some(base_url) = cmd.remote {
|
||||
let config = load_exec_server_config(root_config_overrides, strict_config).await?;
|
||||
let (_otel, telemetry) = init_exec_server_tracing(Some(&config));
|
||||
let environment_id = cmd
|
||||
.environment_id
|
||||
.ok_or_else(|| anyhow::anyhow!("--environment-id is required when --remote is set"))?;
|
||||
let config = load_exec_server_config(root_config_overrides, strict_config).await?;
|
||||
let auth_provider =
|
||||
load_exec_server_remote_auth_provider(&config, &base_url, cmd.use_agent_identity_auth)
|
||||
.await?;
|
||||
@@ -1611,25 +1619,79 @@ async fn run_exec_server_command(
|
||||
if let Some(name) = cmd.name {
|
||||
remote_config.name = name;
|
||||
}
|
||||
codex_exec_server::run_remote_environment(remote_config, runtime_paths).await?;
|
||||
let remote_config = remote_config.with_telemetry(telemetry);
|
||||
codex_exec_server::run_remote_environment(remote_config, runtime_paths)
|
||||
.instrument(codex_exec_server::runtime_span())
|
||||
.await?;
|
||||
Ok(())
|
||||
} else {
|
||||
if strict_config {
|
||||
// Local exec-server startup does not consume Config, but strict
|
||||
// mode should still reject unknown fields before opening a listener.
|
||||
let _validated_config =
|
||||
load_exec_server_config(root_config_overrides, strict_config).await?;
|
||||
}
|
||||
let config = if strict_config {
|
||||
Some(load_exec_server_config(root_config_overrides, strict_config).await?)
|
||||
} else {
|
||||
load_exec_server_config(root_config_overrides, /*strict_config*/ false)
|
||||
.await
|
||||
.ok()
|
||||
};
|
||||
let (_otel, telemetry) = init_exec_server_tracing(config.as_ref());
|
||||
let listen_url = cmd
|
||||
.listen
|
||||
.as_deref()
|
||||
.unwrap_or(codex_exec_server::DEFAULT_LISTEN_URL);
|
||||
codex_exec_server::run_main(listen_url, runtime_paths)
|
||||
codex_exec_server::run_main_with_telemetry(listen_url, runtime_paths, telemetry)
|
||||
.instrument(codex_exec_server::runtime_span())
|
||||
.await
|
||||
.map_err(anyhow::Error::from_boxed)
|
||||
}
|
||||
}
|
||||
|
||||
fn init_exec_server_tracing(
|
||||
config: Option<&codex_core::config::Config>,
|
||||
) -> (impl Send + Sync, codex_exec_server::ExecServerTelemetry) {
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_writer(std::io::stderr)
|
||||
.with_filter(exec_server_stderr_env_filter());
|
||||
let otel = config.and_then(|config| {
|
||||
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
codex_core::otel_init::build_provider(
|
||||
config,
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
Some(EXEC_SERVER_OTEL_SERVICE_NAME),
|
||||
EXEC_SERVER_DEFAULT_ANALYTICS_ENABLED,
|
||||
)
|
||||
})) {
|
||||
Ok(Ok(otel)) => otel,
|
||||
Ok(Err(err)) => {
|
||||
eprintln!("Could not create otel exporter: {err}");
|
||||
None
|
||||
}
|
||||
Err(_) => {
|
||||
eprintln!("Could not create otel exporter: panicked during initialization");
|
||||
None
|
||||
}
|
||||
}
|
||||
});
|
||||
codex_core::otel_init::record_process_start(otel.as_ref(), EXEC_SERVER_OTEL_SERVICE_NAME);
|
||||
|
||||
let otel_logger_layer = otel.as_ref().and_then(|otel| otel.logger_layer());
|
||||
let otel_tracing_layer = otel.as_ref().and_then(|otel| otel.tracing_layer());
|
||||
let telemetry = codex_exec_server::ExecServerTelemetry::new(
|
||||
otel.as_ref().and_then(|otel| otel.metrics()).cloned(),
|
||||
);
|
||||
let _ = tracing_subscriber::registry()
|
||||
.with(fmt_layer)
|
||||
.with(otel_tracing_layer)
|
||||
.with(otel_logger_layer)
|
||||
.try_init();
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
(otel, telemetry)
|
||||
}
|
||||
|
||||
fn exec_server_stderr_env_filter() -> EnvFilter {
|
||||
EnvFilter::try_from_default_env()
|
||||
.or_else(|_| EnvFilter::try_new(EXEC_SERVER_DEFAULT_LOG_FILTER))
|
||||
.unwrap_or_else(|_| EnvFilter::new("error"))
|
||||
}
|
||||
|
||||
async fn load_exec_server_remote_auth_provider(
|
||||
config: &codex_core::config::Config,
|
||||
base_url: &str,
|
||||
|
||||
@@ -1,6 +1,14 @@
|
||||
use std::io::Read as _;
|
||||
use std::io::Write as _;
|
||||
use std::net::TcpListener;
|
||||
use std::path::Path;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Result;
|
||||
use predicates::prelude::PredicateBooleanExt;
|
||||
use predicates::str::contains;
|
||||
use tempfile::TempDir;
|
||||
|
||||
@@ -33,3 +41,249 @@ foo = "bar"
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_exec_server_ignores_invalid_config_without_strict_config() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
std::fs::write(codex_home.path().join("config.toml"), "not valid toml = [")?;
|
||||
|
||||
let mut cmd = codex_command(codex_home.path())?;
|
||||
cmd.args(["exec-server", "--listen", "stdio"])
|
||||
.assert()
|
||||
.success()
|
||||
.stderr(contains("not valid toml").not());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_exec_server_exports_real_otel_metrics() -> Result<()> {
|
||||
let collector = TestCollector::start()?;
|
||||
let codex_home = TempDir::new()?;
|
||||
let base_url = &collector.base_url;
|
||||
std::fs::write(
|
||||
codex_home.path().join("config.toml"),
|
||||
format!(
|
||||
r#"
|
||||
[analytics]
|
||||
enabled = true
|
||||
|
||||
[otel]
|
||||
environment = "test"
|
||||
metrics_exporter = {{ otlp-http = {{ endpoint = "{base_url}/v1/metrics", protocol = "json" }} }}
|
||||
"#
|
||||
),
|
||||
)?;
|
||||
|
||||
let mut cmd = codex_command(codex_home.path())?;
|
||||
cmd.args(["exec-server", "--listen", "stdio"])
|
||||
.write_stdin(
|
||||
r#"{"id":1,"method":"initialize","params":{"clientName":"otel-test","resumeSessionId":null}}"#,
|
||||
)
|
||||
.assert()
|
||||
.success();
|
||||
|
||||
let requests = collector.finish()?;
|
||||
let metrics = requests
|
||||
.iter()
|
||||
.filter(|request| request.path == "/v1/metrics")
|
||||
.map(|request| request.body.as_str())
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
assert!(
|
||||
metrics.contains("exec_server_connections_active"),
|
||||
"{metrics}"
|
||||
);
|
||||
assert!(metrics.contains("exec_server_requests_total"), "{metrics}");
|
||||
assert!(metrics.contains("initialize"), "{metrics}");
|
||||
assert!(
|
||||
metrics.contains("success") || metrics.contains("disconnected"),
|
||||
"{metrics}"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_exec_server_preserves_websocket_error_in_stderr() -> Result<()> {
|
||||
let failed_websocket_listener = TcpListener::bind("127.0.0.1:0")?;
|
||||
let failed_websocket_url = format!("ws://{}", failed_websocket_listener.local_addr()?);
|
||||
drop(failed_websocket_listener);
|
||||
|
||||
let registry = TestRegistry::start(&failed_websocket_url)?;
|
||||
let codex_home = TempDir::new()?;
|
||||
let stderr_path = codex_home.path().join("remote.stderr");
|
||||
let stderr_file = std::fs::File::create(&stderr_path)?;
|
||||
let mut cmd = std::process::Command::new(codex_utils_cargo_bin::cargo_bin("codex")?);
|
||||
cmd.env("CODEX_HOME", codex_home.path())
|
||||
.env("CODEX_API_KEY", "test-key")
|
||||
.env("RUST_LOG", "codex_exec_server=warn")
|
||||
.args([
|
||||
"exec-server",
|
||||
"--remote",
|
||||
®istry.base_url,
|
||||
"--environment-id",
|
||||
"env-test",
|
||||
])
|
||||
.stdout(std::process::Stdio::null())
|
||||
.stderr(stderr_file);
|
||||
|
||||
let mut child = cmd.spawn()?;
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
let stderr = loop {
|
||||
let stderr = std::fs::read_to_string(&stderr_path)?;
|
||||
if stderr.contains("failed to connect remote exec-server websocket")
|
||||
|| Instant::now() >= deadline
|
||||
{
|
||||
break stderr;
|
||||
}
|
||||
thread::sleep(Duration::from_millis(50));
|
||||
};
|
||||
let _ = child.kill();
|
||||
child.wait()?;
|
||||
registry.finish()?;
|
||||
|
||||
assert!(
|
||||
stderr.contains("failed to connect remote exec-server websocket"),
|
||||
"{stderr}"
|
||||
);
|
||||
assert!(stderr.contains("IO error"), "{stderr}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct CapturedRequest {
|
||||
path: String,
|
||||
body: String,
|
||||
}
|
||||
|
||||
struct TestRegistry {
|
||||
base_url: String,
|
||||
server: thread::JoinHandle<Result<()>>,
|
||||
}
|
||||
|
||||
impl TestRegistry {
|
||||
fn start(websocket_url: &str) -> Result<Self> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")?;
|
||||
let addr = listener.local_addr()?;
|
||||
let websocket_url = websocket_url.to_string();
|
||||
let server = thread::spawn(move || {
|
||||
let (mut stream, _) = listener.accept()?;
|
||||
let _request = read_http_request(&mut stream)?;
|
||||
let body = format!(r#"{{"environment_id":"env-test","url":"{websocket_url}"}}"#);
|
||||
let response = format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
|
||||
body.len()
|
||||
);
|
||||
stream.write_all(response.as_bytes())?;
|
||||
Ok(())
|
||||
});
|
||||
Ok(Self {
|
||||
base_url: format!("http://{addr}"),
|
||||
server,
|
||||
})
|
||||
}
|
||||
|
||||
fn finish(self) -> Result<()> {
|
||||
self.server
|
||||
.join()
|
||||
.map_err(|_| anyhow::anyhow!("registry thread panicked"))?
|
||||
}
|
||||
}
|
||||
|
||||
struct TestCollector {
|
||||
base_url: String,
|
||||
requests: mpsc::Receiver<Vec<CapturedRequest>>,
|
||||
stop: mpsc::Sender<()>,
|
||||
server: thread::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl TestCollector {
|
||||
fn start() -> Result<Self> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")?;
|
||||
let addr = listener.local_addr()?;
|
||||
listener.set_nonblocking(true)?;
|
||||
let (tx, requests) = mpsc::channel();
|
||||
let (stop, stop_rx) = mpsc::channel();
|
||||
let server = thread::spawn(move || {
|
||||
let mut captured = Vec::new();
|
||||
loop {
|
||||
match listener.accept() {
|
||||
Ok((mut stream, _)) => {
|
||||
if let Ok(request) = read_http_request(&mut stream) {
|
||||
captured.push(request);
|
||||
}
|
||||
let _ = stream.write_all(
|
||||
b"HTTP/1.1 202 Accepted\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
|
||||
);
|
||||
}
|
||||
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
if stop_rx.try_recv().is_ok() {
|
||||
break;
|
||||
}
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
let _ = tx.send(captured);
|
||||
});
|
||||
Ok(Self {
|
||||
base_url: format!("http://{addr}"),
|
||||
requests,
|
||||
stop,
|
||||
server,
|
||||
})
|
||||
}
|
||||
|
||||
fn finish(self) -> Result<Vec<CapturedRequest>> {
|
||||
let _ = self.stop.send(());
|
||||
self.server
|
||||
.join()
|
||||
.map_err(|_| anyhow::anyhow!("collector thread panicked"))?;
|
||||
Ok(self.requests.recv_timeout(Duration::from_secs(1))?)
|
||||
}
|
||||
}
|
||||
|
||||
fn read_http_request(stream: &mut std::net::TcpStream) -> std::io::Result<CapturedRequest> {
|
||||
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
|
||||
let mut bytes = Vec::new();
|
||||
let mut scratch = [0_u8; 8192];
|
||||
let header_end = loop {
|
||||
let read = stream.read(&mut scratch)?;
|
||||
if read == 0 {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"request closed before headers",
|
||||
));
|
||||
}
|
||||
bytes.extend_from_slice(&scratch[..read]);
|
||||
if let Some(header_end) = bytes.windows(4).position(|window| window == b"\r\n\r\n") {
|
||||
break header_end;
|
||||
}
|
||||
};
|
||||
let headers = String::from_utf8_lossy(&bytes[..header_end]);
|
||||
let mut lines = headers.split("\r\n");
|
||||
let path = lines
|
||||
.next()
|
||||
.and_then(|line| line.split_whitespace().nth(1))
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
let content_length = lines
|
||||
.filter_map(|line| line.split_once(':'))
|
||||
.find(|(key, _)| key.eq_ignore_ascii_case("content-length"))
|
||||
.and_then(|(_, value)| value.trim().parse::<usize>().ok())
|
||||
.unwrap_or_default();
|
||||
let mut body = bytes[header_end + 4..].to_vec();
|
||||
while body.len() < content_length {
|
||||
let read = stream.read(&mut scratch)?;
|
||||
if read == 0 {
|
||||
break;
|
||||
}
|
||||
body.extend_from_slice(&scratch[..read]);
|
||||
}
|
||||
body.truncate(content_length);
|
||||
Ok(CapturedRequest {
|
||||
path,
|
||||
body: String::from_utf8_lossy(&body).into_owned(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ codex_rust_crate(
|
||||
name = "exec-server",
|
||||
crate_name = "codex_exec_server",
|
||||
deps_extra = [
|
||||
"@crates//:opentelemetry",
|
||||
"@crates//:opentelemetry_sdk",
|
||||
"@crates//:toml",
|
||||
],
|
||||
# Keep the crate's integration tests single-threaded under Bazel because
|
||||
|
||||
@@ -20,6 +20,7 @@ codex-app-server-protocol = { workspace = true }
|
||||
codex-api = { workspace = true }
|
||||
codex-client = { workspace = true }
|
||||
codex-file-system = { workspace = true }
|
||||
codex-otel = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-sandboxing = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
@@ -53,8 +54,12 @@ anyhow = { workspace = true }
|
||||
codex-test-binary-support = { workspace = true }
|
||||
ctor = { workspace = true }
|
||||
http = { workspace = true }
|
||||
opentelemetry = { workspace = true }
|
||||
opentelemetry_sdk = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
serial_test = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
test-case = "3.3.1"
|
||||
tracing-opentelemetry = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
wiremock = { workspace = true }
|
||||
|
||||
@@ -23,6 +23,7 @@ mod rpc;
|
||||
mod runtime_paths;
|
||||
mod sandboxed_file_system;
|
||||
mod server;
|
||||
mod telemetry;
|
||||
|
||||
pub use client::ExecServerClient;
|
||||
pub use client::ExecServerError;
|
||||
@@ -105,3 +106,6 @@ pub use runtime_paths::ExecServerRuntimePaths;
|
||||
pub use server::DEFAULT_LISTEN_URL;
|
||||
pub use server::ExecServerListenUrlParseError;
|
||||
pub use server::run_main;
|
||||
pub use server::run_main_with_telemetry;
|
||||
pub use telemetry::ExecServerTelemetry;
|
||||
pub use telemetry::runtime_span;
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::collections::VecDeque;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
@@ -45,6 +46,7 @@ use crate::rpc::RpcServerOutboundMessage;
|
||||
use crate::rpc::internal_error;
|
||||
use crate::rpc::invalid_params;
|
||||
use crate::rpc::invalid_request;
|
||||
use crate::telemetry::ExecServerTelemetry;
|
||||
|
||||
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
|
||||
const NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
|
||||
@@ -74,6 +76,9 @@ struct RunningProcess {
|
||||
output_notify: Arc<Notify>,
|
||||
open_streams: usize,
|
||||
closed: bool,
|
||||
started_at: Instant,
|
||||
metrics_finished: bool,
|
||||
termination_requested: bool,
|
||||
}
|
||||
|
||||
enum ProcessEntry {
|
||||
@@ -84,6 +89,7 @@ enum ProcessEntry {
|
||||
struct Inner {
|
||||
notifications: std::sync::RwLock<Option<RpcNotificationSender>>,
|
||||
processes: Mutex<HashMap<ProcessId, ProcessEntry>>,
|
||||
telemetry: ExecServerTelemetry,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -103,16 +109,23 @@ impl Default for LocalProcess {
|
||||
let (outgoing_tx, mut outgoing_rx) =
|
||||
mpsc::channel::<RpcServerOutboundMessage>(NOTIFICATION_CHANNEL_CAPACITY);
|
||||
tokio::spawn(async move { while outgoing_rx.recv().await.is_some() {} });
|
||||
Self::new(RpcNotificationSender::new(outgoing_tx))
|
||||
Self::new(
|
||||
RpcNotificationSender::new(outgoing_tx),
|
||||
ExecServerTelemetry::default(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalProcess {
|
||||
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
|
||||
pub(crate) fn new(
|
||||
notifications: RpcNotificationSender,
|
||||
telemetry: ExecServerTelemetry,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Inner {
|
||||
notifications: std::sync::RwLock::new(Some(notifications)),
|
||||
processes: Mutex::new(HashMap::new()),
|
||||
telemetry,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -129,6 +142,11 @@ impl LocalProcess {
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
for process in remaining {
|
||||
if !process.metrics_finished {
|
||||
self.inner
|
||||
.telemetry
|
||||
.process_finished("terminated", process.started_at.elapsed());
|
||||
}
|
||||
process.session.terminate();
|
||||
}
|
||||
}
|
||||
@@ -226,9 +244,13 @@ impl LocalProcess {
|
||||
output_notify: Arc::clone(&output_notify),
|
||||
open_streams: 2,
|
||||
closed: false,
|
||||
started_at: Instant::now(),
|
||||
metrics_finished: false,
|
||||
termination_requested: false,
|
||||
})),
|
||||
);
|
||||
}
|
||||
self.inner.telemetry.process_started();
|
||||
|
||||
tokio::spawn(stream_output(
|
||||
process_id.clone(),
|
||||
@@ -389,12 +411,13 @@ impl LocalProcess {
|
||||
) -> Result<TerminateResponse, JSONRPCErrorError> {
|
||||
let _process_id = params.process_id.clone();
|
||||
let running = {
|
||||
let process_map = self.inner.processes.lock().await;
|
||||
match process_map.get(¶ms.process_id) {
|
||||
let mut process_map = self.inner.processes.lock().await;
|
||||
match process_map.get_mut(¶ms.process_id) {
|
||||
Some(ProcessEntry::Running(process)) => {
|
||||
if process.exit_code.is_some() {
|
||||
return Ok(TerminateResponse { running: false });
|
||||
}
|
||||
process.termination_requested = true;
|
||||
process.session.terminate();
|
||||
true
|
||||
}
|
||||
@@ -601,7 +624,7 @@ async fn watch_exit(
|
||||
output_notify: Arc<Notify>,
|
||||
) {
|
||||
let exit_code = exit_rx.await.unwrap_or(-1);
|
||||
let notification = {
|
||||
let (notification, process_duration, termination_requested) = {
|
||||
let mut processes = inner.processes.lock().await;
|
||||
if let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) {
|
||||
let seq = process.next_seq;
|
||||
@@ -611,15 +634,32 @@ async fn watch_exit(
|
||||
process
|
||||
.events
|
||||
.publish(ExecProcessEvent::Exited { seq, exit_code });
|
||||
Some(ExecExitedNotification {
|
||||
process_id: process_id.clone(),
|
||||
seq,
|
||||
exit_code,
|
||||
})
|
||||
process.metrics_finished = true;
|
||||
(
|
||||
Some(ExecExitedNotification {
|
||||
process_id: process_id.clone(),
|
||||
seq,
|
||||
exit_code,
|
||||
}),
|
||||
Some(process.started_at.elapsed()),
|
||||
process.termination_requested,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
(None, None, false)
|
||||
}
|
||||
};
|
||||
if let Some(process_duration) = process_duration {
|
||||
inner.telemetry.process_finished(
|
||||
if termination_requested {
|
||||
"terminated"
|
||||
} else if exit_code == 0 {
|
||||
"success"
|
||||
} else {
|
||||
"error"
|
||||
},
|
||||
process_duration,
|
||||
);
|
||||
}
|
||||
output_notify.notify_waiters();
|
||||
if let Some(notification) = notification
|
||||
&& let Some(notifications) = notification_sender(&inner)
|
||||
@@ -890,6 +930,9 @@ mod tests {
|
||||
output_notify: Arc::clone(&output_notify),
|
||||
open_streams: 2,
|
||||
closed: false,
|
||||
started_at: Instant::now(),
|
||||
metrics_finished: false,
|
||||
termination_requested: false,
|
||||
})),
|
||||
);
|
||||
assert!(previous.is_none());
|
||||
|
||||
@@ -26,6 +26,7 @@ use crate::relay_proto::RelayMessageFrame;
|
||||
use crate::relay_proto::RelayResume;
|
||||
use crate::relay_proto::relay_message_frame;
|
||||
use crate::server::ConnectionProcessor;
|
||||
use crate::telemetry::ConnectionTransport;
|
||||
|
||||
const RELAY_MESSAGE_FRAME_VERSION: u32 = 1;
|
||||
|
||||
@@ -449,7 +450,9 @@ fn spawn_virtual_stream(
|
||||
transport: JsonRpcTransport::Plain,
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
processor.run_connection(connection).await;
|
||||
processor
|
||||
.run_connection(connection, ConnectionTransport::Relay)
|
||||
.await;
|
||||
});
|
||||
|
||||
VirtualStream {
|
||||
|
||||
@@ -1,21 +1,49 @@
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use codex_api::SharedAuthProvider;
|
||||
use reqwest::StatusCode;
|
||||
use serde::Deserialize;
|
||||
use tokio::time::sleep;
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tracing::Instrument;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
|
||||
|
||||
use crate::ExecServerError;
|
||||
use crate::ExecServerRuntimePaths;
|
||||
use crate::ExecServerTelemetry;
|
||||
use crate::relay::run_multiplexed_environment;
|
||||
use crate::server::ConnectionProcessor;
|
||||
|
||||
const ERROR_BODY_PREVIEW_BYTES: usize = 4096;
|
||||
|
||||
macro_rules! emit_remote_otel_event {
|
||||
($level:ident, $event_name:literal, $($fields:tt)*) => {{
|
||||
let span = tracing::info_span!(
|
||||
"codex.exec_server.remote_event",
|
||||
otel.kind = "internal",
|
||||
otel.name = $event_name,
|
||||
);
|
||||
span.in_scope(|| {
|
||||
tracing::event!(
|
||||
target: "codex_otel.log_only",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
$($fields)*
|
||||
);
|
||||
tracing::event!(
|
||||
target: "codex_otel.trace_safe",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
$($fields)*
|
||||
);
|
||||
});
|
||||
}};
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct EnvironmentRegistryClient {
|
||||
base_url: String,
|
||||
@@ -94,6 +122,7 @@ pub struct RemoteEnvironmentConfig {
|
||||
pub environment_id: String,
|
||||
pub name: String,
|
||||
auth_provider: SharedAuthProvider,
|
||||
telemetry: ExecServerTelemetry,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RemoteEnvironmentConfig {
|
||||
@@ -119,8 +148,14 @@ impl RemoteEnvironmentConfig {
|
||||
environment_id,
|
||||
name: "codex-exec-server".to_string(),
|
||||
auth_provider,
|
||||
telemetry: ExecServerTelemetry::default(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn with_telemetry(mut self, telemetry: ExecServerTelemetry) -> Self {
|
||||
self.telemetry = telemetry;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Register an exec-server for remote use and serve requests over the returned
|
||||
@@ -132,31 +167,148 @@ pub async fn run_remote_environment(
|
||||
ensure_rustls_crypto_provider();
|
||||
let client =
|
||||
EnvironmentRegistryClient::new(config.base_url.clone(), config.auth_provider.clone())?;
|
||||
let processor = ConnectionProcessor::new(runtime_paths);
|
||||
let processor = ConnectionProcessor::new(runtime_paths, config.telemetry.clone());
|
||||
let mut backoff = Duration::from_secs(1);
|
||||
let mut connection_attempt = 0_u32;
|
||||
|
||||
loop {
|
||||
let response = client.register_environment(&config.environment_id).await?;
|
||||
eprintln!(
|
||||
"codex exec-server remote environment registered with environment_id {}",
|
||||
response.environment_id
|
||||
let registration_started_at = Instant::now();
|
||||
let registration_span = remote_registration_span();
|
||||
let response = match client
|
||||
.register_environment(&config.environment_id)
|
||||
.instrument(registration_span.clone())
|
||||
.await
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
registration_span.record("result", "error");
|
||||
config
|
||||
.telemetry
|
||||
.remote_registration_completed("error", registration_started_at.elapsed());
|
||||
drop(registration_span);
|
||||
warn!(error = %err, "failed to register remote exec-server environment");
|
||||
emit_remote_otel_event!(
|
||||
WARN,
|
||||
"codex.exec_server.remote_environment_registration_failed",
|
||||
"failed to register remote exec-server environment"
|
||||
);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
registration_span.record("result", "success");
|
||||
config
|
||||
.telemetry
|
||||
.remote_registration_completed("success", registration_started_at.elapsed());
|
||||
drop(registration_span);
|
||||
eprintln!("codex exec-server remote environment registered");
|
||||
info!("codex exec-server remote environment registered");
|
||||
emit_remote_otel_event!(
|
||||
INFO,
|
||||
"codex.exec_server.remote_environment_registered",
|
||||
"codex exec-server remote environment registered"
|
||||
);
|
||||
|
||||
match connect_async(response.url.as_str()).await {
|
||||
let websocket_connect_started_at = Instant::now();
|
||||
let websocket_connect_span = remote_websocket_connect_span(connection_attempt + 1);
|
||||
match connect_async(response.url.as_str())
|
||||
.instrument(websocket_connect_span.clone())
|
||||
.await
|
||||
{
|
||||
Ok((websocket, _)) => {
|
||||
connection_attempt += 1;
|
||||
websocket_connect_span.record("result", "success");
|
||||
config.telemetry.remote_websocket_connect_completed(
|
||||
"success",
|
||||
websocket_connect_started_at.elapsed(),
|
||||
);
|
||||
drop(websocket_connect_span);
|
||||
let remote_websocket = config.telemetry.remote_websocket_connected();
|
||||
info!(
|
||||
attempt = connection_attempt,
|
||||
"connected remote exec-server websocket"
|
||||
);
|
||||
emit_remote_otel_event!(
|
||||
INFO,
|
||||
"codex.exec_server.remote_websocket_connected",
|
||||
attempt = connection_attempt,
|
||||
"connected remote exec-server websocket"
|
||||
);
|
||||
backoff = Duration::from_secs(1);
|
||||
run_multiplexed_environment(websocket, processor.clone()).await;
|
||||
drop(remote_websocket);
|
||||
config.telemetry.remote_websocket_reconnect("disconnected");
|
||||
warn!(
|
||||
attempt = connection_attempt,
|
||||
"remote exec-server websocket disconnected; retrying"
|
||||
);
|
||||
emit_remote_otel_event!(
|
||||
WARN,
|
||||
"codex.exec_server.remote_websocket_disconnected",
|
||||
attempt = connection_attempt,
|
||||
"remote exec-server websocket disconnected; retrying"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("failed to connect remote exec-server websocket: {err}");
|
||||
connection_attempt += 1;
|
||||
websocket_connect_span.record("result", "error");
|
||||
config.telemetry.remote_websocket_connect_completed(
|
||||
"error",
|
||||
websocket_connect_started_at.elapsed(),
|
||||
);
|
||||
drop(websocket_connect_span);
|
||||
warn!(
|
||||
attempt = connection_attempt,
|
||||
error = %err,
|
||||
"failed to connect remote exec-server websocket"
|
||||
);
|
||||
emit_remote_otel_event!(
|
||||
WARN,
|
||||
"codex.exec_server.remote_websocket_connect_failed",
|
||||
attempt = connection_attempt,
|
||||
"failed to connect remote exec-server websocket"
|
||||
);
|
||||
config
|
||||
.telemetry
|
||||
.remote_websocket_reconnect("connect_failed");
|
||||
}
|
||||
}
|
||||
|
||||
let backoff_ms = backoff.as_millis();
|
||||
info!(
|
||||
attempt = connection_attempt,
|
||||
backoff_ms, "retrying remote exec-server websocket"
|
||||
);
|
||||
emit_remote_otel_event!(
|
||||
INFO,
|
||||
"codex.exec_server.remote_websocket_retrying",
|
||||
attempt = connection_attempt,
|
||||
backoff_ms,
|
||||
"retrying remote exec-server websocket"
|
||||
);
|
||||
sleep(backoff).await;
|
||||
backoff = (backoff * 2).min(Duration::from_secs(30));
|
||||
}
|
||||
}
|
||||
|
||||
fn remote_registration_span() -> tracing::Span {
|
||||
tracing::info_span!(
|
||||
"codex.exec_server.remote.register",
|
||||
otel.kind = "client",
|
||||
otel.name = "codex.exec_server.remote.register",
|
||||
result = tracing::field::Empty,
|
||||
)
|
||||
}
|
||||
|
||||
fn remote_websocket_connect_span(attempt: u32) -> tracing::Span {
|
||||
tracing::info_span!(
|
||||
"codex.exec_server.remote.websocket.connect",
|
||||
otel.kind = "client",
|
||||
otel.name = "codex.exec_server.remote.websocket.connect",
|
||||
attempt,
|
||||
result = tracing::field::Empty,
|
||||
)
|
||||
}
|
||||
|
||||
fn normalize_environment_id(environment_id: String) -> Result<String, ExecServerError> {
|
||||
let environment_id = environment_id.trim().to_string();
|
||||
if environment_id.is_empty() {
|
||||
@@ -248,7 +400,12 @@ mod tests {
|
||||
use codex_api::AuthProvider;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use opentelemetry::trace::TracerProvider as _;
|
||||
use opentelemetry_sdk::trace::InMemorySpanExporter;
|
||||
use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tracing_subscriber::filter::filter_fn;
|
||||
use tracing_subscriber::prelude::*;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
@@ -363,4 +520,77 @@ mod tests {
|
||||
assert!(debug.contains("<redacted>"));
|
||||
assert!(!debug.contains("workspace-123"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_otel_events_finish_trace_spans_immediately() {
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("exec-server-test");
|
||||
let subscriber = tracing_subscriber::registry().with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(codex_otel::OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
emit_remote_otel_event!(
|
||||
INFO,
|
||||
"codex.exec_server.remote_environment_registered",
|
||||
"codex exec-server remote environment registered"
|
||||
);
|
||||
});
|
||||
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
assert!(
|
||||
spans.iter().any(|span| {
|
||||
span.name.as_ref() == "codex.exec_server.remote_environment_registered"
|
||||
}),
|
||||
"expected finished remote OTEL lifecycle span, got {spans:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_operation_spans_finish_immediately() {
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("exec-server-test");
|
||||
let subscriber = tracing_subscriber::registry().with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(codex_otel::OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
let registration_span = remote_registration_span();
|
||||
registration_span.in_scope(|| {});
|
||||
registration_span.record("result", "success");
|
||||
drop(registration_span);
|
||||
let websocket_span = remote_websocket_connect_span(/*attempt*/ 2);
|
||||
websocket_span.in_scope(|| {});
|
||||
websocket_span.record("result", "success");
|
||||
drop(websocket_span);
|
||||
});
|
||||
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
assert!(
|
||||
spans
|
||||
.iter()
|
||||
.any(|span| span.name.as_ref() == "codex.exec_server.remote.register"),
|
||||
"expected finished remote registration span, got {spans:?}"
|
||||
);
|
||||
assert!(
|
||||
spans
|
||||
.iter()
|
||||
.any(|span| { span.name.as_ref() == "codex.exec_server.remote.websocket.connect" }),
|
||||
"expected finished remote websocket connect span, got {spans:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,10 +12,19 @@ pub use transport::DEFAULT_LISTEN_URL;
|
||||
pub use transport::ExecServerListenUrlParseError;
|
||||
|
||||
use crate::ExecServerRuntimePaths;
|
||||
use crate::ExecServerTelemetry;
|
||||
|
||||
pub async fn run_main(
|
||||
listen_url: &str,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
transport::run_transport(listen_url, runtime_paths).await
|
||||
run_main_with_telemetry(listen_url, runtime_paths, ExecServerTelemetry::default()).await
|
||||
}
|
||||
|
||||
pub async fn run_main_with_telemetry(
|
||||
listen_url: &str,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
telemetry: ExecServerTelemetry,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
transport::run_transport(listen_url, runtime_paths, telemetry).await
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ fn test_runtime_paths() -> ExecServerRuntimePaths {
|
||||
|
||||
async fn initialized_handler() -> Arc<ExecServerHandler> {
|
||||
let (outgoing_tx, _outgoing_rx) = mpsc::channel(16);
|
||||
let registry = SessionRegistry::new();
|
||||
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
|
||||
let handler = Arc::new(ExecServerHandler::new(
|
||||
registry,
|
||||
RpcNotificationSender::new(outgoing_tx),
|
||||
@@ -155,7 +155,7 @@ async fn terminate_reports_false_after_process_exit() {
|
||||
#[tokio::test]
|
||||
async fn long_poll_read_fails_after_session_resume() {
|
||||
let (first_tx, _first_rx) = mpsc::channel(16);
|
||||
let registry = SessionRegistry::new();
|
||||
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
|
||||
let first_handler = Arc::new(ExecServerHandler::new(
|
||||
Arc::clone(®istry),
|
||||
RpcNotificationSender::new(first_tx),
|
||||
@@ -228,7 +228,7 @@ async fn long_poll_read_fails_after_session_resume() {
|
||||
#[tokio::test]
|
||||
async fn active_session_resume_is_rejected() {
|
||||
let (first_tx, _first_rx) = mpsc::channel(16);
|
||||
let registry = SessionRegistry::new();
|
||||
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
|
||||
let first_handler = Arc::new(ExecServerHandler::new(
|
||||
Arc::clone(®istry),
|
||||
RpcNotificationSender::new(first_tx),
|
||||
@@ -272,7 +272,7 @@ async fn active_session_resume_is_rejected() {
|
||||
async fn output_and_exit_are_retained_after_notification_receiver_closes() {
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
|
||||
let handler = Arc::new(ExecServerHandler::new(
|
||||
SessionRegistry::new(),
|
||||
SessionRegistry::new(crate::ExecServerTelemetry::default()),
|
||||
RpcNotificationSender::new(outgoing_tx),
|
||||
test_runtime_paths(),
|
||||
));
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::protocol::TerminateResponse;
|
||||
use crate::protocol::WriteParams;
|
||||
use crate::protocol::WriteResponse;
|
||||
use crate::rpc::RpcNotificationSender;
|
||||
use crate::telemetry::ExecServerTelemetry;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ProcessHandler {
|
||||
@@ -17,9 +18,12 @@ pub(crate) struct ProcessHandler {
|
||||
}
|
||||
|
||||
impl ProcessHandler {
|
||||
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
|
||||
pub(crate) fn new(
|
||||
notifications: RpcNotificationSender,
|
||||
telemetry: ExecServerTelemetry,
|
||||
) -> Self {
|
||||
Self {
|
||||
process: LocalProcess::new(notifications),
|
||||
process: LocalProcess::new(notifications, telemetry),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::Instrument;
|
||||
use tracing::debug;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -16,26 +18,39 @@ use crate::rpc::method_not_found;
|
||||
use crate::server::ExecServerHandler;
|
||||
use crate::server::registry::build_router;
|
||||
use crate::server::session_registry::SessionRegistry;
|
||||
use crate::telemetry::ConnectionTransport;
|
||||
use crate::telemetry::ExecServerTelemetry;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ConnectionProcessor {
|
||||
session_registry: Arc<SessionRegistry>,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
telemetry: ExecServerTelemetry,
|
||||
}
|
||||
|
||||
impl ConnectionProcessor {
|
||||
pub(crate) fn new(runtime_paths: ExecServerRuntimePaths) -> Self {
|
||||
pub(crate) fn new(
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
telemetry: ExecServerTelemetry,
|
||||
) -> Self {
|
||||
Self {
|
||||
session_registry: SessionRegistry::new(),
|
||||
session_registry: SessionRegistry::new(telemetry.clone()),
|
||||
runtime_paths,
|
||||
telemetry,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run_connection(&self, connection: JsonRpcConnection) {
|
||||
pub(crate) async fn run_connection(
|
||||
&self,
|
||||
connection: JsonRpcConnection,
|
||||
transport: ConnectionTransport,
|
||||
) {
|
||||
run_connection(
|
||||
connection,
|
||||
Arc::clone(&self.session_registry),
|
||||
self.runtime_paths.clone(),
|
||||
self.telemetry.clone(),
|
||||
transport,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -45,7 +60,10 @@ async fn run_connection(
|
||||
connection: JsonRpcConnection,
|
||||
session_registry: Arc<SessionRegistry>,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
telemetry: ExecServerTelemetry,
|
||||
transport: ConnectionTransport,
|
||||
) {
|
||||
let _connection_metrics = telemetry.connection_started(transport);
|
||||
let router = Arc::new(build_router());
|
||||
let JsonRpcConnection {
|
||||
outgoing_tx: json_outgoing_tx,
|
||||
@@ -100,31 +118,50 @@ async fn run_connection(
|
||||
}
|
||||
JsonRpcConnectionEvent::Message(message) => match message {
|
||||
codex_app_server_protocol::JSONRPCMessage::Request(request) => {
|
||||
let method = request_method(request.method.as_str());
|
||||
let request_span = request_span(method);
|
||||
let request_started_at = Instant::now();
|
||||
if let Some(route) = router.request_route(request.method.as_str()) {
|
||||
let message = tokio::select! {
|
||||
message = route(Arc::clone(&handler), request) => message,
|
||||
message = route(Arc::clone(&handler), request).instrument(request_span.clone()) => message,
|
||||
_ = disconnected_rx.changed() => {
|
||||
request_span.record("result", "disconnected");
|
||||
telemetry.request_completed(
|
||||
method,
|
||||
"disconnected",
|
||||
request_started_at.elapsed(),
|
||||
);
|
||||
drop(request_span);
|
||||
debug!("exec-server transport disconnected while handling request");
|
||||
break;
|
||||
}
|
||||
};
|
||||
let result = request_result(&message);
|
||||
request_span.record("result", result);
|
||||
telemetry.request_completed(method, result, request_started_at.elapsed());
|
||||
drop(request_span);
|
||||
if let Some(message) = message
|
||||
&& outgoing_tx.send(message).await.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
} else if outgoing_tx
|
||||
.send(RpcServerOutboundMessage::Error {
|
||||
request_id: request.id,
|
||||
error: method_not_found(format!(
|
||||
"exec-server stub does not implement `{}` yet",
|
||||
request.method
|
||||
)),
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
} else {
|
||||
request_span.record("result", "error");
|
||||
telemetry.request_completed(method, "error", request_started_at.elapsed());
|
||||
drop(request_span);
|
||||
if outgoing_tx
|
||||
.send(RpcServerOutboundMessage::Error {
|
||||
request_id: request.id,
|
||||
error: method_not_found(format!(
|
||||
"exec-server stub does not implement `{}` yet",
|
||||
request.method
|
||||
)),
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
codex_app_server_protocol::JSONRPCMessage::Notification(notification) => {
|
||||
@@ -184,6 +221,45 @@ async fn run_connection(
|
||||
let _ = outbound_task.await;
|
||||
}
|
||||
|
||||
fn request_method(method: &str) -> &'static str {
|
||||
match method {
|
||||
crate::protocol::INITIALIZE_METHOD => crate::protocol::INITIALIZE_METHOD,
|
||||
crate::protocol::EXEC_METHOD => crate::protocol::EXEC_METHOD,
|
||||
crate::protocol::EXEC_READ_METHOD => crate::protocol::EXEC_READ_METHOD,
|
||||
crate::protocol::EXEC_WRITE_METHOD => crate::protocol::EXEC_WRITE_METHOD,
|
||||
crate::protocol::EXEC_TERMINATE_METHOD => crate::protocol::EXEC_TERMINATE_METHOD,
|
||||
crate::protocol::FS_READ_FILE_METHOD => crate::protocol::FS_READ_FILE_METHOD,
|
||||
crate::protocol::FS_WRITE_FILE_METHOD => crate::protocol::FS_WRITE_FILE_METHOD,
|
||||
crate::protocol::FS_CREATE_DIRECTORY_METHOD => crate::protocol::FS_CREATE_DIRECTORY_METHOD,
|
||||
crate::protocol::FS_GET_METADATA_METHOD => crate::protocol::FS_GET_METADATA_METHOD,
|
||||
crate::protocol::FS_READ_DIRECTORY_METHOD => crate::protocol::FS_READ_DIRECTORY_METHOD,
|
||||
crate::protocol::FS_REMOVE_METHOD => crate::protocol::FS_REMOVE_METHOD,
|
||||
crate::protocol::FS_COPY_METHOD => crate::protocol::FS_COPY_METHOD,
|
||||
crate::protocol::HTTP_REQUEST_METHOD => crate::protocol::HTTP_REQUEST_METHOD,
|
||||
_ => "unknown",
|
||||
}
|
||||
}
|
||||
|
||||
fn request_span(method: &'static str) -> tracing::Span {
|
||||
tracing::info_span!(
|
||||
"codex.exec_server.request",
|
||||
otel.kind = "server",
|
||||
otel.name = method,
|
||||
method,
|
||||
result = tracing::field::Empty,
|
||||
)
|
||||
}
|
||||
|
||||
fn request_result(message: &Option<RpcServerOutboundMessage>) -> &'static str {
|
||||
match message {
|
||||
Some(RpcServerOutboundMessage::Error { .. }) => "error",
|
||||
Some(
|
||||
RpcServerOutboundMessage::Response { .. } | RpcServerOutboundMessage::Notification(_),
|
||||
)
|
||||
| None => "success",
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
@@ -195,6 +271,10 @@ mod tests {
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use opentelemetry::trace::TracerProvider as _;
|
||||
use opentelemetry_sdk::trace::InMemorySpanExporter;
|
||||
use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
@@ -205,7 +285,11 @@ mod tests {
|
||||
use tokio::io::duplex;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::timeout;
|
||||
use tracing_subscriber::filter::filter_fn;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
use super::request_method;
|
||||
use super::request_span;
|
||||
use super::run_connection;
|
||||
use crate::ExecServerRuntimePaths;
|
||||
use crate::ProcessId;
|
||||
@@ -223,10 +307,58 @@ mod tests {
|
||||
use crate::protocol::TerminateParams;
|
||||
use crate::protocol::TerminateResponse;
|
||||
use crate::server::session_registry::SessionRegistry;
|
||||
use crate::telemetry::runtime_span;
|
||||
|
||||
#[test]
|
||||
fn request_method_uses_bounded_protocol_method_names() {
|
||||
assert_eq!(request_method(EXEC_TERMINATE_METHOD), EXEC_TERMINATE_METHOD);
|
||||
assert_eq!(request_method("custom/method"), "unknown");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn runtime_and_request_spans_are_exported() {
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("exec-server-test");
|
||||
let subscriber = tracing_subscriber::registry().with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(codex_otel::OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
let runtime_span = runtime_span();
|
||||
runtime_span.in_scope(|| {
|
||||
let request_span = request_span(INITIALIZE_METHOD);
|
||||
request_span.in_scope(|| {});
|
||||
request_span.record("result", "success");
|
||||
drop(request_span);
|
||||
});
|
||||
drop(runtime_span);
|
||||
});
|
||||
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
assert!(
|
||||
spans
|
||||
.iter()
|
||||
.any(|span| span.name.as_ref() == "codex.exec_server"),
|
||||
"expected runtime span, got {spans:?}"
|
||||
);
|
||||
assert!(
|
||||
spans
|
||||
.iter()
|
||||
.any(|span| span.name.as_ref() == INITIALIZE_METHOD),
|
||||
"expected initialize request span, got {spans:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn transport_disconnect_detaches_session_during_in_flight_read() {
|
||||
let registry = SessionRegistry::new();
|
||||
let registry = SessionRegistry::new(crate::ExecServerTelemetry::default());
|
||||
let (mut first_writer, mut first_lines, first_task) =
|
||||
spawn_test_connection(Arc::clone(®istry), "first");
|
||||
|
||||
@@ -322,7 +454,13 @@ mod tests {
|
||||
let (server_writer, client_reader) = duplex(1 << 20);
|
||||
let connection =
|
||||
JsonRpcConnection::from_stdio(server_reader, server_writer, label.to_string());
|
||||
let task = tokio::spawn(run_connection(connection, registry, test_runtime_paths()));
|
||||
let task = tokio::spawn(run_connection(
|
||||
connection,
|
||||
registry,
|
||||
test_runtime_paths(),
|
||||
crate::ExecServerTelemetry::default(),
|
||||
crate::telemetry::ConnectionTransport::Stdio,
|
||||
));
|
||||
(client_writer, BufReader::new(client_reader).lines(), task)
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ use uuid::Uuid;
|
||||
use crate::rpc::RpcNotificationSender;
|
||||
use crate::rpc::invalid_request;
|
||||
use crate::server::process_handler::ProcessHandler;
|
||||
use crate::telemetry::ExecServerTelemetry;
|
||||
|
||||
#[cfg(test)]
|
||||
const DETACHED_SESSION_TTL: Duration = Duration::from_millis(200);
|
||||
@@ -18,6 +19,7 @@ const DETACHED_SESSION_TTL: Duration = Duration::from_secs(10);
|
||||
|
||||
pub(crate) struct SessionRegistry {
|
||||
sessions: Mutex<HashMap<String, Arc<SessionEntry>>>,
|
||||
telemetry: ExecServerTelemetry,
|
||||
}
|
||||
|
||||
struct SessionEntry {
|
||||
@@ -49,9 +51,10 @@ pub(crate) struct SessionHandle {
|
||||
}
|
||||
|
||||
impl SessionRegistry {
|
||||
pub(crate) fn new() -> Arc<Self> {
|
||||
pub(crate) fn new(telemetry: ExecServerTelemetry) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
sessions: Mutex::new(HashMap::new()),
|
||||
telemetry,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -94,7 +97,7 @@ impl SessionRegistry {
|
||||
let session_id = Uuid::new_v4().to_string();
|
||||
let entry = Arc::new(SessionEntry::new(
|
||||
session_id.clone(),
|
||||
ProcessHandler::new(notifications),
|
||||
ProcessHandler::new(notifications, self.telemetry.clone()),
|
||||
connection_id,
|
||||
));
|
||||
sessions.insert(session_id, Arc::clone(&entry));
|
||||
@@ -140,6 +143,7 @@ impl Default for SessionRegistry {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
sessions: Mutex::new(HashMap::new()),
|
||||
telemetry: ExecServerTelemetry::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,8 +22,10 @@ use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::ExecServerRuntimePaths;
|
||||
use crate::ExecServerTelemetry;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::server::processor::ConnectionProcessor;
|
||||
use crate::telemetry::ConnectionTransport;
|
||||
|
||||
pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
|
||||
|
||||
@@ -80,38 +82,40 @@ pub(crate) fn parse_listen_url(
|
||||
pub(crate) async fn run_transport(
|
||||
listen_url: &str,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
telemetry: ExecServerTelemetry,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
match parse_listen_url(listen_url)? {
|
||||
ExecServerListenTransport::WebSocket(bind_address) => {
|
||||
run_websocket_listener(bind_address, runtime_paths).await
|
||||
run_websocket_listener(bind_address, runtime_paths, telemetry).await
|
||||
}
|
||||
ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths).await,
|
||||
ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths, telemetry).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_stdio_connection(
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
telemetry: ExecServerTelemetry,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
run_stdio_connection_with_io(io::stdin(), io::stdout(), runtime_paths).await
|
||||
run_stdio_connection_with_io(io::stdin(), io::stdout(), runtime_paths, telemetry).await
|
||||
}
|
||||
|
||||
async fn run_stdio_connection_with_io<R, W>(
|
||||
reader: R,
|
||||
writer: W,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
telemetry: ExecServerTelemetry,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
|
||||
where
|
||||
R: AsyncRead + Unpin + Send + 'static,
|
||||
W: AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
let processor = ConnectionProcessor::new(runtime_paths);
|
||||
let processor = ConnectionProcessor::new(runtime_paths, telemetry);
|
||||
tracing::info!("codex-exec-server listening on stdio");
|
||||
processor
|
||||
.run_connection(JsonRpcConnection::from_stdio(
|
||||
reader,
|
||||
writer,
|
||||
"exec-server stdio".to_string(),
|
||||
))
|
||||
.run_connection(
|
||||
JsonRpcConnection::from_stdio(reader, writer, "exec-server stdio".to_string()),
|
||||
ConnectionTransport::Stdio,
|
||||
)
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -119,10 +123,11 @@ where
|
||||
async fn run_websocket_listener(
|
||||
bind_address: SocketAddr,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
telemetry: ExecServerTelemetry,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let listener = TcpListener::bind(bind_address).await?;
|
||||
let local_addr = listener.local_addr()?;
|
||||
let processor = ConnectionProcessor::new(runtime_paths);
|
||||
let processor = ConnectionProcessor::new(runtime_paths, telemetry);
|
||||
info!("codex-exec-server listening on ws://{local_addr}");
|
||||
println!("ws://{local_addr}");
|
||||
std::io::stdout().flush()?;
|
||||
@@ -174,10 +179,13 @@ async fn websocket_upgrade_handler(
|
||||
websocket.on_upgrade(move |stream| async move {
|
||||
state
|
||||
.processor
|
||||
.run_connection(JsonRpcConnection::from_axum_websocket(
|
||||
stream,
|
||||
format!("exec-server websocket {peer_addr}"),
|
||||
))
|
||||
.run_connection(
|
||||
JsonRpcConnection::from_axum_websocket(
|
||||
stream,
|
||||
format!("exec-server websocket {peer_addr}"),
|
||||
),
|
||||
ConnectionTransport::WebSocket,
|
||||
)
|
||||
.await;
|
||||
})
|
||||
}
|
||||
|
||||
@@ -6,6 +6,9 @@ use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_otel::MetricsConfig;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use opentelemetry_sdk::metrics::data::ScopeMetrics;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
@@ -61,6 +64,7 @@ async fn stdio_listen_transport_serves_initialize() {
|
||||
server_reader,
|
||||
server_writer,
|
||||
test_runtime_paths(),
|
||||
crate::ExecServerTelemetry::default(),
|
||||
));
|
||||
let mut client_lines = BufReader::new(client_reader).lines();
|
||||
|
||||
@@ -111,6 +115,76 @@ async fn stdio_listen_transport_serves_initialize() {
|
||||
.expect("stdio transport should not fail");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stdio_listen_transport_emits_connection_and_request_metrics() {
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let metrics = codex_otel::MetricsClient::new(MetricsConfig::in_memory(
|
||||
"test",
|
||||
"codex-exec-server",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
exporter.clone(),
|
||||
))
|
||||
.expect("metrics");
|
||||
let telemetry = crate::ExecServerTelemetry::new(Some(metrics.clone()));
|
||||
let (mut client_writer, server_reader) = duplex(1 << 20);
|
||||
let (server_writer, client_reader) = duplex(1 << 20);
|
||||
let server_task = tokio::spawn(run_stdio_connection_with_io(
|
||||
server_reader,
|
||||
server_writer,
|
||||
test_runtime_paths(),
|
||||
telemetry,
|
||||
));
|
||||
let mut client_lines = BufReader::new(client_reader).lines();
|
||||
|
||||
write_jsonrpc_line(
|
||||
&mut client_writer,
|
||||
&JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: RequestId::Integer(1),
|
||||
method: INITIALIZE_METHOD.to_string(),
|
||||
params: Some(
|
||||
serde_json::to_value(InitializeParams {
|
||||
client_name: "exec-server-metrics-test".to_string(),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.expect("initialize params should serialize"),
|
||||
),
|
||||
trace: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
let _response = timeout(Duration::from_secs(1), client_lines.next_line())
|
||||
.await
|
||||
.expect("initialize response should arrive")
|
||||
.expect("initialize response read should succeed")
|
||||
.expect("initialize response should be present");
|
||||
|
||||
drop(client_writer);
|
||||
drop(client_lines);
|
||||
timeout(Duration::from_secs(1), server_task)
|
||||
.await
|
||||
.expect("stdio transport should finish after client disconnect")
|
||||
.expect("stdio transport task should join")
|
||||
.expect("stdio transport should not fail");
|
||||
metrics.shutdown().expect("shutdown metrics");
|
||||
|
||||
let metric_names = exporter
|
||||
.get_finished_metrics()
|
||||
.expect("finished metrics")
|
||||
.into_iter()
|
||||
.flat_map(|metrics| {
|
||||
metrics
|
||||
.scope_metrics()
|
||||
.flat_map(ScopeMetrics::metrics)
|
||||
.map(|metric| metric.name().to_string())
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert!(metric_names.contains(&"exec_server_connections_active".to_string()));
|
||||
assert!(metric_names.contains(&"exec_server_connections_total".to_string()));
|
||||
assert!(metric_names.contains(&"exec_server_requests_total".to_string()));
|
||||
assert!(metric_names.contains(&"exec_server_request_duration_seconds".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_listen_url_accepts_websocket_url() {
|
||||
let transport =
|
||||
|
||||
608
codex-rs/exec-server/src/telemetry.rs
Normal file
608
codex-rs/exec-server/src/telemetry.rs
Normal file
@@ -0,0 +1,608 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_otel::MetricsClient;
|
||||
use tracing::warn;
|
||||
|
||||
const CONNECTIONS_ACTIVE_METRIC: &str = "exec_server_connections_active";
|
||||
const CONNECTIONS_ACTIVE_DESCRIPTION: &str = "Number of active exec-server connections.";
|
||||
const CONNECTIONS_TOTAL_METRIC: &str = "exec_server_connections_total";
|
||||
const CONNECTIONS_TOTAL_DESCRIPTION: &str = "Total number of accepted exec-server connections.";
|
||||
const REMOTE_REGISTRATION_TOTAL_METRIC: &str = "exec_server_remote_registration_total";
|
||||
const REMOTE_REGISTRATION_TOTAL_DESCRIPTION: &str =
|
||||
"Total number of remote exec-server registration attempts.";
|
||||
const REMOTE_REGISTRATION_DURATION_METRIC: &str =
|
||||
"exec_server_remote_registration_duration_seconds";
|
||||
const REMOTE_REGISTRATION_DURATION_DESCRIPTION: &str =
|
||||
"Duration of remote exec-server registration attempts in seconds.";
|
||||
const REMOTE_WEBSOCKET_ACTIVE_METRIC: &str = "exec_server_remote_websocket_active";
|
||||
const REMOTE_WEBSOCKET_ACTIVE_DESCRIPTION: &str =
|
||||
"Number of active remote exec-server WebSocket connections.";
|
||||
const REMOTE_WEBSOCKET_CONNECT_TOTAL_METRIC: &str = "exec_server_remote_websocket_connect_total";
|
||||
const REMOTE_WEBSOCKET_CONNECT_TOTAL_DESCRIPTION: &str =
|
||||
"Total number of remote exec-server WebSocket connection attempts.";
|
||||
const REMOTE_WEBSOCKET_CONNECT_DURATION_METRIC: &str =
|
||||
"exec_server_remote_websocket_connect_duration_seconds";
|
||||
const REMOTE_WEBSOCKET_CONNECT_DURATION_DESCRIPTION: &str =
|
||||
"Duration of remote exec-server WebSocket connection attempts in seconds.";
|
||||
const REMOTE_WEBSOCKET_RECONNECTS_METRIC: &str = "exec_server_remote_websocket_reconnects_total";
|
||||
const REMOTE_WEBSOCKET_RECONNECTS_DESCRIPTION: &str =
|
||||
"Total number of remote exec-server WebSocket reconnects.";
|
||||
const REQUESTS_TOTAL_METRIC: &str = "exec_server_requests_total";
|
||||
const REQUESTS_TOTAL_DESCRIPTION: &str = "Total number of exec-server requests.";
|
||||
const REQUEST_DURATION_METRIC: &str = "exec_server_request_duration_seconds";
|
||||
const REQUEST_DURATION_DESCRIPTION: &str = "Duration of exec-server requests in seconds.";
|
||||
const PROCESSES_ACTIVE_METRIC: &str = "exec_server_processes_active";
|
||||
const PROCESSES_ACTIVE_DESCRIPTION: &str = "Number of active exec-server processes.";
|
||||
const PROCESSES_FINISHED_TOTAL_METRIC: &str = "exec_server_processes_finished_total";
|
||||
const PROCESSES_FINISHED_TOTAL_DESCRIPTION: &str =
|
||||
"Total number of finished exec-server processes.";
|
||||
const PROCESS_DURATION_METRIC: &str = "exec_server_process_duration_seconds";
|
||||
const PROCESS_DURATION_DESCRIPTION: &str = "Duration of exec-server processes in seconds.";
|
||||
|
||||
pub fn runtime_span() -> tracing::Span {
|
||||
tracing::info_span!("codex.exec_server", otel.kind = "internal")
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) enum ConnectionTransport {
|
||||
Relay,
|
||||
Stdio,
|
||||
WebSocket,
|
||||
}
|
||||
|
||||
impl ConnectionTransport {
|
||||
fn metric_tag(self) -> &'static str {
|
||||
match self {
|
||||
Self::Relay => "relay",
|
||||
Self::Stdio => "stdio",
|
||||
Self::WebSocket => "websocket",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct ExecServerTelemetry {
|
||||
inner: Option<Arc<ExecServerTelemetryInner>>,
|
||||
}
|
||||
|
||||
struct ExecServerTelemetryInner {
|
||||
metrics: MetricsClient,
|
||||
relay_connections: AtomicI64,
|
||||
stdio_connections: AtomicI64,
|
||||
websocket_connections: AtomicI64,
|
||||
remote_websockets: AtomicI64,
|
||||
active_processes: AtomicI64,
|
||||
}
|
||||
|
||||
pub(crate) struct ConnectionMetricGuard {
|
||||
telemetry: ExecServerTelemetry,
|
||||
transport: ConnectionTransport,
|
||||
}
|
||||
|
||||
pub(crate) struct RemoteWebSocketMetricGuard {
|
||||
telemetry: ExecServerTelemetry,
|
||||
}
|
||||
|
||||
impl ExecServerTelemetry {
|
||||
pub fn new(metrics: Option<MetricsClient>) -> Self {
|
||||
Self {
|
||||
inner: metrics.map(|metrics| {
|
||||
Arc::new(ExecServerTelemetryInner {
|
||||
metrics,
|
||||
relay_connections: AtomicI64::new(0),
|
||||
stdio_connections: AtomicI64::new(0),
|
||||
websocket_connections: AtomicI64::new(0),
|
||||
remote_websockets: AtomicI64::new(0),
|
||||
active_processes: AtomicI64::new(0),
|
||||
})
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn connection_started(
|
||||
&self,
|
||||
transport: ConnectionTransport,
|
||||
) -> ConnectionMetricGuard {
|
||||
self.with_inner(|inner| {
|
||||
let active = inner
|
||||
.connection_counter(transport)
|
||||
.fetch_add(1, Ordering::AcqRel)
|
||||
+ 1;
|
||||
inner.gauge(
|
||||
CONNECTIONS_ACTIVE_METRIC,
|
||||
CONNECTIONS_ACTIVE_DESCRIPTION,
|
||||
active,
|
||||
&[("transport", transport.metric_tag())],
|
||||
);
|
||||
inner.counter(
|
||||
CONNECTIONS_TOTAL_METRIC,
|
||||
CONNECTIONS_TOTAL_DESCRIPTION,
|
||||
&[
|
||||
("transport", transport.metric_tag()),
|
||||
("result", "accepted"),
|
||||
],
|
||||
);
|
||||
});
|
||||
ConnectionMetricGuard {
|
||||
telemetry: self.clone(),
|
||||
transport,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn remote_registration_completed(&self, result: &'static str, duration: Duration) {
|
||||
self.with_inner(|inner| {
|
||||
let tags = [("result", result)];
|
||||
inner.counter(
|
||||
REMOTE_REGISTRATION_TOTAL_METRIC,
|
||||
REMOTE_REGISTRATION_TOTAL_DESCRIPTION,
|
||||
&tags,
|
||||
);
|
||||
inner.duration(
|
||||
REMOTE_REGISTRATION_DURATION_METRIC,
|
||||
REMOTE_REGISTRATION_DURATION_DESCRIPTION,
|
||||
duration,
|
||||
&tags,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn remote_websocket_connected(&self) -> RemoteWebSocketMetricGuard {
|
||||
self.with_inner(|inner| {
|
||||
let active = inner.remote_websockets.fetch_add(1, Ordering::AcqRel) + 1;
|
||||
inner.gauge(
|
||||
REMOTE_WEBSOCKET_ACTIVE_METRIC,
|
||||
REMOTE_WEBSOCKET_ACTIVE_DESCRIPTION,
|
||||
active,
|
||||
&[],
|
||||
);
|
||||
});
|
||||
RemoteWebSocketMetricGuard {
|
||||
telemetry: self.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn remote_websocket_connect_completed(
|
||||
&self,
|
||||
result: &'static str,
|
||||
duration: Duration,
|
||||
) {
|
||||
self.with_inner(|inner| {
|
||||
let tags = [("result", result)];
|
||||
inner.counter(
|
||||
REMOTE_WEBSOCKET_CONNECT_TOTAL_METRIC,
|
||||
REMOTE_WEBSOCKET_CONNECT_TOTAL_DESCRIPTION,
|
||||
&tags,
|
||||
);
|
||||
inner.duration(
|
||||
REMOTE_WEBSOCKET_CONNECT_DURATION_METRIC,
|
||||
REMOTE_WEBSOCKET_CONNECT_DURATION_DESCRIPTION,
|
||||
duration,
|
||||
&tags,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn request_completed(
|
||||
&self,
|
||||
method: &'static str,
|
||||
result: &'static str,
|
||||
duration: Duration,
|
||||
) {
|
||||
self.with_inner(|inner| {
|
||||
let tags = [("method", method), ("result", result)];
|
||||
inner.counter(REQUESTS_TOTAL_METRIC, REQUESTS_TOTAL_DESCRIPTION, &tags);
|
||||
inner.duration(
|
||||
REQUEST_DURATION_METRIC,
|
||||
REQUEST_DURATION_DESCRIPTION,
|
||||
duration,
|
||||
&tags,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn process_started(&self) {
|
||||
self.with_inner(|inner| {
|
||||
let active = inner.active_processes.fetch_add(1, Ordering::AcqRel) + 1;
|
||||
inner.gauge(
|
||||
PROCESSES_ACTIVE_METRIC,
|
||||
PROCESSES_ACTIVE_DESCRIPTION,
|
||||
active,
|
||||
&[],
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn process_finished(&self, result: &'static str, duration: Duration) {
|
||||
self.with_inner(|inner| {
|
||||
let active = inner.active_processes.fetch_sub(1, Ordering::AcqRel) - 1;
|
||||
inner.gauge(
|
||||
PROCESSES_ACTIVE_METRIC,
|
||||
PROCESSES_ACTIVE_DESCRIPTION,
|
||||
active,
|
||||
&[],
|
||||
);
|
||||
inner.counter(
|
||||
PROCESSES_FINISHED_TOTAL_METRIC,
|
||||
PROCESSES_FINISHED_TOTAL_DESCRIPTION,
|
||||
&[("result", result)],
|
||||
);
|
||||
inner.duration(
|
||||
PROCESS_DURATION_METRIC,
|
||||
PROCESS_DURATION_DESCRIPTION,
|
||||
duration,
|
||||
&[("result", result)],
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn remote_websocket_reconnect(&self, reason: &'static str) {
|
||||
self.with_inner(|inner| {
|
||||
inner.counter(
|
||||
REMOTE_WEBSOCKET_RECONNECTS_METRIC,
|
||||
REMOTE_WEBSOCKET_RECONNECTS_DESCRIPTION,
|
||||
&[("reason", reason)],
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
fn connection_finished(&self, transport: ConnectionTransport) {
|
||||
self.with_inner(|inner| {
|
||||
let active = inner
|
||||
.connection_counter(transport)
|
||||
.fetch_sub(1, Ordering::AcqRel)
|
||||
- 1;
|
||||
inner.gauge(
|
||||
CONNECTIONS_ACTIVE_METRIC,
|
||||
CONNECTIONS_ACTIVE_DESCRIPTION,
|
||||
active,
|
||||
&[("transport", transport.metric_tag())],
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
fn remote_websocket_disconnected(&self) {
|
||||
self.with_inner(|inner| {
|
||||
let active = inner.remote_websockets.fetch_sub(1, Ordering::AcqRel) - 1;
|
||||
inner.gauge(
|
||||
REMOTE_WEBSOCKET_ACTIVE_METRIC,
|
||||
REMOTE_WEBSOCKET_ACTIVE_DESCRIPTION,
|
||||
active,
|
||||
&[],
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
fn with_inner(&self, emit: impl FnOnce(&ExecServerTelemetryInner)) {
|
||||
if let Some(inner) = &self.inner {
|
||||
emit(inner);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ConnectionMetricGuard {
|
||||
fn drop(&mut self) {
|
||||
self.telemetry.connection_finished(self.transport);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RemoteWebSocketMetricGuard {
|
||||
fn drop(&mut self) {
|
||||
self.telemetry.remote_websocket_disconnected();
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecServerTelemetryInner {
|
||||
fn connection_counter(&self, transport: ConnectionTransport) -> &AtomicI64 {
|
||||
match transport {
|
||||
ConnectionTransport::Relay => &self.relay_connections,
|
||||
ConnectionTransport::Stdio => &self.stdio_connections,
|
||||
ConnectionTransport::WebSocket => &self.websocket_connections,
|
||||
}
|
||||
}
|
||||
|
||||
fn counter(&self, name: &str, description: &str, tags: &[(&str, &str)]) {
|
||||
if self
|
||||
.metrics
|
||||
.counter_with_description(name, description, /*inc*/ 1, tags)
|
||||
.is_err()
|
||||
{
|
||||
warn!(metric = name, "failed to emit exec-server counter");
|
||||
}
|
||||
}
|
||||
|
||||
fn duration(&self, name: &str, description: &str, duration: Duration, tags: &[(&str, &str)]) {
|
||||
if self
|
||||
.metrics
|
||||
.record_duration_seconds_with_description(name, description, duration, tags)
|
||||
.is_err()
|
||||
{
|
||||
warn!(metric = name, "failed to emit exec-server duration");
|
||||
}
|
||||
}
|
||||
|
||||
fn gauge(&self, name: &str, description: &str, value: i64, tags: &[(&str, &str)]) {
|
||||
if self
|
||||
.metrics
|
||||
.gauge_with_description(name, description, value, tags)
|
||||
.is_err()
|
||||
{
|
||||
warn!(metric = name, "failed to emit exec-server gauge");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_otel::MetricsConfig;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
|
||||
use opentelemetry_sdk::metrics::data::Metric;
|
||||
use opentelemetry_sdk::metrics::data::MetricData;
|
||||
use opentelemetry_sdk::metrics::data::ResourceMetrics;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn emits_bounded_exec_server_metrics() {
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let metrics = codex_otel::MetricsClient::new(MetricsConfig::in_memory(
|
||||
"test",
|
||||
"codex-exec-server",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
exporter.clone(),
|
||||
))
|
||||
.expect("metrics");
|
||||
let telemetry = ExecServerTelemetry::new(Some(metrics.clone()));
|
||||
|
||||
let connection = telemetry.connection_started(ConnectionTransport::WebSocket);
|
||||
telemetry.remote_registration_completed("success", Duration::from_millis(5));
|
||||
let remote_websocket = telemetry.remote_websocket_connected();
|
||||
telemetry.remote_websocket_connect_completed("success", Duration::from_millis(7));
|
||||
telemetry.request_completed("process/start", "success", Duration::from_millis(12));
|
||||
telemetry.process_started();
|
||||
telemetry.process_finished("success", Duration::from_millis(34));
|
||||
telemetry.remote_websocket_reconnect("connect_failed");
|
||||
drop(remote_websocket);
|
||||
drop(connection);
|
||||
metrics.shutdown().expect("shutdown metrics");
|
||||
|
||||
let metrics = latest_metrics(&exporter);
|
||||
assert_eq!(
|
||||
metric_points(&metrics, "exec_server_connections_total"),
|
||||
vec![(
|
||||
1.0,
|
||||
BTreeMap::from([
|
||||
("result".to_string(), "accepted".to_string()),
|
||||
("transport".to_string(), "websocket".to_string()),
|
||||
]),
|
||||
)]
|
||||
);
|
||||
assert_eq!(
|
||||
metric_points(&metrics, "exec_server_connections_active"),
|
||||
vec![(
|
||||
0.0,
|
||||
BTreeMap::from([("transport".to_string(), "websocket".to_string())]),
|
||||
)]
|
||||
);
|
||||
assert_eq!(
|
||||
metric_points(&metrics, "exec_server_remote_registration_total"),
|
||||
vec![(
|
||||
1.0,
|
||||
BTreeMap::from([("result".to_string(), "success".to_string())]),
|
||||
)]
|
||||
);
|
||||
assert_eq!(
|
||||
metric_points(&metrics, "exec_server_remote_websocket_connect_total"),
|
||||
vec![(
|
||||
1.0,
|
||||
BTreeMap::from([("result".to_string(), "success".to_string())]),
|
||||
)]
|
||||
);
|
||||
assert_eq!(
|
||||
metric_points(&metrics, "exec_server_remote_websocket_active"),
|
||||
vec![(0.0, BTreeMap::new())]
|
||||
);
|
||||
assert_eq!(
|
||||
metric_points(&metrics, "exec_server_requests_total"),
|
||||
vec![(
|
||||
1.0,
|
||||
BTreeMap::from([
|
||||
("method".to_string(), "process/start".to_string()),
|
||||
("result".to_string(), "success".to_string()),
|
||||
]),
|
||||
)]
|
||||
);
|
||||
assert_eq!(
|
||||
metric_points(&metrics, "exec_server_processes_active"),
|
||||
vec![(0.0, BTreeMap::new())]
|
||||
);
|
||||
assert_eq!(
|
||||
metric_points(&metrics, "exec_server_processes_finished_total"),
|
||||
vec![(
|
||||
1.0,
|
||||
BTreeMap::from([("result".to_string(), "success".to_string())]),
|
||||
)]
|
||||
);
|
||||
assert_eq!(
|
||||
metric_points(&metrics, "exec_server_remote_websocket_reconnects_total"),
|
||||
vec![(
|
||||
1.0,
|
||||
BTreeMap::from([("reason".to_string(), "connect_failed".to_string())]),
|
||||
)]
|
||||
);
|
||||
assert_eq!(
|
||||
histogram_count(&metrics, "exec_server_remote_registration_duration_seconds"),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
histogram_count(
|
||||
&metrics,
|
||||
"exec_server_remote_websocket_connect_duration_seconds"
|
||||
),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
histogram_count(&metrics, "exec_server_request_duration_seconds"),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
histogram_count(&metrics, "exec_server_process_duration_seconds"),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
histogram_sum(&metrics, "exec_server_request_duration_seconds"),
|
||||
0.012
|
||||
);
|
||||
for (name, description, unit) in [
|
||||
(
|
||||
"exec_server_connections_active",
|
||||
CONNECTIONS_ACTIVE_DESCRIPTION,
|
||||
"",
|
||||
),
|
||||
(
|
||||
"exec_server_connections_total",
|
||||
CONNECTIONS_TOTAL_DESCRIPTION,
|
||||
"",
|
||||
),
|
||||
(
|
||||
"exec_server_remote_registration_total",
|
||||
REMOTE_REGISTRATION_TOTAL_DESCRIPTION,
|
||||
"",
|
||||
),
|
||||
(
|
||||
"exec_server_remote_registration_duration_seconds",
|
||||
REMOTE_REGISTRATION_DURATION_DESCRIPTION,
|
||||
"s",
|
||||
),
|
||||
(
|
||||
"exec_server_remote_websocket_active",
|
||||
REMOTE_WEBSOCKET_ACTIVE_DESCRIPTION,
|
||||
"",
|
||||
),
|
||||
(
|
||||
"exec_server_remote_websocket_connect_total",
|
||||
REMOTE_WEBSOCKET_CONNECT_TOTAL_DESCRIPTION,
|
||||
"",
|
||||
),
|
||||
(
|
||||
"exec_server_remote_websocket_connect_duration_seconds",
|
||||
REMOTE_WEBSOCKET_CONNECT_DURATION_DESCRIPTION,
|
||||
"s",
|
||||
),
|
||||
(
|
||||
"exec_server_remote_websocket_reconnects_total",
|
||||
REMOTE_WEBSOCKET_RECONNECTS_DESCRIPTION,
|
||||
"",
|
||||
),
|
||||
("exec_server_requests_total", REQUESTS_TOTAL_DESCRIPTION, ""),
|
||||
(
|
||||
"exec_server_request_duration_seconds",
|
||||
REQUEST_DURATION_DESCRIPTION,
|
||||
"s",
|
||||
),
|
||||
(
|
||||
"exec_server_processes_active",
|
||||
PROCESSES_ACTIVE_DESCRIPTION,
|
||||
"",
|
||||
),
|
||||
(
|
||||
"exec_server_processes_finished_total",
|
||||
PROCESSES_FINISHED_TOTAL_DESCRIPTION,
|
||||
"",
|
||||
),
|
||||
(
|
||||
"exec_server_process_duration_seconds",
|
||||
PROCESS_DURATION_DESCRIPTION,
|
||||
"s",
|
||||
),
|
||||
] {
|
||||
assert_metric_metadata(&metrics, name, description, unit);
|
||||
}
|
||||
}
|
||||
|
||||
fn latest_metrics(exporter: &InMemoryMetricExporter) -> ResourceMetrics {
|
||||
exporter
|
||||
.get_finished_metrics()
|
||||
.expect("finished metrics")
|
||||
.into_iter()
|
||||
.last()
|
||||
.expect("metrics export")
|
||||
}
|
||||
|
||||
fn find_metric<'a>(resource_metrics: &'a ResourceMetrics, name: &str) -> &'a Metric {
|
||||
resource_metrics
|
||||
.scope_metrics()
|
||||
.flat_map(opentelemetry_sdk::metrics::data::ScopeMetrics::metrics)
|
||||
.find(|metric| metric.name() == name)
|
||||
.unwrap_or_else(|| panic!("metric {name} missing"))
|
||||
}
|
||||
|
||||
fn metric_points(
|
||||
resource_metrics: &ResourceMetrics,
|
||||
name: &str,
|
||||
) -> Vec<(f64, BTreeMap<String, String>)> {
|
||||
match find_metric(resource_metrics, name).data() {
|
||||
AggregatedMetrics::I64(MetricData::Gauge(gauge)) => gauge
|
||||
.data_points()
|
||||
.map(|point| (point.value() as f64, attributes_to_map(point.attributes())))
|
||||
.collect(),
|
||||
AggregatedMetrics::U64(MetricData::Sum(sum)) => sum
|
||||
.data_points()
|
||||
.map(|point| (point.value() as f64, attributes_to_map(point.attributes())))
|
||||
.collect(),
|
||||
_ => panic!("unexpected metric data for {name}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn histogram_count(resource_metrics: &ResourceMetrics, name: &str) -> u64 {
|
||||
match find_metric(resource_metrics, name).data() {
|
||||
AggregatedMetrics::F64(MetricData::Histogram(histogram)) => histogram
|
||||
.data_points()
|
||||
.map(opentelemetry_sdk::metrics::data::HistogramDataPoint::count)
|
||||
.sum(),
|
||||
_ => panic!("unexpected histogram data for {name}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn histogram_sum(resource_metrics: &ResourceMetrics, name: &str) -> f64 {
|
||||
match find_metric(resource_metrics, name).data() {
|
||||
AggregatedMetrics::F64(MetricData::Histogram(histogram)) => histogram
|
||||
.data_points()
|
||||
.map(opentelemetry_sdk::metrics::data::HistogramDataPoint::sum)
|
||||
.sum(),
|
||||
_ => panic!("unexpected histogram data for {name}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_metric_metadata(
|
||||
resource_metrics: &ResourceMetrics,
|
||||
name: &str,
|
||||
description: &str,
|
||||
unit: &str,
|
||||
) {
|
||||
let metric = find_metric(resource_metrics, name);
|
||||
assert_eq!(metric.description(), description);
|
||||
assert_eq!(metric.unit(), unit);
|
||||
}
|
||||
|
||||
fn attributes_to_map<'a>(
|
||||
attributes: impl Iterator<Item = &'a KeyValue>,
|
||||
) -> BTreeMap<String, String> {
|
||||
attributes
|
||||
.map(|attribute| {
|
||||
(
|
||||
attribute.key.as_str().to_string(),
|
||||
attribute.value.as_str().to_string(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,8 @@ pub(super) async fn seed_instructions(memory_root: &Path) -> std::io::Result<()>
|
||||
.await
|
||||
{
|
||||
Ok(mut file) => {
|
||||
tokio::io::AsyncWriteExt::write_all(&mut file, INSTRUCTIONS.as_bytes()).await
|
||||
tokio::io::AsyncWriteExt::write_all(&mut file, INSTRUCTIONS.as_bytes()).await?;
|
||||
tokio::io::AsyncWriteExt::flush(&mut file).await
|
||||
}
|
||||
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => Ok(()),
|
||||
Err(err) => Err(err),
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::metrics::validation::validate_tags;
|
||||
use codex_utils_string::sanitize_metric_tag_value;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::metrics::Counter;
|
||||
use opentelemetry::metrics::Gauge;
|
||||
use opentelemetry::metrics::Histogram;
|
||||
use opentelemetry::metrics::Meter;
|
||||
use opentelemetry::metrics::MeterProvider as _;
|
||||
@@ -42,8 +43,9 @@ use tracing::debug;
|
||||
|
||||
const ENV_ATTRIBUTE: &str = "env";
|
||||
const METER_NAME: &str = "codex";
|
||||
const DURATION_UNIT: &str = "ms";
|
||||
const DURATION_DESCRIPTION: &str = "Duration in milliseconds.";
|
||||
const MILLISECOND_DURATION_UNIT: &str = "ms";
|
||||
const MILLISECOND_DURATION_DESCRIPTION: &str = "Duration in milliseconds.";
|
||||
const SECOND_DURATION_UNIT: &str = "s";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct SharedManualReader {
|
||||
@@ -83,6 +85,7 @@ struct MetricsClientInner {
|
||||
meter_provider: SdkMeterProvider,
|
||||
meter: Meter,
|
||||
counters: Mutex<HashMap<String, Counter<u64>>>,
|
||||
gauges: Mutex<HashMap<String, Gauge<i64>>>,
|
||||
histograms: Mutex<HashMap<String, Histogram<f64>>>,
|
||||
duration_histograms: Mutex<HashMap<String, Histogram<f64>>>,
|
||||
runtime_reader: Option<Arc<ManualReader>>,
|
||||
@@ -90,7 +93,13 @@ struct MetricsClientInner {
|
||||
}
|
||||
|
||||
impl MetricsClientInner {
|
||||
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) -> Result<()> {
|
||||
fn counter(
|
||||
&self,
|
||||
name: &str,
|
||||
description: Option<&str>,
|
||||
inc: i64,
|
||||
tags: &[(&str, &str)],
|
||||
) -> Result<()> {
|
||||
validate_metric_name(name)?;
|
||||
if inc < 0 {
|
||||
return Err(MetricsError::NegativeCounterIncrement {
|
||||
@@ -104,9 +113,13 @@ impl MetricsClientInner {
|
||||
.counters
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let counter = counters
|
||||
.entry(name.to_string())
|
||||
.or_insert_with(|| self.meter.u64_counter(name.to_string()).build());
|
||||
let counter = counters.entry(name.to_string()).or_insert_with(|| {
|
||||
let builder = self.meter.u64_counter(name.to_string());
|
||||
match description {
|
||||
Some(description) => builder.with_description(description.to_string()).build(),
|
||||
None => builder.build(),
|
||||
}
|
||||
});
|
||||
counter.add(inc as u64, &attributes);
|
||||
Ok(())
|
||||
}
|
||||
@@ -126,7 +139,39 @@ impl MetricsClientInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn duration_histogram(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> {
|
||||
fn gauge(
|
||||
&self,
|
||||
name: &str,
|
||||
description: Option<&str>,
|
||||
value: i64,
|
||||
tags: &[(&str, &str)],
|
||||
) -> Result<()> {
|
||||
validate_metric_name(name)?;
|
||||
let attributes = self.attributes(tags)?;
|
||||
|
||||
let mut gauges = self
|
||||
.gauges
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let gauge = gauges.entry(name.to_string()).or_insert_with(|| {
|
||||
let builder = self.meter.i64_gauge(name.to_string());
|
||||
match description {
|
||||
Some(description) => builder.with_description(description.to_string()).build(),
|
||||
None => builder.build(),
|
||||
}
|
||||
});
|
||||
gauge.record(value, &attributes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn duration_histogram(
|
||||
&self,
|
||||
name: &str,
|
||||
value: f64,
|
||||
unit: &'static str,
|
||||
description: &str,
|
||||
tags: &[(&str, &str)],
|
||||
) -> Result<()> {
|
||||
validate_metric_name(name)?;
|
||||
let attributes = self.attributes(tags)?;
|
||||
|
||||
@@ -137,11 +182,11 @@ impl MetricsClientInner {
|
||||
let histogram = histograms.entry(name.to_string()).or_insert_with(|| {
|
||||
self.meter
|
||||
.f64_histogram(name.to_string())
|
||||
.with_unit(DURATION_UNIT)
|
||||
.with_description(DURATION_DESCRIPTION)
|
||||
.with_unit(unit)
|
||||
.with_description(description.to_string())
|
||||
.build()
|
||||
});
|
||||
histogram.record(value as f64, &attributes);
|
||||
histogram.record(value, &attributes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -233,6 +278,7 @@ impl MetricsClient {
|
||||
meter_provider,
|
||||
meter,
|
||||
counters: Mutex::new(HashMap::new()),
|
||||
gauges: Mutex::new(HashMap::new()),
|
||||
histograms: Mutex::new(HashMap::new()),
|
||||
duration_histograms: Mutex::new(HashMap::new()),
|
||||
runtime_reader,
|
||||
@@ -242,7 +288,18 @@ impl MetricsClient {
|
||||
|
||||
/// Send a single counter increment.
|
||||
pub fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) -> Result<()> {
|
||||
self.0.counter(name, inc, tags)
|
||||
self.0.counter(name, /*description*/ None, inc, tags)
|
||||
}
|
||||
|
||||
/// Send a single counter increment with an instrument description.
|
||||
pub fn counter_with_description(
|
||||
&self,
|
||||
name: &str,
|
||||
description: &str,
|
||||
inc: i64,
|
||||
tags: &[(&str, &str)],
|
||||
) -> Result<()> {
|
||||
self.0.counter(name, Some(description), inc, tags)
|
||||
}
|
||||
|
||||
/// Send a single histogram sample.
|
||||
@@ -250,6 +307,22 @@ impl MetricsClient {
|
||||
self.0.histogram(name, value, tags)
|
||||
}
|
||||
|
||||
/// Send a single gauge measurement.
|
||||
pub fn gauge(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> {
|
||||
self.0.gauge(name, /*description*/ None, value, tags)
|
||||
}
|
||||
|
||||
/// Send a single gauge measurement with an instrument description.
|
||||
pub fn gauge_with_description(
|
||||
&self,
|
||||
name: &str,
|
||||
description: &str,
|
||||
value: i64,
|
||||
tags: &[(&str, &str)],
|
||||
) -> Result<()> {
|
||||
self.0.gauge(name, Some(description), value, tags)
|
||||
}
|
||||
|
||||
/// Record a duration in milliseconds using a histogram.
|
||||
pub fn record_duration(
|
||||
&self,
|
||||
@@ -259,7 +332,26 @@ impl MetricsClient {
|
||||
) -> Result<()> {
|
||||
self.0.duration_histogram(
|
||||
name,
|
||||
duration.as_millis().min(i64::MAX as u128) as i64,
|
||||
duration.as_millis().min(i64::MAX as u128) as f64,
|
||||
MILLISECOND_DURATION_UNIT,
|
||||
MILLISECOND_DURATION_DESCRIPTION,
|
||||
tags,
|
||||
)
|
||||
}
|
||||
|
||||
/// Record a duration in seconds using a histogram with an instrument description.
|
||||
pub fn record_duration_seconds_with_description(
|
||||
&self,
|
||||
name: &str,
|
||||
description: &str,
|
||||
duration: Duration,
|
||||
tags: &[(&str, &str)],
|
||||
) -> Result<()> {
|
||||
self.0.duration_histogram(
|
||||
name,
|
||||
duration.as_secs_f64(),
|
||||
SECOND_DURATION_UNIT,
|
||||
description,
|
||||
tags,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ const KNOWN_ORIGINATOR_TAG_VALUES: &[&str] = &[
|
||||
"none",
|
||||
"codex_exec",
|
||||
"codex-cli",
|
||||
"codex-exec-server",
|
||||
"codex_sdk_ts",
|
||||
"codex-app-server-sdk",
|
||||
];
|
||||
@@ -80,6 +81,7 @@ mod tests {
|
||||
use super::SERVICE_NAME_TAG;
|
||||
use super::SESSION_SOURCE_TAG;
|
||||
use super::SessionMetricTagValues;
|
||||
use super::bounded_originator_tag_value;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
@@ -131,4 +133,12 @@ mod tests {
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bounded_originator_tag_value_accepts_exec_server() {
|
||||
assert_eq!(
|
||||
bounded_originator_tag_value("codex-exec-server"),
|
||||
"codex-exec-server"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,6 +186,12 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
|
||||
))?;
|
||||
|
||||
metrics.counter("codex.turns", /*inc*/ 1, &[("source", "test")])?;
|
||||
metrics.gauge_with_description(
|
||||
"exec_server_connections_active",
|
||||
"Number of active exec-server connections.",
|
||||
/*value*/ 1,
|
||||
&[("transport", "websocket")],
|
||||
)?;
|
||||
metrics.shutdown()?;
|
||||
|
||||
server.join().expect("server join");
|
||||
@@ -220,10 +226,119 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
|
||||
"expected metric name not found; body prefix: {}",
|
||||
&body.chars().take(2000).collect::<String>()
|
||||
);
|
||||
assert!(
|
||||
body.contains("exec_server_connections_active"),
|
||||
"expected exec-server gauge not found; body prefix: {}",
|
||||
&body.chars().take(2000).collect::<String>()
|
||||
);
|
||||
assert!(
|
||||
body.contains("websocket"),
|
||||
"expected exec-server metric tag not found; body prefix: {}",
|
||||
&body.chars().take(2000).collect::<String>()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otlp_http_exporter_sends_logs_to_collector()
|
||||
-> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
|
||||
let addr = listener.local_addr().expect("local_addr");
|
||||
listener.set_nonblocking(true).expect("set_nonblocking");
|
||||
|
||||
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
|
||||
let server = thread::spawn(move || {
|
||||
let mut captured = Vec::new();
|
||||
let deadline = Instant::now() + Duration::from_secs(3);
|
||||
|
||||
while Instant::now() < deadline {
|
||||
match listener.accept() {
|
||||
Ok((mut stream, _)) => {
|
||||
let result = read_http_request(&mut stream);
|
||||
let _ = write_http_response(&mut stream, "202 Accepted");
|
||||
if let Ok((path, headers, body)) = result {
|
||||
captured.push(CapturedRequest {
|
||||
path,
|
||||
content_type: headers.get("content-type").cloned(),
|
||||
body,
|
||||
});
|
||||
}
|
||||
}
|
||||
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
let _ = tx.send(captured);
|
||||
});
|
||||
|
||||
let otel = OtelProvider::from(&OtelSettings {
|
||||
environment: "test".to_string(),
|
||||
service_name: "codex-exec-server".to_string(),
|
||||
service_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
codex_home: PathBuf::from("."),
|
||||
exporter: OtelExporter::OtlpHttp {
|
||||
endpoint: format!("http://{addr}/v1/logs"),
|
||||
headers: HashMap::new(),
|
||||
protocol: OtelHttpProtocol::Json,
|
||||
tls: None,
|
||||
},
|
||||
trace_exporter: OtelExporter::None,
|
||||
metrics_exporter: OtelExporter::None,
|
||||
runtime_metrics: false,
|
||||
span_attributes: BTreeMap::new(),
|
||||
tracestate: BTreeMap::new(),
|
||||
})?
|
||||
.expect("otel provider");
|
||||
let logger_layer = otel.logger_layer().expect("logger layer");
|
||||
let subscriber = tracing_subscriber::registry().with(logger_layer);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
tracing::event!(
|
||||
target: "codex_otel.log_only",
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.exec_server.remote_environment_registered",
|
||||
"codex exec-server remote environment registered"
|
||||
);
|
||||
});
|
||||
otel.shutdown();
|
||||
|
||||
server.join().expect("server join");
|
||||
let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured");
|
||||
|
||||
let request = captured
|
||||
.iter()
|
||||
.find(|req| req.path == "/v1/logs")
|
||||
.unwrap_or_else(|| {
|
||||
let paths = captured
|
||||
.iter()
|
||||
.map(|req| req.path.as_str())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
panic!("missing /v1/logs request; got {}: {paths}", captured.len());
|
||||
});
|
||||
let content_type = request
|
||||
.content_type
|
||||
.as_deref()
|
||||
.unwrap_or("<missing content-type>");
|
||||
assert!(
|
||||
content_type.starts_with("application/json"),
|
||||
"unexpected content-type: {content_type}"
|
||||
);
|
||||
|
||||
let body = String::from_utf8_lossy(&request.body);
|
||||
assert!(
|
||||
body.contains("codex.exec_server.remote_environment_registered"),
|
||||
"expected exec-server event not found; body prefix: {}",
|
||||
&body.chars().take(2000).collect::<String>()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otel_provider_rejects_header_unsafe_configured_tracestate() {
|
||||
let result = OtelProvider::from(&OtelSettings {
|
||||
@@ -341,6 +456,12 @@ fn otlp_http_exporter_sends_traces_to_collector()
|
||||
let _guard = span.enter();
|
||||
let propagated_trace =
|
||||
current_span_w3c_trace_context().expect("current span should have trace context");
|
||||
tracing::event!(
|
||||
target: "codex_otel.trace_safe",
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.exec_server.remote_environment_registered",
|
||||
"codex exec-server remote environment registered"
|
||||
);
|
||||
tracing::info!("trace loopback event");
|
||||
propagated_trace
|
||||
});
|
||||
@@ -393,7 +514,11 @@ fn otlp_http_exporter_sends_traces_to_collector()
|
||||
"expected configured span attribute not found; body prefix: {}",
|
||||
&body.chars().take(2000).collect::<String>()
|
||||
);
|
||||
|
||||
assert!(
|
||||
body.contains("codex.exec_server.remote_environment_registered"),
|
||||
"expected exec-server trace event not found; body prefix: {}",
|
||||
&body.chars().take(2000).collect::<String>()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -13,8 +13,9 @@ fn send_builds_payload_with_tags_and_histograms() -> Result<()> {
|
||||
let (metrics, exporter) =
|
||||
build_metrics_with_defaults(&[("service", "codex-cli"), ("env", "prod")])?;
|
||||
|
||||
metrics.counter(
|
||||
metrics.counter_with_description(
|
||||
"codex.turns",
|
||||
"Total number of Codex turns.",
|
||||
/*inc*/ 1,
|
||||
&[("model", "gpt-5.1"), ("env", "dev")],
|
||||
)?;
|
||||
@@ -23,11 +24,18 @@ fn send_builds_payload_with_tags_and_histograms() -> Result<()> {
|
||||
/*value*/ 25,
|
||||
&[("tool", "shell")],
|
||||
)?;
|
||||
metrics.gauge_with_description(
|
||||
"codex.active",
|
||||
"Number of active Codex operations.",
|
||||
/*value*/ 2,
|
||||
&[("component", "exec-server")],
|
||||
)?;
|
||||
metrics.shutdown()?;
|
||||
|
||||
let resource_metrics = latest_metrics(&exporter);
|
||||
|
||||
let counter = find_metric(&resource_metrics, "codex.turns").expect("counter metric missing");
|
||||
assert_eq!(counter.description(), "Total number of Codex turns.");
|
||||
let counter_attributes = match counter.data() {
|
||||
opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data {
|
||||
opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => {
|
||||
@@ -78,6 +86,27 @@ fn send_builds_payload_with_tags_and_histograms() -> Result<()> {
|
||||
]);
|
||||
assert_eq!(histogram_attrs, expected_histogram_attributes);
|
||||
|
||||
let gauge = find_metric(&resource_metrics, "codex.active").expect("gauge metric missing");
|
||||
assert_eq!(gauge.description(), "Number of active Codex operations.");
|
||||
let gauge_point = match gauge.data() {
|
||||
opentelemetry_sdk::metrics::data::AggregatedMetrics::I64(data) => match data {
|
||||
opentelemetry_sdk::metrics::data::MetricData::Gauge(gauge) => {
|
||||
gauge.data_points().next().expect("gauge point")
|
||||
}
|
||||
_ => panic!("unexpected gauge aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected gauge metric data type"),
|
||||
};
|
||||
assert_eq!(gauge_point.value(), 2);
|
||||
assert_eq!(
|
||||
attributes_to_map(gauge_point.attributes()),
|
||||
BTreeMap::from([
|
||||
("component".to_string(), "exec-server".to_string()),
|
||||
("env".to_string(), "prod".to_string()),
|
||||
("service".to_string(), "codex-cli".to_string()),
|
||||
])
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,37 @@ fn record_duration_records_histogram() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn record_duration_seconds_with_description_records_fractional_seconds() -> Result<()> {
|
||||
let (metrics, exporter) = build_metrics_with_defaults(&[])?;
|
||||
|
||||
metrics.record_duration_seconds_with_description(
|
||||
"exec_server_request_duration_seconds",
|
||||
"Duration of exec-server requests in seconds.",
|
||||
Duration::from_millis(15),
|
||||
&[("method", "initialize")],
|
||||
)?;
|
||||
metrics.shutdown()?;
|
||||
|
||||
let resource_metrics = latest_metrics(&exporter);
|
||||
let (bounds, bucket_counts, sum, count) =
|
||||
histogram_data(&resource_metrics, "exec_server_request_duration_seconds");
|
||||
assert!(!bounds.is_empty());
|
||||
assert_eq!(bucket_counts.iter().sum::<u64>(), 1);
|
||||
assert_eq!(sum, 0.015);
|
||||
assert_eq!(count, 1);
|
||||
let metric =
|
||||
crate::harness::find_metric(&resource_metrics, "exec_server_request_duration_seconds")
|
||||
.unwrap_or_else(|| panic!("metric exec_server_request_duration_seconds missing"));
|
||||
assert_eq!(metric.unit(), "s");
|
||||
assert_eq!(
|
||||
metric.description(),
|
||||
"Duration of exec-server requests in seconds."
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ensures time_result returns the closure output and records timing.
|
||||
#[test]
|
||||
fn timer_result_records_success() -> Result<()> {
|
||||
|
||||
@@ -65,6 +65,8 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::utils::create_env_for_mcp_server;
|
||||
use anyhow::Result;
|
||||
#[cfg(unix)]
|
||||
use std::ffi::OsStr;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
@@ -75,24 +77,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_unix_executes_script_without_extension() -> Result<()> {
|
||||
let env = TestExecutableEnv::new()?;
|
||||
// Linux can transiently report ETXTBSY while the freshly written test
|
||||
// script is becoming executable on the backing filesystem.
|
||||
let mut retries = 0;
|
||||
let output = loop {
|
||||
let mut cmd = Command::new(&env.program_name);
|
||||
cmd.envs(&env.mcp_env);
|
||||
|
||||
let output = cmd.output().await;
|
||||
if !output
|
||||
.as_ref()
|
||||
.is_err_and(|err| err.kind() == std::io::ErrorKind::ExecutableFileBusy)
|
||||
|| retries == 2
|
||||
{
|
||||
break output;
|
||||
}
|
||||
retries += 1;
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
};
|
||||
let output = output_with_etxtbsy_retry(OsStr::new(&env.program_name), &env.mcp_env).await;
|
||||
|
||||
assert!(
|
||||
output.is_ok(),
|
||||
@@ -145,9 +130,14 @@ mod tests {
|
||||
let resolved = resolve(program, &env.mcp_env, std::env::current_dir()?.as_path())?;
|
||||
|
||||
// Verify resolved path executes successfully
|
||||
let mut cmd = Command::new(resolved);
|
||||
cmd.envs(&env.mcp_env);
|
||||
let output = cmd.output().await;
|
||||
#[cfg(unix)]
|
||||
let output = output_with_etxtbsy_retry(resolved.as_os_str(), &env.mcp_env).await;
|
||||
#[cfg(windows)]
|
||||
let output = {
|
||||
let mut cmd = Command::new(resolved);
|
||||
cmd.envs(&env.mcp_env);
|
||||
cmd.output().await
|
||||
};
|
||||
|
||||
assert!(
|
||||
output.is_ok(),
|
||||
@@ -156,6 +146,31 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn output_with_etxtbsy_retry(
|
||||
program: &OsStr,
|
||||
mcp_env: &HashMap<OsString, OsString>,
|
||||
) -> std::io::Result<std::process::Output> {
|
||||
// Linux can transiently report ETXTBSY while the freshly written test
|
||||
// script is becoming executable on the backing filesystem.
|
||||
let mut retries = 0;
|
||||
loop {
|
||||
let mut cmd = Command::new(program);
|
||||
cmd.envs(mcp_env);
|
||||
|
||||
let output = cmd.output().await;
|
||||
if !output
|
||||
.as_ref()
|
||||
.is_err_and(|err| err.kind() == std::io::ErrorKind::ExecutableFileBusy)
|
||||
|| retries == 2
|
||||
{
|
||||
return output;
|
||||
}
|
||||
retries += 1;
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Test fixture for creating temporary executables in a controlled environment.
|
||||
struct TestExecutableEnv {
|
||||
// Held to prevent the temporary directory from being deleted.
|
||||
|
||||
Reference in New Issue
Block a user