mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
feat: show runtime metrics in console (#10278)
Summary of changes:
- Adds a new feature flag: runtime_metrics
- Declared in core/src/features.rs
- Added to core/config.schema.json
- Wired into OTEL init in core/src/otel_init.rs
- Enables on-demand runtime metric snapshots in OTEL
- Adds runtime_metrics: bool to otel/src/config.rs
- Enables experimental custom reader features in otel/Cargo.toml
- Adds snapshot/reset/summary APIs in:
- otel/src/lib.rs
- otel/src/metrics/client.rs
- otel/src/metrics/config.rs
- otel/src/metrics/error.rs
- Defines metric names and a runtime summary builder
- New files:
- otel/src/metrics/names.rs
- otel/src/metrics/runtime_metrics.rs
- Summarizes totals for:
- Tool calls
- API requests
- SSE/streaming events
- Instruments metrics collection in OTEL manager
- otel/src/traces/otel_manager.rs now records:
- API call counts + durations
- SSE event counts + durations (success/failure)
- Tool call metrics now use shared constants
- Surfaces runtime metrics in the TUI
- Resets runtime metrics at turn start in tui/src/chatwidget.rs
- Displays metrics in the final separator line in
tui/src/history_cell.rs
- Adds tests
- New OTEL tests:
- otel/tests/suite/snapshot.rs
- otel/tests/suite/runtime_summary.rs
- New TUI test:
- final_message_separator_includes_runtime_metrics in
tui/src/history_cell.rs
Scope:
- 19 files changed
- ~652 insertions, 38 deletions
<img width="922" height="169" alt="Screenshot 2026-01-30 at 4 11 34 PM"
src="https://github.com/user-attachments/assets/1efd754d-a16d-4564-83a5-f4442fd2f998"
/>
This commit is contained in:
@@ -208,6 +208,9 @@
|
||||
"responses_websockets": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"runtime_metrics": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"shell_snapshot": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -1233,6 +1236,9 @@
|
||||
"responses_websockets": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"runtime_metrics": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"shell_snapshot": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
@@ -101,6 +101,8 @@ pub enum Feature {
|
||||
RemoteModels,
|
||||
/// Experimental shell snapshotting.
|
||||
ShellSnapshot,
|
||||
/// Enable runtime metrics snapshots via a manual reader.
|
||||
RuntimeMetrics,
|
||||
/// Persist rollout metadata to a local SQLite database.
|
||||
Sqlite,
|
||||
/// Append additional AGENTS.md guidance to user instructions.
|
||||
@@ -437,6 +439,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
},
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::RuntimeMetrics,
|
||||
key: "runtime_metrics",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::Sqlite,
|
||||
key: "sqlite",
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::config::Config;
|
||||
use crate::config::types::OtelExporterKind as Kind;
|
||||
use crate::config::types::OtelHttpProtocol as Protocol;
|
||||
use crate::default_client::originator;
|
||||
use crate::features::Feature;
|
||||
use codex_otel::config::OtelExporter;
|
||||
use codex_otel::config::OtelHttpProtocol;
|
||||
use codex_otel::config::OtelSettings;
|
||||
@@ -77,6 +78,7 @@ pub fn build_provider(
|
||||
|
||||
let originator = originator();
|
||||
let service_name = service_name_override.unwrap_or(originator.value.as_str());
|
||||
let runtime_metrics = config.features.enabled(Feature::RuntimeMetrics);
|
||||
|
||||
OtelProvider::from(&OtelSettings {
|
||||
service_name: service_name.to_string(),
|
||||
@@ -86,6 +88,7 @@ pub fn build_provider(
|
||||
exporter,
|
||||
trace_exporter,
|
||||
metrics_exporter,
|
||||
runtime_metrics,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,14 @@ opentelemetry-otlp = { workspace = true, features = [
|
||||
"tls-roots",
|
||||
]}
|
||||
opentelemetry-semantic-conventions = { workspace = true }
|
||||
opentelemetry_sdk = { workspace = true, features = ["logs", "metrics", "rt-tokio", "testing", "trace"] }
|
||||
opentelemetry_sdk = { workspace = true, features = [
|
||||
"experimental_metrics_custom_reader",
|
||||
"logs",
|
||||
"metrics",
|
||||
"rt-tokio",
|
||||
"testing",
|
||||
"trace",
|
||||
] }
|
||||
http = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["blocking", "rustls-tls"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
@@ -54,5 +61,8 @@ tracing-opentelemetry = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
opentelemetry_sdk = { workspace = true, features = ["testing"] }
|
||||
opentelemetry_sdk = { workspace = true, features = [
|
||||
"experimental_metrics_custom_reader",
|
||||
"testing",
|
||||
] }
|
||||
pretty_assertions = { workspace = true }
|
||||
|
||||
@@ -37,6 +37,7 @@ pub struct OtelSettings {
|
||||
pub exporter: OtelExporter,
|
||||
pub trace_exporter: OtelExporter,
|
||||
pub metrics_exporter: OtelExporter,
|
||||
pub runtime_metrics: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
||||
@@ -14,9 +14,14 @@ use crate::metrics::validation::validate_tag_key;
|
||||
use crate::metrics::validation::validate_tag_value;
|
||||
use crate::otel_provider::OtelProvider;
|
||||
use codex_protocol::ThreadId;
|
||||
use opentelemetry_sdk::metrics::data::ResourceMetrics;
|
||||
use serde::Serialize;
|
||||
use std::time::Duration;
|
||||
use strum_macros::Display;
|
||||
use tracing::debug;
|
||||
|
||||
pub use crate::metrics::runtime_metrics::RuntimeMetricTotals;
|
||||
pub use crate::metrics::runtime_metrics::RuntimeMetricsSummary;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Display)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
@@ -137,6 +142,39 @@ impl OtelManager {
|
||||
metrics.shutdown()
|
||||
}
|
||||
|
||||
pub fn snapshot_metrics(&self) -> MetricsResult<ResourceMetrics> {
|
||||
let Some(metrics) = &self.metrics else {
|
||||
return Err(MetricsError::ExporterDisabled);
|
||||
};
|
||||
metrics.snapshot()
|
||||
}
|
||||
|
||||
/// Collect and discard a runtime metrics snapshot to reset delta accumulators.
|
||||
pub fn reset_runtime_metrics(&self) {
|
||||
if self.metrics.is_none() {
|
||||
return;
|
||||
}
|
||||
if let Err(err) = self.snapshot_metrics() {
|
||||
debug!("runtime metrics reset skipped: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Collect a runtime metrics summary if debug snapshots are available.
|
||||
pub fn runtime_metrics_summary(&self) -> Option<RuntimeMetricsSummary> {
|
||||
let snapshot = match self.snapshot_metrics() {
|
||||
Ok(snapshot) => snapshot,
|
||||
Err(_) => {
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let summary = RuntimeMetricsSummary::from_snapshot(&snapshot);
|
||||
if summary.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(summary)
|
||||
}
|
||||
}
|
||||
|
||||
fn tags_with_metadata<'a>(
|
||||
&'a self,
|
||||
tags: &'a [(&'a str, &'a str)],
|
||||
|
||||
@@ -22,13 +22,20 @@ use opentelemetry_otlp::WithTonicConfig;
|
||||
use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
|
||||
use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig;
|
||||
use opentelemetry_sdk::Resource;
|
||||
use opentelemetry_sdk::metrics::InstrumentKind;
|
||||
use opentelemetry_sdk::metrics::ManualReader;
|
||||
use opentelemetry_sdk::metrics::PeriodicReader;
|
||||
use opentelemetry_sdk::metrics::Pipeline;
|
||||
use opentelemetry_sdk::metrics::SdkMeterProvider;
|
||||
use opentelemetry_sdk::metrics::Temporality;
|
||||
use opentelemetry_sdk::metrics::data::ResourceMetrics;
|
||||
use opentelemetry_sdk::metrics::reader::MetricReader;
|
||||
use opentelemetry_semantic_conventions as semconv;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::Weak;
|
||||
use std::time::Duration;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -37,6 +44,39 @@ const METER_NAME: &str = "codex";
|
||||
const DURATION_UNIT: &str = "ms";
|
||||
const DURATION_DESCRIPTION: &str = "Duration in milliseconds.";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct SharedManualReader {
|
||||
inner: Arc<ManualReader>,
|
||||
}
|
||||
|
||||
impl SharedManualReader {
|
||||
fn new(inner: Arc<ManualReader>) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricReader for SharedManualReader {
|
||||
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
|
||||
self.inner.register_pipeline(pipeline);
|
||||
}
|
||||
|
||||
fn collect(&self, rm: &mut ResourceMetrics) -> opentelemetry_sdk::error::OTelSdkResult {
|
||||
self.inner.collect(rm)
|
||||
}
|
||||
|
||||
fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult {
|
||||
self.inner.force_flush()
|
||||
}
|
||||
|
||||
fn shutdown_with_timeout(&self, timeout: Duration) -> opentelemetry_sdk::error::OTelSdkResult {
|
||||
self.inner.shutdown_with_timeout(timeout)
|
||||
}
|
||||
|
||||
fn temporality(&self, kind: InstrumentKind) -> Temporality {
|
||||
self.inner.temporality(kind)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MetricsClientInner {
|
||||
meter_provider: SdkMeterProvider,
|
||||
@@ -44,6 +84,7 @@ struct MetricsClientInner {
|
||||
counters: Mutex<HashMap<String, Counter<u64>>>,
|
||||
histograms: Mutex<HashMap<String, Histogram<f64>>>,
|
||||
duration_histograms: Mutex<HashMap<String, Histogram<f64>>>,
|
||||
runtime_reader: Option<Arc<ManualReader>>,
|
||||
default_tags: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
@@ -144,26 +185,41 @@ pub struct MetricsClient(std::sync::Arc<MetricsClientInner>);
|
||||
impl MetricsClient {
|
||||
/// Build a metrics client from configuration and validate defaults.
|
||||
pub fn new(config: MetricsConfig) -> Result<Self> {
|
||||
validate_tags(&config.default_tags)?;
|
||||
let MetricsConfig {
|
||||
environment,
|
||||
service_name,
|
||||
service_version,
|
||||
exporter,
|
||||
export_interval,
|
||||
runtime_reader,
|
||||
default_tags,
|
||||
} = config;
|
||||
|
||||
validate_tags(&default_tags)?;
|
||||
|
||||
let resource = Resource::builder()
|
||||
.with_service_name(config.service_name.clone())
|
||||
.with_service_name(service_name)
|
||||
.with_attributes(vec![
|
||||
KeyValue::new(
|
||||
semconv::attribute::SERVICE_VERSION,
|
||||
config.service_version.clone(),
|
||||
),
|
||||
KeyValue::new(ENV_ATTRIBUTE, config.environment.clone()),
|
||||
KeyValue::new(semconv::attribute::SERVICE_VERSION, service_version),
|
||||
KeyValue::new(ENV_ATTRIBUTE, environment),
|
||||
])
|
||||
.build();
|
||||
|
||||
let (meter_provider, meter) = match config.exporter {
|
||||
let runtime_reader = runtime_reader.then(|| {
|
||||
Arc::new(
|
||||
ManualReader::builder()
|
||||
.with_temporality(Temporality::Delta)
|
||||
.build(),
|
||||
)
|
||||
});
|
||||
|
||||
let (meter_provider, meter) = match exporter {
|
||||
MetricsExporter::InMemory(exporter) => {
|
||||
build_provider(resource, exporter, config.export_interval)
|
||||
build_provider(resource, exporter, export_interval, runtime_reader.clone())
|
||||
}
|
||||
MetricsExporter::Otlp(exporter) => {
|
||||
let exporter = build_otlp_metric_exporter(exporter, Temporality::Delta)?;
|
||||
build_provider(resource, exporter, config.export_interval)
|
||||
build_provider(resource, exporter, export_interval, runtime_reader.clone())
|
||||
}
|
||||
};
|
||||
|
||||
@@ -173,7 +229,8 @@ impl MetricsClient {
|
||||
counters: Mutex::new(HashMap::new()),
|
||||
histograms: Mutex::new(HashMap::new()),
|
||||
duration_histograms: Mutex::new(HashMap::new()),
|
||||
default_tags: config.default_tags,
|
||||
runtime_reader,
|
||||
default_tags,
|
||||
})))
|
||||
}
|
||||
|
||||
@@ -209,6 +266,18 @@ impl MetricsClient {
|
||||
Ok(Timer::new(name, tags, self))
|
||||
}
|
||||
|
||||
/// Collect a runtime metrics snapshot without shutting down the provider.
|
||||
pub fn snapshot(&self) -> Result<ResourceMetrics> {
|
||||
let Some(reader) = &self.0.runtime_reader else {
|
||||
return Err(MetricsError::RuntimeSnapshotUnavailable);
|
||||
};
|
||||
let mut snapshot = ResourceMetrics::default();
|
||||
reader
|
||||
.collect(&mut snapshot)
|
||||
.map_err(|source| MetricsError::RuntimeSnapshotCollect { source })?;
|
||||
Ok(snapshot)
|
||||
}
|
||||
|
||||
/// Flush metrics and stop the underlying OTEL meter provider.
|
||||
pub fn shutdown(&self) -> Result<()> {
|
||||
self.0.shutdown()
|
||||
@@ -219,6 +288,7 @@ fn build_provider<E>(
|
||||
resource: Resource,
|
||||
exporter: E,
|
||||
interval: Option<Duration>,
|
||||
runtime_reader: Option<Arc<ManualReader>>,
|
||||
) -> (SdkMeterProvider, Meter)
|
||||
where
|
||||
E: opentelemetry_sdk::metrics::exporter::PushMetricExporter + 'static,
|
||||
@@ -228,10 +298,11 @@ where
|
||||
reader_builder = reader_builder.with_interval(interval);
|
||||
}
|
||||
let reader = reader_builder.build();
|
||||
let provider = SdkMeterProvider::builder()
|
||||
.with_resource(resource)
|
||||
.with_reader(reader)
|
||||
.build();
|
||||
let mut provider_builder = SdkMeterProvider::builder().with_resource(resource);
|
||||
if let Some(reader) = runtime_reader {
|
||||
provider_builder = provider_builder.with_reader(SharedManualReader::new(reader));
|
||||
}
|
||||
let provider = provider_builder.with_reader(reader).build();
|
||||
let meter = provider.meter(METER_NAME);
|
||||
(provider, meter)
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ pub struct MetricsConfig {
|
||||
pub(crate) service_version: String,
|
||||
pub(crate) exporter: MetricsExporter,
|
||||
pub(crate) export_interval: Option<Duration>,
|
||||
pub(crate) runtime_reader: bool,
|
||||
pub(crate) default_tags: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
@@ -35,6 +36,7 @@ impl MetricsConfig {
|
||||
service_version: service_version.into(),
|
||||
exporter: MetricsExporter::Otlp(exporter),
|
||||
export_interval: None,
|
||||
runtime_reader: false,
|
||||
default_tags: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
@@ -52,6 +54,7 @@ impl MetricsConfig {
|
||||
service_version: service_version.into(),
|
||||
exporter: MetricsExporter::InMemory(exporter),
|
||||
export_interval: None,
|
||||
runtime_reader: false,
|
||||
default_tags: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
@@ -62,6 +65,12 @@ impl MetricsConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable a manual reader for on-demand runtime snapshots.
|
||||
pub fn with_runtime_reader(mut self) -> Self {
|
||||
self.runtime_reader = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Add a default tag that will be sent with every metric.
|
||||
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Result<Self> {
|
||||
let key = key.into();
|
||||
|
||||
@@ -34,4 +34,13 @@ pub enum MetricsError {
|
||||
#[source]
|
||||
source: opentelemetry_sdk::error::OTelSdkError,
|
||||
},
|
||||
|
||||
#[error("runtime metrics snapshot reader is not enabled")]
|
||||
RuntimeSnapshotUnavailable,
|
||||
|
||||
#[error("failed to collect runtime metrics snapshot from metrics reader")]
|
||||
RuntimeSnapshotCollect {
|
||||
#[source]
|
||||
source: opentelemetry_sdk::error::OTelSdkError,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
mod client;
|
||||
mod config;
|
||||
mod error;
|
||||
pub(crate) mod names;
|
||||
pub(crate) mod runtime_metrics;
|
||||
pub(crate) mod timer;
|
||||
pub(crate) mod validation;
|
||||
|
||||
|
||||
6
codex-rs/otel/src/metrics/names.rs
Normal file
6
codex-rs/otel/src/metrics/names.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
pub(crate) const TOOL_CALL_COUNT_METRIC: &str = "codex.tool.call";
|
||||
pub(crate) const TOOL_CALL_DURATION_METRIC: &str = "codex.tool.call.duration_ms";
|
||||
pub(crate) const API_CALL_COUNT_METRIC: &str = "codex.api_request";
|
||||
pub(crate) const API_CALL_DURATION_METRIC: &str = "codex.api_request.duration_ms";
|
||||
pub(crate) const SSE_EVENT_COUNT_METRIC: &str = "codex.sse_event";
|
||||
pub(crate) const SSE_EVENT_DURATION_METRIC: &str = "codex.sse_event.duration_ms";
|
||||
101
codex-rs/otel/src/metrics/runtime_metrics.rs
Normal file
101
codex-rs/otel/src/metrics/runtime_metrics.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
use crate::metrics::names::API_CALL_COUNT_METRIC;
|
||||
use crate::metrics::names::API_CALL_DURATION_METRIC;
|
||||
use crate::metrics::names::SSE_EVENT_COUNT_METRIC;
|
||||
use crate::metrics::names::SSE_EVENT_DURATION_METRIC;
|
||||
use crate::metrics::names::TOOL_CALL_COUNT_METRIC;
|
||||
use crate::metrics::names::TOOL_CALL_DURATION_METRIC;
|
||||
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
|
||||
use opentelemetry_sdk::metrics::data::Metric;
|
||||
use opentelemetry_sdk::metrics::data::MetricData;
|
||||
use opentelemetry_sdk::metrics::data::ResourceMetrics;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
pub struct RuntimeMetricTotals {
|
||||
pub count: u64,
|
||||
pub duration_ms: u64,
|
||||
}
|
||||
|
||||
impl RuntimeMetricTotals {
|
||||
pub fn is_empty(self) -> bool {
|
||||
self.count == 0 && self.duration_ms == 0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
pub struct RuntimeMetricsSummary {
|
||||
pub tool_calls: RuntimeMetricTotals,
|
||||
pub api_calls: RuntimeMetricTotals,
|
||||
pub streaming_events: RuntimeMetricTotals,
|
||||
}
|
||||
|
||||
impl RuntimeMetricsSummary {
|
||||
pub fn is_empty(self) -> bool {
|
||||
self.tool_calls.is_empty() && self.api_calls.is_empty() && self.streaming_events.is_empty()
|
||||
}
|
||||
|
||||
pub(crate) fn from_snapshot(snapshot: &ResourceMetrics) -> Self {
|
||||
let tool_calls = RuntimeMetricTotals {
|
||||
count: sum_counter(snapshot, TOOL_CALL_COUNT_METRIC),
|
||||
duration_ms: sum_histogram_ms(snapshot, TOOL_CALL_DURATION_METRIC),
|
||||
};
|
||||
let api_calls = RuntimeMetricTotals {
|
||||
count: sum_counter(snapshot, API_CALL_COUNT_METRIC),
|
||||
duration_ms: sum_histogram_ms(snapshot, API_CALL_DURATION_METRIC),
|
||||
};
|
||||
let streaming_events = RuntimeMetricTotals {
|
||||
count: sum_counter(snapshot, SSE_EVENT_COUNT_METRIC),
|
||||
duration_ms: sum_histogram_ms(snapshot, SSE_EVENT_DURATION_METRIC),
|
||||
};
|
||||
Self {
|
||||
tool_calls,
|
||||
api_calls,
|
||||
streaming_events,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn sum_counter(snapshot: &ResourceMetrics, name: &str) -> u64 {
|
||||
snapshot
|
||||
.scope_metrics()
|
||||
.flat_map(opentelemetry_sdk::metrics::data::ScopeMetrics::metrics)
|
||||
.filter(|metric| metric.name() == name)
|
||||
.map(sum_counter_metric)
|
||||
.sum()
|
||||
}
|
||||
|
||||
fn sum_counter_metric(metric: &Metric) -> u64 {
|
||||
match metric.data() {
|
||||
AggregatedMetrics::U64(MetricData::Sum(sum)) => sum
|
||||
.data_points()
|
||||
.map(opentelemetry_sdk::metrics::data::SumDataPoint::value)
|
||||
.sum(),
|
||||
_ => 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn sum_histogram_ms(snapshot: &ResourceMetrics, name: &str) -> u64 {
|
||||
snapshot
|
||||
.scope_metrics()
|
||||
.flat_map(opentelemetry_sdk::metrics::data::ScopeMetrics::metrics)
|
||||
.filter(|metric| metric.name() == name)
|
||||
.map(sum_histogram_metric_ms)
|
||||
.sum()
|
||||
}
|
||||
|
||||
fn sum_histogram_metric_ms(metric: &Metric) -> u64 {
|
||||
match metric.data() {
|
||||
AggregatedMetrics::F64(MetricData::Histogram(histogram)) => histogram
|
||||
.data_points()
|
||||
.map(|point| f64_to_u64(point.sum()))
|
||||
.sum(),
|
||||
_ => 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn f64_to_u64(value: f64) -> u64 {
|
||||
if !value.is_finite() || value <= 0.0 {
|
||||
return 0;
|
||||
}
|
||||
let clamped = value.min(u64::MAX as f64);
|
||||
clamped.round() as u64
|
||||
}
|
||||
@@ -75,12 +75,16 @@ impl OtelProvider {
|
||||
let metrics = if matches!(metric_exporter, OtelExporter::None) {
|
||||
None
|
||||
} else {
|
||||
Some(MetricsClient::new(MetricsConfig::otlp(
|
||||
let mut config = MetricsConfig::otlp(
|
||||
settings.environment.clone(),
|
||||
settings.service_name.clone(),
|
||||
settings.service_version.clone(),
|
||||
metric_exporter,
|
||||
))?)
|
||||
);
|
||||
if settings.runtime_metrics {
|
||||
config = config.with_runtime_reader();
|
||||
}
|
||||
Some(MetricsClient::new(config)?)
|
||||
};
|
||||
|
||||
if let Some(metrics) = metrics.as_ref() {
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
use crate::metrics::names::API_CALL_COUNT_METRIC;
|
||||
use crate::metrics::names::API_CALL_DURATION_METRIC;
|
||||
use crate::metrics::names::SSE_EVENT_COUNT_METRIC;
|
||||
use crate::metrics::names::SSE_EVENT_DURATION_METRIC;
|
||||
use crate::metrics::names::TOOL_CALL_COUNT_METRIC;
|
||||
use crate::metrics::names::TOOL_CALL_DURATION_METRIC;
|
||||
use crate::otel_provider::traceparent_context_from_env;
|
||||
use chrono::SecondsFormat;
|
||||
use chrono::Utc;
|
||||
@@ -29,6 +35,8 @@ pub use crate::OtelEventMetadata;
|
||||
pub use crate::OtelManager;
|
||||
pub use crate::ToolDecisionSource;
|
||||
|
||||
const SSE_UNKNOWN_KIND: &str = "unknown";
|
||||
|
||||
impl OtelManager {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
@@ -148,6 +156,21 @@ impl OtelManager {
|
||||
error: Option<&str>,
|
||||
duration: Duration,
|
||||
) {
|
||||
let success = status.is_some_and(|code| (200..=299).contains(&code)) && error.is_none();
|
||||
let success_str = if success { "true" } else { "false" };
|
||||
let status_str = status
|
||||
.map(|code| code.to_string())
|
||||
.unwrap_or_else(|| "none".to_string());
|
||||
self.counter(
|
||||
API_CALL_COUNT_METRIC,
|
||||
1,
|
||||
&[("status", status_str.as_str()), ("success", success_str)],
|
||||
);
|
||||
self.record_duration(
|
||||
API_CALL_DURATION_METRIC,
|
||||
duration,
|
||||
&[("status", status_str.as_str()), ("success", success_str)],
|
||||
);
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.api_request",
|
||||
@@ -215,6 +238,16 @@ impl OtelManager {
|
||||
}
|
||||
|
||||
fn sse_event(&self, kind: &str, duration: Duration) {
|
||||
self.counter(
|
||||
SSE_EVENT_COUNT_METRIC,
|
||||
1,
|
||||
&[("kind", kind), ("success", "true")],
|
||||
);
|
||||
self.record_duration(
|
||||
SSE_EVENT_DURATION_METRIC,
|
||||
duration,
|
||||
&[("kind", kind), ("success", "true")],
|
||||
);
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.sse_event",
|
||||
@@ -236,6 +269,17 @@ impl OtelManager {
|
||||
where
|
||||
T: Display,
|
||||
{
|
||||
let kind_str = kind.map_or(SSE_UNKNOWN_KIND, String::as_str);
|
||||
self.counter(
|
||||
SSE_EVENT_COUNT_METRIC,
|
||||
1,
|
||||
&[("kind", kind_str), ("success", "false")],
|
||||
);
|
||||
self.record_duration(
|
||||
SSE_EVENT_DURATION_METRIC,
|
||||
duration,
|
||||
&[("kind", kind_str), ("success", "false")],
|
||||
);
|
||||
match kind {
|
||||
Some(kind) => tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
@@ -443,12 +487,12 @@ impl OtelManager {
|
||||
) {
|
||||
let success_str = if success { "true" } else { "false" };
|
||||
self.counter(
|
||||
"codex.tool.call",
|
||||
TOOL_CALL_COUNT_METRIC,
|
||||
1,
|
||||
&[("tool", tool_name), ("success", success_str)],
|
||||
);
|
||||
self.record_duration(
|
||||
"codex.tool.call.duration_ms",
|
||||
TOOL_CALL_DURATION_METRIC,
|
||||
duration,
|
||||
&[("tool", tool_name), ("success", success_str)],
|
||||
);
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
mod manager_metrics;
|
||||
mod otlp_http_loopback;
|
||||
mod runtime_summary;
|
||||
mod send;
|
||||
mod snapshot;
|
||||
mod timing;
|
||||
mod validation;
|
||||
|
||||
77
codex-rs/otel/tests/suite/runtime_summary.rs
Normal file
77
codex-rs/otel/tests/suite/runtime_summary.rs
Normal file
@@ -0,0 +1,77 @@
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_otel::RuntimeMetricTotals;
|
||||
use codex_otel::RuntimeMetricsSummary;
|
||||
use codex_otel::metrics::MetricsClient;
|
||||
use codex_otel::metrics::MetricsConfig;
|
||||
use codex_otel::metrics::Result;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use eventsource_stream::Event as StreamEvent;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<()> {
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let metrics = MetricsClient::new(
|
||||
MetricsConfig::in_memory("test", "codex-cli", env!("CARGO_PKG_VERSION"), exporter)
|
||||
.with_runtime_reader(),
|
||||
)?;
|
||||
let manager = OtelManager::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
None,
|
||||
Some(AuthMode::ApiKey),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
)
|
||||
.with_metrics(metrics);
|
||||
|
||||
manager.reset_runtime_metrics();
|
||||
|
||||
manager.tool_result(
|
||||
"shell",
|
||||
"call-1",
|
||||
"{\"cmd\":\"echo\"}",
|
||||
Duration::from_millis(250),
|
||||
true,
|
||||
"ok",
|
||||
);
|
||||
manager.record_api_request(1, Some(200), None, Duration::from_millis(300));
|
||||
let sse_response: std::result::Result<
|
||||
Option<std::result::Result<StreamEvent, eventsource_stream::EventStreamError<&str>>>,
|
||||
tokio::time::error::Elapsed,
|
||||
> = Ok(Some(Ok(StreamEvent {
|
||||
event: "response.created".to_string(),
|
||||
data: "{}".to_string(),
|
||||
id: String::new(),
|
||||
retry: None,
|
||||
})));
|
||||
manager.log_sse_event(&sse_response, Duration::from_millis(120));
|
||||
|
||||
let summary = manager
|
||||
.runtime_metrics_summary()
|
||||
.expect("runtime metrics summary should be available");
|
||||
let expected = RuntimeMetricsSummary {
|
||||
tool_calls: RuntimeMetricTotals {
|
||||
count: 1,
|
||||
duration_ms: 250,
|
||||
},
|
||||
api_calls: RuntimeMetricTotals {
|
||||
count: 1,
|
||||
duration_ms: 300,
|
||||
},
|
||||
streaming_events: RuntimeMetricTotals {
|
||||
count: 1,
|
||||
duration_ms: 120,
|
||||
},
|
||||
};
|
||||
assert_eq!(summary, expected);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
120
codex-rs/otel/tests/suite/snapshot.rs
Normal file
120
codex-rs/otel/tests/suite/snapshot.rs
Normal file
@@ -0,0 +1,120 @@
|
||||
use crate::harness::attributes_to_map;
|
||||
use crate::harness::find_metric;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_otel::metrics::MetricsClient;
|
||||
use codex_otel::metrics::MetricsConfig;
|
||||
use codex_otel::metrics::Result;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
|
||||
use opentelemetry_sdk::metrics::data::MetricData;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
#[test]
|
||||
fn snapshot_collects_metrics_without_shutdown() -> Result<()> {
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let config = MetricsConfig::in_memory(
|
||||
"test",
|
||||
"codex-cli",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
exporter.clone(),
|
||||
)
|
||||
.with_tag("service", "codex-cli")?
|
||||
.with_runtime_reader();
|
||||
let metrics = MetricsClient::new(config)?;
|
||||
|
||||
metrics.counter(
|
||||
"codex.tool.call",
|
||||
1,
|
||||
&[("tool", "shell"), ("success", "true")],
|
||||
)?;
|
||||
|
||||
let snapshot = metrics.snapshot()?;
|
||||
|
||||
let metric = find_metric(&snapshot, "codex.tool.call").expect("counter metric missing");
|
||||
let attrs = match metric.data() {
|
||||
AggregatedMetrics::U64(data) => match data {
|
||||
MetricData::Sum(sum) => {
|
||||
let points: Vec<_> = sum.data_points().collect();
|
||||
assert_eq!(points.len(), 1);
|
||||
attributes_to_map(points[0].attributes())
|
||||
}
|
||||
_ => panic!("unexpected counter aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected counter data type"),
|
||||
};
|
||||
|
||||
let expected = BTreeMap::from([
|
||||
("service".to_string(), "codex-cli".to_string()),
|
||||
("success".to_string(), "true".to_string()),
|
||||
("tool".to_string(), "shell".to_string()),
|
||||
]);
|
||||
assert_eq!(attrs, expected);
|
||||
|
||||
let finished = exporter
|
||||
.get_finished_metrics()
|
||||
.expect("finished metrics should be readable");
|
||||
assert!(finished.is_empty(), "expected no periodic exports yet");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn manager_snapshot_metrics_collects_without_shutdown() -> Result<()> {
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let config = MetricsConfig::in_memory("test", "codex-cli", env!("CARGO_PKG_VERSION"), exporter)
|
||||
.with_tag("service", "codex-cli")?
|
||||
.with_runtime_reader();
|
||||
let metrics = MetricsClient::new(config)?;
|
||||
let manager = OtelManager::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
None,
|
||||
Some(AuthMode::ApiKey),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
)
|
||||
.with_metrics(metrics);
|
||||
|
||||
manager.counter(
|
||||
"codex.tool.call",
|
||||
1,
|
||||
&[("tool", "shell"), ("success", "true")],
|
||||
);
|
||||
|
||||
let snapshot = manager.snapshot_metrics()?;
|
||||
let metric = find_metric(&snapshot, "codex.tool.call").expect("counter metric missing");
|
||||
let attrs = match metric.data() {
|
||||
AggregatedMetrics::U64(data) => match data {
|
||||
MetricData::Sum(sum) => {
|
||||
let points: Vec<_> = sum.data_points().collect();
|
||||
assert_eq!(points.len(), 1);
|
||||
attributes_to_map(points[0].attributes())
|
||||
}
|
||||
_ => panic!("unexpected counter aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected counter data type"),
|
||||
};
|
||||
|
||||
let expected = BTreeMap::from([
|
||||
(
|
||||
"app.version".to_string(),
|
||||
env!("CARGO_PKG_VERSION").to_string(),
|
||||
),
|
||||
("auth_mode".to_string(), AuthMode::ApiKey.to_string()),
|
||||
("model".to_string(), "gpt-5.1".to_string()),
|
||||
("service".to_string(), "codex-cli".to_string()),
|
||||
("session_source".to_string(), "cli".to_string()),
|
||||
("success".to_string(), "true".to_string()),
|
||||
("tool".to_string(), "shell".to_string()),
|
||||
]);
|
||||
assert_eq!(attrs, expected);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1017,6 +1017,7 @@ impl ChatWidget {
|
||||
self.plan_delta_buffer.clear();
|
||||
self.plan_item_active = false;
|
||||
self.plan_stream_controller = None;
|
||||
self.otel_manager.reset_runtime_metrics();
|
||||
self.bottom_pane.clear_quit_shortcut_hint();
|
||||
self.quit_shortcut_expires_at = None;
|
||||
self.quit_shortcut_key = None;
|
||||
@@ -1038,6 +1039,21 @@ impl ChatWidget {
|
||||
self.add_boxed_history(cell);
|
||||
}
|
||||
self.flush_unified_exec_wait_streak();
|
||||
if !from_replay {
|
||||
let runtime_metrics = self.otel_manager.runtime_metrics_summary();
|
||||
if runtime_metrics.is_some() {
|
||||
let elapsed_seconds = self
|
||||
.bottom_pane
|
||||
.status_widget()
|
||||
.map(super::status_indicator_widget::StatusIndicatorWidget::elapsed_seconds);
|
||||
self.add_to_history(history_cell::FinalMessageSeparator::new(
|
||||
elapsed_seconds,
|
||||
runtime_metrics,
|
||||
));
|
||||
}
|
||||
self.needs_final_message_separator = false;
|
||||
self.had_work_activity = false;
|
||||
}
|
||||
// Mark task stopped and request redraw now that all content is in history.
|
||||
self.agent_turn_running = false;
|
||||
self.update_task_running_state();
|
||||
@@ -1887,7 +1903,10 @@ impl ChatWidget {
|
||||
.status_widget()
|
||||
.map(super::status_indicator_widget::StatusIndicatorWidget::elapsed_seconds)
|
||||
.map(|current| self.worked_elapsed_from(current));
|
||||
self.add_to_history(history_cell::FinalMessageSeparator::new(elapsed_seconds));
|
||||
self.add_to_history(history_cell::FinalMessageSeparator::new(
|
||||
elapsed_seconds,
|
||||
None,
|
||||
));
|
||||
self.needs_final_message_separator = false;
|
||||
self.had_work_activity = false;
|
||||
} else if self.needs_final_message_separator {
|
||||
|
||||
@@ -45,6 +45,7 @@ use codex_core::protocol::McpAuthStatus;
|
||||
use codex_core::protocol::McpInvocation;
|
||||
use codex_core::protocol::SessionConfiguredEvent;
|
||||
use codex_core::web_search::web_search_detail;
|
||||
use codex_otel::RuntimeMetricsSummary;
|
||||
use codex_protocol::models::WebSearchAction;
|
||||
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::plan_tool::PlanItemArg;
|
||||
@@ -1966,34 +1967,96 @@ pub(crate) fn new_reasoning_summary_block(full_reasoning_buffer: String) -> Box<
|
||||
/// divider.
|
||||
pub struct FinalMessageSeparator {
|
||||
elapsed_seconds: Option<u64>,
|
||||
runtime_metrics: Option<RuntimeMetricsSummary>,
|
||||
}
|
||||
impl FinalMessageSeparator {
|
||||
/// Creates a separator; `elapsed_seconds` typically comes from the status indicator timer.
|
||||
pub(crate) fn new(elapsed_seconds: Option<u64>) -> Self {
|
||||
Self { elapsed_seconds }
|
||||
pub(crate) fn new(
|
||||
elapsed_seconds: Option<u64>,
|
||||
runtime_metrics: Option<RuntimeMetricsSummary>,
|
||||
) -> Self {
|
||||
Self {
|
||||
elapsed_seconds,
|
||||
runtime_metrics,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl HistoryCell for FinalMessageSeparator {
|
||||
fn display_lines(&self, width: u16) -> Vec<Line<'static>> {
|
||||
let elapsed_seconds = self
|
||||
let mut label_parts = Vec::new();
|
||||
if let Some(elapsed_seconds) = self
|
||||
.elapsed_seconds
|
||||
.map(super::status_indicator_widget::fmt_elapsed_compact);
|
||||
if let Some(elapsed_seconds) = elapsed_seconds {
|
||||
let worked_for = format!("─ Worked for {elapsed_seconds} ─");
|
||||
let worked_for_width = worked_for.width();
|
||||
vec![
|
||||
Line::from_iter([
|
||||
worked_for,
|
||||
"─".repeat((width as usize).saturating_sub(worked_for_width)),
|
||||
])
|
||||
.dim(),
|
||||
]
|
||||
} else {
|
||||
vec![Line::from_iter(["─".repeat(width as usize).dim()])]
|
||||
.map(super::status_indicator_widget::fmt_elapsed_compact)
|
||||
{
|
||||
label_parts.push(format!("Worked for {elapsed_seconds}"));
|
||||
}
|
||||
if let Some(metrics_label) = self.runtime_metrics.and_then(runtime_metrics_label) {
|
||||
label_parts.push(metrics_label);
|
||||
}
|
||||
|
||||
if label_parts.is_empty() {
|
||||
return vec![Line::from_iter(["─".repeat(width as usize).dim()])];
|
||||
}
|
||||
|
||||
let label = format!("─ {} ─", label_parts.join(" • "));
|
||||
let (label, _suffix, label_width) = take_prefix_by_width(&label, width as usize);
|
||||
vec![
|
||||
Line::from_iter([
|
||||
label,
|
||||
"─".repeat((width as usize).saturating_sub(label_width)),
|
||||
])
|
||||
.dim(),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
fn runtime_metrics_label(summary: RuntimeMetricsSummary) -> Option<String> {
|
||||
let mut parts = Vec::new();
|
||||
if summary.tool_calls.count > 0 {
|
||||
let duration = format_duration_ms(summary.tool_calls.duration_ms);
|
||||
let calls = pluralize(summary.tool_calls.count, "call", "calls");
|
||||
parts.push(format!(
|
||||
"Local tools: {} {calls} ({duration})",
|
||||
summary.tool_calls.count
|
||||
));
|
||||
}
|
||||
if summary.api_calls.count > 0 {
|
||||
let duration = format_duration_ms(summary.api_calls.duration_ms);
|
||||
let calls = pluralize(summary.api_calls.count, "call", "calls");
|
||||
parts.push(format!(
|
||||
"Inference: {} {calls} ({duration})",
|
||||
summary.api_calls.count
|
||||
));
|
||||
}
|
||||
if summary.streaming_events.count > 0 {
|
||||
let duration = format_duration_ms(summary.streaming_events.duration_ms);
|
||||
let stream_label = pluralize(summary.streaming_events.count, "Stream", "Streams");
|
||||
let events = pluralize(summary.streaming_events.count, "event", "events");
|
||||
parts.push(format!(
|
||||
"{stream_label}: {} {events} ({duration})",
|
||||
summary.streaming_events.count
|
||||
));
|
||||
}
|
||||
if parts.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(parts.join(" • "))
|
||||
}
|
||||
}
|
||||
|
||||
fn format_duration_ms(duration_ms: u64) -> String {
|
||||
if duration_ms >= 1_000 {
|
||||
let seconds = duration_ms as f64 / 1_000.0;
|
||||
format!("{seconds:.1}s")
|
||||
} else {
|
||||
format!("{duration_ms}ms")
|
||||
}
|
||||
}
|
||||
|
||||
fn pluralize(count: u64, singular: &'static str, plural: &'static str) -> &'static str {
|
||||
if count == 1 { singular } else { plural }
|
||||
}
|
||||
|
||||
fn format_mcp_invocation<'a>(invocation: McpInvocation) -> Line<'a> {
|
||||
let args_str = invocation
|
||||
.arguments
|
||||
@@ -2026,6 +2089,8 @@ mod tests {
|
||||
use codex_core::config::types::McpServerConfig;
|
||||
use codex_core::config::types::McpServerTransportConfig;
|
||||
use codex_core::protocol::McpAuthStatus;
|
||||
use codex_otel::RuntimeMetricTotals;
|
||||
use codex_otel::RuntimeMetricsSummary;
|
||||
use codex_protocol::models::WebSearchAction;
|
||||
use codex_protocol::parse_command::ParsedCommand;
|
||||
use dirs::home_dir;
|
||||
@@ -2101,6 +2166,32 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn final_message_separator_includes_runtime_metrics() {
|
||||
let summary = RuntimeMetricsSummary {
|
||||
tool_calls: RuntimeMetricTotals {
|
||||
count: 3,
|
||||
duration_ms: 2_450,
|
||||
},
|
||||
api_calls: RuntimeMetricTotals {
|
||||
count: 2,
|
||||
duration_ms: 1_200,
|
||||
},
|
||||
streaming_events: RuntimeMetricTotals {
|
||||
count: 6,
|
||||
duration_ms: 900,
|
||||
},
|
||||
};
|
||||
let cell = FinalMessageSeparator::new(Some(12), Some(summary));
|
||||
let rendered = render_lines(&cell.display_lines(120));
|
||||
|
||||
assert_eq!(rendered.len(), 1);
|
||||
assert!(rendered[0].contains("Worked for 12s"));
|
||||
assert!(rendered[0].contains("Local tools: 3 calls (2.5s)"));
|
||||
assert!(rendered[0].contains("Inference: 2 calls (1.2s)"));
|
||||
assert!(rendered[0].contains("Streams: 6 events (900ms)"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ps_output_empty_snapshot() {
|
||||
let cell = new_unified_exec_processes_output(Vec::new());
|
||||
|
||||
Reference in New Issue
Block a user