mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
feat: metrics capabilities (#8318)
Add metrics capabilities to Codex. The `README.md` is up to date. This will not be merged with the metrics before this PR of course: https://github.com/openai/codex/pull/8350
This commit is contained in:
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
@@ -1610,10 +1610,12 @@ dependencies = [
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"opentelemetry_sdk",
|
||||
"pretty_assertions",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"strum_macros 0.27.2",
|
||||
"thiserror 2.0.17",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
|
||||
@@ -122,7 +122,8 @@ keyring = { workspace = true, features = ["sync-secret-service"] }
|
||||
assert_cmd = { workspace = true }
|
||||
assert_matches = { workspace = true }
|
||||
codex-arg0 = { workspace = true }
|
||||
codex-core = { path = ".", features = ["deterministic_process_ids"] }
|
||||
codex-core = { path = ".", default-features = false, features = ["deterministic_process_ids"] }
|
||||
codex-otel = { workspace = true, features = ["disable-default-metrics-exporter"] }
|
||||
codex-utils-cargo-bin = { workspace = true }
|
||||
core_test_support = { workspace = true }
|
||||
ctor = { workspace = true }
|
||||
|
||||
@@ -19,7 +19,7 @@ use codex_api::create_text_param_for_request;
|
||||
use codex_api::error::ApiError;
|
||||
use codex_api::requests::responses::Compression;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_otel::otel_manager::OtelManager;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
@@ -152,7 +152,7 @@ use crate::user_instructions::UserInstructions;
|
||||
use crate::user_notification::UserNotification;
|
||||
use crate::util::backoff;
|
||||
use codex_async_utils::OrCancelExt;
|
||||
use codex_otel::otel_manager::OtelManager;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
@@ -644,6 +644,7 @@ impl Session {
|
||||
terminal::user_agent(),
|
||||
session_configuration.session_source.clone(),
|
||||
);
|
||||
config.features.emit_metrics(&otel_manager);
|
||||
|
||||
otel_manager.conversation_starts(
|
||||
config.model_provider.name.as_str(),
|
||||
|
||||
@@ -1439,11 +1439,13 @@ impl Config {
|
||||
.unwrap_or(DEFAULT_OTEL_ENVIRONMENT.to_string());
|
||||
let exporter = t.exporter.unwrap_or(OtelExporterKind::None);
|
||||
let trace_exporter = t.trace_exporter.unwrap_or_else(|| exporter.clone());
|
||||
let metrics_exporter = t.metrics_exporter.unwrap_or(OtelExporterKind::Statsig);
|
||||
OtelConfig {
|
||||
log_user_prompt,
|
||||
environment,
|
||||
exporter,
|
||||
trace_exporter,
|
||||
metrics_exporter,
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
@@ -306,6 +306,7 @@ pub struct OtelTlsConfig {
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum OtelExporterKind {
|
||||
None,
|
||||
Statsig,
|
||||
OtlpHttp {
|
||||
endpoint: String,
|
||||
#[serde(default)]
|
||||
@@ -337,6 +338,11 @@ pub struct OtelConfigToml {
|
||||
|
||||
/// Optional trace exporter
|
||||
pub trace_exporter: Option<OtelExporterKind>,
|
||||
|
||||
/// Optional metrics exporter
|
||||
///
|
||||
/// Defaults to `statsig` outside of tests.
|
||||
pub metrics_exporter: Option<OtelExporterKind>,
|
||||
}
|
||||
|
||||
/// Effective OTEL settings after defaults are applied.
|
||||
@@ -346,6 +352,7 @@ pub struct OtelConfig {
|
||||
pub environment: String,
|
||||
pub exporter: OtelExporterKind,
|
||||
pub trace_exporter: OtelExporterKind,
|
||||
pub metrics_exporter: OtelExporterKind,
|
||||
}
|
||||
|
||||
impl Default for OtelConfig {
|
||||
@@ -355,6 +362,7 @@ impl Default for OtelConfig {
|
||||
environment: DEFAULT_OTEL_ENVIRONMENT.to_owned(),
|
||||
exporter: OtelExporterKind::None,
|
||||
trace_exporter: OtelExporterKind::None,
|
||||
metrics_exporter: OtelExporterKind::Statsig,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
|
||||
use crate::config::ConfigToml;
|
||||
use crate::config::profile::ConfigProfile;
|
||||
use codex_otel::OtelManager;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
@@ -193,6 +194,21 @@ impl Features {
|
||||
.map(|usage| (usage.alias.as_str(), usage.feature))
|
||||
}
|
||||
|
||||
pub fn emit_metrics(&self, otel: &OtelManager) {
|
||||
for feature in FEATURES {
|
||||
if self.enabled(feature.id) != feature.default_enabled {
|
||||
otel.counter(
|
||||
"codex.feature.state",
|
||||
1,
|
||||
&[
|
||||
("feature", feature.key),
|
||||
("value", &self.enabled(feature.id).to_string()),
|
||||
],
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply a table of key -> bool toggles (e.g. from TOML).
|
||||
pub fn apply_map(&mut self, m: &BTreeMap<String, bool>) {
|
||||
for (k, v) in m {
|
||||
|
||||
@@ -6,7 +6,7 @@ use codex_otel::config::OtelExporter;
|
||||
use codex_otel::config::OtelHttpProtocol;
|
||||
use codex_otel::config::OtelSettings;
|
||||
use codex_otel::config::OtelTlsConfig as OtelTlsSettings;
|
||||
use codex_otel::otel_provider::OtelProvider;
|
||||
use codex_otel::traces::otel_provider::OtelProvider;
|
||||
use std::error::Error;
|
||||
|
||||
/// Build an OpenTelemetry provider from the app Config.
|
||||
@@ -18,6 +18,7 @@ pub fn build_provider(
|
||||
) -> Result<Option<OtelProvider>, Box<dyn Error>> {
|
||||
let to_otel_exporter = |kind: &Kind| match kind {
|
||||
Kind::None => OtelExporter::None,
|
||||
Kind::Statsig => OtelExporter::Statsig,
|
||||
Kind::OtlpHttp {
|
||||
endpoint,
|
||||
headers,
|
||||
@@ -63,6 +64,11 @@ pub fn build_provider(
|
||||
|
||||
let exporter = to_otel_exporter(&config.otel.exporter);
|
||||
let trace_exporter = to_otel_exporter(&config.otel.trace_exporter);
|
||||
let metrics_exporter = if config.analytics {
|
||||
to_otel_exporter(&config.otel.metrics_exporter)
|
||||
} else {
|
||||
OtelExporter::None
|
||||
};
|
||||
|
||||
OtelProvider::from(&OtelSettings {
|
||||
service_name: originator().value.to_owned(),
|
||||
@@ -71,6 +77,7 @@ pub fn build_provider(
|
||||
environment: config.otel.environment.to_string(),
|
||||
exporter,
|
||||
trace_exporter,
|
||||
metrics_exporter,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::skills::SkillsManager;
|
||||
use crate::tools::sandboxing::ApprovalStore;
|
||||
use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use crate::user_notification::UserNotifier;
|
||||
use codex_otel::otel_manager::OtelManager;
|
||||
use codex_otel::OtelManager;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
@@ -29,8 +29,16 @@ impl SessionTask for CompactTask {
|
||||
session.as_ref(),
|
||||
&ctx.client.get_provider(),
|
||||
) {
|
||||
let _ = session
|
||||
.services
|
||||
.otel_manager
|
||||
.counter("codex.task.compact.remote", 1, &[]);
|
||||
crate::compact_remote::run_remote_compact_task(session, ctx).await
|
||||
} else {
|
||||
let _ = session
|
||||
.services
|
||||
.otel_manager
|
||||
.counter("codex.task.compact.local", 1, &[]);
|
||||
crate::compact::run_compact_task(session, ctx, input).await
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,12 @@ impl SessionTask for ReviewTask {
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let _ = session
|
||||
.session
|
||||
.services
|
||||
.otel_manager
|
||||
.counter("codex.task.review", 1, &[]);
|
||||
|
||||
// Start sub-codex conversation and get the receiver for events.
|
||||
let output = match start_review_conversation(
|
||||
session.clone(),
|
||||
|
||||
@@ -38,6 +38,11 @@ impl SessionTask for UndoTask {
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let _ = session
|
||||
.session
|
||||
.services
|
||||
.otel_manager
|
||||
.counter("codex.task.undo", 1, &[]);
|
||||
let sess = session.clone_session();
|
||||
sess.send_event(
|
||||
ctx.as_ref(),
|
||||
|
||||
@@ -58,6 +58,12 @@ impl SessionTask for UserShellCommandTask {
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let _ = session
|
||||
.session
|
||||
.services
|
||||
.otel_manager
|
||||
.counter("codex.task.user_shell", 1, &[]);
|
||||
|
||||
let event = EventMsg::TaskStarted(TaskStartedEvent {
|
||||
model_context_window: turn_context.client.get_model_context_window(),
|
||||
});
|
||||
|
||||
@@ -17,6 +17,7 @@ use crate::tools::sandboxing::ToolCtx;
|
||||
use crate::tools::sandboxing::ToolError;
|
||||
use crate::tools::sandboxing::ToolRuntime;
|
||||
use crate::tools::sandboxing::default_exec_approval_requirement;
|
||||
use codex_otel::ToolDecisionSource;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::ReviewDecision;
|
||||
|
||||
@@ -45,8 +46,8 @@ impl ToolOrchestrator {
|
||||
let otel = turn_ctx.client.get_otel_manager();
|
||||
let otel_tn = &tool_ctx.tool_name;
|
||||
let otel_ci = &tool_ctx.call_id;
|
||||
let otel_user = codex_otel::otel_manager::ToolDecisionSource::User;
|
||||
let otel_cfg = codex_otel::otel_manager::ToolDecisionSource::Config;
|
||||
let otel_user = ToolDecisionSource::User;
|
||||
let otel_cfg = ToolDecisionSource::Config;
|
||||
|
||||
// 1) Approval
|
||||
let mut already_approved = false;
|
||||
|
||||
@@ -13,7 +13,7 @@ use codex_core::Prompt;
|
||||
use codex_core::ResponseItem;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::models_manager::manager::ModelsManager;
|
||||
use codex_otel::otel_manager::OtelManager;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::ReasoningItemContent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
|
||||
@@ -12,7 +12,7 @@ use codex_core::ResponseEvent;
|
||||
use codex_core::ResponseItem;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::models_manager::manager::ModelsManager;
|
||||
use codex_otel::otel_manager::OtelManager;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::ReasoningItemContent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
|
||||
@@ -11,7 +11,7 @@ use codex_core::ResponseEvent;
|
||||
use codex_core::ResponseItem;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::models_manager::manager::ModelsManager;
|
||||
use codex_otel::otel_manager::OtelManager;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
|
||||
@@ -19,7 +19,7 @@ use codex_core::models_manager::manager::ModelsManager;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::SessionSource;
|
||||
use codex_otel::otel_manager::OtelManager;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::config_types::Verbosity;
|
||||
|
||||
@@ -12,6 +12,13 @@ path = "src/lib.rs"
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[features]
|
||||
## Disables the built-in default metrics exporter.
|
||||
##
|
||||
## Intended for use from `dev-dependencies` so unit/integration tests never
|
||||
## attempt to export metrics over the network.
|
||||
disable-default-metrics-exporter = []
|
||||
|
||||
[dependencies]
|
||||
chrono = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
@@ -19,13 +26,14 @@ codex-utils-absolute-path = { workspace = true }
|
||||
codex-api = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
eventsource-stream = { workspace = true }
|
||||
opentelemetry = { workspace = true, features = ["logs", "trace"] }
|
||||
opentelemetry = { workspace = true, features = ["logs", "metrics", "trace"] }
|
||||
opentelemetry-appender-tracing = { workspace = true }
|
||||
opentelemetry-otlp = { workspace = true, features = [
|
||||
"grpc-tonic",
|
||||
"http-proto",
|
||||
"http-json",
|
||||
"logs",
|
||||
"metrics",
|
||||
"trace",
|
||||
"reqwest-blocking-client",
|
||||
"reqwest-rustls",
|
||||
@@ -33,16 +41,13 @@ opentelemetry-otlp = { workspace = true, features = [
|
||||
"tls-roots",
|
||||
]}
|
||||
opentelemetry-semantic-conventions = { workspace = true }
|
||||
opentelemetry_sdk = { workspace = true, features = [
|
||||
"logs",
|
||||
"rt-tokio",
|
||||
"trace",
|
||||
]}
|
||||
opentelemetry_sdk = { workspace = true, features = ["logs", "metrics", "rt-tokio", "testing", "trace"] }
|
||||
http = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["blocking", "rustls-tls"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
strum_macros = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-opentelemetry = { workspace = true }
|
||||
@@ -50,3 +55,4 @@ tracing-subscriber = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
opentelemetry_sdk = { workspace = true, features = ["testing"] }
|
||||
pretty_assertions = { workspace = true }
|
||||
|
||||
128
codex-rs/otel/README.md
Normal file
128
codex-rs/otel/README.md
Normal file
@@ -0,0 +1,128 @@
|
||||
# codex-otel
|
||||
|
||||
`codex-otel` is the OpenTelemetry integration crate for Codex. It provides:
|
||||
|
||||
- Trace/log exporters and tracing subscriber layers (`codex_otel::traces::otel_provider`).
|
||||
- A structured event helper (`codex_otel::OtelManager`).
|
||||
- OpenTelemetry metrics support via OTLP exporters (`codex_otel::metrics`).
|
||||
- A metrics facade on `OtelManager` so tracing + metrics share metadata.
|
||||
|
||||
## Tracing and logs
|
||||
|
||||
Create an OTEL provider from `OtelSettings`, then attach its layers to your
|
||||
`tracing_subscriber` registry:
|
||||
|
||||
```rust
|
||||
use codex_otel::config::OtelExporter;
|
||||
use codex_otel::config::OtelHttpProtocol;
|
||||
use codex_otel::config::OtelSettings;
|
||||
use codex_otel::traces::otel_provider::OtelProvider;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
let settings = OtelSettings {
|
||||
environment: "dev".to_string(),
|
||||
service_name: "codex-cli".to_string(),
|
||||
service_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
codex_home: std::path::PathBuf::from("/tmp"),
|
||||
exporter: OtelExporter::OtlpHttp {
|
||||
endpoint: "https://otlp.example.com".to_string(),
|
||||
headers: std::collections::HashMap::new(),
|
||||
protocol: OtelHttpProtocol::Binary,
|
||||
tls: None,
|
||||
},
|
||||
trace_exporter: OtelExporter::OtlpHttp {
|
||||
endpoint: "https://otlp.example.com".to_string(),
|
||||
headers: std::collections::HashMap::new(),
|
||||
protocol: OtelHttpProtocol::Binary,
|
||||
tls: None,
|
||||
},
|
||||
metrics_exporter: OtelExporter::None,
|
||||
};
|
||||
|
||||
if let Some(provider) = OtelProvider::from(&settings)? {
|
||||
let registry = tracing_subscriber::registry()
|
||||
.with(provider.logger_layer())
|
||||
.with(provider.tracing_layer());
|
||||
registry.init();
|
||||
}
|
||||
```
|
||||
|
||||
## OtelManager (events)
|
||||
|
||||
`OtelManager` adds consistent metadata to tracing events and helps record
|
||||
Codex-specific events.
|
||||
|
||||
```rust
|
||||
use codex_otel::OtelManager;
|
||||
|
||||
let manager = OtelManager::new(
|
||||
conversation_id,
|
||||
model,
|
||||
slug,
|
||||
account_id,
|
||||
account_email,
|
||||
auth_mode,
|
||||
log_user_prompts,
|
||||
terminal_type,
|
||||
session_source,
|
||||
);
|
||||
|
||||
manager.user_prompt(&prompt_items);
|
||||
```
|
||||
|
||||
## Metrics (OTLP or in-memory)
|
||||
|
||||
Modes:
|
||||
|
||||
- OTLP: exports metrics via the OpenTelemetry OTLP exporter (HTTP or gRPC).
|
||||
- In-memory: records via `opentelemetry_sdk::metrics::InMemoryMetricExporter` for tests/assertions; call `shutdown()` to flush.
|
||||
|
||||
`codex-otel` also provides `OtelExporter::Statsig`, a shorthand for exporting OTLP/HTTP JSON metrics
|
||||
to Statsig using Codex-internal defaults.
|
||||
|
||||
Statsig ingestion (OTLP/HTTP JSON) example:
|
||||
|
||||
```rust
|
||||
use codex_otel::config::{OtelExporter, OtelHttpProtocol};
|
||||
|
||||
let metrics = MetricsClient::new(MetricsConfig::otlp(
|
||||
"dev",
|
||||
"codex-cli",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
OtelExporter::OtlpHttp {
|
||||
endpoint: "https://api.statsig.com/otlp".to_string(),
|
||||
headers: std::collections::HashMap::from([(
|
||||
"statsig-api-key".to_string(),
|
||||
std::env::var("STATSIG_SERVER_SDK_SECRET")?,
|
||||
)]),
|
||||
protocol: OtelHttpProtocol::Json,
|
||||
tls: None,
|
||||
},
|
||||
))?;
|
||||
|
||||
metrics.counter("codex.session_started", 1, &[("source", "tui")])?;
|
||||
metrics.histogram("codex.request_latency", 83, &[("route", "chat")])?;
|
||||
```
|
||||
|
||||
In-memory (tests):
|
||||
|
||||
```rust
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let metrics = MetricsClient::new(MetricsConfig::in_memory(
|
||||
"test",
|
||||
"codex-cli",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
exporter.clone(),
|
||||
))?;
|
||||
metrics.counter("codex.turns", 1, &[("model", "gpt-5.1")])?;
|
||||
metrics.shutdown()?; // flushes in-memory exporter
|
||||
```
|
||||
|
||||
## Shutdown
|
||||
|
||||
- `OtelProvider::shutdown()` stops the OTEL exporter.
|
||||
- `OtelManager::shutdown_metrics()` flushes and shuts down the metrics provider.
|
||||
|
||||
Both are optional because drop performs best-effort shutdown, but calling them
|
||||
explicitly gives deterministic flushing (or a shutdown error if flushing does
|
||||
not complete in time).
|
||||
@@ -3,6 +3,31 @@ use std::path::PathBuf;
|
||||
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
|
||||
pub(crate) const STATSIG_OTLP_HTTP_ENDPOINT: &str = "https://ab.chatgpt.com/otlp/v1/metrics";
|
||||
pub(crate) const STATSIG_API_KEY_HEADER: &str = "statsig-api-key";
|
||||
pub(crate) const STATSIG_API_KEY: &str = "client-MkRuleRQBd6qakfnDYqJVR9JuXcY57Ljly3vi5JVUIO";
|
||||
|
||||
pub(crate) fn resolve_exporter(exporter: &OtelExporter) -> OtelExporter {
|
||||
match exporter {
|
||||
OtelExporter::Statsig => {
|
||||
if cfg!(test) || cfg!(feature = "disable-default-metrics-exporter") {
|
||||
return OtelExporter::None;
|
||||
}
|
||||
|
||||
OtelExporter::OtlpHttp {
|
||||
endpoint: STATSIG_OTLP_HTTP_ENDPOINT.to_string(),
|
||||
headers: HashMap::from([(
|
||||
STATSIG_API_KEY_HEADER.to_string(),
|
||||
STATSIG_API_KEY.to_string(),
|
||||
)]),
|
||||
protocol: OtelHttpProtocol::Json,
|
||||
tls: None,
|
||||
}
|
||||
}
|
||||
_ => exporter.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OtelSettings {
|
||||
pub environment: String,
|
||||
@@ -11,6 +36,7 @@ pub struct OtelSettings {
|
||||
pub codex_home: PathBuf,
|
||||
pub exporter: OtelExporter,
|
||||
pub trace_exporter: OtelExporter,
|
||||
pub metrics_exporter: OtelExporter,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -31,6 +57,10 @@ pub struct OtelTlsConfig {
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum OtelExporter {
|
||||
None,
|
||||
/// Statsig metrics ingestion exporter using Codex-internal defaults.
|
||||
///
|
||||
/// This is intended for metrics only.
|
||||
Statsig,
|
||||
OtlpGrpc {
|
||||
endpoint: String,
|
||||
headers: HashMap<String, String>,
|
||||
|
||||
@@ -1,4 +1,173 @@
|
||||
pub mod config;
|
||||
pub mod metrics;
|
||||
pub mod traces;
|
||||
|
||||
pub mod otel_manager;
|
||||
pub mod otel_provider;
|
||||
mod otlp;
|
||||
|
||||
use crate::metrics::MetricsClient;
|
||||
use crate::metrics::MetricsConfig;
|
||||
use crate::metrics::MetricsError;
|
||||
use crate::metrics::Result as MetricsResult;
|
||||
use crate::metrics::timer::Timer;
|
||||
use crate::metrics::validation::validate_tag_key;
|
||||
use crate::metrics::validation::validate_tag_value;
|
||||
use crate::traces::otel_provider::OtelProvider;
|
||||
use codex_protocol::ThreadId;
|
||||
use serde::Serialize;
|
||||
use std::time::Duration;
|
||||
use strum_macros::Display;
|
||||
use tracing::Span;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Display)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ToolDecisionSource {
|
||||
Config,
|
||||
User,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OtelEventMetadata {
|
||||
pub(crate) conversation_id: ThreadId,
|
||||
pub(crate) auth_mode: Option<String>,
|
||||
pub(crate) account_id: Option<String>,
|
||||
pub(crate) account_email: Option<String>,
|
||||
pub(crate) model: String,
|
||||
pub(crate) slug: String,
|
||||
pub(crate) log_user_prompts: bool,
|
||||
pub(crate) app_version: &'static str,
|
||||
pub(crate) terminal_type: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OtelManager {
|
||||
pub(crate) metadata: OtelEventMetadata,
|
||||
pub(crate) session_span: Span,
|
||||
pub(crate) metrics: Option<MetricsClient>,
|
||||
pub(crate) metrics_use_metadata_tags: bool,
|
||||
}
|
||||
|
||||
impl OtelManager {
|
||||
pub fn with_model(mut self, model: &str, slug: &str) -> Self {
|
||||
self.metadata.model = model.to_owned();
|
||||
self.metadata.slug = slug.to_owned();
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_metrics(mut self, metrics: MetricsClient) -> Self {
|
||||
self.metrics = Some(metrics);
|
||||
self.metrics_use_metadata_tags = true;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_metrics_without_metadata_tags(mut self, metrics: MetricsClient) -> Self {
|
||||
self.metrics = Some(metrics);
|
||||
self.metrics_use_metadata_tags = false;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_metrics_config(self, config: MetricsConfig) -> MetricsResult<Self> {
|
||||
let metrics = MetricsClient::new(config)?;
|
||||
Ok(self.with_metrics(metrics))
|
||||
}
|
||||
|
||||
pub fn with_provider_metrics(self, provider: &OtelProvider) -> Self {
|
||||
match provider.metrics() {
|
||||
Some(metrics) => self.with_metrics(metrics.clone()),
|
||||
None => self,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) {
|
||||
let res: MetricsResult<()> = (|| {
|
||||
let Some(metrics) = &self.metrics else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let tags = self.tags_with_metadata(tags)?;
|
||||
metrics.counter(name, inc, &tags)
|
||||
})();
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::warn!("metrics counter [{name}] failed: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn histogram(&self, name: &str, value: i64, tags: &[(&str, &str)]) {
|
||||
let res: MetricsResult<()> = (|| {
|
||||
let Some(metrics) = &self.metrics else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let tags = self.tags_with_metadata(tags)?;
|
||||
metrics.histogram(name, value, &tags)
|
||||
})();
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::warn!("metrics histogram [{name}] failed: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]) {
|
||||
let res: MetricsResult<()> = (|| {
|
||||
let Some(metrics) = &self.metrics else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let tags = self.tags_with_metadata(tags)?;
|
||||
metrics.record_duration(name, duration, &tags)
|
||||
})();
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::warn!("metrics duration [{name}] failed: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_timer(&self, name: &str, tags: &[(&str, &str)]) -> Result<Timer, MetricsError> {
|
||||
let Some(metrics) = &self.metrics else {
|
||||
return Err(MetricsError::ExporterDisabled);
|
||||
};
|
||||
let tags = self.tags_with_metadata(tags)?;
|
||||
metrics.start_timer(name, &tags)
|
||||
}
|
||||
|
||||
pub fn shutdown_metrics(&self) -> MetricsResult<()> {
|
||||
let Some(metrics) = &self.metrics else {
|
||||
return Ok(());
|
||||
};
|
||||
metrics.shutdown()
|
||||
}
|
||||
|
||||
fn tags_with_metadata<'a>(
|
||||
&'a self,
|
||||
tags: &'a [(&'a str, &'a str)],
|
||||
) -> MetricsResult<Vec<(&'a str, &'a str)>> {
|
||||
let mut merged = self.metadata_tag_refs()?;
|
||||
merged.extend(tags.iter().copied());
|
||||
Ok(merged)
|
||||
}
|
||||
|
||||
fn metadata_tag_refs(&self) -> MetricsResult<Vec<(&str, &str)>> {
|
||||
if !self.metrics_use_metadata_tags {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let mut tags = Vec::with_capacity(5);
|
||||
Self::push_metadata_tag(&mut tags, "auth_mode", self.metadata.auth_mode.as_deref())?;
|
||||
Self::push_metadata_tag(&mut tags, "model", Some(self.metadata.model.as_str()))?;
|
||||
Self::push_metadata_tag(&mut tags, "app.version", Some(self.metadata.app_version))?;
|
||||
Ok(tags)
|
||||
}
|
||||
|
||||
fn push_metadata_tag<'a>(
|
||||
tags: &mut Vec<(&'a str, &'a str)>,
|
||||
key: &'static str,
|
||||
value: Option<&'a str>,
|
||||
) -> MetricsResult<()> {
|
||||
let Some(value) = value else {
|
||||
return Ok(());
|
||||
};
|
||||
validate_tag_key(key)?;
|
||||
validate_tag_value(value)?;
|
||||
tags.push((key, value));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
291
codex-rs/otel/src/metrics/client.rs
Normal file
291
codex-rs/otel/src/metrics/client.rs
Normal file
@@ -0,0 +1,291 @@
|
||||
use crate::config::OtelExporter;
|
||||
use crate::config::OtelHttpProtocol;
|
||||
use crate::metrics::MetricsError;
|
||||
use crate::metrics::Result;
|
||||
use crate::metrics::config::MetricsConfig;
|
||||
use crate::metrics::config::MetricsExporter;
|
||||
use crate::metrics::timer::Timer;
|
||||
use crate::metrics::validation::validate_metric_name;
|
||||
use crate::metrics::validation::validate_tag_key;
|
||||
use crate::metrics::validation::validate_tag_value;
|
||||
use crate::metrics::validation::validate_tags;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::metrics::Counter;
|
||||
use opentelemetry::metrics::Histogram;
|
||||
use opentelemetry::metrics::Meter;
|
||||
use opentelemetry::metrics::MeterProvider as _;
|
||||
use opentelemetry_otlp::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT;
|
||||
use opentelemetry_otlp::Protocol;
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_otlp::WithHttpConfig;
|
||||
use opentelemetry_otlp::WithTonicConfig;
|
||||
use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
|
||||
use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig;
|
||||
use opentelemetry_sdk::Resource;
|
||||
use opentelemetry_sdk::metrics::PeriodicReader;
|
||||
use opentelemetry_sdk::metrics::SdkMeterProvider;
|
||||
use opentelemetry_sdk::metrics::Temporality;
|
||||
use opentelemetry_semantic_conventions as semconv;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tracing::debug;
|
||||
|
||||
const ENV_ATTRIBUTE: &str = "env";
|
||||
const METER_NAME: &str = "codex";
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MetricsClientInner {
|
||||
meter_provider: SdkMeterProvider,
|
||||
meter: Meter,
|
||||
counters: Mutex<HashMap<String, Counter<u64>>>,
|
||||
histograms: Mutex<HashMap<String, Histogram<f64>>>,
|
||||
default_tags: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
impl MetricsClientInner {
|
||||
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) -> Result<()> {
|
||||
validate_metric_name(name)?;
|
||||
if inc < 0 {
|
||||
return Err(MetricsError::NegativeCounterIncrement {
|
||||
name: name.to_string(),
|
||||
inc,
|
||||
});
|
||||
}
|
||||
let attributes = self.attributes(tags)?;
|
||||
|
||||
let mut counters = self
|
||||
.counters
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let counter = counters
|
||||
.entry(name.to_string())
|
||||
.or_insert_with(|| self.meter.u64_counter(name.to_string()).build());
|
||||
counter.add(inc as u64, &attributes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn histogram(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> {
|
||||
validate_metric_name(name)?;
|
||||
let attributes = self.attributes(tags)?;
|
||||
|
||||
let mut histograms = self
|
||||
.histograms
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
let histogram = histograms
|
||||
.entry(name.to_string())
|
||||
.or_insert_with(|| self.meter.f64_histogram(name.to_string()).build());
|
||||
histogram.record(value as f64, &attributes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn attributes(&self, tags: &[(&str, &str)]) -> Result<Vec<KeyValue>> {
|
||||
if tags.is_empty() {
|
||||
return Ok(self
|
||||
.default_tags
|
||||
.iter()
|
||||
.map(|(key, value)| KeyValue::new(key.clone(), value.clone()))
|
||||
.collect());
|
||||
}
|
||||
|
||||
let mut merged = self.default_tags.clone();
|
||||
for (key, value) in tags {
|
||||
validate_tag_key(key)?;
|
||||
validate_tag_value(value)?;
|
||||
merged.insert((*key).to_string(), (*value).to_string());
|
||||
}
|
||||
|
||||
Ok(merged
|
||||
.into_iter()
|
||||
.map(|(key, value)| KeyValue::new(key, value))
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn shutdown(&self) -> Result<()> {
|
||||
debug!("flushing OTEL metrics");
|
||||
self.meter_provider
|
||||
.force_flush()
|
||||
.map_err(|source| MetricsError::ProviderShutdown { source })?;
|
||||
self.meter_provider
|
||||
.shutdown()
|
||||
.map_err(|source| MetricsError::ProviderShutdown { source })?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// OpenTelemetry metrics client used by Codex.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MetricsClient(std::sync::Arc<MetricsClientInner>);
|
||||
|
||||
impl MetricsClient {
|
||||
/// Build a metrics client from configuration and validate defaults.
|
||||
pub fn new(config: MetricsConfig) -> Result<Self> {
|
||||
validate_tags(&config.default_tags)?;
|
||||
|
||||
let resource = Resource::builder()
|
||||
.with_service_name(config.service_name.clone())
|
||||
.with_attributes(vec![
|
||||
KeyValue::new(
|
||||
semconv::attribute::SERVICE_VERSION,
|
||||
config.service_version.clone(),
|
||||
),
|
||||
KeyValue::new(ENV_ATTRIBUTE, config.environment.clone()),
|
||||
])
|
||||
.build();
|
||||
|
||||
let temporality = Temporality::default();
|
||||
let (meter_provider, meter) = match config.exporter {
|
||||
MetricsExporter::InMemory(exporter) => {
|
||||
build_provider(resource, exporter, config.export_interval)
|
||||
}
|
||||
MetricsExporter::Otlp(exporter) => {
|
||||
let exporter = build_otlp_metric_exporter(exporter, temporality)?;
|
||||
build_provider(resource, exporter, config.export_interval)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self(std::sync::Arc::new(MetricsClientInner {
|
||||
meter_provider,
|
||||
meter,
|
||||
counters: Mutex::new(HashMap::new()),
|
||||
histograms: Mutex::new(HashMap::new()),
|
||||
default_tags: config.default_tags,
|
||||
})))
|
||||
}
|
||||
|
||||
/// Send a single counter increment.
|
||||
pub fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) -> Result<()> {
|
||||
self.0.counter(name, inc, tags)
|
||||
}
|
||||
|
||||
/// Send a single histogram sample.
|
||||
pub fn histogram(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> {
|
||||
self.0.histogram(name, value, tags)
|
||||
}
|
||||
|
||||
/// Record a duration in milliseconds using a histogram.
|
||||
pub fn record_duration(
|
||||
&self,
|
||||
name: &str,
|
||||
duration: Duration,
|
||||
tags: &[(&str, &str)],
|
||||
) -> Result<()> {
|
||||
self.histogram(
|
||||
name,
|
||||
duration.as_millis().min(i64::MAX as u128) as i64,
|
||||
tags,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn start_timer(
|
||||
&self,
|
||||
name: &str,
|
||||
tags: &[(&str, &str)],
|
||||
) -> std::result::Result<Timer, MetricsError> {
|
||||
Ok(Timer::new(name, tags, self))
|
||||
}
|
||||
|
||||
/// Flush metrics and stop the underlying OTEL meter provider.
|
||||
pub fn shutdown(&self) -> Result<()> {
|
||||
self.0.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
fn build_provider<E>(
|
||||
resource: Resource,
|
||||
exporter: E,
|
||||
interval: Option<Duration>,
|
||||
) -> (SdkMeterProvider, Meter)
|
||||
where
|
||||
E: opentelemetry_sdk::metrics::exporter::PushMetricExporter + 'static,
|
||||
{
|
||||
let mut reader_builder = PeriodicReader::builder(exporter);
|
||||
if let Some(interval) = interval {
|
||||
reader_builder = reader_builder.with_interval(interval);
|
||||
}
|
||||
let reader = reader_builder.build();
|
||||
let provider = SdkMeterProvider::builder()
|
||||
.with_resource(resource)
|
||||
.with_reader(reader)
|
||||
.build();
|
||||
let meter = provider.meter(METER_NAME);
|
||||
(provider, meter)
|
||||
}
|
||||
|
||||
fn build_otlp_metric_exporter(
|
||||
exporter: OtelExporter,
|
||||
temporality: Temporality,
|
||||
) -> Result<opentelemetry_otlp::MetricExporter> {
|
||||
match exporter {
|
||||
OtelExporter::None => Err(MetricsError::ExporterDisabled),
|
||||
OtelExporter::Statsig => build_otlp_metric_exporter(
|
||||
crate::config::resolve_exporter(&OtelExporter::Statsig),
|
||||
temporality,
|
||||
),
|
||||
OtelExporter::OtlpGrpc {
|
||||
endpoint,
|
||||
headers,
|
||||
tls,
|
||||
} => {
|
||||
debug!("Using OTLP Grpc exporter for metrics: {endpoint}");
|
||||
|
||||
let header_map = crate::otlp::build_header_map(&headers);
|
||||
|
||||
let base_tls_config = ClientTlsConfig::new()
|
||||
.with_enabled_roots()
|
||||
.assume_http2(true);
|
||||
|
||||
let tls_config = match tls.as_ref() {
|
||||
Some(tls) => crate::otlp::build_grpc_tls_config(&endpoint, base_tls_config, tls)
|
||||
.map_err(|err| MetricsError::InvalidConfig {
|
||||
message: err.to_string(),
|
||||
})?,
|
||||
None => base_tls_config,
|
||||
};
|
||||
|
||||
opentelemetry_otlp::MetricExporter::builder()
|
||||
.with_tonic()
|
||||
.with_endpoint(endpoint)
|
||||
.with_temporality(temporality)
|
||||
.with_metadata(MetadataMap::from_headers(header_map))
|
||||
.with_tls_config(tls_config)
|
||||
.build()
|
||||
.map_err(|source| MetricsError::ExporterBuild { source })
|
||||
}
|
||||
OtelExporter::OtlpHttp {
|
||||
endpoint,
|
||||
headers,
|
||||
protocol,
|
||||
tls,
|
||||
} => {
|
||||
debug!("Using OTLP Http exporter for metrics: {endpoint}");
|
||||
|
||||
let protocol = match protocol {
|
||||
OtelHttpProtocol::Binary => Protocol::HttpBinary,
|
||||
OtelHttpProtocol::Json => Protocol::HttpJson,
|
||||
};
|
||||
|
||||
let mut exporter_builder = opentelemetry_otlp::MetricExporter::builder()
|
||||
.with_http()
|
||||
.with_endpoint(endpoint)
|
||||
.with_temporality(temporality)
|
||||
.with_protocol(protocol)
|
||||
.with_headers(headers);
|
||||
|
||||
if let Some(tls) = tls.as_ref() {
|
||||
let client =
|
||||
crate::otlp::build_http_client(tls, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT)
|
||||
.map_err(|err| MetricsError::InvalidConfig {
|
||||
message: err.to_string(),
|
||||
})?;
|
||||
exporter_builder = exporter_builder.with_http_client(client);
|
||||
}
|
||||
|
||||
exporter_builder
|
||||
.build()
|
||||
.map_err(|source| MetricsError::ExporterBuild { source })
|
||||
}
|
||||
}
|
||||
}
|
||||
74
codex-rs/otel/src/metrics/config.rs
Normal file
74
codex-rs/otel/src/metrics/config.rs
Normal file
@@ -0,0 +1,74 @@
|
||||
use crate::config::OtelExporter;
|
||||
use crate::metrics::Result;
|
||||
use crate::metrics::validation::validate_tag_key;
|
||||
use crate::metrics::validation::validate_tag_value;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum MetricsExporter {
|
||||
Otlp(OtelExporter),
|
||||
InMemory(InMemoryMetricExporter),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MetricsConfig {
|
||||
pub(crate) environment: String,
|
||||
pub(crate) service_name: String,
|
||||
pub(crate) service_version: String,
|
||||
pub(crate) exporter: MetricsExporter,
|
||||
pub(crate) export_interval: Option<Duration>,
|
||||
pub(crate) default_tags: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
impl MetricsConfig {
|
||||
pub fn otlp(
|
||||
environment: impl Into<String>,
|
||||
service_name: impl Into<String>,
|
||||
service_version: impl Into<String>,
|
||||
exporter: OtelExporter,
|
||||
) -> Self {
|
||||
Self {
|
||||
environment: environment.into(),
|
||||
service_name: service_name.into(),
|
||||
service_version: service_version.into(),
|
||||
exporter: MetricsExporter::Otlp(exporter),
|
||||
export_interval: None,
|
||||
default_tags: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create an in-memory config (used in tests).
|
||||
pub fn in_memory(
|
||||
environment: impl Into<String>,
|
||||
service_name: impl Into<String>,
|
||||
service_version: impl Into<String>,
|
||||
exporter: InMemoryMetricExporter,
|
||||
) -> Self {
|
||||
Self {
|
||||
environment: environment.into(),
|
||||
service_name: service_name.into(),
|
||||
service_version: service_version.into(),
|
||||
exporter: MetricsExporter::InMemory(exporter),
|
||||
export_interval: None,
|
||||
default_tags: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Override the interval between periodic metric exports.
|
||||
pub fn with_export_interval(mut self, interval: Duration) -> Self {
|
||||
self.export_interval = Some(interval);
|
||||
self
|
||||
}
|
||||
|
||||
/// Add a default tag that will be sent with every metric.
|
||||
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Result<Self> {
|
||||
let key = key.into();
|
||||
let value = value.into();
|
||||
validate_tag_key(&key)?;
|
||||
validate_tag_value(&value)?;
|
||||
self.default_tags.insert(key, value);
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
37
codex-rs/otel/src/metrics/error.rs
Normal file
37
codex-rs/otel/src/metrics/error.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
use thiserror::Error;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, MetricsError>;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum MetricsError {
|
||||
// Metrics.
|
||||
#[error("metric name cannot be empty")]
|
||||
EmptyMetricName,
|
||||
#[error("metric name contains invalid characters: {name}")]
|
||||
InvalidMetricName { name: String },
|
||||
#[error("{label} cannot be empty")]
|
||||
EmptyTagComponent { label: String },
|
||||
#[error("{label} contains invalid characters: {value}")]
|
||||
InvalidTagComponent { label: String, value: String },
|
||||
|
||||
#[error("metrics exporter is disabled")]
|
||||
ExporterDisabled,
|
||||
|
||||
#[error("counter increment must be non-negative for {name}: {inc}")]
|
||||
NegativeCounterIncrement { name: String, inc: i64 },
|
||||
|
||||
#[error("failed to build OTLP metrics exporter")]
|
||||
ExporterBuild {
|
||||
#[source]
|
||||
source: opentelemetry_otlp::ExporterBuildError,
|
||||
},
|
||||
|
||||
#[error("invalid OTLP metrics configuration: {message}")]
|
||||
InvalidConfig { message: String },
|
||||
|
||||
#[error("failed to flush or shutdown metrics provider")]
|
||||
ProviderShutdown {
|
||||
#[source]
|
||||
source: opentelemetry_sdk::error::OTelSdkError,
|
||||
},
|
||||
}
|
||||
22
codex-rs/otel/src/metrics/mod.rs
Normal file
22
codex-rs/otel/src/metrics/mod.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
mod client;
|
||||
mod config;
|
||||
mod error;
|
||||
pub(crate) mod timer;
|
||||
pub(crate) mod validation;
|
||||
|
||||
pub use crate::metrics::client::MetricsClient;
|
||||
pub use crate::metrics::config::MetricsConfig;
|
||||
pub use crate::metrics::config::MetricsExporter;
|
||||
pub use crate::metrics::error::MetricsError;
|
||||
pub use crate::metrics::error::Result;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
static GLOBAL_METRICS: OnceLock<MetricsClient> = OnceLock::new();
|
||||
|
||||
pub(crate) fn install_global(metrics: MetricsClient) {
|
||||
let _ = GLOBAL_METRICS.set(metrics);
|
||||
}
|
||||
|
||||
pub(crate) fn global() -> Option<MetricsClient> {
|
||||
GLOBAL_METRICS.get().cloned()
|
||||
}
|
||||
42
codex-rs/otel/src/metrics/timer.rs
Normal file
42
codex-rs/otel/src/metrics/timer.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
use crate::metrics::MetricsClient;
|
||||
use crate::metrics::error::Result;
|
||||
use std::time::Instant;
|
||||
|
||||
pub struct Timer {
|
||||
name: String,
|
||||
tags: Vec<(String, String)>,
|
||||
client: MetricsClient,
|
||||
start_time: Instant,
|
||||
}
|
||||
|
||||
impl Drop for Timer {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = self.record() {
|
||||
tracing::error!("metrics client error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Timer {
|
||||
pub(crate) fn new(name: &str, tags: &[(&str, &str)], client: &MetricsClient) -> Self {
|
||||
Self {
|
||||
name: name.to_string(),
|
||||
tags: tags
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||
.collect(),
|
||||
client: client.clone(),
|
||||
start_time: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record(&self) -> Result<()> {
|
||||
let tags = self
|
||||
.tags
|
||||
.iter()
|
||||
.map(|(k, v)| (k.as_str(), v.as_str()))
|
||||
.collect::<Vec<_>>();
|
||||
self.client
|
||||
.record_duration(&self.name, self.start_time.elapsed(), &tags)
|
||||
}
|
||||
}
|
||||
55
codex-rs/otel/src/metrics/validation.rs
Normal file
55
codex-rs/otel/src/metrics/validation.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
use crate::metrics::error::MetricsError;
|
||||
use crate::metrics::error::Result;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
pub(crate) fn validate_tags(tags: &BTreeMap<String, String>) -> Result<()> {
|
||||
for (key, value) in tags {
|
||||
validate_tag_key(key)?;
|
||||
validate_tag_value(value)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn validate_metric_name(name: &str) -> Result<()> {
|
||||
if name.is_empty() {
|
||||
return Err(MetricsError::EmptyMetricName);
|
||||
}
|
||||
if !name.chars().all(is_metric_char) {
|
||||
return Err(MetricsError::InvalidMetricName {
|
||||
name: name.to_string(),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn validate_tag_key(key: &str) -> Result<()> {
|
||||
validate_tag_component(key, "tag key")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn validate_tag_value(value: &str) -> Result<()> {
|
||||
validate_tag_component(value, "tag value")
|
||||
}
|
||||
|
||||
fn validate_tag_component(value: &str, label: &str) -> Result<()> {
|
||||
if value.is_empty() {
|
||||
return Err(MetricsError::EmptyTagComponent {
|
||||
label: label.to_string(),
|
||||
});
|
||||
}
|
||||
if !value.chars().all(is_tag_char) {
|
||||
return Err(MetricsError::InvalidTagComponent {
|
||||
label: label.to_string(),
|
||||
value: value.to_string(),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_metric_char(c: char) -> bool {
|
||||
c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-')
|
||||
}
|
||||
|
||||
fn is_tag_char(c: char) -> bool {
|
||||
c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-' | '/')
|
||||
}
|
||||
163
codex-rs/otel/src/otlp.rs
Normal file
163
codex-rs/otel/src/otlp.rs
Normal file
@@ -0,0 +1,163 @@
|
||||
use crate::config::OtelTlsConfig;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use http::Uri;
|
||||
use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT;
|
||||
use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT;
|
||||
use opentelemetry_otlp::tonic_types::transport::Certificate as TonicCertificate;
|
||||
use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig;
|
||||
use opentelemetry_otlp::tonic_types::transport::Identity as TonicIdentity;
|
||||
use reqwest::Certificate as ReqwestCertificate;
|
||||
use reqwest::Identity as ReqwestIdentity;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderName;
|
||||
use reqwest::header::HeaderValue;
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
pub(crate) fn build_header_map(headers: &std::collections::HashMap<String, String>) -> HeaderMap {
|
||||
let mut header_map = HeaderMap::new();
|
||||
for (key, value) in headers {
|
||||
if let Ok(name) = HeaderName::from_bytes(key.as_bytes())
|
||||
&& let Ok(val) = HeaderValue::from_str(value)
|
||||
{
|
||||
header_map.insert(name, val);
|
||||
}
|
||||
}
|
||||
header_map
|
||||
}
|
||||
|
||||
pub(crate) fn build_grpc_tls_config(
|
||||
endpoint: &str,
|
||||
tls_config: ClientTlsConfig,
|
||||
tls: &OtelTlsConfig,
|
||||
) -> Result<ClientTlsConfig, Box<dyn Error>> {
|
||||
let uri: Uri = endpoint.parse()?;
|
||||
let host = uri.host().ok_or_else(|| {
|
||||
config_error(format!(
|
||||
"OTLP gRPC endpoint {endpoint} does not include a host"
|
||||
))
|
||||
})?;
|
||||
|
||||
let mut config = tls_config.domain_name(host.to_owned());
|
||||
|
||||
if let Some(path) = tls.ca_certificate.as_ref() {
|
||||
let (pem, _) = read_bytes(path)?;
|
||||
config = config.ca_certificate(TonicCertificate::from_pem(pem));
|
||||
}
|
||||
|
||||
match (&tls.client_certificate, &tls.client_private_key) {
|
||||
(Some(cert_path), Some(key_path)) => {
|
||||
let (cert_pem, _) = read_bytes(cert_path)?;
|
||||
let (key_pem, _) = read_bytes(key_path)?;
|
||||
config = config.identity(TonicIdentity::from_pem(cert_pem, key_pem));
|
||||
}
|
||||
(Some(_), None) | (None, Some(_)) => {
|
||||
return Err(config_error(
|
||||
"client_certificate and client_private_key must both be provided for mTLS",
|
||||
));
|
||||
}
|
||||
(None, None) => {}
|
||||
}
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
/// Build a blocking HTTP client with TLS configuration for OTLP HTTP exporters.
|
||||
///
|
||||
/// We use `reqwest::blocking::Client` because OTEL exporters run on dedicated
|
||||
/// OS threads that are not necessarily backed by tokio.
|
||||
pub(crate) fn build_http_client(
|
||||
tls: &OtelTlsConfig,
|
||||
timeout_var: &str,
|
||||
) -> Result<reqwest::blocking::Client, Box<dyn Error>> {
|
||||
if tokio::runtime::Handle::try_current().is_ok() {
|
||||
tokio::task::block_in_place(|| build_http_client_inner(tls, timeout_var))
|
||||
} else {
|
||||
build_http_client_inner(tls, timeout_var)
|
||||
}
|
||||
}
|
||||
|
||||
fn build_http_client_inner(
|
||||
tls: &OtelTlsConfig,
|
||||
timeout_var: &str,
|
||||
) -> Result<reqwest::blocking::Client, Box<dyn Error>> {
|
||||
let mut builder =
|
||||
reqwest::blocking::Client::builder().timeout(resolve_otlp_timeout(timeout_var));
|
||||
|
||||
if let Some(path) = tls.ca_certificate.as_ref() {
|
||||
let (pem, location) = read_bytes(path)?;
|
||||
let certificate = ReqwestCertificate::from_pem(pem.as_slice()).map_err(|error| {
|
||||
config_error(format!(
|
||||
"failed to parse certificate {}: {error}",
|
||||
location.display()
|
||||
))
|
||||
})?;
|
||||
builder = builder
|
||||
.tls_built_in_root_certs(false)
|
||||
.add_root_certificate(certificate);
|
||||
}
|
||||
|
||||
match (&tls.client_certificate, &tls.client_private_key) {
|
||||
(Some(cert_path), Some(key_path)) => {
|
||||
let (mut cert_pem, cert_location) = read_bytes(cert_path)?;
|
||||
let (key_pem, key_location) = read_bytes(key_path)?;
|
||||
cert_pem.extend_from_slice(key_pem.as_slice());
|
||||
let identity = ReqwestIdentity::from_pem(cert_pem.as_slice()).map_err(|error| {
|
||||
config_error(format!(
|
||||
"failed to parse client identity using {} and {}: {error}",
|
||||
cert_location.display(),
|
||||
key_location.display()
|
||||
))
|
||||
})?;
|
||||
builder = builder.identity(identity).https_only(true);
|
||||
}
|
||||
(Some(_), None) | (None, Some(_)) => {
|
||||
return Err(config_error(
|
||||
"client_certificate and client_private_key must both be provided for mTLS",
|
||||
));
|
||||
}
|
||||
(None, None) => {}
|
||||
}
|
||||
|
||||
builder
|
||||
.build()
|
||||
.map_err(|error| Box::new(error) as Box<dyn Error>)
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_otlp_timeout(signal_var: &str) -> Duration {
|
||||
if let Some(timeout) = read_timeout_env(signal_var) {
|
||||
return timeout;
|
||||
}
|
||||
if let Some(timeout) = read_timeout_env(OTEL_EXPORTER_OTLP_TIMEOUT) {
|
||||
return timeout;
|
||||
}
|
||||
OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT
|
||||
}
|
||||
|
||||
fn read_timeout_env(var: &str) -> Option<Duration> {
|
||||
let value = env::var(var).ok()?;
|
||||
let parsed = value.parse::<i64>().ok()?;
|
||||
if parsed < 0 {
|
||||
return None;
|
||||
}
|
||||
Some(Duration::from_millis(parsed as u64))
|
||||
}
|
||||
|
||||
fn read_bytes(path: &AbsolutePathBuf) -> Result<(Vec<u8>, PathBuf), Box<dyn Error>> {
|
||||
match fs::read(path) {
|
||||
Ok(bytes) => Ok((bytes, path.to_path_buf())),
|
||||
Err(error) => Err(Box::new(io::Error::new(
|
||||
error.kind(),
|
||||
format!("failed to read {}: {error}", path.display()),
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn config_error(message: impl Into<String>) -> Box<dyn Error> {
|
||||
Box::new(io::Error::new(ErrorKind::InvalidData, message.into()))
|
||||
}
|
||||
2
codex-rs/otel/src/traces/mod.rs
Normal file
2
codex-rs/otel/src/traces/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
pub mod otel_manager;
|
||||
pub mod otel_provider;
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::otel_provider::traceparent_context_from_env;
|
||||
use crate::traces::otel_provider::traceparent_context_from_env;
|
||||
use chrono::SecondsFormat;
|
||||
use chrono::Utc;
|
||||
use codex_api::ResponseEvent;
|
||||
@@ -16,43 +16,19 @@ use eventsource_stream::Event as StreamEvent;
|
||||
use eventsource_stream::EventStreamError as StreamError;
|
||||
use reqwest::Error;
|
||||
use reqwest::Response;
|
||||
use serde::Serialize;
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Display;
|
||||
use std::future::Future;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use strum_macros::Display;
|
||||
use tokio::time::error::Elapsed;
|
||||
use tracing::Span;
|
||||
use tracing::trace_span;
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Display)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ToolDecisionSource {
|
||||
Config,
|
||||
User,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OtelEventMetadata {
|
||||
conversation_id: ThreadId,
|
||||
auth_mode: Option<String>,
|
||||
account_id: Option<String>,
|
||||
account_email: Option<String>,
|
||||
model: String,
|
||||
slug: String,
|
||||
log_user_prompts: bool,
|
||||
app_version: &'static str,
|
||||
terminal_type: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OtelManager {
|
||||
metadata: OtelEventMetadata,
|
||||
session_span: Span,
|
||||
}
|
||||
pub use crate::OtelEventMetadata;
|
||||
pub use crate::OtelManager;
|
||||
pub use crate::ToolDecisionSource;
|
||||
|
||||
impl OtelManager {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -86,16 +62,11 @@ impl OtelManager {
|
||||
terminal_type,
|
||||
},
|
||||
session_span,
|
||||
metrics: crate::metrics::global(),
|
||||
metrics_use_metadata_tags: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_model(&self, model: &str, slug: &str) -> Self {
|
||||
let mut manager = self.clone();
|
||||
manager.metadata.model = model.to_owned();
|
||||
manager.metadata.slug = slug.to_owned();
|
||||
manager
|
||||
}
|
||||
|
||||
pub fn current_span(&self) -> &Span {
|
||||
&self.session_span
|
||||
}
|
||||
@@ -162,7 +133,7 @@ impl OtelManager {
|
||||
F: FnOnce() -> Fut,
|
||||
Fut: Future<Output = Result<Response, Error>>,
|
||||
{
|
||||
let start = std::time::Instant::now();
|
||||
let start = Instant::now();
|
||||
let response = f().await;
|
||||
let duration = start.elapsed();
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use crate::config::OtelExporter;
|
||||
use crate::config::OtelHttpProtocol;
|
||||
use crate::config::OtelSettings;
|
||||
use crate::config::OtelTlsConfig;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use http::Uri;
|
||||
use crate::metrics::MetricsClient;
|
||||
use crate::metrics::MetricsConfig;
|
||||
use opentelemetry::Context;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::context::ContextGuard;
|
||||
@@ -14,8 +13,6 @@ use opentelemetry::trace::TracerProvider as _;
|
||||
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
|
||||
use opentelemetry_otlp::LogExporter;
|
||||
use opentelemetry_otlp::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT;
|
||||
use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT;
|
||||
use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT;
|
||||
use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT;
|
||||
use opentelemetry_otlp::Protocol;
|
||||
use opentelemetry_otlp::SpanExporter;
|
||||
@@ -23,9 +20,7 @@ use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_otlp::WithHttpConfig;
|
||||
use opentelemetry_otlp::WithTonicConfig;
|
||||
use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
|
||||
use opentelemetry_otlp::tonic_types::transport::Certificate as TonicCertificate;
|
||||
use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig;
|
||||
use opentelemetry_otlp::tonic_types::transport::Identity as TonicIdentity;
|
||||
use opentelemetry_sdk::Resource;
|
||||
use opentelemetry_sdk::logs::SdkLoggerProvider;
|
||||
use opentelemetry_sdk::propagation::TraceContextPropagator;
|
||||
@@ -33,21 +28,11 @@ use opentelemetry_sdk::trace::BatchSpanProcessor;
|
||||
use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use opentelemetry_sdk::trace::Tracer;
|
||||
use opentelemetry_semantic_conventions as semconv;
|
||||
use reqwest::Certificate as ReqwestCertificate;
|
||||
use reqwest::Identity as ReqwestIdentity;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderName;
|
||||
use reqwest::header::HeaderValue;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
use std::fs;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::{self};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
use tracing::debug;
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing::warn;
|
||||
@@ -63,10 +48,12 @@ thread_local! {
|
||||
static TRACEPARENT_GUARD: RefCell<Option<ContextGuard>> = const { RefCell::new(None) };
|
||||
}
|
||||
|
||||
// TODO(jif) move OtelProvider out of `traces/`
|
||||
pub struct OtelProvider {
|
||||
pub logger: Option<SdkLoggerProvider>,
|
||||
pub tracer_provider: Option<SdkTracerProvider>,
|
||||
pub tracer: Option<Tracer>,
|
||||
pub metrics: Option<MetricsClient>,
|
||||
}
|
||||
|
||||
impl OtelProvider {
|
||||
@@ -77,14 +64,33 @@ impl OtelProvider {
|
||||
if let Some(tracer_provider) = &self.tracer_provider {
|
||||
let _ = tracer_provider.shutdown();
|
||||
}
|
||||
if let Some(metrics) = &self.metrics {
|
||||
let _ = metrics.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from(settings: &OtelSettings) -> Result<Option<Self>, Box<dyn Error>> {
|
||||
let log_enabled = !matches!(settings.exporter, OtelExporter::None);
|
||||
let trace_enabled = !matches!(settings.trace_exporter, OtelExporter::None);
|
||||
|
||||
if !log_enabled && !trace_enabled {
|
||||
debug!("No exporter enabled in OTLP settings.");
|
||||
let metric_exporter = crate::config::resolve_exporter(&settings.metrics_exporter);
|
||||
let metrics = if matches!(metric_exporter, OtelExporter::None) {
|
||||
None
|
||||
} else {
|
||||
Some(MetricsClient::new(MetricsConfig::otlp(
|
||||
settings.environment.clone(),
|
||||
settings.service_name.clone(),
|
||||
settings.service_version.clone(),
|
||||
metric_exporter,
|
||||
))?)
|
||||
};
|
||||
|
||||
if let Some(metrics) = metrics.as_ref() {
|
||||
crate::metrics::install_global(metrics.clone());
|
||||
}
|
||||
|
||||
if !log_enabled && !trace_enabled && metrics.is_none() {
|
||||
debug!("No OTEL exporter enabled in settings.");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -113,6 +119,7 @@ impl OtelProvider {
|
||||
logger,
|
||||
tracer_provider,
|
||||
tracer,
|
||||
metrics,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -141,6 +148,10 @@ impl OtelProvider {
|
||||
pub fn codex_export_filter(meta: &tracing::Metadata<'_>) -> bool {
|
||||
meta.target().starts_with("codex_otel")
|
||||
}
|
||||
|
||||
pub fn metrics(&self) -> Option<&MetricsClient> {
|
||||
self.metrics.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for OtelProvider {
|
||||
@@ -151,6 +162,9 @@ impl Drop for OtelProvider {
|
||||
if let Some(tracer_provider) = &self.tracer_provider {
|
||||
let _ = tracer_provider.shutdown();
|
||||
}
|
||||
if let Some(metrics) = &self.metrics {
|
||||
let _ = metrics.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -223,8 +237,9 @@ fn build_logger(
|
||||
) -> Result<SdkLoggerProvider, Box<dyn Error>> {
|
||||
let mut builder = SdkLoggerProvider::builder().with_resource(resource.clone());
|
||||
|
||||
match exporter {
|
||||
match crate::config::resolve_exporter(exporter) {
|
||||
OtelExporter::None => return Ok(builder.build()),
|
||||
OtelExporter::Statsig => unreachable!("statsig exporter should be resolved"),
|
||||
OtelExporter::OtlpGrpc {
|
||||
endpoint,
|
||||
headers,
|
||||
@@ -232,14 +247,14 @@ fn build_logger(
|
||||
} => {
|
||||
debug!("Using OTLP Grpc exporter: {endpoint}");
|
||||
|
||||
let header_map = build_header_map(headers);
|
||||
let header_map = crate::otlp::build_header_map(&headers);
|
||||
|
||||
let base_tls_config = ClientTlsConfig::new()
|
||||
.with_enabled_roots()
|
||||
.assume_http2(true);
|
||||
|
||||
let tls_config = match tls.as_ref() {
|
||||
Some(tls) => build_grpc_tls_config(endpoint, base_tls_config, tls)?,
|
||||
Some(tls) => crate::otlp::build_grpc_tls_config(&endpoint, base_tls_config, tls)?,
|
||||
None => base_tls_config,
|
||||
};
|
||||
|
||||
@@ -269,10 +284,10 @@ fn build_logger(
|
||||
.with_http()
|
||||
.with_endpoint(endpoint)
|
||||
.with_protocol(protocol)
|
||||
.with_headers(headers.clone());
|
||||
.with_headers(headers);
|
||||
|
||||
if let Some(tls) = tls.as_ref() {
|
||||
let client = build_http_client(tls, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)?;
|
||||
let client = crate::otlp::build_http_client(tls, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)?;
|
||||
exporter_builder = exporter_builder.with_http_client(client);
|
||||
}
|
||||
|
||||
@@ -289,8 +304,9 @@ fn build_tracer_provider(
|
||||
resource: &Resource,
|
||||
exporter: &OtelExporter,
|
||||
) -> Result<SdkTracerProvider, Box<dyn Error>> {
|
||||
let span_exporter = match exporter {
|
||||
let span_exporter = match crate::config::resolve_exporter(exporter) {
|
||||
OtelExporter::None => return Ok(SdkTracerProvider::builder().build()),
|
||||
OtelExporter::Statsig => unreachable!("statsig exporter should be resolved"),
|
||||
OtelExporter::OtlpGrpc {
|
||||
endpoint,
|
||||
headers,
|
||||
@@ -298,14 +314,14 @@ fn build_tracer_provider(
|
||||
} => {
|
||||
debug!("Using OTLP Grpc exporter for traces: {endpoint}");
|
||||
|
||||
let header_map = build_header_map(headers);
|
||||
let header_map = crate::otlp::build_header_map(&headers);
|
||||
|
||||
let base_tls_config = ClientTlsConfig::new()
|
||||
.with_enabled_roots()
|
||||
.assume_http2(true);
|
||||
|
||||
let tls_config = match tls.as_ref() {
|
||||
Some(tls) => build_grpc_tls_config(endpoint, base_tls_config, tls)?,
|
||||
Some(tls) => crate::otlp::build_grpc_tls_config(&endpoint, base_tls_config, tls)?,
|
||||
None => base_tls_config,
|
||||
};
|
||||
|
||||
@@ -333,10 +349,11 @@ fn build_tracer_provider(
|
||||
.with_http()
|
||||
.with_endpoint(endpoint)
|
||||
.with_protocol(protocol)
|
||||
.with_headers(headers.clone());
|
||||
.with_headers(headers);
|
||||
|
||||
if let Some(tls) = tls.as_ref() {
|
||||
let client = build_http_client(tls, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)?;
|
||||
let client =
|
||||
crate::otlp::build_http_client(tls, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)?;
|
||||
exporter_builder = exporter_builder.with_http_client(client);
|
||||
}
|
||||
|
||||
@@ -352,150 +369,6 @@ fn build_tracer_provider(
|
||||
.build())
|
||||
}
|
||||
|
||||
fn build_header_map(headers: &HashMap<String, String>) -> HeaderMap {
|
||||
let mut header_map = HeaderMap::new();
|
||||
for (key, value) in headers {
|
||||
if let Ok(name) = HeaderName::from_bytes(key.as_bytes())
|
||||
&& let Ok(val) = HeaderValue::from_str(value)
|
||||
{
|
||||
header_map.insert(name, val);
|
||||
}
|
||||
}
|
||||
header_map
|
||||
}
|
||||
|
||||
fn build_grpc_tls_config(
|
||||
endpoint: &str,
|
||||
tls_config: ClientTlsConfig,
|
||||
tls: &OtelTlsConfig,
|
||||
) -> Result<ClientTlsConfig, Box<dyn Error>> {
|
||||
let uri: Uri = endpoint.parse()?;
|
||||
let host = uri.host().ok_or_else(|| {
|
||||
config_error(format!(
|
||||
"OTLP gRPC endpoint {endpoint} does not include a host"
|
||||
))
|
||||
})?;
|
||||
|
||||
let mut config = tls_config.domain_name(host.to_owned());
|
||||
|
||||
if let Some(path) = tls.ca_certificate.as_ref() {
|
||||
let (pem, _) = read_bytes(path)?;
|
||||
config = config.ca_certificate(TonicCertificate::from_pem(pem));
|
||||
}
|
||||
|
||||
match (&tls.client_certificate, &tls.client_private_key) {
|
||||
(Some(cert_path), Some(key_path)) => {
|
||||
let (cert_pem, _) = read_bytes(cert_path)?;
|
||||
let (key_pem, _) = read_bytes(key_path)?;
|
||||
config = config.identity(TonicIdentity::from_pem(cert_pem, key_pem));
|
||||
}
|
||||
(Some(_), None) | (None, Some(_)) => {
|
||||
return Err(config_error(
|
||||
"client_certificate and client_private_key must both be provided for mTLS",
|
||||
));
|
||||
}
|
||||
(None, None) => {}
|
||||
}
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
/// Build a blocking HTTP client with TLS configuration for the OTLP HTTP exporter.
|
||||
///
|
||||
/// We use `reqwest::blocking::Client` instead of the async client because the
|
||||
/// `opentelemetry_sdk` `BatchLogProcessor` spawns a dedicated OS thread that uses
|
||||
/// `futures_executor::block_on()` rather than tokio. When the async reqwest client's
|
||||
/// timeout calls `tokio::time::sleep()`, it panics with "no reactor running".
|
||||
fn build_http_client(
|
||||
tls: &OtelTlsConfig,
|
||||
timeout_var: &str,
|
||||
) -> Result<reqwest::blocking::Client, Box<dyn Error>> {
|
||||
// Wrap in block_in_place because reqwest::blocking::Client creates its own
|
||||
// internal tokio runtime, which would panic if built directly from an async context.
|
||||
tokio::task::block_in_place(|| build_http_client_inner(tls, timeout_var))
|
||||
}
|
||||
|
||||
fn build_http_client_inner(
|
||||
tls: &OtelTlsConfig,
|
||||
timeout_var: &str,
|
||||
) -> Result<reqwest::blocking::Client, Box<dyn Error>> {
|
||||
let mut builder =
|
||||
reqwest::blocking::Client::builder().timeout(resolve_otlp_timeout(timeout_var));
|
||||
|
||||
if let Some(path) = tls.ca_certificate.as_ref() {
|
||||
let (pem, location) = read_bytes(path)?;
|
||||
let certificate = ReqwestCertificate::from_pem(pem.as_slice()).map_err(|error| {
|
||||
config_error(format!(
|
||||
"failed to parse certificate {}: {error}",
|
||||
location.display()
|
||||
))
|
||||
})?;
|
||||
// Disable built-in root certificates and use only our custom CA
|
||||
builder = builder
|
||||
.tls_built_in_root_certs(false)
|
||||
.add_root_certificate(certificate);
|
||||
}
|
||||
|
||||
match (&tls.client_certificate, &tls.client_private_key) {
|
||||
(Some(cert_path), Some(key_path)) => {
|
||||
let (mut cert_pem, cert_location) = read_bytes(cert_path)?;
|
||||
let (key_pem, key_location) = read_bytes(key_path)?;
|
||||
cert_pem.extend_from_slice(key_pem.as_slice());
|
||||
let identity = ReqwestIdentity::from_pem(cert_pem.as_slice()).map_err(|error| {
|
||||
config_error(format!(
|
||||
"failed to parse client identity using {} and {}: {error}",
|
||||
cert_location.display(),
|
||||
key_location.display()
|
||||
))
|
||||
})?;
|
||||
builder = builder.identity(identity).https_only(true);
|
||||
}
|
||||
(Some(_), None) | (None, Some(_)) => {
|
||||
return Err(config_error(
|
||||
"client_certificate and client_private_key must both be provided for mTLS",
|
||||
));
|
||||
}
|
||||
(None, None) => {}
|
||||
}
|
||||
|
||||
builder
|
||||
.build()
|
||||
.map_err(|error| Box::new(error) as Box<dyn Error>)
|
||||
}
|
||||
|
||||
fn resolve_otlp_timeout(signal_var: &str) -> Duration {
|
||||
if let Some(timeout) = read_timeout_env(signal_var) {
|
||||
return timeout;
|
||||
}
|
||||
if let Some(timeout) = read_timeout_env(OTEL_EXPORTER_OTLP_TIMEOUT) {
|
||||
return timeout;
|
||||
}
|
||||
OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT
|
||||
}
|
||||
|
||||
fn read_timeout_env(var: &str) -> Option<Duration> {
|
||||
let value = env::var(var).ok()?;
|
||||
let parsed = value.parse::<i64>().ok()?;
|
||||
if parsed < 0 {
|
||||
return None;
|
||||
}
|
||||
Some(Duration::from_millis(parsed as u64))
|
||||
}
|
||||
|
||||
fn read_bytes(path: &AbsolutePathBuf) -> Result<(Vec<u8>, PathBuf), Box<dyn Error>> {
|
||||
match fs::read(path) {
|
||||
Ok(bytes) => Ok((bytes, path.to_path_buf())),
|
||||
Err(error) => Err(Box::new(io::Error::new(
|
||||
error.kind(),
|
||||
format!("failed to read {}: {error}", path.display()),
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn config_error(message: impl Into<String>) -> Box<dyn Error> {
|
||||
Box::new(io::Error::new(ErrorKind::InvalidData, message.into()))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
81
codex-rs/otel/tests/harness/mod.rs
Normal file
81
codex-rs/otel/tests/harness/mod.rs
Normal file
@@ -0,0 +1,81 @@
|
||||
use codex_otel::metrics::MetricsClient;
|
||||
use codex_otel::metrics::MetricsConfig;
|
||||
use codex_otel::metrics::Result;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
|
||||
use opentelemetry_sdk::metrics::data::Metric;
|
||||
use opentelemetry_sdk::metrics::data::MetricData;
|
||||
use opentelemetry_sdk::metrics::data::ResourceMetrics;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
pub(crate) fn build_metrics_with_defaults(
|
||||
default_tags: &[(&str, &str)],
|
||||
) -> Result<(MetricsClient, InMemoryMetricExporter)> {
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let mut config = MetricsConfig::in_memory(
|
||||
"test",
|
||||
"codex-cli",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
exporter.clone(),
|
||||
);
|
||||
for (key, value) in default_tags {
|
||||
config = config.with_tag(*key, *value)?;
|
||||
}
|
||||
let metrics = MetricsClient::new(config)?;
|
||||
Ok((metrics, exporter))
|
||||
}
|
||||
|
||||
pub(crate) fn latest_metrics(exporter: &InMemoryMetricExporter) -> ResourceMetrics {
|
||||
let Ok(metrics) = exporter.get_finished_metrics() else {
|
||||
panic!("finished metrics error");
|
||||
};
|
||||
let Some(metrics) = metrics.into_iter().last() else {
|
||||
panic!("metrics export missing");
|
||||
};
|
||||
metrics
|
||||
}
|
||||
|
||||
pub(crate) fn find_metric<'a>(
|
||||
resource_metrics: &'a ResourceMetrics,
|
||||
name: &str,
|
||||
) -> Option<&'a Metric> {
|
||||
for scope_metrics in resource_metrics.scope_metrics() {
|
||||
for metric in scope_metrics.metrics() {
|
||||
if metric.name() == name {
|
||||
return Some(metric);
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn attributes_to_map<'a>(
|
||||
attributes: impl Iterator<Item = &'a KeyValue>,
|
||||
) -> BTreeMap<String, String> {
|
||||
attributes
|
||||
.map(|kv| (kv.key.as_str().to_string(), kv.value.as_str().to_string()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn histogram_data(
|
||||
resource_metrics: &ResourceMetrics,
|
||||
name: &str,
|
||||
) -> (Vec<f64>, Vec<u64>, f64, u64) {
|
||||
let metric =
|
||||
find_metric(resource_metrics, name).unwrap_or_else(|| panic!("metric {name} missing"));
|
||||
match metric.data() {
|
||||
AggregatedMetrics::F64(data) => match data {
|
||||
MetricData::Histogram(histogram) => {
|
||||
let points: Vec<_> = histogram.data_points().collect();
|
||||
assert_eq!(points.len(), 1);
|
||||
let point = points[0];
|
||||
let bounds = point.bounds().collect();
|
||||
let bucket_counts = point.bucket_counts().collect();
|
||||
(bounds, bucket_counts, point.sum(), point.count())
|
||||
}
|
||||
_ => panic!("unexpected histogram aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected metric data type"),
|
||||
}
|
||||
}
|
||||
104
codex-rs/otel/tests/suite/manager_metrics.rs
Normal file
104
codex-rs/otel/tests/suite/manager_metrics.rs
Normal file
@@ -0,0 +1,104 @@
|
||||
use crate::harness::attributes_to_map;
|
||||
use crate::harness::build_metrics_with_defaults;
|
||||
use crate::harness::find_metric;
|
||||
use crate::harness::latest_metrics;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_otel::metrics::Result;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
|
||||
use opentelemetry_sdk::metrics::data::MetricData;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
// Ensures OtelManager attaches metadata tags when forwarding metrics.
|
||||
#[test]
|
||||
fn manager_attaches_metadata_tags_to_metrics() -> Result<()> {
|
||||
let (metrics, exporter) = build_metrics_with_defaults(&[("service", "codex-cli")])?;
|
||||
let manager = OtelManager::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
None,
|
||||
Some(AuthMode::ApiKey),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
)
|
||||
.with_metrics(metrics);
|
||||
|
||||
manager.counter("codex.session_started", 1, &[("source", "tui")]);
|
||||
manager.shutdown_metrics()?;
|
||||
|
||||
let resource_metrics = latest_metrics(&exporter);
|
||||
let metric =
|
||||
find_metric(&resource_metrics, "codex.session_started").expect("counter metric missing");
|
||||
let attrs = match metric.data() {
|
||||
AggregatedMetrics::U64(data) => match data {
|
||||
MetricData::Sum(sum) => {
|
||||
let points: Vec<_> = sum.data_points().collect();
|
||||
assert_eq!(points.len(), 1);
|
||||
attributes_to_map(points[0].attributes())
|
||||
}
|
||||
_ => panic!("unexpected counter aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected counter data type"),
|
||||
};
|
||||
|
||||
let expected = BTreeMap::from([
|
||||
(
|
||||
"app.version".to_string(),
|
||||
env!("CARGO_PKG_VERSION").to_string(),
|
||||
),
|
||||
("auth_mode".to_string(), AuthMode::ApiKey.to_string()),
|
||||
("model".to_string(), "gpt-5.1".to_string()),
|
||||
("service".to_string(), "codex-cli".to_string()),
|
||||
("source".to_string(), "tui".to_string()),
|
||||
]);
|
||||
assert_eq!(attrs, expected);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ensures metadata tagging can be disabled when recording via OtelManager.
|
||||
#[test]
|
||||
fn manager_allows_disabling_metadata_tags() -> Result<()> {
|
||||
let (metrics, exporter) = build_metrics_with_defaults(&[])?;
|
||||
let manager = OtelManager::new(
|
||||
ThreadId::new(),
|
||||
"gpt-4o",
|
||||
"gpt-4o",
|
||||
Some("account-id".to_string()),
|
||||
None,
|
||||
Some(AuthMode::ApiKey),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
)
|
||||
.with_metrics_without_metadata_tags(metrics);
|
||||
|
||||
manager.counter("codex.session_started", 1, &[("source", "tui")]);
|
||||
manager.shutdown_metrics()?;
|
||||
|
||||
let resource_metrics = latest_metrics(&exporter);
|
||||
let metric =
|
||||
find_metric(&resource_metrics, "codex.session_started").expect("counter metric missing");
|
||||
let attrs = match metric.data() {
|
||||
AggregatedMetrics::U64(data) => match data {
|
||||
MetricData::Sum(sum) => {
|
||||
let points: Vec<_> = sum.data_points().collect();
|
||||
assert_eq!(points.len(), 1);
|
||||
attributes_to_map(points[0].attributes())
|
||||
}
|
||||
_ => panic!("unexpected counter aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected counter data type"),
|
||||
};
|
||||
|
||||
let expected = BTreeMap::from([("source".to_string(), "tui".to_string())]);
|
||||
assert_eq!(attrs, expected);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
5
codex-rs/otel/tests/suite/mod.rs
Normal file
5
codex-rs/otel/tests/suite/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
mod manager_metrics;
|
||||
mod otlp_http_loopback;
|
||||
mod send;
|
||||
mod timing;
|
||||
mod validation;
|
||||
192
codex-rs/otel/tests/suite/otlp_http_loopback.rs
Normal file
192
codex-rs/otel/tests/suite/otlp_http_loopback.rs
Normal file
@@ -0,0 +1,192 @@
|
||||
use codex_otel::config::OtelExporter;
|
||||
use codex_otel::config::OtelHttpProtocol;
|
||||
use codex_otel::metrics::MetricsClient;
|
||||
use codex_otel::metrics::MetricsConfig;
|
||||
use codex_otel::metrics::Result;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Read as _;
|
||||
use std::io::Write as _;
|
||||
use std::net::TcpListener;
|
||||
use std::net::TcpStream;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
struct CapturedRequest {
|
||||
path: String,
|
||||
content_type: Option<String>,
|
||||
body: Vec<u8>,
|
||||
}
|
||||
|
||||
fn read_http_request(
|
||||
stream: &mut TcpStream,
|
||||
) -> std::io::Result<(String, HashMap<String, String>, Vec<u8>)> {
|
||||
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
|
||||
|
||||
let mut buf = Vec::new();
|
||||
let mut scratch = [0u8; 8192];
|
||||
let header_end = loop {
|
||||
let n = stream.read(&mut scratch)?;
|
||||
if n == 0 {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"EOF before headers",
|
||||
));
|
||||
}
|
||||
buf.extend_from_slice(&scratch[..n]);
|
||||
if let Some(end) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
|
||||
break end;
|
||||
}
|
||||
if buf.len() > 1024 * 1024 {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"headers too large",
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let headers_bytes = &buf[..header_end];
|
||||
let mut body_bytes = buf[header_end + 4..].to_vec();
|
||||
|
||||
let headers_str = std::str::from_utf8(headers_bytes).map_err(|err| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("headers not utf-8: {err}"),
|
||||
)
|
||||
})?;
|
||||
let mut lines = headers_str.split("\r\n");
|
||||
let start = lines.next().ok_or_else(|| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, "missing request line")
|
||||
})?;
|
||||
let mut parts = start.split_whitespace();
|
||||
let _method = parts.next().unwrap_or_default();
|
||||
let path = parts
|
||||
.next()
|
||||
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "missing path"))?
|
||||
.to_string();
|
||||
|
||||
let mut headers = HashMap::new();
|
||||
for line in lines {
|
||||
let Some((k, v)) = line.split_once(':') else {
|
||||
continue;
|
||||
};
|
||||
headers.insert(k.trim().to_ascii_lowercase(), v.trim().to_string());
|
||||
}
|
||||
|
||||
if let Some(len) = headers
|
||||
.get("content-length")
|
||||
.and_then(|v| v.parse::<usize>().ok())
|
||||
{
|
||||
while body_bytes.len() < len {
|
||||
let n = stream.read(&mut scratch)?;
|
||||
if n == 0 {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"EOF before body complete",
|
||||
));
|
||||
}
|
||||
body_bytes.extend_from_slice(&scratch[..n]);
|
||||
if body_bytes.len() > len + 1024 * 1024 {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"body too large",
|
||||
));
|
||||
}
|
||||
}
|
||||
body_bytes.truncate(len);
|
||||
}
|
||||
|
||||
Ok((path, headers, body_bytes))
|
||||
}
|
||||
|
||||
fn write_http_response(stream: &mut TcpStream, status: &str) -> std::io::Result<()> {
|
||||
let response = format!("HTTP/1.1 {status}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
|
||||
stream.write_all(response.as_bytes())?;
|
||||
stream.flush()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
|
||||
let addr = listener.local_addr().expect("local_addr");
|
||||
listener.set_nonblocking(true).expect("set_nonblocking");
|
||||
|
||||
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
|
||||
let server = thread::spawn(move || {
|
||||
let mut captured = Vec::new();
|
||||
let deadline = Instant::now() + Duration::from_secs(3);
|
||||
|
||||
while Instant::now() < deadline {
|
||||
match listener.accept() {
|
||||
Ok((mut stream, _)) => {
|
||||
let result = read_http_request(&mut stream);
|
||||
let _ = write_http_response(&mut stream, "202 Accepted");
|
||||
if let Ok((path, headers, body)) = result {
|
||||
captured.push(CapturedRequest {
|
||||
path,
|
||||
content_type: headers.get("content-type").cloned(),
|
||||
body,
|
||||
});
|
||||
}
|
||||
}
|
||||
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
let _ = tx.send(captured);
|
||||
});
|
||||
|
||||
let metrics = MetricsClient::new(MetricsConfig::otlp(
|
||||
"test",
|
||||
"codex-cli",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
OtelExporter::OtlpHttp {
|
||||
endpoint: format!("http://{addr}/v1/metrics"),
|
||||
headers: HashMap::new(),
|
||||
protocol: OtelHttpProtocol::Json,
|
||||
tls: None,
|
||||
},
|
||||
))?;
|
||||
|
||||
metrics.counter("codex.turns", 1, &[("source", "test")])?;
|
||||
metrics.shutdown()?;
|
||||
|
||||
server.join().expect("server join");
|
||||
let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured");
|
||||
|
||||
let request = captured
|
||||
.iter()
|
||||
.find(|req| req.path == "/v1/metrics")
|
||||
.unwrap_or_else(|| {
|
||||
let paths = captured
|
||||
.iter()
|
||||
.map(|req| req.path.as_str())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
panic!(
|
||||
"missing /v1/metrics request; got {}: {paths}",
|
||||
captured.len()
|
||||
);
|
||||
});
|
||||
let content_type = request
|
||||
.content_type
|
||||
.as_deref()
|
||||
.unwrap_or("<missing content-type>");
|
||||
assert!(
|
||||
content_type.starts_with("application/json"),
|
||||
"unexpected content-type: {content_type}"
|
||||
);
|
||||
|
||||
let body = String::from_utf8_lossy(&request.body);
|
||||
assert!(
|
||||
body.contains("codex.turns"),
|
||||
"expected metric name not found; body prefix: {}",
|
||||
&body.chars().take(2000).collect::<String>()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
205
codex-rs/otel/tests/suite/send.rs
Normal file
205
codex-rs/otel/tests/suite/send.rs
Normal file
@@ -0,0 +1,205 @@
|
||||
use crate::harness::attributes_to_map;
|
||||
use crate::harness::build_metrics_with_defaults;
|
||||
use crate::harness::find_metric;
|
||||
use crate::harness::histogram_data;
|
||||
use crate::harness::latest_metrics;
|
||||
use codex_otel::metrics::Result;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
// Ensures counters/histograms render with default + per-call tags.
|
||||
#[test]
|
||||
fn send_builds_payload_with_tags_and_histograms() -> Result<()> {
|
||||
let (metrics, exporter) =
|
||||
build_metrics_with_defaults(&[("service", "codex-cli"), ("env", "prod")])?;
|
||||
|
||||
metrics.counter("codex.turns", 1, &[("model", "gpt-5.1"), ("env", "dev")])?;
|
||||
metrics.histogram("codex.tool_latency", 25, &[("tool", "shell")])?;
|
||||
metrics.shutdown()?;
|
||||
|
||||
let resource_metrics = latest_metrics(&exporter);
|
||||
|
||||
let counter = find_metric(&resource_metrics, "codex.turns").expect("counter metric missing");
|
||||
let counter_attributes = match counter.data() {
|
||||
opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data {
|
||||
opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => {
|
||||
let points: Vec<_> = sum.data_points().collect();
|
||||
assert_eq!(points.len(), 1);
|
||||
assert_eq!(points[0].value(), 1);
|
||||
attributes_to_map(points[0].attributes())
|
||||
}
|
||||
_ => panic!("unexpected counter aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected counter data type"),
|
||||
};
|
||||
|
||||
let expected_counter_attributes = BTreeMap::from([
|
||||
("service".to_string(), "codex-cli".to_string()),
|
||||
("env".to_string(), "dev".to_string()),
|
||||
("model".to_string(), "gpt-5.1".to_string()),
|
||||
]);
|
||||
assert_eq!(counter_attributes, expected_counter_attributes);
|
||||
|
||||
let (bounds, bucket_counts, sum, count) =
|
||||
histogram_data(&resource_metrics, "codex.tool_latency");
|
||||
assert!(!bounds.is_empty());
|
||||
assert_eq!(bucket_counts.iter().sum::<u64>(), 1);
|
||||
assert_eq!(sum, 25.0);
|
||||
assert_eq!(count, 1);
|
||||
|
||||
let histogram_attrs = attributes_to_map(
|
||||
match find_metric(&resource_metrics, "codex.tool_latency").and_then(|metric| {
|
||||
match metric.data() {
|
||||
opentelemetry_sdk::metrics::data::AggregatedMetrics::F64(
|
||||
opentelemetry_sdk::metrics::data::MetricData::Histogram(histogram),
|
||||
) => histogram
|
||||
.data_points()
|
||||
.next()
|
||||
.map(opentelemetry_sdk::metrics::data::HistogramDataPoint::attributes),
|
||||
_ => None,
|
||||
}
|
||||
}) {
|
||||
Some(attrs) => attrs,
|
||||
None => panic!("histogram attributes missing"),
|
||||
},
|
||||
);
|
||||
let expected_histogram_attributes = BTreeMap::from([
|
||||
("service".to_string(), "codex-cli".to_string()),
|
||||
("env".to_string(), "prod".to_string()),
|
||||
("tool".to_string(), "shell".to_string()),
|
||||
]);
|
||||
assert_eq!(histogram_attrs, expected_histogram_attributes);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ensures defaults merge per line and overrides take precedence.
|
||||
#[test]
|
||||
fn send_merges_default_tags_per_line() -> Result<()> {
|
||||
let (metrics, exporter) = build_metrics_with_defaults(&[
|
||||
("service", "codex-cli"),
|
||||
("env", "prod"),
|
||||
("region", "us"),
|
||||
])?;
|
||||
|
||||
metrics.counter("codex.alpha", 1, &[("env", "dev"), ("component", "alpha")])?;
|
||||
metrics.counter(
|
||||
"codex.beta",
|
||||
2,
|
||||
&[("service", "worker"), ("component", "beta")],
|
||||
)?;
|
||||
metrics.shutdown()?;
|
||||
|
||||
let resource_metrics = latest_metrics(&exporter);
|
||||
let alpha_metric =
|
||||
find_metric(&resource_metrics, "codex.alpha").expect("codex.alpha metric missing");
|
||||
let alpha_point = match alpha_metric.data() {
|
||||
opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data {
|
||||
opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => {
|
||||
let points: Vec<_> = sum.data_points().collect();
|
||||
assert_eq!(points.len(), 1);
|
||||
points[0]
|
||||
}
|
||||
_ => panic!("unexpected counter aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected counter data type"),
|
||||
};
|
||||
assert_eq!(alpha_point.value(), 1);
|
||||
let alpha_attrs = attributes_to_map(alpha_point.attributes());
|
||||
let expected_alpha_attrs = BTreeMap::from([
|
||||
("component".to_string(), "alpha".to_string()),
|
||||
("env".to_string(), "dev".to_string()),
|
||||
("region".to_string(), "us".to_string()),
|
||||
("service".to_string(), "codex-cli".to_string()),
|
||||
]);
|
||||
assert_eq!(alpha_attrs, expected_alpha_attrs);
|
||||
|
||||
let beta_metric =
|
||||
find_metric(&resource_metrics, "codex.beta").expect("codex.beta metric missing");
|
||||
let beta_point = match beta_metric.data() {
|
||||
opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data {
|
||||
opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => {
|
||||
let points: Vec<_> = sum.data_points().collect();
|
||||
assert_eq!(points.len(), 1);
|
||||
points[0]
|
||||
}
|
||||
_ => panic!("unexpected counter aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected counter data type"),
|
||||
};
|
||||
assert_eq!(beta_point.value(), 2);
|
||||
let beta_attrs = attributes_to_map(beta_point.attributes());
|
||||
let expected_beta_attrs = BTreeMap::from([
|
||||
("component".to_string(), "beta".to_string()),
|
||||
("env".to_string(), "prod".to_string()),
|
||||
("region".to_string(), "us".to_string()),
|
||||
("service".to_string(), "worker".to_string()),
|
||||
]);
|
||||
assert_eq!(beta_attrs, expected_beta_attrs);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Verifies enqueued metrics are delivered by the background worker.
|
||||
#[test]
|
||||
fn client_sends_enqueued_metric() -> Result<()> {
|
||||
let (metrics, exporter) = build_metrics_with_defaults(&[])?;
|
||||
|
||||
metrics.counter("codex.turns", 1, &[("model", "gpt-5.1")])?;
|
||||
metrics.shutdown()?;
|
||||
|
||||
let resource_metrics = latest_metrics(&exporter);
|
||||
let counter = find_metric(&resource_metrics, "codex.turns").expect("counter metric missing");
|
||||
let points = match counter.data() {
|
||||
opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data {
|
||||
opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => {
|
||||
sum.data_points().collect::<Vec<_>>()
|
||||
}
|
||||
_ => panic!("unexpected counter aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected counter data type"),
|
||||
};
|
||||
assert_eq!(points.len(), 1);
|
||||
let point = points[0];
|
||||
assert_eq!(point.value(), 1);
|
||||
let attrs = attributes_to_map(point.attributes());
|
||||
assert_eq!(attrs.get("model").map(String::as_str), Some("gpt-5.1"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ensures shutdown flushes successfully with in-memory exporters.
|
||||
#[test]
|
||||
fn shutdown_flushes_in_memory_exporter() -> Result<()> {
|
||||
let (metrics, exporter) = build_metrics_with_defaults(&[])?;
|
||||
|
||||
metrics.counter("codex.turns", 1, &[])?;
|
||||
metrics.shutdown()?;
|
||||
|
||||
let resource_metrics = latest_metrics(&exporter);
|
||||
let counter = find_metric(&resource_metrics, "codex.turns").expect("counter metric missing");
|
||||
let points = match counter.data() {
|
||||
opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data {
|
||||
opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => {
|
||||
sum.data_points().collect::<Vec<_>>()
|
||||
}
|
||||
_ => panic!("unexpected counter aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected counter data type"),
|
||||
};
|
||||
assert_eq!(points.len(), 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ensures shutting down without recording metrics does not export anything.
|
||||
#[test]
|
||||
fn shutdown_without_metrics_exports_nothing() -> Result<()> {
|
||||
let (metrics, exporter) = build_metrics_with_defaults(&[])?;
|
||||
|
||||
metrics.shutdown()?;
|
||||
|
||||
let finished = exporter.get_finished_metrics().unwrap();
|
||||
assert!(finished.is_empty(), "expected no metrics exported");
|
||||
Ok(())
|
||||
}
|
||||
68
codex-rs/otel/tests/suite/timing.rs
Normal file
68
codex-rs/otel/tests/suite/timing.rs
Normal file
@@ -0,0 +1,68 @@
|
||||
use crate::harness::attributes_to_map;
|
||||
use crate::harness::build_metrics_with_defaults;
|
||||
use crate::harness::histogram_data;
|
||||
use crate::harness::latest_metrics;
|
||||
use codex_otel::metrics::Result;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::time::Duration;
|
||||
|
||||
// Ensures duration recording maps to histogram output.
|
||||
#[test]
|
||||
fn record_duration_records_histogram() -> Result<()> {
|
||||
let (metrics, exporter) = build_metrics_with_defaults(&[])?;
|
||||
|
||||
metrics.record_duration(
|
||||
"codex.request_latency",
|
||||
Duration::from_millis(15),
|
||||
&[("route", "chat")],
|
||||
)?;
|
||||
metrics.shutdown()?;
|
||||
|
||||
let (bounds, bucket_counts, sum, count) =
|
||||
histogram_data(&latest_metrics(&exporter), "codex.request_latency");
|
||||
assert!(!bounds.is_empty());
|
||||
assert_eq!(bucket_counts.iter().sum::<u64>(), 1);
|
||||
assert_eq!(sum, 15.0);
|
||||
assert_eq!(count, 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ensures time_result returns the closure output and records timing.
|
||||
#[test]
|
||||
fn timer_result_records_success() -> Result<()> {
|
||||
let (metrics, exporter) = build_metrics_with_defaults(&[])?;
|
||||
|
||||
{
|
||||
let timer = metrics.start_timer("codex.request_latency", &[("route", "chat")]);
|
||||
assert!(timer.is_ok());
|
||||
}
|
||||
|
||||
metrics.shutdown()?;
|
||||
|
||||
let resource_metrics = latest_metrics(&exporter);
|
||||
let (bounds, bucket_counts, _sum, count) =
|
||||
histogram_data(&resource_metrics, "codex.request_latency");
|
||||
assert!(!bounds.is_empty());
|
||||
assert_eq!(count, 1);
|
||||
assert_eq!(bucket_counts.iter().sum::<u64>(), 1);
|
||||
let attrs = attributes_to_map(
|
||||
match crate::harness::find_metric(&resource_metrics, "codex.request_latency").and_then(
|
||||
|metric| match metric.data() {
|
||||
opentelemetry_sdk::metrics::data::AggregatedMetrics::F64(
|
||||
opentelemetry_sdk::metrics::data::MetricData::Histogram(histogram),
|
||||
) => histogram
|
||||
.data_points()
|
||||
.next()
|
||||
.map(opentelemetry_sdk::metrics::data::HistogramDataPoint::attributes),
|
||||
_ => None,
|
||||
},
|
||||
) {
|
||||
Some(attrs) => attrs,
|
||||
None => panic!("attributes missing"),
|
||||
},
|
||||
);
|
||||
assert_eq!(attrs.get("route").map(String::as_str), Some("chat"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
87
codex-rs/otel/tests/suite/validation.rs
Normal file
87
codex-rs/otel/tests/suite/validation.rs
Normal file
@@ -0,0 +1,87 @@
|
||||
use codex_otel::metrics::MetricsClient;
|
||||
use codex_otel::metrics::MetricsConfig;
|
||||
use codex_otel::metrics::MetricsError;
|
||||
use codex_otel::metrics::Result;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
|
||||
fn build_in_memory_client() -> Result<MetricsClient> {
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let config = MetricsConfig::in_memory("test", "codex-cli", env!("CARGO_PKG_VERSION"), exporter);
|
||||
MetricsClient::new(config)
|
||||
}
|
||||
|
||||
// Ensures invalid tag components are rejected during config build.
|
||||
#[test]
|
||||
fn invalid_tag_component_is_rejected() -> Result<()> {
|
||||
let err = MetricsConfig::in_memory(
|
||||
"test",
|
||||
"codex-cli",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
InMemoryMetricExporter::default(),
|
||||
)
|
||||
.with_tag("bad key", "value")
|
||||
.unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
MetricsError::InvalidTagComponent { label, value }
|
||||
if label == "tag key" && value == "bad key"
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ensures per-metric tag keys are validated.
|
||||
#[test]
|
||||
fn counter_rejects_invalid_tag_key() -> Result<()> {
|
||||
let metrics = build_in_memory_client()?;
|
||||
let err = metrics
|
||||
.counter("codex.turns", 1, &[("bad key", "value")])
|
||||
.unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
MetricsError::InvalidTagComponent { label, value }
|
||||
if label == "tag key" && value == "bad key"
|
||||
));
|
||||
metrics.shutdown()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ensures per-metric tag values are validated.
|
||||
#[test]
|
||||
fn histogram_rejects_invalid_tag_value() -> Result<()> {
|
||||
let metrics = build_in_memory_client()?;
|
||||
let err = metrics
|
||||
.histogram("codex.request_latency", 3, &[("route", "bad value")])
|
||||
.unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
MetricsError::InvalidTagComponent { label, value }
|
||||
if label == "tag value" && value == "bad value"
|
||||
));
|
||||
metrics.shutdown()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ensures invalid metric names are rejected.
|
||||
#[test]
|
||||
fn counter_rejects_invalid_metric_name() -> Result<()> {
|
||||
let metrics = build_in_memory_client()?;
|
||||
let err = metrics.counter("bad name", 1, &[]).unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
MetricsError::InvalidMetricName { name } if name == "bad name"
|
||||
));
|
||||
metrics.shutdown()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn counter_rejects_negative_increment() -> Result<()> {
|
||||
let metrics = build_in_memory_client()?;
|
||||
let err = metrics.counter("codex.turns", -1, &[]).unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
MetricsError::NegativeCounterIncrement { name, inc } if name == "codex.turns" && inc == -1
|
||||
));
|
||||
metrics.shutdown()?;
|
||||
Ok(())
|
||||
}
|
||||
2
codex-rs/otel/tests/tests.rs
Normal file
2
codex-rs/otel/tests/tests.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
mod harness;
|
||||
mod suite;
|
||||
48
docs/telemetry.md
Normal file
48
docs/telemetry.md
Normal file
@@ -0,0 +1,48 @@
|
||||
# Codex Telemetry
|
||||
|
||||
## Config
|
||||
|
||||
**TODO(jif)**: add the config and document it
|
||||
|
||||
## Tracing
|
||||
|
||||
Codex can export OpenTelemetry **log events**, **trace spans**, and **metrics**
|
||||
when OTEL exporters are configured in `config.toml` (`[otel]`).
|
||||
By default, exporters are disabled and nothing is sent.
|
||||
|
||||
## Feedback
|
||||
|
||||
Feedback is sent only when you run `/feedback` and confirm. The report includes
|
||||
the selected category and optional note; if you opt in to include logs, Codex
|
||||
attaches the most recent in-memory logs for the session (up to ~4 MiB).
|
||||
|
||||
## Metrics
|
||||
|
||||
This section list all the metrics exported by Codex when locally installed.
|
||||
|
||||
### Global context (applies to every event/metric)
|
||||
|
||||
- `surface`: `cli` | `vscode` | `exec` | `mcp` | `subagent_*` (from `SessionSource`).
|
||||
- `version`: binary version.
|
||||
- `auth_mode`: `swic` (AuthMode::ChatGPT) | `api` (AuthMode::ApiKey) | `unknown`.
|
||||
- `model`: name of the model used.
|
||||
|
||||
## Metrics catalog
|
||||
|
||||
Each metric includes the required fields plus the global context above.
|
||||
|
||||
| Metric | Type | Fields | Description |
|
||||
| ------------------------- | --------- | ------------------------------------- | ------------------------------------------------------------------------------- |
|
||||
| `approval.requested` | counter | `tool`, `approved` | Tool approval request result (`approved`: `yes` or `no`). |
|
||||
| `auth.completed` | counter | `status` | Authentication completed (only for ChatGPT authentication). |
|
||||
| `conversation.compact` | counter | `status`, `number` | Compaction event including the status and the compaction number in the session. |
|
||||
| `conversation.turn.count` | counter | `role` | User/assistant turns per session. |
|
||||
| `feature.duration_ms` | histogram | `feature`, `status` | End-to-end feature latency. |
|
||||
| `feature.used` | counter | `feature` | Feature usage through `/` (e.g., `/undo`, `/review`, ...). |
|
||||
| `features.state` | counter | `key`, `value` | Feature values that differ from defaults (emit one row per non-default). |
|
||||
| `mcp.call` | counter | `status` | MCP tool invocation result (`ok` or error string). |
|
||||
| `model.call.duration_ms` | histogram | `provider`, `status`, `attempt` | Model API request duration. |
|
||||
| `session.started` | counter | `is_git` | New session created. |
|
||||
| `tool.call` | counter | `tool`, `status` | Tool invocation result (`ok` or error string). |
|
||||
| `tool.call.duration_ms` | histogram | `tool`, `status` | Tool execution time. |
|
||||
| `user.feedback.submitted` | counter | `category`, `include_logs`, `success` | Feedback submission via `/feedback`. |
|
||||
Reference in New Issue
Block a user