fix(otel): make HTTP trace export survive app-server runtimes (#14300)

## Summary

This PR fixes OTLP HTTP trace export in runtimes where the previous
exporter setup was unreliable, especially around app-server usage. It
also removes the old `codex_otel::otel_provider` compatibility shim and
switches remaining call sites over to the crate-root
`codex_otel::OtelProvider` export.

## What changed

- Use a runtime-safe OTLP HTTP trace exporter path for Tokio runtimes.
- Add an async HTTP client path for trace export when we are already
inside a multi-thread Tokio runtime.
- Make provider shutdown flush traces before tearing down the tracer
provider.
- Add loopback coverage that verifies traces are actually sent to
`/v1/traces`:
  - outside Tokio
  - inside a multi-thread Tokio runtime
  - inside a current-thread Tokio runtime
- Remove the `codex_otel::otel_provider` shim and update remaining
imports.

## Why

I hit cases where spans were being created correctly but never made it
to the collector. The issue turned out to be in exporter/runtime
behavior rather than the span plumbing itself. This PR narrows that gap
and gives us regression coverage for the actual export path.
This commit is contained in:
Owen Lin
2026-03-11 09:59:49 -07:00
committed by Michael Bolin
parent 548583198a
commit fa1242c83b
12 changed files with 511 additions and 19 deletions

View File

@@ -1,6 +1,6 @@
use codex_otel::OtelProvider;
use codex_otel::SessionTelemetry;
use codex_otel::TelemetryAuthMode;
use codex_otel::otel_provider::OtelProvider;
use opentelemetry::KeyValue;
use opentelemetry::logs::AnyValue;
use opentelemetry::trace::TracerProvider as _;

View File

@@ -1,5 +1,7 @@
use codex_otel::OtelProvider;
use codex_otel::config::OtelExporter;
use codex_otel::config::OtelHttpProtocol;
use codex_otel::config::OtelSettings;
use codex_otel::metrics::MetricsClient;
use codex_otel::metrics::MetricsConfig;
use codex_otel::metrics::Result;
@@ -8,10 +10,12 @@ use std::io::Read as _;
use std::io::Write as _;
use std::net::TcpListener;
use std::net::TcpStream;
use std::path::PathBuf;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use tracing_subscriber::layer::SubscriberExt;
struct CapturedRequest {
path: String,
@@ -212,3 +216,346 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
Ok(())
}
#[test]
fn otlp_http_exporter_sends_traces_to_collector()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("local_addr");
listener.set_nonblocking(true).expect("set_nonblocking");
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
let server = thread::spawn(move || {
let mut captured = Vec::new();
let deadline = Instant::now() + Duration::from_secs(3);
while Instant::now() < deadline {
match listener.accept() {
Ok((mut stream, _)) => {
let result = read_http_request(&mut stream);
let _ = write_http_response(&mut stream, "202 Accepted");
if let Ok((path, headers, body)) = result {
captured.push(CapturedRequest {
path,
content_type: headers.get("content-type").cloned(),
body,
});
}
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
}
Err(_) => break,
}
}
let _ = tx.send(captured);
});
let otel = OtelProvider::from(&OtelSettings {
environment: "test".to_string(),
service_name: "codex-cli".to_string(),
service_version: env!("CARGO_PKG_VERSION").to_string(),
codex_home: PathBuf::from("."),
exporter: OtelExporter::None,
trace_exporter: OtelExporter::OtlpHttp {
endpoint: format!("http://{addr}/v1/traces"),
headers: HashMap::new(),
protocol: OtelHttpProtocol::Json,
tls: None,
},
metrics_exporter: OtelExporter::None,
runtime_metrics: false,
})?
.expect("otel provider");
let tracing_layer = otel.tracing_layer().expect("tracing layer");
let subscriber = tracing_subscriber::registry().with(tracing_layer);
tracing::subscriber::with_default(subscriber, || {
let span = tracing::info_span!(
"trace-loopback",
otel.name = "trace-loopback",
otel.kind = "server",
rpc.system = "jsonrpc",
rpc.method = "trace-loopback",
);
let _guard = span.enter();
tracing::info!("trace loopback event");
});
otel.shutdown();
server.join().expect("server join");
let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured");
let request = captured
.iter()
.find(|req| req.path == "/v1/traces")
.unwrap_or_else(|| {
let paths = captured
.iter()
.map(|req| req.path.as_str())
.collect::<Vec<_>>()
.join(", ");
panic!(
"missing /v1/traces request; got {}: {paths}",
captured.len()
);
});
let content_type = request
.content_type
.as_deref()
.unwrap_or("<missing content-type>");
assert!(
content_type.starts_with("application/json"),
"unexpected content-type: {content_type}"
);
let body = String::from_utf8_lossy(&request.body);
assert!(
body.contains("trace-loopback"),
"expected span name not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
assert!(
body.contains("codex-cli"),
"expected service name not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn otlp_http_exporter_sends_traces_to_collector_in_tokio_runtime()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("local_addr");
listener.set_nonblocking(true).expect("set_nonblocking");
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
let server = thread::spawn(move || {
let mut captured = Vec::new();
let deadline = Instant::now() + Duration::from_secs(3);
while Instant::now() < deadline {
match listener.accept() {
Ok((mut stream, _)) => {
let result = read_http_request(&mut stream);
let _ = write_http_response(&mut stream, "202 Accepted");
if let Ok((path, headers, body)) = result {
captured.push(CapturedRequest {
path,
content_type: headers.get("content-type").cloned(),
body,
});
}
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
}
Err(_) => break,
}
}
let _ = tx.send(captured);
});
let otel = OtelProvider::from(&OtelSettings {
environment: "test".to_string(),
service_name: "codex-cli".to_string(),
service_version: env!("CARGO_PKG_VERSION").to_string(),
codex_home: PathBuf::from("."),
exporter: OtelExporter::None,
trace_exporter: OtelExporter::OtlpHttp {
endpoint: format!("http://{addr}/v1/traces"),
headers: HashMap::new(),
protocol: OtelHttpProtocol::Json,
tls: None,
},
metrics_exporter: OtelExporter::None,
runtime_metrics: false,
})?
.expect("otel provider");
let tracing_layer = otel.tracing_layer().expect("tracing layer");
let subscriber = tracing_subscriber::registry().with(tracing_layer);
tracing::subscriber::with_default(subscriber, || {
let span = tracing::info_span!(
"trace-loopback-tokio",
otel.name = "trace-loopback-tokio",
otel.kind = "server",
rpc.system = "jsonrpc",
rpc.method = "trace-loopback-tokio",
);
let _guard = span.enter();
tracing::info!("trace loopback event from tokio runtime");
});
otel.shutdown();
server.join().expect("server join");
let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured");
let request = captured
.iter()
.find(|req| req.path == "/v1/traces")
.unwrap_or_else(|| {
let paths = captured
.iter()
.map(|req| req.path.as_str())
.collect::<Vec<_>>()
.join(", ");
panic!(
"missing /v1/traces request; got {}: {paths}",
captured.len()
);
});
let content_type = request
.content_type
.as_deref()
.unwrap_or("<missing content-type>");
assert!(
content_type.starts_with("application/json"),
"unexpected content-type: {content_type}"
);
let body = String::from_utf8_lossy(&request.body);
assert!(
body.contains("trace-loopback-tokio"),
"expected span name not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
assert!(
body.contains("codex-cli"),
"expected service name not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
Ok(())
}
#[test]
fn otlp_http_exporter_sends_traces_to_collector_in_current_thread_tokio_runtime()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("local_addr");
listener.set_nonblocking(true).expect("set_nonblocking");
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
let server = thread::spawn(move || {
let mut captured = Vec::new();
let deadline = Instant::now() + Duration::from_secs(3);
while Instant::now() < deadline {
match listener.accept() {
Ok((mut stream, _)) => {
let result = read_http_request(&mut stream);
let _ = write_http_response(&mut stream, "202 Accepted");
if let Ok((path, headers, body)) = result {
captured.push(CapturedRequest {
path,
content_type: headers.get("content-type").cloned(),
body,
});
}
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
}
Err(_) => break,
}
}
let _ = tx.send(captured);
});
let (runtime_result_tx, runtime_result_rx) = mpsc::channel::<std::result::Result<(), String>>();
let runtime_thread = thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("current-thread runtime");
let result = runtime.block_on(async move {
let otel = OtelProvider::from(&OtelSettings {
environment: "test".to_string(),
service_name: "codex-cli".to_string(),
service_version: env!("CARGO_PKG_VERSION").to_string(),
codex_home: PathBuf::from("."),
exporter: OtelExporter::None,
trace_exporter: OtelExporter::OtlpHttp {
endpoint: format!("http://{addr}/v1/traces"),
headers: HashMap::new(),
protocol: OtelHttpProtocol::Json,
tls: None,
},
metrics_exporter: OtelExporter::None,
runtime_metrics: false,
})
.map_err(|err| err.to_string())?
.expect("otel provider");
let tracing_layer = otel.tracing_layer().expect("tracing layer");
let subscriber = tracing_subscriber::registry().with(tracing_layer);
tracing::subscriber::with_default(subscriber, || {
let span = tracing::info_span!(
"trace-loopback-current-thread",
otel.name = "trace-loopback-current-thread",
otel.kind = "server",
rpc.system = "jsonrpc",
rpc.method = "trace-loopback-current-thread",
);
let _guard = span.enter();
tracing::info!("trace loopback event from current-thread tokio runtime");
});
otel.shutdown();
Ok::<(), String>(())
});
let _ = runtime_result_tx.send(result);
});
runtime_result_rx
.recv_timeout(Duration::from_secs(5))
.expect("current-thread runtime should complete")
.map_err(std::io::Error::other)?;
runtime_thread.join().expect("runtime thread");
server.join().expect("server join");
let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured");
let request = captured
.iter()
.find(|req| req.path == "/v1/traces")
.unwrap_or_else(|| {
let paths = captured
.iter()
.map(|req| req.path.as_str())
.collect::<Vec<_>>()
.join(", ");
panic!(
"missing /v1/traces request; got {}: {paths}",
captured.len()
);
});
let content_type = request
.content_type
.as_deref()
.unwrap_or("<missing content-type>");
assert!(
content_type.starts_with("application/json"),
"unexpected content-type: {content_type}"
);
let body = String::from_utf8_lossy(&request.body);
assert!(
body.contains("trace-loopback-current-thread"),
"expected span name not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
assert!(
body.contains("codex-cli"),
"expected service name not found; body prefix: {}",
&body.chars().take(2000).collect::<String>()
);
Ok(())
}