mirror of
https://github.com/openai/codex.git
synced 2026-05-16 01:02:48 +00:00
Add Codex config for static trace span attributes and structured W3C tracestate field upserts. The config flows through OtelSettings so callers can attach trace metadata without touching every span call site. Apply span attributes with an SDK span processor so every exported trace span carries the configured metadata. Model tracestate as nested member fields so configured keys can be upserted while unrelated propagated state in the same member is preserved. Validate configured tracestate before installing provider-global state, including header-unsafe values the SDK does not reject by itself. This keeps Codex from propagating malformed trace context from config. Update the config schema, public docs, and OTLP loopback coverage for config parsing, span export, propagation, and invalid-header rejection.
644 lines
22 KiB
Rust
644 lines
22 KiB
Rust
use codex_otel::MetricsClient;
|
|
use codex_otel::MetricsConfig;
|
|
use codex_otel::OtelExporter;
|
|
use codex_otel::OtelHttpProtocol;
|
|
use codex_otel::OtelProvider;
|
|
use codex_otel::OtelSettings;
|
|
use codex_otel::Result;
|
|
use codex_otel::current_span_w3c_trace_context;
|
|
use codex_otel::set_parent_from_w3c_trace_context;
|
|
use codex_protocol::protocol::W3cTraceContext;
|
|
use std::collections::BTreeMap;
|
|
use std::collections::HashMap;
|
|
use std::io::Read as _;
|
|
use std::io::Write as _;
|
|
use std::net::TcpListener;
|
|
use std::net::TcpStream;
|
|
use std::path::PathBuf;
|
|
use std::sync::Mutex;
|
|
use std::sync::mpsc;
|
|
use std::thread;
|
|
use std::time::Duration;
|
|
use std::time::Instant;
|
|
use tracing_subscriber::layer::SubscriberExt;
|
|
|
|
static TRACE_CONTEXT_CONFIG_LOCK: Mutex<()> = Mutex::new(());
|
|
|
|
struct CapturedRequest {
|
|
path: String,
|
|
content_type: Option<String>,
|
|
body: Vec<u8>,
|
|
}
|
|
|
|
fn read_http_request(
|
|
stream: &mut TcpStream,
|
|
) -> std::io::Result<(String, HashMap<String, String>, Vec<u8>)> {
|
|
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
|
|
let deadline = Instant::now() + Duration::from_secs(2);
|
|
|
|
let mut read_next = |buf: &mut [u8]| -> std::io::Result<usize> {
|
|
loop {
|
|
match stream.read(buf) {
|
|
Ok(n) => return Ok(n),
|
|
Err(err)
|
|
if err.kind() == std::io::ErrorKind::WouldBlock
|
|
|| err.kind() == std::io::ErrorKind::Interrupted =>
|
|
{
|
|
if Instant::now() >= deadline {
|
|
return Err(std::io::Error::new(
|
|
std::io::ErrorKind::TimedOut,
|
|
"timed out waiting for request data",
|
|
));
|
|
}
|
|
thread::sleep(Duration::from_millis(5));
|
|
}
|
|
Err(err) => return Err(err),
|
|
}
|
|
}
|
|
};
|
|
|
|
let mut buf = Vec::new();
|
|
let mut scratch = [0u8; 8192];
|
|
let header_end = loop {
|
|
let n = read_next(&mut scratch)?;
|
|
if n == 0 {
|
|
return Err(std::io::Error::new(
|
|
std::io::ErrorKind::UnexpectedEof,
|
|
"EOF before headers",
|
|
));
|
|
}
|
|
buf.extend_from_slice(&scratch[..n]);
|
|
if let Some(end) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
|
|
break end;
|
|
}
|
|
if buf.len() > 1024 * 1024 {
|
|
return Err(std::io::Error::new(
|
|
std::io::ErrorKind::InvalidData,
|
|
"headers too large",
|
|
));
|
|
}
|
|
};
|
|
|
|
let headers_bytes = &buf[..header_end];
|
|
let mut body_bytes = buf[header_end + 4..].to_vec();
|
|
|
|
let headers_str = std::str::from_utf8(headers_bytes).map_err(|err| {
|
|
std::io::Error::new(
|
|
std::io::ErrorKind::InvalidData,
|
|
format!("headers not utf-8: {err}"),
|
|
)
|
|
})?;
|
|
let mut lines = headers_str.split("\r\n");
|
|
let start = lines.next().ok_or_else(|| {
|
|
std::io::Error::new(std::io::ErrorKind::InvalidData, "missing request line")
|
|
})?;
|
|
let mut parts = start.split_whitespace();
|
|
let _method = parts.next().unwrap_or_default();
|
|
let path = parts
|
|
.next()
|
|
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "missing path"))?
|
|
.to_string();
|
|
|
|
let mut headers = HashMap::new();
|
|
for line in lines {
|
|
let Some((k, v)) = line.split_once(':') else {
|
|
continue;
|
|
};
|
|
headers.insert(k.trim().to_ascii_lowercase(), v.trim().to_string());
|
|
}
|
|
|
|
if let Some(len) = headers
|
|
.get("content-length")
|
|
.and_then(|v| v.parse::<usize>().ok())
|
|
{
|
|
while body_bytes.len() < len {
|
|
let n = read_next(&mut scratch)?;
|
|
if n == 0 {
|
|
return Err(std::io::Error::new(
|
|
std::io::ErrorKind::UnexpectedEof,
|
|
"EOF before body complete",
|
|
));
|
|
}
|
|
body_bytes.extend_from_slice(&scratch[..n]);
|
|
if body_bytes.len() > len + 1024 * 1024 {
|
|
return Err(std::io::Error::new(
|
|
std::io::ErrorKind::InvalidData,
|
|
"body too large",
|
|
));
|
|
}
|
|
}
|
|
body_bytes.truncate(len);
|
|
}
|
|
|
|
Ok((path, headers, body_bytes))
|
|
}
|
|
|
|
fn write_http_response(stream: &mut TcpStream, status: &str) -> std::io::Result<()> {
|
|
let response = format!("HTTP/1.1 {status}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
|
|
stream.write_all(response.as_bytes())?;
|
|
stream.flush()
|
|
}
|
|
|
|
#[test]
|
|
fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
|
|
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
|
|
let addr = listener.local_addr().expect("local_addr");
|
|
listener.set_nonblocking(true).expect("set_nonblocking");
|
|
|
|
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
|
|
let server = thread::spawn(move || {
|
|
let mut captured = Vec::new();
|
|
let deadline = Instant::now() + Duration::from_secs(3);
|
|
|
|
while Instant::now() < deadline {
|
|
match listener.accept() {
|
|
Ok((mut stream, _)) => {
|
|
let result = read_http_request(&mut stream);
|
|
let _ = write_http_response(&mut stream, "202 Accepted");
|
|
if let Ok((path, headers, body)) = result {
|
|
captured.push(CapturedRequest {
|
|
path,
|
|
content_type: headers.get("content-type").cloned(),
|
|
body,
|
|
});
|
|
}
|
|
}
|
|
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
|
thread::sleep(Duration::from_millis(10));
|
|
}
|
|
Err(_) => break,
|
|
}
|
|
}
|
|
|
|
let _ = tx.send(captured);
|
|
});
|
|
|
|
let metrics = MetricsClient::new(MetricsConfig::otlp(
|
|
"test",
|
|
"codex-cli",
|
|
env!("CARGO_PKG_VERSION"),
|
|
OtelExporter::OtlpHttp {
|
|
endpoint: format!("http://{addr}/v1/metrics"),
|
|
headers: HashMap::new(),
|
|
protocol: OtelHttpProtocol::Json,
|
|
tls: None,
|
|
},
|
|
))?;
|
|
|
|
metrics.counter("codex.turns", /*inc*/ 1, &[("source", "test")])?;
|
|
metrics.shutdown()?;
|
|
|
|
server.join().expect("server join");
|
|
let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured");
|
|
|
|
let request = captured
|
|
.iter()
|
|
.find(|req| req.path == "/v1/metrics")
|
|
.unwrap_or_else(|| {
|
|
let paths = captured
|
|
.iter()
|
|
.map(|req| req.path.as_str())
|
|
.collect::<Vec<_>>()
|
|
.join(", ");
|
|
panic!(
|
|
"missing /v1/metrics request; got {}: {paths}",
|
|
captured.len()
|
|
);
|
|
});
|
|
let content_type = request
|
|
.content_type
|
|
.as_deref()
|
|
.unwrap_or("<missing content-type>");
|
|
assert!(
|
|
content_type.starts_with("application/json"),
|
|
"unexpected content-type: {content_type}"
|
|
);
|
|
|
|
let body = String::from_utf8_lossy(&request.body);
|
|
assert!(
|
|
body.contains("codex.turns"),
|
|
"expected metric name not found; body prefix: {}",
|
|
&body.chars().take(2000).collect::<String>()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn otel_provider_rejects_header_unsafe_configured_tracestate() {
|
|
let result = OtelProvider::from(&OtelSettings {
|
|
environment: "test".to_string(),
|
|
service_name: "codex-cli".to_string(),
|
|
service_version: env!("CARGO_PKG_VERSION").to_string(),
|
|
codex_home: PathBuf::from("."),
|
|
exporter: OtelExporter::None,
|
|
trace_exporter: OtelExporter::OtlpHttp {
|
|
endpoint: "http://127.0.0.1:1/v1/traces".to_string(),
|
|
headers: HashMap::new(),
|
|
protocol: OtelHttpProtocol::Json,
|
|
tls: None,
|
|
},
|
|
metrics_exporter: OtelExporter::None,
|
|
runtime_metrics: false,
|
|
span_attributes: BTreeMap::new(),
|
|
tracestate: BTreeMap::from([(
|
|
"example".to_string(),
|
|
BTreeMap::from([("alpha".to_string(), "one\ntwo".to_string())]),
|
|
)]),
|
|
});
|
|
|
|
let Err(err) = result else {
|
|
panic!("expected header-unsafe configured tracestate to be rejected");
|
|
};
|
|
assert!(err.to_string().contains("configured tracestate value"));
|
|
}
|
|
|
|
#[test]
|
|
fn otlp_http_exporter_sends_traces_to_collector()
|
|
-> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
let _trace_context_config_guard = TRACE_CONTEXT_CONFIG_LOCK
|
|
.lock()
|
|
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
|
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
|
|
let addr = listener.local_addr().expect("local_addr");
|
|
listener.set_nonblocking(true).expect("set_nonblocking");
|
|
|
|
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
|
|
let server = thread::spawn(move || {
|
|
let mut captured = Vec::new();
|
|
let deadline = Instant::now() + Duration::from_secs(3);
|
|
|
|
while Instant::now() < deadline {
|
|
match listener.accept() {
|
|
Ok((mut stream, _)) => {
|
|
let result = read_http_request(&mut stream);
|
|
let _ = write_http_response(&mut stream, "202 Accepted");
|
|
if let Ok((path, headers, body)) = result {
|
|
captured.push(CapturedRequest {
|
|
path,
|
|
content_type: headers.get("content-type").cloned(),
|
|
body,
|
|
});
|
|
}
|
|
}
|
|
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
|
thread::sleep(Duration::from_millis(10));
|
|
}
|
|
Err(_) => break,
|
|
}
|
|
}
|
|
|
|
let _ = tx.send(captured);
|
|
});
|
|
|
|
let otel = OtelProvider::from(&OtelSettings {
|
|
environment: "test".to_string(),
|
|
service_name: "codex-cli".to_string(),
|
|
service_version: env!("CARGO_PKG_VERSION").to_string(),
|
|
codex_home: PathBuf::from("."),
|
|
exporter: OtelExporter::None,
|
|
trace_exporter: OtelExporter::OtlpHttp {
|
|
endpoint: format!("http://{addr}/v1/traces"),
|
|
headers: HashMap::new(),
|
|
protocol: OtelHttpProtocol::Json,
|
|
tls: None,
|
|
},
|
|
metrics_exporter: OtelExporter::None,
|
|
runtime_metrics: false,
|
|
span_attributes: BTreeMap::from([(
|
|
"test.configured_attribute".to_string(),
|
|
"configured-value".to_string(),
|
|
)]),
|
|
tracestate: BTreeMap::from([(
|
|
"example".to_string(),
|
|
BTreeMap::from([
|
|
("alpha".to_string(), "one".to_string()),
|
|
("beta".to_string(), "two".to_string()),
|
|
]),
|
|
)]),
|
|
})?
|
|
.expect("otel provider");
|
|
let tracing_layer = otel.tracing_layer().expect("tracing layer");
|
|
let subscriber = tracing_subscriber::registry().with(tracing_layer);
|
|
|
|
let propagated_trace = tracing::subscriber::with_default(subscriber, || {
|
|
let span = tracing::info_span!(
|
|
"trace-loopback",
|
|
otel.name = "trace-loopback",
|
|
otel.kind = "server",
|
|
rpc.system = "jsonrpc",
|
|
rpc.method = "trace-loopback",
|
|
);
|
|
assert!(set_parent_from_w3c_trace_context(
|
|
&span,
|
|
&W3cTraceContext {
|
|
traceparent: Some(
|
|
"00-00000000000000000000000000000001-0000000000000002-01".to_string(),
|
|
),
|
|
tracestate: Some("example=alpha:zero;keep:yes,other=value".to_string()),
|
|
},
|
|
));
|
|
let _guard = span.enter();
|
|
let propagated_trace =
|
|
current_span_w3c_trace_context().expect("current span should have trace context");
|
|
tracing::info!("trace loopback event");
|
|
propagated_trace
|
|
});
|
|
otel.shutdown();
|
|
|
|
assert_eq!(
|
|
propagated_trace.tracestate.as_deref(),
|
|
Some("example=alpha:one;keep:yes;beta:two,other=value")
|
|
);
|
|
|
|
server.join().expect("server join");
|
|
let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured");
|
|
|
|
let request = captured
|
|
.iter()
|
|
.find(|req| req.path == "/v1/traces")
|
|
.unwrap_or_else(|| {
|
|
let paths = captured
|
|
.iter()
|
|
.map(|req| req.path.as_str())
|
|
.collect::<Vec<_>>()
|
|
.join(", ");
|
|
panic!(
|
|
"missing /v1/traces request; got {}: {paths}",
|
|
captured.len()
|
|
);
|
|
});
|
|
let content_type = request
|
|
.content_type
|
|
.as_deref()
|
|
.unwrap_or("<missing content-type>");
|
|
assert!(
|
|
content_type.starts_with("application/json"),
|
|
"unexpected content-type: {content_type}"
|
|
);
|
|
|
|
let body = String::from_utf8_lossy(&request.body);
|
|
assert!(
|
|
body.contains("trace-loopback"),
|
|
"expected span name not found; body prefix: {}",
|
|
&body.chars().take(2000).collect::<String>()
|
|
);
|
|
assert!(
|
|
body.contains("codex-cli"),
|
|
"expected service name not found; body prefix: {}",
|
|
&body.chars().take(2000).collect::<String>()
|
|
);
|
|
assert!(
|
|
body.contains("test.configured_attribute") && body.contains("configured-value"),
|
|
"expected configured span attribute not found; body prefix: {}",
|
|
&body.chars().take(2000).collect::<String>()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn otlp_http_exporter_sends_traces_to_collector_in_tokio_runtime()
|
|
-> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
let _trace_context_config_guard = TRACE_CONTEXT_CONFIG_LOCK
|
|
.lock()
|
|
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
|
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
|
|
let addr = listener.local_addr().expect("local_addr");
|
|
listener.set_nonblocking(true).expect("set_nonblocking");
|
|
|
|
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
|
|
let server = thread::spawn(move || {
|
|
let mut captured = Vec::new();
|
|
let deadline = Instant::now() + Duration::from_secs(3);
|
|
|
|
while Instant::now() < deadline {
|
|
match listener.accept() {
|
|
Ok((mut stream, _)) => {
|
|
let result = read_http_request(&mut stream);
|
|
let _ = write_http_response(&mut stream, "202 Accepted");
|
|
if let Ok((path, headers, body)) = result {
|
|
captured.push(CapturedRequest {
|
|
path,
|
|
content_type: headers.get("content-type").cloned(),
|
|
body,
|
|
});
|
|
}
|
|
}
|
|
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
|
thread::sleep(Duration::from_millis(10));
|
|
}
|
|
Err(_) => break,
|
|
}
|
|
}
|
|
|
|
let _ = tx.send(captured);
|
|
});
|
|
|
|
let otel = OtelProvider::from(&OtelSettings {
|
|
environment: "test".to_string(),
|
|
service_name: "codex-cli".to_string(),
|
|
service_version: env!("CARGO_PKG_VERSION").to_string(),
|
|
codex_home: PathBuf::from("."),
|
|
exporter: OtelExporter::None,
|
|
trace_exporter: OtelExporter::OtlpHttp {
|
|
endpoint: format!("http://{addr}/v1/traces"),
|
|
headers: HashMap::new(),
|
|
protocol: OtelHttpProtocol::Json,
|
|
tls: None,
|
|
},
|
|
metrics_exporter: OtelExporter::None,
|
|
runtime_metrics: false,
|
|
span_attributes: BTreeMap::new(),
|
|
tracestate: BTreeMap::new(),
|
|
})?
|
|
.expect("otel provider");
|
|
let tracing_layer = otel.tracing_layer().expect("tracing layer");
|
|
let subscriber = tracing_subscriber::registry().with(tracing_layer);
|
|
|
|
tracing::subscriber::with_default(subscriber, || {
|
|
let span = tracing::info_span!(
|
|
"trace-loopback-tokio",
|
|
otel.name = "trace-loopback-tokio",
|
|
otel.kind = "server",
|
|
rpc.system = "jsonrpc",
|
|
rpc.method = "trace-loopback-tokio",
|
|
);
|
|
let _guard = span.enter();
|
|
tracing::info!("trace loopback event from tokio runtime");
|
|
});
|
|
otel.shutdown();
|
|
|
|
server.join().expect("server join");
|
|
let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured");
|
|
|
|
let request = captured
|
|
.iter()
|
|
.find(|req| req.path == "/v1/traces")
|
|
.unwrap_or_else(|| {
|
|
let paths = captured
|
|
.iter()
|
|
.map(|req| req.path.as_str())
|
|
.collect::<Vec<_>>()
|
|
.join(", ");
|
|
panic!(
|
|
"missing /v1/traces request; got {}: {paths}",
|
|
captured.len()
|
|
);
|
|
});
|
|
let content_type = request
|
|
.content_type
|
|
.as_deref()
|
|
.unwrap_or("<missing content-type>");
|
|
assert!(
|
|
content_type.starts_with("application/json"),
|
|
"unexpected content-type: {content_type}"
|
|
);
|
|
|
|
let body = String::from_utf8_lossy(&request.body);
|
|
assert!(
|
|
body.contains("trace-loopback-tokio"),
|
|
"expected span name not found; body prefix: {}",
|
|
&body.chars().take(2000).collect::<String>()
|
|
);
|
|
assert!(
|
|
body.contains("codex-cli"),
|
|
"expected service name not found; body prefix: {}",
|
|
&body.chars().take(2000).collect::<String>()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn otlp_http_exporter_sends_traces_to_collector_in_current_thread_tokio_runtime()
|
|
-> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
let _trace_context_config_guard = TRACE_CONTEXT_CONFIG_LOCK
|
|
.lock()
|
|
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
|
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
|
|
let addr = listener.local_addr().expect("local_addr");
|
|
listener.set_nonblocking(true).expect("set_nonblocking");
|
|
|
|
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
|
|
let server = thread::spawn(move || {
|
|
let mut captured = Vec::new();
|
|
let deadline = Instant::now() + Duration::from_secs(3);
|
|
|
|
while Instant::now() < deadline {
|
|
match listener.accept() {
|
|
Ok((mut stream, _)) => {
|
|
let result = read_http_request(&mut stream);
|
|
let _ = write_http_response(&mut stream, "202 Accepted");
|
|
if let Ok((path, headers, body)) = result {
|
|
captured.push(CapturedRequest {
|
|
path,
|
|
content_type: headers.get("content-type").cloned(),
|
|
body,
|
|
});
|
|
}
|
|
}
|
|
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
|
thread::sleep(Duration::from_millis(10));
|
|
}
|
|
Err(_) => break,
|
|
}
|
|
}
|
|
|
|
let _ = tx.send(captured);
|
|
});
|
|
|
|
let (runtime_result_tx, runtime_result_rx) = mpsc::channel::<std::result::Result<(), String>>();
|
|
let runtime_thread = thread::spawn(move || {
|
|
let runtime = tokio::runtime::Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("current-thread runtime");
|
|
|
|
let result = runtime.block_on(async move {
|
|
let otel = OtelProvider::from(&OtelSettings {
|
|
environment: "test".to_string(),
|
|
service_name: "codex-cli".to_string(),
|
|
service_version: env!("CARGO_PKG_VERSION").to_string(),
|
|
codex_home: PathBuf::from("."),
|
|
exporter: OtelExporter::None,
|
|
trace_exporter: OtelExporter::OtlpHttp {
|
|
endpoint: format!("http://{addr}/v1/traces"),
|
|
headers: HashMap::new(),
|
|
protocol: OtelHttpProtocol::Json,
|
|
tls: None,
|
|
},
|
|
metrics_exporter: OtelExporter::None,
|
|
runtime_metrics: false,
|
|
span_attributes: BTreeMap::new(),
|
|
tracestate: BTreeMap::new(),
|
|
})
|
|
.map_err(|err| err.to_string())?
|
|
.expect("otel provider");
|
|
let tracing_layer = otel.tracing_layer().expect("tracing layer");
|
|
let subscriber = tracing_subscriber::registry().with(tracing_layer);
|
|
|
|
tracing::subscriber::with_default(subscriber, || {
|
|
let span = tracing::info_span!(
|
|
"trace-loopback-current-thread",
|
|
otel.name = "trace-loopback-current-thread",
|
|
otel.kind = "server",
|
|
rpc.system = "jsonrpc",
|
|
rpc.method = "trace-loopback-current-thread",
|
|
);
|
|
let _guard = span.enter();
|
|
tracing::info!("trace loopback event from current-thread tokio runtime");
|
|
});
|
|
otel.shutdown();
|
|
Ok::<(), String>(())
|
|
});
|
|
let _ = runtime_result_tx.send(result);
|
|
});
|
|
|
|
runtime_result_rx
|
|
.recv_timeout(Duration::from_secs(5))
|
|
.expect("current-thread runtime should complete")
|
|
.map_err(std::io::Error::other)?;
|
|
runtime_thread.join().expect("runtime thread");
|
|
|
|
server.join().expect("server join");
|
|
let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured");
|
|
|
|
let request = captured
|
|
.iter()
|
|
.find(|req| req.path == "/v1/traces")
|
|
.unwrap_or_else(|| {
|
|
let paths = captured
|
|
.iter()
|
|
.map(|req| req.path.as_str())
|
|
.collect::<Vec<_>>()
|
|
.join(", ");
|
|
panic!(
|
|
"missing /v1/traces request; got {}: {paths}",
|
|
captured.len()
|
|
);
|
|
});
|
|
let content_type = request
|
|
.content_type
|
|
.as_deref()
|
|
.unwrap_or("<missing content-type>");
|
|
assert!(
|
|
content_type.starts_with("application/json"),
|
|
"unexpected content-type: {content_type}"
|
|
);
|
|
|
|
let body = String::from_utf8_lossy(&request.body);
|
|
assert!(
|
|
body.contains("trace-loopback-current-thread"),
|
|
"expected span name not found; body prefix: {}",
|
|
&body.chars().take(2000).collect::<String>()
|
|
);
|
|
assert!(
|
|
body.contains("codex-cli"),
|
|
"expected service name not found; body prefix: {}",
|
|
&body.chars().take(2000).collect::<String>()
|
|
);
|
|
|
|
Ok(())
|
|
}
|