Add OTEL tracing feature. Track session span for now

This commit is contained in:
Vishnu Chopra
2025-08-09 19:34:26 +01:00
committed by Anton Panasenko
parent 42d335deb8
commit 4c9c63d725
12 changed files with 1078 additions and 15 deletions

331
codex-rs/Cargo.lock generated
View File

@@ -316,6 +316,17 @@ dependencies = [
"syn 2.0.104",
]
[[package]]
name = "async-trait"
version = "0.1.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.104",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@@ -328,6 +339,53 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "axum"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper",
"tower-layer",
"tower-service",
]
[[package]]
name = "backtrace"
version = "0.3.75"
@@ -637,6 +695,7 @@ dependencies = [
"codex-mcp-server",
"codex-protocol",
"codex-protocol-ts",
"codex-telemetry",
"codex-tui",
"predicates",
"pretty_assertions",
@@ -673,6 +732,7 @@ dependencies = [
"codex-file-search",
"codex-mcp-client",
"codex-protocol",
"codex-telemetry",
"core_test_support",
"dirs",
"env-flags",
@@ -728,6 +788,7 @@ dependencies = [
"codex-core",
"codex-ollama",
"codex-protocol",
"codex-telemetry",
"core_test_support",
"libc",
"owo-colors",
@@ -737,6 +798,7 @@ dependencies = [
"tempfile",
"tokio",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"uuid",
"walkdir",
@@ -900,6 +962,29 @@ dependencies = [
"ts-rs",
]
[[package]]
name = "codex-telemetry"
version = "0.1.0"
dependencies = [
"chrono",
"futures-util",
"hex",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-proto",
"opentelemetry_sdk",
"prost",
"rand 0.8.5",
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"tonic",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
]
[[package]]
name = "codex-tui"
version = "0.0.0"
@@ -918,6 +1003,7 @@ dependencies = [
"codex-login",
"codex-ollama",
"codex-protocol",
"codex-telemetry",
"color-eyre",
"crossterm",
"diffy",
@@ -949,6 +1035,7 @@ dependencies = [
"tokio-stream",
"tracing",
"tracing-appender",
"tracing-opentelemetry",
"tracing-subscriber",
"unicode-segmentation",
"unicode-width 0.1.14",
@@ -1903,6 +1990,12 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "glob"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]]
name = "globset"
version = "0.4.16"
@@ -2093,6 +2186,19 @@ dependencies = [
"tower-service",
]
[[package]]
name = "hyper-timeout"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
dependencies = [
"hyper",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "hyper-tls"
version = "0.6.0"
@@ -2127,7 +2233,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2",
"socket2 0.6.0",
"system-configuration",
"tokio",
"tower-service",
@@ -2719,6 +2825,12 @@ dependencies = [
"regex-automata",
]
[[package]]
name = "matchit"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "mcp-types"
version = "0.0.0"
@@ -3110,6 +3222,86 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "opentelemetry"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17"
dependencies = [
"futures-core",
"futures-sink",
"js-sys",
"once_cell",
"pin-project-lite",
"thiserror 1.0.69",
]
[[package]]
name = "opentelemetry-http"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6351496aeaa49d7c267fb480678d85d1cd30c5edb20b497c48c56f62a8c14b99"
dependencies = [
"async-trait",
"bytes",
"http",
"opentelemetry",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd"
dependencies = [
"async-trait",
"futures-core",
"http",
"opentelemetry",
"opentelemetry-http",
"opentelemetry-proto",
"opentelemetry_sdk",
"prost",
"thiserror 1.0.69",
"tokio",
"tonic",
]
[[package]]
name = "opentelemetry-proto"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34"
dependencies = [
"hex",
"opentelemetry",
"opentelemetry_sdk",
"prost",
"serde",
"tonic",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3"
dependencies = [
"async-trait",
"futures-channel",
"futures-executor",
"futures-util",
"glob",
"once_cell",
"opentelemetry",
"percent-encoding",
"rand 0.8.5",
"serde_json",
"thiserror 1.0.69",
"tokio",
"tokio-stream",
]
[[package]]
name = "option-ext"
version = "0.2.0"
@@ -3224,6 +3416,26 @@ dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.104",
]
[[package]]
name = "pin-project-lite"
version = "0.2.16"
@@ -3384,6 +3596,29 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prost"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-derive"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn 2.0.104",
]
[[package]]
name = "pulldown-cmark"
version = "0.10.3"
@@ -3657,7 +3892,7 @@ dependencies = [
"tokio",
"tokio-native-tls",
"tokio-util",
"tower",
"tower 0.5.2",
"tower-http",
"tower-service",
"url",
@@ -4181,6 +4416,16 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7c388c1b5e93756d0c740965c41e8822f866621d41acbdf6336a6a168f8840c"
[[package]]
name = "socket2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.0"
@@ -4661,7 +4906,7 @@ dependencies = [
"pin-project-lite",
"signal-hook-registry",
"slab",
"socket2",
"socket2 0.6.0",
"tokio-macros",
"windows-sys 0.59.0",
]
@@ -4786,6 +5031,56 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64"
[[package]]
name = "tonic"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64",
"bytes",
"h2",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"prost",
"socket2 0.5.10",
"tokio",
"tokio-stream",
"tower 0.4.13",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"indexmap 1.9.3",
"pin-project",
"pin-project-lite",
"rand 0.8.5",
"slab",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.5.2"
@@ -4814,7 +5109,7 @@ dependencies = [
"http-body",
"iri-string",
"pin-project-lite",
"tower",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
@@ -4897,6 +5192,24 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-opentelemetry"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b"
dependencies = [
"js-sys",
"once_cell",
"opentelemetry",
"opentelemetry_sdk",
"smallvec",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
"web-time",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.20"
@@ -5258,6 +5571,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "web-time"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "webbrowser"
version = "1.0.5"

View File

@@ -18,6 +18,7 @@ members = [
"protocol",
"protocol-ts",
"tui",
"codex-telemetry",
]
resolver = "2"

View File

@@ -27,6 +27,7 @@ codex-login = { path = "../login" }
codex-mcp-server = { path = "../mcp-server" }
codex-protocol = { path = "../protocol" }
codex-tui = { path = "../tui" }
codex-telemetry = { path = "../codex-telemetry", features = ["otel"] }
serde_json = "1"
tokio = { version = "1", features = [
"io-std",

View File

@@ -22,6 +22,8 @@ mod mcp_cmd;
use crate::mcp_cmd::McpCli;
use crate::proto::ProtoCli;
// Telemetry is initialized after Config is loaded in subcommands.
/// Codex CLI
///
/// If no subcommand is specified, options will be forwarded to the interactive CLI.

View File

@@ -0,0 +1,48 @@
[package]
name = "codex-telemetry"
version = "0.1.0"
edition = "2021"
[features]
# Compile-time gate for telemetry; disabled by default.
# Downstream crates can enable via `features = ["otel"]`.
default = []
otel = [
"dep:tracing-subscriber",
"dep:tracing-opentelemetry",
"dep:opentelemetry",
"dep:opentelemetry_sdk",
"dep:opentelemetry-otlp",
"dep:opentelemetry-proto",
"dep:prost",
"dep:tokio",
"dep:tonic",
"dep:futures-util",
"dep:serde",
"dep:serde_json",
"dep:hex",
"dep:chrono",
"dep:rand",
]
[dependencies]
# Optional to keep build lean unless feature enabled
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["registry", "fmt"], optional = true }
tracing-opentelemetry = { version = "0.27", optional = true }
opentelemetry = { version = "0.26", features = ["trace"], optional = true }
opentelemetry_sdk = { version = "0.26", features = ["trace", "rt-tokio"], optional = true }
opentelemetry-otlp = { version = "0.26", features = ["tonic", "http-proto"], optional = true }
opentelemetry-proto = { version = "0.26", features = ["gen-tonic"], optional = true }
prost = { version = "0.13", optional = true }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"], optional = true }
tonic = { version = "0.12", optional = true }
futures-util = { version = "0.3", optional = true }
serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true }
hex = { version = "0.4", optional = true }
chrono = { version = "0.4", default-features = false, features = ["std", "clock"], optional = true }
rand = { version = "0.8", optional = true }
# Always-available deps (small)
thiserror = "1"

View File

@@ -0,0 +1,620 @@
//! Telemetry initialization and OTLP File Exporter for Codex.
#[cfg(feature = "otel")]
mod imp {
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::Mutex;
use std::time::SystemTime;
use opentelemetry::global;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::{KeyValue, Value};
use opentelemetry_otlp as otlp;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
use opentelemetry_sdk::export::ExportError;
use opentelemetry_sdk::resource::Resource;
use opentelemetry_sdk::trace::{self as sdktrace, TracerProvider};
use serde::Serialize;
use serde_json::json;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{layer::SubscriberExt, Registry};
use std::collections::HashMap;
#[derive(Clone, Debug)]
pub enum TelemetryExporter {
None,
OtlpFile { path: PathBuf, rotate_mb: Option<u64> },
OtlpGrpc { endpoint: String, headers: Vec<(String, String)> },
OtlpHttp { endpoint: String, headers: Vec<(String, String)> },
}
#[derive(Clone, Debug)]
pub struct TelemetrySettings {
pub enabled: bool,
pub exporter: TelemetryExporter,
pub service_name: String,
pub service_version: String,
pub codex_home: Option<PathBuf>,
}
pub struct TelemetryGuard {
provider: TracerProvider,
}
impl Drop for TelemetryGuard {
fn drop(&mut self) {
let _ = self.provider.shutdown();
}
}
fn debug_enabled() -> bool {
std::env::var("CODEX_TELEMETRY_DEBUG").map(|v| v == "1" || v.eq_ignore_ascii_case("true")).unwrap_or(false)
}
pub fn init_from_settings(settings: &TelemetrySettings) -> Option<TelemetryGuard> {
if !settings.enabled {
return None;
}
let resource = Resource::default().merge(&Resource::new(vec![
KeyValue::new("service.name", settings.service_name.clone()),
KeyValue::new("service.version", settings.service_version.clone()),
]));
let trace_config = sdktrace::Config::default().with_resource(resource.clone());
let mut provider_builder = sdktrace::TracerProvider::builder().with_config(trace_config);
// Build base subscriber; add a console fmt layer when CODEX_TELEMETRY_DEBUG is enabled
use tracing_subscriber::prelude::*;
let subscriber = Registry::default().with(if debug_enabled() {
eprintln!("[codex-telemetry] debug enabled");
Some(tracing_subscriber::fmt::layer().with_ansi(true).with_target(true).with_level(true))
} else { None });
match &settings.exporter {
TelemetryExporter::None => {
if debug_enabled() { eprintln!("[codex-telemetry] exporter=None"); }
let provider = provider_builder.build();
let tracer = provider.tracer(settings.service_name.clone());
let otel_layer = OpenTelemetryLayer::new(tracer);
let subscriber = subscriber.with(otel_layer);
if let Err(e) = tracing::subscriber::set_global_default(subscriber) {
eprintln!("[codex-telemetry] ERROR: failed to set global subscriber: {e}");
}
global::set_tracer_provider(provider.clone());
return Some(TelemetryGuard { provider });
}
TelemetryExporter::OtlpFile { path: _ignored, rotate_mb } => {
let final_path = determine_traces_file_path(settings.codex_home.as_ref());
if debug_enabled() { eprintln!("[codex-telemetry] using OTLP JSON file exporter: {} (rotate_mb={:?})", final_path.display(), rotate_mb); }
let mut resource_attributes: Vec<JsonKeyValue> = Vec::new();
for (k, v) in resource.iter() {
resource_attributes.push(JsonKeyValue { key: k.as_str().to_string(), value: json_any_from(v.clone()) });
}
let exporter = OtlpJsonFileExporter::new(final_path, *rotate_mb, resource_attributes)
.expect("create OTLP JSON file exporter");
let batch = sdktrace::BatchSpanProcessor::builder(
exporter,
opentelemetry_sdk::runtime::Tokio,
)
.build();
let trace_config2 = sdktrace::Config::default().with_resource(resource);
provider_builder = sdktrace::TracerProvider::builder()
.with_span_processor(batch)
.with_config(trace_config2);
}
TelemetryExporter::OtlpGrpc { endpoint, headers } => {
if debug_enabled() { eprintln!("[codex-telemetry] using OTLP gRPC exporter: endpoint={} headers={} pairs", endpoint, headers.len()); }
let mut exp = otlp::new_exporter().tonic().with_endpoint(endpoint.clone());
if !headers.is_empty() {
let mut map = tonic::metadata::MetadataMap::new();
for (k, v) in headers {
let key = k.parse::<tonic::metadata::MetadataKey<tonic::metadata::Ascii>>();
let val = v.parse::<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>();
if let (Ok(key), Ok(val)) = (key, val) {
let _ = map.insert(key, val);
}
}
exp = exp.with_metadata(map);
}
let exporter = exp
.build_span_exporter()
.expect("install OTLP gRPC exporter");
let batch = sdktrace::BatchSpanProcessor::builder(
exporter,
opentelemetry_sdk::runtime::Tokio,
)
.build();
provider_builder = sdktrace::TracerProvider::builder()
.with_span_processor(batch)
.with_config(sdktrace::Config::default().with_resource(resource));
}
TelemetryExporter::OtlpHttp { endpoint, headers } => {
if debug_enabled() { eprintln!("[codex-telemetry] using OTLP HTTP exporter: endpoint={} headers={} pairs", endpoint, headers.len()); }
let mut exp = otlp::new_exporter().http().with_endpoint(endpoint.clone());
if !headers.is_empty() {
let mut map: HashMap<String, String> = HashMap::new();
for (k, v) in headers {
map.insert(k.clone(), v.clone());
}
exp = exp.with_headers(map);
}
let exporter = exp
.build_span_exporter()
.expect("install OTLP HTTP exporter");
let batch = sdktrace::BatchSpanProcessor::builder(
exporter,
opentelemetry_sdk::runtime::Tokio,
)
.build();
provider_builder = sdktrace::TracerProvider::builder()
.with_span_processor(batch)
.with_config(sdktrace::Config::default().with_resource(resource));
}
}
let provider = provider_builder.build();
let tracer = provider.tracer(settings.service_name.clone());
let otel_layer = OpenTelemetryLayer::new(tracer);
let subscriber = subscriber.with(otel_layer);
if let Err(e) = tracing::subscriber::set_global_default(subscriber) {
eprintln!("[codex-telemetry] ERROR: failed to set global subscriber: {e}");
}
global::set_tracer_provider(provider.clone());
Some(TelemetryGuard { provider })
}
/// Build an OpenTelemetry Layer without installing a global subscriber.
/// Caller should attach the returned Layer to their existing subscriber and
/// hold onto the Guard for the process lifetime to ensure clean shutdown.
pub fn build_layer(
settings: &TelemetrySettings,
) -> Option<(
TelemetryGuard,
opentelemetry_sdk::trace::Tracer,
)> {
if !settings.enabled {
return None;
}
let resource = Resource::default().merge(&Resource::new(vec![
KeyValue::new("service.name", settings.service_name.clone()),
KeyValue::new("service.version", settings.service_version.clone()),
]));
let trace_config = sdktrace::Config::default().with_resource(resource.clone());
let mut provider_builder = sdktrace::TracerProvider::builder().with_config(trace_config);
match &settings.exporter {
TelemetryExporter::None => {
if debug_enabled() { eprintln!("[codex-telemetry] build_layer: exporter=None"); }
let provider = provider_builder.build();
let tracer = provider.tracer(settings.service_name.clone());
return Some((TelemetryGuard { provider }, tracer));
}
TelemetryExporter::OtlpFile { path: _ignored, rotate_mb } => {
let final_path = determine_traces_file_path(settings.codex_home.as_ref());
if debug_enabled() { eprintln!("[codex-telemetry] build_layer: file exporter at {} (rotate_mb={:?})", final_path.display(), rotate_mb); }
let mut resource_attributes: Vec<JsonKeyValue> = Vec::new();
for (k, v) in resource.iter() {
resource_attributes.push(JsonKeyValue { key: k.as_str().to_string(), value: json_any_from(v.clone()) });
}
let exporter = OtlpJsonFileExporter::new(final_path, *rotate_mb, resource_attributes)
.expect("create OTLP JSON file exporter");
let batch = sdktrace::BatchSpanProcessor::builder(
exporter,
opentelemetry_sdk::runtime::Tokio,
)
.build();
provider_builder = sdktrace::TracerProvider::builder()
.with_span_processor(batch)
.with_config(sdktrace::Config::default().with_resource(resource));
}
TelemetryExporter::OtlpGrpc { endpoint, headers } => {
if debug_enabled() { eprintln!("[codex-telemetry] build_layer: grpc exporter endpoint={} headers={} pairs", endpoint, headers.len()); }
let mut exp = otlp::new_exporter().tonic().with_endpoint(endpoint.clone());
if !headers.is_empty() {
let mut map = tonic::metadata::MetadataMap::new();
for (k, v) in headers {
let key = k.parse::<tonic::metadata::MetadataKey<tonic::metadata::Ascii>>();
let val = v.parse::<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>();
if let (Ok(key), Ok(val)) = (key, val) {
let _ = map.insert(key, val);
}
}
exp = exp.with_metadata(map);
}
let exporter = exp
.build_span_exporter()
.expect("install OTLP gRPC exporter");
let batch = sdktrace::BatchSpanProcessor::builder(
exporter,
opentelemetry_sdk::runtime::Tokio,
)
.build();
provider_builder = sdktrace::TracerProvider::builder()
.with_span_processor(batch)
.with_config(sdktrace::Config::default().with_resource(resource));
}
TelemetryExporter::OtlpHttp { endpoint, headers } => {
if debug_enabled() { eprintln!("[codex-telemetry] build_layer: http exporter endpoint={} headers={} pairs", endpoint, headers.len()); }
let mut exp = otlp::new_exporter().http().with_endpoint(endpoint.clone());
if !headers.is_empty() {
let mut map: HashMap<String, String> = HashMap::new();
for (k, v) in headers {
map.insert(k.clone(), v.clone());
}
exp = exp.with_headers(map);
}
let exporter = exp
.build_span_exporter()
.expect("install OTLP HTTP exporter");
let batch = sdktrace::BatchSpanProcessor::builder(
exporter,
opentelemetry_sdk::runtime::Tokio,
)
.build();
provider_builder = sdktrace::TracerProvider::builder()
.with_span_processor(batch)
.with_config(sdktrace::Config::default().with_resource(resource));
}
}
let provider = provider_builder.build();
let tracer = provider.tracer(settings.service_name.clone());
Some((TelemetryGuard { provider }, tracer))
}
/// Create a span representing a Codex session; store and drop it to delimit the session.
pub fn make_session_span(session_id: &str, model: &str, provider: &str) -> tracing::Span {
tracing::info_span!(
"codex.session",
session.id = %session_id,
model = %model,
provider = %provider
)
}
#[derive(Debug)]
struct OtlpJsonFileExporter {
writer: Mutex<BufWriter<File>>,
path: PathBuf,
rotate_bytes: Option<u64>,
resource_attributes: Vec<JsonKeyValue>,
}
#[derive(Debug)]
struct FileExportError(String);
impl std::fmt::Display for FileExportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for FileExportError {}
impl ExportError for FileExportError {
fn exporter_name(&self) -> &'static str { "otlp-json-file" }
}
impl OtlpJsonFileExporter {
fn new(path: PathBuf, rotate_mb: Option<u64>, resource_attributes: Vec<JsonKeyValue>) -> std::io::Result<Self> {
if debug_enabled() { eprintln!("[codex-telemetry] opening trace file: {}", path.display()); }
let file = OpenOptions::new().create(true).append(true).open(&path)?;
Ok(Self {
writer: Mutex::new(BufWriter::new(file)),
path,
rotate_bytes: rotate_mb.map(|mb| mb.saturating_mul(1024 * 1024)),
resource_attributes,
})
}
fn maybe_rotate(&self) -> std::io::Result<()> {
let Some(limit) = self.rotate_bytes else { return Ok(()); };
let meta = std::fs::metadata(&self.path)?;
if meta.len() as u64 >= limit {
let secs = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).map(|d| d.as_secs()).unwrap_or(0);
let rotated = self
.path
.with_extension(format!("json.{}", secs));
{
let mut w = self.writer.lock().unwrap();
w.flush()?;
}
std::fs::rename(&self.path, rotated)?;
let file = OpenOptions::new().create(true).append(true).open(&self.path)?;
let mut guard = self.writer.lock().unwrap();
*guard = BufWriter::new(file);
}
Ok(())
}
fn write_traces_line(&self, batch: Vec<SpanData>) -> std::io::Result<()> {
if debug_enabled() { eprintln!("[codex-telemetry] exporting {} span(s) to {}", batch.len(), self.path.display()); }
let spans_json: Vec<JsonSpan> = batch.into_iter().map(span_to_json).collect();
let traces = TracesData {
resourceSpans: vec![JsonResourceSpans {
resource: Some(JsonResource { attributes: self.resource_attributes.clone() }),
scopeSpans: vec![JsonScopeSpans { scope: JsonScope {}, spans: spans_json, schemaUrl: String::new() }],
schemaUrl: String::new(),
}],
};
let mut w = self.writer.lock().unwrap();
let line = serde_json::to_string(&traces).unwrap_or_else(|_| String::from("{}"));
w.write_all(line.as_bytes())?;
w.write_all(b"\n")?;
w.flush()
}
}
impl SpanExporter for OtlpJsonFileExporter {
fn export(&mut self, batch: Vec<SpanData>) -> futures_util::future::BoxFuture<'static, ExportResult> {
if batch.is_empty() {
return Box::pin(async { Ok(()) });
}
if let Err(e) = self.maybe_rotate() {
tracing::warn!("otlp-json-file: rotate failed: {}", e);
}
let res = self.write_traces_line(batch);
if let Err(e) = res {
tracing::warn!("otlp-json-file: write failed: {}", e);
}
Box::pin(async { Ok(()) })
}
fn shutdown(&mut self) {
if let Ok(mut w) = self.writer.lock() {
let _ = w.flush();
}
}
}
// ===== OTLP JSON model =====
#[derive(Serialize, Clone, Debug)]
struct TracesData {
resourceSpans: Vec<JsonResourceSpans>,
}
#[derive(Serialize, Clone, Debug)]
struct JsonResourceSpans {
#[serde(skip_serializing_if = "Option::is_none")]
resource: Option<JsonResource>,
scopeSpans: Vec<JsonScopeSpans>,
#[serde(default)]
schemaUrl: String,
}
#[derive(Serialize, Clone, Debug)]
struct JsonResource {
attributes: Vec<JsonKeyValue>,
}
#[derive(Serialize, Clone, Debug)]
struct JsonScopeSpans {
scope: JsonScope,
spans: Vec<JsonSpan>,
#[serde(default)]
schemaUrl: String,
}
#[derive(Serialize, Clone, Debug, Default)]
struct JsonScope {}
#[derive(Serialize, Clone, Debug)]
struct JsonSpan {
traceId: String,
spanId: String,
#[serde(default)]
parentSpanId: String,
name: String,
kind: i32,
startTimeUnixNano: String,
endTimeUnixNano: String,
#[serde(default)]
attributes: Vec<JsonKeyValue>,
#[serde(default)]
droppedAttributesCount: i32,
#[serde(default)]
events: Vec<JsonEvent>,
#[serde(default)]
droppedEventsCount: i32,
#[serde(default)]
links: Vec<JsonLink>,
#[serde(default)]
droppedLinksCount: i32,
#[serde(skip_serializing_if = "Option::is_none")]
status: Option<JsonStatus>,
}
#[derive(Serialize, Clone, Debug)]
struct JsonKeyValue {
key: String,
value: JsonAnyValue,
}
#[derive(Serialize, Clone, Debug)]
#[serde(untagged)]
enum JsonAnyValue {
StringValue { stringValue: String },
BoolValue { boolValue: bool },
IntValue { intValue: i64 },
DoubleValue { doubleValue: f64 },
}
#[derive(Serialize, Clone, Debug)]
struct JsonEvent {
timeUnixNano: String,
name: String,
#[serde(default)]
attributes: Vec<JsonKeyValue>,
#[serde(default)]
droppedAttributesCount: i32,
}
#[derive(Serialize, Clone, Debug)]
struct JsonLink {
traceId: String,
spanId: String,
#[serde(default)]
traceState: String,
#[serde(default)]
attributes: Vec<JsonKeyValue>,
#[serde(default)]
droppedAttributesCount: i32,
#[serde(default)]
flags: i32,
}
#[derive(Serialize, Clone, Debug)]
struct JsonStatus {
#[serde(default)]
message: String,
code: i32, // 0=UNSET,1=OK,2=ERROR
}
// ===== mapping helpers =====
fn json_kv_from(kv: &KeyValue) -> JsonKeyValue {
JsonKeyValue { key: kv.key.as_str().to_string(), value: json_any_from(kv.value.clone()) }
}
fn json_any_from(val: Value) -> JsonAnyValue {
match val {
Value::String(s) => JsonAnyValue::StringValue { stringValue: s.to_string() },
Value::Bool(b) => JsonAnyValue::BoolValue { boolValue: b },
Value::I64(i) => JsonAnyValue::IntValue { intValue: i },
Value::F64(f) => JsonAnyValue::DoubleValue { doubleValue: f },
other => JsonAnyValue::StringValue { stringValue: format!("{other:?}") },
}
}
fn status_to_json(status: &opentelemetry::trace::Status) -> JsonStatus {
use opentelemetry::trace::Status::{Error, Ok, Unset};
match status {
Unset => JsonStatus { code: 0, message: String::new() },
Ok => JsonStatus { code: 1, message: String::new() },
Error { description } => JsonStatus { code: 2, message: description.to_string() },
}
}
fn span_to_json(sd: SpanData) -> JsonSpan {
let trace_id = hex::encode(sd.span_context.trace_id().to_bytes());
let span_id = hex::encode(sd.span_context.span_id().to_bytes());
let parent_span_id = hex::encode(sd.parent_span_id.to_bytes());
let start = to_unix_nanos(sd.start_time).to_string();
let end = to_unix_nanos(sd.end_time).to_string();
let attributes = sd.attributes.iter().map(|kv| json_kv_from(kv)).collect::<Vec<_>>();
let events = sd
.events
.into_iter()
.map(|ev| JsonEvent {
timeUnixNano: to_unix_nanos(ev.timestamp).to_string(),
name: ev.name.to_string(),
attributes: ev.attributes.into_iter().map(|kv| json_kv_from(&kv)).collect(),
droppedAttributesCount: 0,
})
.collect();
let links = sd
.links
.into_iter()
.map(|lnk| JsonLink {
traceId: hex::encode(lnk.span_context.trace_id().to_bytes()),
spanId: hex::encode(lnk.span_context.span_id().to_bytes()),
traceState: lnk.span_context.trace_state().header(),
attributes: lnk.attributes.into_iter().map(|kv| json_kv_from(&kv)).collect(),
droppedAttributesCount: 0,
flags: 0,
})
.collect();
let kind = match sd.span_kind {
opentelemetry::trace::SpanKind::Internal => 1,
opentelemetry::trace::SpanKind::Server => 2,
opentelemetry::trace::SpanKind::Client => 3,
opentelemetry::trace::SpanKind::Producer => 4,
opentelemetry::trace::SpanKind::Consumer => 5,
};
JsonSpan {
traceId: trace_id,
spanId: span_id,
parentSpanId: parent_span_id,
name: sd.name.into_owned(),
kind,
startTimeUnixNano: start,
endTimeUnixNano: end,
attributes,
droppedAttributesCount: 0,
events,
droppedEventsCount: 0,
links,
droppedLinksCount: 0,
status: Some(status_to_json(&sd.status)),
}
}
fn to_unix_nanos(t: SystemTime) -> u128 {
t.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
}
fn determine_traces_file_path(codex_home: Option<&PathBuf>) -> PathBuf {
use chrono::Utc;
use rand::RngCore;
use std::fs;
let base = if let Some(h) = codex_home { h.clone() } else {
if debug_enabled() { eprintln!("[codex-telemetry] WARNING: codex_home not provided; defaulting to current directory"); }
std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
};
let traces_dir = base.join("traces");
match fs::create_dir_all(&traces_dir) {
Ok(()) => {
if debug_enabled() { eprintln!("[codex-telemetry] ensured traces dir exists: {}", traces_dir.display()); }
}
Err(err) => {
eprintln!("[codex-telemetry] ERROR: failed to create traces dir {}: {}", traces_dir.display(), err);
}
}
let ts = Utc::now().format("%Y%m%d_%H%M%S");
let mut bytes = [0u8; 8];
rand::thread_rng().fill_bytes(&mut bytes);
let hex_id = hex::encode(bytes);
traces_dir.join(format!("codex_traces_{}_{}.jsonl", ts, hex_id))
}
// Re-exports for consumers
pub use TelemetryExporter as Exporter;
pub use TelemetryGuard as Guard;
pub use TelemetrySettings as Settings;
}
#[cfg(not(feature = "otel"))]
mod imp {
#[derive(Clone, Debug)]
pub enum TelemetryExporter { None }
#[derive(Clone, Debug)]
pub struct TelemetrySettings { pub enabled: bool, pub exporter: TelemetryExporter, pub service_name: String, pub service_version: String }
pub struct TelemetryGuard;
pub fn init_from_settings(_settings: &TelemetrySettings) -> Option<TelemetryGuard> { None }
pub fn make_session_span(_session_id: &str, _model: &str, _provider: &str) -> tracing::Span {
tracing::Span::none()
}
pub use TelemetryExporter as Exporter;
pub use TelemetryGuard as Guard;
pub use TelemetrySettings as Settings;
}
pub use imp::*;

View File

@@ -63,6 +63,7 @@ tree-sitter-bash = "0.25.0"
uuid = { version = "1", features = ["serde", "v4"] }
which = "6"
wildmatch = "2.5.0"
codex-telemetry = { path = "../codex-telemetry", features = ["otel"] }
[target.'cfg(target_os = "linux")'.dependencies]

View File

@@ -308,6 +308,12 @@ impl TurnContext {
.map(PathBuf::from)
.map_or_else(|| self.cwd.clone(), |p| self.cwd.join(p))
}
pub fn end_session_span(&self) {
if let Some(span) = self.session_span.lock().unwrap().take() {
drop(span);
}
}
}
/// Configure the model session.

View File

@@ -27,6 +27,7 @@ codex-common = { path = "../common", features = [
codex-core = { path = "../core" }
codex-ollama = { path = "../ollama" }
codex-protocol = { path = "../protocol" }
codex-telemetry = { path = "../codex-telemetry", features = ["otel"] }
owo-colors = "4.2.0"
serde_json = "1"
shlex = "1.3.0"
@@ -38,6 +39,7 @@ tokio = { version = "1", features = [
"signal",
] }
tracing = { version = "0.1.41", features = ["log"] }
tracing-opentelemetry = "0.27"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
[dev-dependencies]

View File

@@ -29,6 +29,8 @@ use tracing::debug;
use tracing::error;
use tracing::info;
use tracing_subscriber::EnvFilter;
use codex_telemetry as telemetry;
use tracing_subscriber::prelude::*;
use crate::cli::Command as ExecCommand;
use crate::event_processor::CodexStatus;
@@ -105,19 +107,11 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
),
};
// TODO(mbolin): Take a more thoughtful approach to logging.
// Build fmt layer (existing logging) to compose with OTEL layer.
let default_level = "error";
let _ = tracing_subscriber::fmt()
// Fallback to the `default_level` log filter if the environment
// variable is not set _or_ contains an invalid value
.with_env_filter(
EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new(default_level))
.unwrap_or_else(|_| EnvFilter::new(default_level)),
)
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(stderr_with_ansi)
.with_writer(std::io::stderr)
.try_init();
.with_writer(std::io::stderr);
let sandbox_mode = if full_auto {
Some(SandboxMode::WorkspaceWrite)
@@ -173,6 +167,36 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
};
let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?;
// Build OTEL layer and compose into subscriber.
let telemetry = telemetry::build_layer(&telemetry::Settings {
enabled: true,
exporter: telemetry::Exporter::OtlpFile { path: PathBuf::new(), rotate_mb: Some(100) },
service_name: "codex".to_string(),
service_version: env!("CARGO_PKG_VERSION").to_string(),
codex_home: Some(config.codex_home.clone()),
});
let _telemetry_guard = if let Some((guard, tracer)) = telemetry {
let otel_layer = tracing_opentelemetry::OpenTelemetryLayer::new(tracer);
// Build env_filter separately and attach via with_filter.
let env_filter = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new(default_level))
.unwrap_or_else(|_| EnvFilter::new(default_level));
let _ = tracing_subscriber::registry()
.with(fmt_layer.with_filter(env_filter))
.with(otel_layer)
.try_init();
Some(guard)
} else {
let env_filter = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new(default_level))
.unwrap_or_else(|_| EnvFilter::new(default_level));
let _ = tracing_subscriber::registry()
.with(fmt_layer.with_filter(env_filter))
.try_init();
None
};
let mut event_processor: Box<dyn EventProcessor> = if json_mode {
Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone()))
} else {
@@ -291,6 +315,29 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
let initial_prompt_task_id = conversation.submit(Op::UserInput { items }).await?;
info!("Sent prompt with event ID: {initial_prompt_task_id}");
// If stdin is an interactive TTY, watch for EOF (Ctrl+D) and request a graceful shutdown.
if std::io::stdin().is_terminal() {
let codex_for_eof = codex.clone();
tokio::spawn(async move {
use tokio::io::{stdin, AsyncReadExt};
let mut stdin = stdin();
let mut buf = [0u8; 1];
loop {
match stdin.read(&mut buf).await {
Ok(0) => {
let _ = codex_for_eof.submit(Op::Shutdown).await;
break;
}
Ok(_) => {
// discard any input; exec does not read interactive input
continue;
}
Err(_) => break,
}
}
});
}
// Run the loop until the task is complete.
while let Some(event) = rx.recv().await {
let shutdown: CodexStatus = event_processor.process_event(event);

View File

@@ -38,6 +38,7 @@ codex-file-search = { path = "../file-search" }
codex-login = { path = "../login" }
codex-ollama = { path = "../ollama" }
codex-protocol = { path = "../protocol" }
codex-telemetry = { path = "../codex-telemetry", features = ["otel"] }
color-eyre = "0.6.3"
crossterm = { version = "0.28.1", features = [
"bracketed-paste",
@@ -81,6 +82,7 @@ tracing = { version = "0.1.41", features = ["log"] }
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
pulldown-cmark = "0.10"
tracing-opentelemetry = "0.27"
unicode-segmentation = "1.12.0"
unicode-width = "0.1"
url = "2"

View File

@@ -28,6 +28,7 @@ use tracing::error;
use tracing_appender::non_blocking;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
use codex_telemetry as telemetry;
mod app;
mod app_backtrack;
@@ -199,6 +200,15 @@ pub async fn run_main(
let internal_storage = InternalStorage::load(&config.codex_home);
// Build OTEL layer and compose into subscriber.
let telemetry = telemetry::build_layer(&telemetry::Settings {
enabled: true,
exporter: telemetry::Exporter::OtlpFile { path: PathBuf::new(), rotate_mb: Some(100) },
service_name: "codex".to_string(),
service_version: env!("CARGO_PKG_VERSION").to_string(),
codex_home: Some(config.codex_home.clone()),
});
let log_dir = codex_core::config::log_dir(&config)?;
std::fs::create_dir_all(&log_dir)?;
// Open (or create) your log file, appending to it.