Compare commits

...

3 Commits

Author SHA1 Message Date
Zach Murphy
80649cfb9d Allow CUA MCP subspan telemetry 2026-05-08 09:28:25 -07:00
charley-openai
6872504155 Gate MCP subspan tracing on stderr support 2026-05-04 09:33:19 -07:00
charley-openai
c38a29ddb9 Ingest node_repl stderr telemetry spans 2026-05-04 09:33:18 -07:00
14 changed files with 1562 additions and 18 deletions

View File

@@ -564,6 +564,19 @@ impl McpConnectionManager {
.server_supports_sandbox_state_meta_capability)
}
pub async fn server_supports_subspan_tracing_for_tool(
&self,
server: &str,
tool: &str,
) -> Result<bool> {
Ok(self
.client_by_name(server)
.await?
.subspan_tracing_capability
.as_ref()
.is_some_and(|capability| capability.supports_tool(tool)))
}
/// List resources from the specified server.
pub async fn list_resources(
&self,

View File

@@ -1,5 +1,8 @@
pub use connection_manager::McpConnectionManager;
pub use rmcp_client::MCP_SANDBOX_STATE_META_CAPABILITY;
pub use rmcp_client::MCP_SUBSPAN_TRACING_CAPABILITY;
pub use rmcp_client::MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL;
pub use rmcp_client::MCP_SUBSPAN_TRACING_VERSION;
pub use runtime::McpRuntimeEnvironment;
pub use runtime::SandboxState;
pub use tools::ToolInfo;

View File

@@ -7,7 +7,9 @@
//! [`crate::connection_manager`].
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::env;
use std::ffi::OsString;
use std::sync::Arc;
@@ -50,6 +52,8 @@ use codex_rmcp_client::ExecutorStdioServerLauncher;
use codex_rmcp_client::LocalStdioServerLauncher;
use codex_rmcp_client::RmcpClient;
use codex_rmcp_client::StdioServerLauncher;
use codex_rmcp_client::StdioServerTelemetry;
use codex_rmcp_client::StdioServerTelemetrySink;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::future::Shared;
@@ -57,6 +61,7 @@ use rmcp::model::ClientCapabilities;
use rmcp::model::ElicitationCapability;
use rmcp::model::Implementation;
use rmcp::model::InitializeRequestParams;
use rmcp::model::JsonObject;
use rmcp::model::ProtocolVersion;
use rmcp::model::Tool as RmcpTool;
use tokio_util::sync::CancellationToken;
@@ -65,6 +70,12 @@ use tracing::warn;
/// MCP server capability indicating that Codex should include [`SandboxState`]
/// in tool-call request `_meta` under this key.
pub const MCP_SANDBOX_STATE_META_CAPABILITY: &str = "codex/sandbox-state-meta";
/// MCP server capability indicating that Codex should include W3C trace context
/// in tool-call request `_meta` so the server can emit reconstructed child spans.
/// See `codex-rs/docs/mcp_subspan_tracing.md` for the protocol contract.
pub const MCP_SUBSPAN_TRACING_CAPABILITY: &str = "codex/subspan-tracing";
pub const MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL: &str = "stderr-jsonl";
pub const MCP_SUBSPAN_TRACING_VERSION: u64 = 1;
pub(crate) const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms";
pub(crate) const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str =
@@ -88,9 +99,52 @@ pub(crate) struct ManagedClient {
pub(crate) tool_timeout: Option<Duration>,
pub(crate) server_instructions: Option<String>,
pub(crate) server_supports_sandbox_state_meta_capability: bool,
pub(crate) subspan_tracing_capability: Option<McpSubspanTracingCapability>,
pub(crate) codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct McpSubspanTracingCapability {
tools: Option<HashSet<String>>,
}
impl McpSubspanTracingCapability {
fn from_experimental_capability(capability: &JsonObject) -> Option<Self> {
if capability
.get("version")
.and_then(serde_json::Value::as_u64)
!= Some(MCP_SUBSPAN_TRACING_VERSION)
{
return None;
}
let supports_stderr_jsonl = capability
.get("transports")
.and_then(serde_json::Value::as_array)
.is_some_and(|transports| {
transports.iter().any(|transport| {
transport.as_str() == Some(MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL)
})
});
if !supports_stderr_jsonl {
return None;
}
let tools = capability
.get("tools")
.and_then(serde_json::Value::as_object)
.map(|tools| tools.keys().cloned().collect());
Some(Self { tools })
}
pub(crate) fn supports_tool(&self, tool_name: &str) -> bool {
self.tools
.as_ref()
.is_none_or(|tools| tools.contains(tool_name))
}
}
impl ManagedClient {
fn listed_tools(&self) -> Vec<ToolInfo> {
let total_start = Instant::now();
@@ -151,7 +205,11 @@ impl AsyncManagedClient {
.map(|tools| filter_tools(tools, &tool_filter));
let startup_tool_filter = tool_filter;
let startup_complete = Arc::new(AtomicBool::new(false));
let subspan_tracing_enabled = Arc::new(AtomicBool::new(false));
let subspan_tracing_stderr_jsonl_available =
subspan_tracing_stderr_jsonl_available(&config, &runtime_environment);
let startup_complete_for_fut = Arc::clone(&startup_complete);
let subspan_tracing_enabled_for_fut = Arc::clone(&subspan_tracing_enabled);
let cancel_token_for_fut = cancel_token.clone();
let fut = async move {
let outcome = match async {
@@ -166,6 +224,7 @@ impl AsyncManagedClient {
store_mode,
runtime_environment,
runtime_auth_provider,
Arc::clone(&subspan_tracing_enabled_for_fut),
)
.await?,
);
@@ -181,6 +240,8 @@ impl AsyncManagedClient {
tx_event,
elicitation_requests,
codex_apps_tools_cache_context,
subspan_tracing_enabled: subspan_tracing_enabled_for_fut,
subspan_tracing_stderr_jsonl_available,
},
)
.await
@@ -456,12 +517,29 @@ async fn start_server_task(
tx_event,
elicitation_requests,
codex_apps_tools_cache_context,
subspan_tracing_enabled,
subspan_tracing_stderr_jsonl_available,
} = params;
let elicitation = elicitation_capability_for_server(&server_name);
let params = InitializeRequestParams {
let client_experimental_capabilities = subspan_tracing_stderr_jsonl_available.then(|| {
let mut subspan_tracing_capability = JsonObject::new();
subspan_tracing_capability.insert(
"version".to_string(),
serde_json::json!(MCP_SUBSPAN_TRACING_VERSION),
);
subspan_tracing_capability.insert(
"transports".to_string(),
serde_json::json!([MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL]),
);
BTreeMap::from([(
MCP_SUBSPAN_TRACING_CAPABILITY.to_string(),
subspan_tracing_capability,
)])
});
let initialize_params = InitializeRequestParams {
meta: None,
capabilities: ClientCapabilities {
experimental: None,
experimental: client_experimental_capabilities,
extensions: None,
roots: None,
sampling: None,
@@ -482,7 +560,7 @@ async fn start_server_task(
let send_elicitation = elicitation_requests.make_sender(server_name.clone(), tx_event);
let initialize_result = client
.initialize(params, startup_timeout, send_elicitation)
.initialize(initialize_params, startup_timeout, send_elicitation)
.await
.map_err(StartupOutcomeError::from)?;
@@ -492,6 +570,13 @@ async fn start_server_task(
.as_ref()
.and_then(|exp| exp.get(MCP_SANDBOX_STATE_META_CAPABILITY))
.is_some();
let subspan_tracing_capability = initialize_result
.capabilities
.experimental
.as_ref()
.and_then(|exp| exp.get(MCP_SUBSPAN_TRACING_CAPABILITY))
.and_then(McpSubspanTracingCapability::from_experimental_capability);
subspan_tracing_enabled.store(subspan_tracing_capability.is_some(), Ordering::Release);
let list_start = Instant::now();
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(
@@ -528,6 +613,7 @@ async fn start_server_task(
tool_filter,
server_instructions: initialize_result.instructions,
server_supports_sandbox_state_meta_capability,
subspan_tracing_capability,
codex_apps_tools_cache_context,
};
@@ -541,6 +627,23 @@ struct StartServerTaskParams {
tx_event: Sender<Event>,
elicitation_requests: ElicitationRequestManager,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
subspan_tracing_enabled: Arc<AtomicBool>,
subspan_tracing_stderr_jsonl_available: bool,
}
fn subspan_tracing_stderr_jsonl_available(
config: &McpServerConfig,
runtime_environment: &McpRuntimeEnvironment,
) -> bool {
if !matches!(config.transport, McpServerTransportConfig::Stdio { .. }) {
return false;
}
match config.experimental_environment.as_deref() {
None | Some("local") => true,
Some("remote") => runtime_environment.environment().is_remote(),
Some(_) => false,
}
}
async fn make_rmcp_client(
@@ -549,6 +652,7 @@ async fn make_rmcp_client(
store_mode: OAuthCredentialsStoreMode,
runtime_environment: McpRuntimeEnvironment,
runtime_auth_provider: Option<SharedAuthProvider>,
subspan_tracing_enabled: Arc<AtomicBool>,
) -> Result<RmcpClient, StartupOutcomeError> {
let McpServerConfig {
transport,
@@ -601,9 +705,20 @@ async fn make_rmcp_client(
// `RmcpClient` always sees a launched MCP stdio server. The
// launcher hides whether that means a local child process or an
// executor process whose stdin/stdout bytes cross the process API.
RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher)
.await
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
RmcpClient::new_stdio_client(
command_os,
args_os,
env_os,
&env_vars,
cwd,
launcher,
Some(subspan_tracing_telemetry_sink(
server_name,
subspan_tracing_enabled,
)),
)
.await
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
}
McpServerTransportConfig::StreamableHttp {
url,
@@ -637,11 +752,40 @@ async fn make_rmcp_client(
}
}
fn subspan_tracing_telemetry_sink(
server_name: &str,
enabled: Arc<AtomicBool>,
) -> StdioServerTelemetrySink {
let server_name = server_name.to_string();
Arc::new(move |telemetry: StdioServerTelemetry| {
if !enabled.load(Ordering::Acquire) {
return;
}
if let Err(error) = codex_otel::emit_mcp_subspan_telemetry(telemetry.payload) {
match error {
codex_otel::StderrSpanTelemetryError::UnsupportedVersion
| codex_otel::StderrSpanTelemetryError::UnsupportedType => {
tracing::debug!(
"ignoring unsupported MCP subspan telemetry from {server_name}: {error}"
);
}
_ => {
warn!("ignoring invalid MCP subspan telemetry from {server_name}: {error}");
}
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use codex_exec_server::Environment;
use rmcp::model::JsonObject;
use rmcp::model::Meta;
use std::path::PathBuf;
use std::sync::Arc;
fn tool_with_connector_meta() -> RmcpTool {
RmcpTool {
@@ -738,4 +882,97 @@ mod tests {
assert!(meta.0.contains_key(key), "{key} should be preserved");
}
}
fn local_runtime_environment() -> McpRuntimeEnvironment {
McpRuntimeEnvironment::new(
Arc::new(Environment::default_for_tests()),
PathBuf::from("/tmp"),
)
}
fn remote_runtime_environment() -> McpRuntimeEnvironment {
McpRuntimeEnvironment::new(
Arc::new(
Environment::create_for_tests(Some("ws://executor.example".to_string()))
.expect("remote environment"),
),
PathBuf::from("/tmp"),
)
}
fn mcp_server_config(value: serde_json::Value) -> McpServerConfig {
serde_json::from_value(value).expect("mcp server config")
}
#[tokio::test]
async fn subspan_tracing_stderr_jsonl_is_available_only_when_stderr_is_observed() {
assert!(subspan_tracing_stderr_jsonl_available(
&mcp_server_config(serde_json::json!({"command": "node"})),
&local_runtime_environment(),
));
assert!(!subspan_tracing_stderr_jsonl_available(
&mcp_server_config(serde_json::json!({"url": "https://mcp.example"})),
&local_runtime_environment(),
));
assert!(!subspan_tracing_stderr_jsonl_available(
&mcp_server_config(serde_json::json!({
"command": "node",
"experimental_environment": "remote",
})),
&local_runtime_environment(),
));
assert!(subspan_tracing_stderr_jsonl_available(
&mcp_server_config(serde_json::json!({
"command": "node",
"experimental_environment": "remote",
})),
&remote_runtime_environment(),
));
}
#[test]
fn subspan_tracing_capability_accepts_stderr_jsonl_transport_and_tool_filter() {
let capability: JsonObject = serde_json::from_value(serde_json::json!({
"version": 1,
"transports": ["stderr-jsonl"],
"tools": {
"js": {
"attributeProfile": "browser-use-v1"
}
}
}))
.expect("capability object");
let capability = McpSubspanTracingCapability::from_experimental_capability(&capability)
.expect("valid capability");
assert!(capability.supports_tool("js"));
assert!(!capability.supports_tool("other"));
}
#[test]
fn subspan_tracing_capability_rejects_unknown_version_or_transport() {
for capability in [
serde_json::json!({
"version": 2,
"transports": ["stderr-jsonl"],
}),
serde_json::json!({
"version": 1,
"transports": ["events"],
}),
serde_json::json!({
"version": 1,
}),
] {
let capability: JsonObject =
serde_json::from_value(capability).expect("capability object");
assert!(
McpSubspanTracingCapability::from_experimental_capability(&capability).is_none()
);
}
}
}

View File

@@ -40,6 +40,8 @@ use codex_config::types::AppToolApproval;
use codex_features::Feature;
use codex_hooks::PermissionRequestDecision;
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
use codex_mcp::MCP_SUBSPAN_TRACING_CAPABILITY;
use codex_mcp::MCP_SUBSPAN_TRACING_VERSION;
use codex_mcp::McpPermissionPromptAutoApproveContext;
use codex_mcp::SandboxState;
use codex_mcp::declared_openai_file_input_param_names;
@@ -545,6 +547,9 @@ async fn execute_mcp_tool_call(
rewritten_arguments: Option<JsonValue>,
request_meta: Option<JsonValue>,
) -> Result<CallToolResult, String> {
let request_meta =
augment_mcp_tool_request_meta_with_subspan_tracing(sess, server, tool_name, request_meta)
.await;
let request_meta =
with_mcp_tool_call_thread_id_meta(request_meta, &sess.conversation_id.to_string());
let request_meta =
@@ -837,6 +842,7 @@ const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
const MCP_TOOL_OPENAI_OUTPUT_TEMPLATE_META_KEY: &str = "openai/outputTemplate";
const MCP_TOOL_UI_RESOURCE_URI_META_KEY: &str = "ui/resourceUri";
const MCP_TOOL_THREAD_ID_META_KEY: &str = "threadId";
const MCP_TOOL_SUBSPAN_TRACING_META_KEY: &str = MCP_SUBSPAN_TRACING_CAPABILITY;
async fn custom_mcp_tool_approval_mode(
sess: &Session,
@@ -919,6 +925,79 @@ fn build_mcp_tool_call_request_meta(
(!request_meta.is_empty()).then_some(serde_json::Value::Object(request_meta))
}
#[expect(
clippy::await_holding_invalid_type,
reason = "MCP subspan metadata reads through the session-owned manager guard"
)]
async fn augment_mcp_tool_request_meta_with_subspan_tracing(
sess: &Session,
server: &str,
tool_name: &str,
meta: Option<serde_json::Value>,
) -> Option<serde_json::Value> {
let supports_subspan_tracing = sess
.services
.mcp_connection_manager
.read()
.await
.server_supports_subspan_tracing_for_tool(server, tool_name)
.await
.unwrap_or(false);
augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(supports_subspan_tracing, meta)
}
fn augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
supports_subspan_tracing: bool,
meta: Option<serde_json::Value>,
) -> Option<serde_json::Value> {
if !supports_subspan_tracing {
return meta;
}
let Some(trace_context) = codex_otel::current_span_w3c_trace_context() else {
return meta;
};
let Some(traceparent) = trace_context.traceparent else {
return meta;
};
let mut telemetry_meta = serde_json::Map::new();
telemetry_meta.insert("enabled".to_string(), serde_json::Value::Bool(true));
telemetry_meta.insert(
"version".to_string(),
serde_json::Value::Number(MCP_SUBSPAN_TRACING_VERSION.into()),
);
telemetry_meta.insert(
"traceparent".to_string(),
serde_json::Value::String(traceparent),
);
if let Some(tracestate) = trace_context.tracestate {
telemetry_meta.insert(
"tracestate".to_string(),
serde_json::Value::String(tracestate),
);
}
match meta {
Some(serde_json::Value::Object(mut map)) => {
map.insert(
MCP_TOOL_SUBSPAN_TRACING_META_KEY.to_string(),
serde_json::Value::Object(telemetry_meta),
);
Some(serde_json::Value::Object(map))
}
None => {
let mut map = serde_json::Map::new();
map.insert(
MCP_TOOL_SUBSPAN_TRACING_META_KEY.to_string(),
serde_json::Value::Object(telemetry_meta),
);
Some(serde_json::Value::Object(map))
}
other => other,
}
}
fn with_mcp_tool_call_thread_id_meta(
meta: Option<serde_json::Value>,
thread_id: &str,

View File

@@ -1034,6 +1034,83 @@ async fn codex_apps_tool_call_request_meta_includes_call_id_without_existing_cod
);
}
#[test]
fn subspan_tracing_request_meta_includes_trace_context_when_supported() {
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::SdkTracerProvider;
use tracing_subscriber::prelude::*;
let provider = SdkTracerProvider::builder().build();
let tracer = provider.tracer("codex-core-mcp-tool-call-tests");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
let _guard = tracing::subscriber::set_default(subscriber);
let span = tracing::info_span!("mcp.tools.call");
let _entered = span.enter();
let meta = augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
/*supports_subspan_tracing*/ true,
Some(serde_json::json!({
"threadId": "thread-live",
})),
)
.expect("meta");
let telemetry = meta
.get(MCP_TOOL_SUBSPAN_TRACING_META_KEY)
.expect("subspan tracing metadata");
assert_eq!(
telemetry.get("enabled"),
Some(&serde_json::Value::Bool(true))
);
assert_eq!(
telemetry.get("version"),
Some(&serde_json::json!(MCP_SUBSPAN_TRACING_VERSION))
);
assert!(
telemetry
.get("traceparent")
.and_then(serde_json::Value::as_str)
.is_some_and(|traceparent| traceparent.starts_with("00-"))
);
assert_eq!(
meta.get("threadId"),
Some(&serde_json::json!("thread-live"))
);
}
#[test]
fn subspan_tracing_request_meta_is_only_added_when_supported_and_trace_context_exists() {
assert_eq!(
augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
/*supports_subspan_tracing*/ true,
Some(serde_json::json!({"threadId": "thread-live"})),
),
Some(serde_json::json!({"threadId": "thread-live"}))
);
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::SdkTracerProvider;
use tracing_subscriber::prelude::*;
let provider = SdkTracerProvider::builder().build();
let tracer = provider.tracer("codex-core-mcp-tool-call-tests");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
let _guard = tracing::subscriber::set_default(subscriber);
let span = tracing::info_span!("mcp.tools.call");
let _entered = span.enter();
assert_eq!(
augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
/*supports_subspan_tracing*/ false,
Some(serde_json::json!({"threadId": "thread-live"})),
),
Some(serde_json::json!({"threadId": "thread-live"}))
);
}
#[test]
fn mcp_tool_call_thread_id_meta_is_added_to_request_meta() {
assert_eq!(

View File

@@ -0,0 +1,176 @@
# MCP Subspan Tracing [experimental]
This document describes Codex's experimental MCP client extension for ingesting child OpenTelemetry spans emitted by MCP servers.
- Status: experimental and subject to change without notice
- Capability: `codex/subspan-tracing`
- Supported version: `1`
- Supported transport: `stderr-jsonl`
## Purpose
Codex creates an `mcp.tools.call` span around each MCP tool call. Some MCP servers perform meaningful nested work that is useful to inspect as child spans of that call. The `codex/subspan-tracing` capability lets a server opt in to receiving the active W3C trace context for a tool call and emitting sanitized span records back to Codex.
This is intended for stdio MCP servers whose stderr stream is observed by Codex. Telemetry records are out-of-band with respect to the MCP JSON-RPC stream and must not affect tool-call success or transport liveness.
## Capability Negotiation
Codex advertises client support during MCP `initialize` only when the active MCP transport can observe stderr telemetry:
```json
{
"capabilities": {
"experimental": {
"codex/subspan-tracing": {
"version": 1,
"transports": ["stderr-jsonl"]
}
}
}
}
```
An MCP server opts in by returning a compatible experimental capability in its initialize result:
```json
{
"capabilities": {
"experimental": {
"codex/subspan-tracing": {
"version": 1,
"transports": ["stderr-jsonl"],
"tools": {
"js": {
"attributeProfile": "browser-use-v1"
},
"get_state": {
"attributeProfile": "cua-v1"
}
}
}
}
}
}
```
If `tools` is present, Codex enables subspan tracing only for those tool names. If `tools` is omitted, Codex treats the capability as applying to every tool exposed by that server.
Unknown versions or transports are ignored.
## Tool Call Metadata
For a negotiated server/tool pair, Codex adds `_meta["codex/subspan-tracing"]` while it is inside the active `mcp.tools.call` span:
```json
{
"_meta": {
"codex/subspan-tracing": {
"enabled": true,
"version": 1,
"traceparent": "00-00000000000000000000000000000001-0000000000000002-01",
"tracestate": "vendor=value"
}
}
}
```
`tracestate` is omitted when no tracestate is active. If tracing is not active, Codex does not add this metadata.
Servers should disable subspan emission for a tool call unless:
- `enabled` is `true`
- `version` is `1`
- `traceparent` is present and parseable
## Stderr Transport
For `stderr-jsonl`, each telemetry record is written to the MCP server process stderr as one line with this exact prefix:
```text
@codex-telemetry
```
The rest of the line is a single JSON object:
```text
@codex-telemetry {"v":1,"type":"span","name":"browser_use.tab.goto",...}
```
Normal stderr output must not use that prefix. Codex preserves ordinary stderr logging behavior for non-telemetry lines.
## Span Record Schema
Span records use schema version `v: 1`:
```json
{
"v": 1,
"type": "span",
"name": "browser_use.tab.goto",
"trace_id": "00000000000000000000000000000001",
"span_id": "0000000000000010",
"parent_span_id": "0000000000000002",
"trace_flags": "01",
"tracestate": "vendor=value",
"start_unix_nanos": 1000000000,
"end_unix_nanos": 2000000000,
"attrs": {
"browser_use.url": "https://example.com",
"browser_use.timeout_ms": 2500
}
}
```
Required fields:
- `v`: must be `1`
- `type`: must be `"span"`
- `name`: allowlisted span name
- `trace_id`: 32 lowercase or uppercase hex characters, nonzero
- `span_id`: 16 lowercase or uppercase hex characters, nonzero
- `parent_span_id`: 16 lowercase or uppercase hex characters, nonzero
- `trace_flags`: 2 hex characters
- `start_unix_nanos`: Unix timestamp in nanoseconds
- `end_unix_nanos`: Unix timestamp in nanoseconds, greater than or equal to `start_unix_nanos`
Optional fields:
- `tracestate`: W3C tracestate header value
- `attrs`: span attributes object
Codex also accepts `traceparent` for backward compatibility, but explicit IDs are the canonical protocol for reconstructed spans because they preserve parent-child relationships across records.
## Span IDs and Hierarchy
The first server-created span for a tool call should use:
- `trace_id` from the request `traceparent`
- `parent_span_id` from the request `traceparent`
- a newly generated `span_id`
Nested server spans should use the same `trace_id`, their own generated `span_id`, and the parent reconstructed span's `span_id` as `parent_span_id`.
## Sanitization
Servers must emit only sanitized, allowlisted attributes. Attribute values must be primitives:
- string
- integer or float
- boolean
Do not emit objects, arrays, cookies, credentials, auth headers, bearer tokens, full request/response payloads, arbitrary DOM text, or user-sensitive selectors. Codex applies its own allowlist and drops unsupported attributes, but servers should sanitize before writing records.
## Failure Behavior
Subspan telemetry is best effort:
- malformed telemetry lines are ignored
- unsupported versions or record types are ignored
- invalid span records are ignored or logged at warning level
- telemetry write failures must not fail the MCP tool call
- telemetry parser failures must not break MCP transport
- no telemetry is emitted or reconstructed when Codex tracing is inactive
## Current Attribute Profile
`browser-use-v1` is used by Browser Use instrumentation. `cua-v1` is used by Computer Use instrumentation for state capture, accessibility capture, action execution, and result-building spans. Codex currently allows the Browser Use, Node REPL, JS, and CUA span names and attribute keys needed for those profiles. New profiles should be added deliberately with their own allowlist changes and tests.

View File

@@ -2,6 +2,7 @@ pub(crate) mod config;
mod events;
pub(crate) mod metrics;
pub(crate) mod provider;
mod stderr_span_telemetry;
pub(crate) mod trace_context;
mod otlp;
@@ -24,6 +25,8 @@ pub use crate::metrics::runtime_metrics::RuntimeMetricsSummary;
pub use crate::metrics::timer::Timer;
pub use crate::metrics::*;
pub use crate::provider::OtelProvider;
pub use crate::stderr_span_telemetry::StderrSpanTelemetryError;
pub use crate::stderr_span_telemetry::emit_mcp_subspan_telemetry;
pub use crate::trace_context::context_from_w3c_trace_context;
pub use crate::trace_context::current_span_trace_id;
pub use crate::trace_context::current_span_w3c_trace_context;

View File

@@ -0,0 +1,864 @@
use std::time::Duration;
use std::time::SystemTime;
use opentelemetry::Context;
use opentelemetry::KeyValue;
use opentelemetry::global;
use opentelemetry::trace::Span as _;
use opentelemetry::trace::SpanContext;
use opentelemetry::trace::SpanId;
use opentelemetry::trace::SpanKind;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::TraceFlags;
use opentelemetry::trace::TraceId;
use opentelemetry::trace::TraceState;
use opentelemetry::trace::Tracer;
use serde_json::Map;
use serde_json::Value;
const MCP_SUBSPAN_TELEMETRY_TRACER_NAME: &str = "codex-mcp-subspan-stderr";
const CURRENT_SCHEMA_VERSION: u64 = 1;
const SPAN_RECORD_TYPE: &str = "span";
const MAX_ATTRIBUTE_STRING_BYTES: usize = 1024;
const ALLOWED_SPAN_NAMES: &[&str] = &[
"node_repl.js",
"browser_use.playwright.dom_snapshot",
"browser_use.tab.goto",
"browser_use.tab.click",
"browser_use.tab.type",
"browser_use.tab.screenshot",
"browser_use.cdp.execute",
"browser_use.tab.wait_for_load_state",
"cua.mcp.list_apps",
"cua.mcp.start_using_app",
"cua.mcp.activate_app",
"cua.mcp.deactivate_app",
"cua.mcp.get_state",
"cua.mcp.click",
"cua.mcp.perform_secondary_action",
"cua.mcp.set_value",
"cua.mcp.scroll",
"cua.mcp.drag",
"cua.mcp.press_key",
"cua.mcp.type_text",
"cua.action.execute_batch",
"cua.state.capture_frame",
"cua.state.capture_accessibility",
"cua.state.build_result",
];
const ALLOWED_ATTRIBUTE_PREFIXES: &[&str] = &["browser_use.", "node_repl.", "js."];
const ALLOWED_ATTRIBUTE_KEYS: &[&str] = &[
"error.type",
"error.message",
"cua.tool.name",
"cua.success",
"cua.action.count",
"cua.action.kind",
"cua.state.has_window",
"cua.screenshot.base64_bytes",
"cua.display.width",
"cua.display.height",
"cua.accessibility.text_bytes",
];
#[derive(Debug, thiserror::Error)]
pub enum StderrSpanTelemetryError {
#[error("telemetry payload must be a JSON object")]
NotObject,
#[error("unsupported telemetry schema version")]
UnsupportedVersion,
#[error("unsupported telemetry record type")]
UnsupportedType,
#[error("missing or invalid telemetry field `{0}`")]
InvalidField(&'static str),
#[error("unsupported telemetry span name")]
UnsupportedSpanName,
}
#[derive(Debug, Clone, PartialEq)]
struct SpanTelemetryRecord {
name: String,
span_id: Option<SpanId>,
trace_id: Option<TraceId>,
parent_span_id: Option<SpanId>,
trace_flags: Option<TraceFlags>,
traceparent: Option<String>,
tracestate: Option<String>,
start_time: SystemTime,
end_time: SystemTime,
attributes: Vec<KeyValue>,
}
pub fn emit_mcp_subspan_telemetry(payload: Value) -> Result<(), StderrSpanTelemetryError> {
let record = parse_span_telemetry_record(payload)?;
let tracer = global::tracer(MCP_SUBSPAN_TELEMETRY_TRACER_NAME);
emit_span_telemetry_record_with_tracer(&tracer, &record)
}
fn emit_span_telemetry_record_with_tracer<T>(
tracer: &T,
record: &SpanTelemetryRecord,
) -> Result<(), StderrSpanTelemetryError>
where
T: Tracer,
{
let parent_context = record.parent_context()?;
let mut builder = tracer
.span_builder(record.name.clone())
.with_kind(SpanKind::Internal)
.with_start_time(record.start_time)
.with_attributes(record.attributes.clone());
if let Some(span_id) = record.span_id {
builder = builder.with_span_id(span_id);
}
let mut span = tracer.build_with_context(builder, &parent_context);
span.end_with_timestamp(record.end_time);
Ok(())
}
impl SpanTelemetryRecord {
fn parent_context(&self) -> Result<Context, StderrSpanTelemetryError> {
if let (Some(trace_id), Some(parent_span_id), Some(trace_flags)) =
(self.trace_id, self.parent_span_id, self.trace_flags)
{
let trace_state = trace_state_from_header(self.tracestate.as_deref())?;
let span_context = SpanContext::new(
trace_id,
parent_span_id,
trace_flags,
/*is_remote*/ true,
trace_state,
);
return Ok(Context::new().with_remote_span_context(span_context));
}
let Some(traceparent) = self.traceparent.as_deref() else {
return Err(StderrSpanTelemetryError::InvalidField("traceparent"));
};
crate::trace_context::context_from_trace_headers(
Some(traceparent),
self.tracestate.as_deref(),
)
.ok_or(StderrSpanTelemetryError::InvalidField("traceparent"))
}
}
fn parse_span_telemetry_record(
payload: Value,
) -> Result<SpanTelemetryRecord, StderrSpanTelemetryError> {
let object = payload
.as_object()
.ok_or(StderrSpanTelemetryError::NotObject)?;
match object.get("v").and_then(Value::as_u64) {
Some(CURRENT_SCHEMA_VERSION) => {}
Some(_) => return Err(StderrSpanTelemetryError::UnsupportedVersion),
None => return Err(StderrSpanTelemetryError::InvalidField("v")),
}
match object.get("type").and_then(Value::as_str) {
Some(SPAN_RECORD_TYPE) => {}
Some(_) => return Err(StderrSpanTelemetryError::UnsupportedType),
None => return Err(StderrSpanTelemetryError::InvalidField("type")),
}
let name = required_string(object, "name")?.to_string();
if !ALLOWED_SPAN_NAMES.contains(&name.as_str()) {
return Err(StderrSpanTelemetryError::UnsupportedSpanName);
}
let traceparent = optional_string(object, "traceparent").map(str::to_string);
let span_id = optional_span_id_alias(object, &["span_id", "spanId"])?;
let trace_id = optional_trace_id_alias(object, &["trace_id", "traceId"])?;
let parent_span_id = optional_span_id_alias(object, &["parent_span_id", "parentSpanId"])?;
let trace_flags = optional_trace_flags_alias(object, &["trace_flags", "traceFlags"])?;
let tracestate = optional_string(object, "tracestate").map(str::to_string);
if span_id.is_some() || trace_id.is_some() || parent_span_id.is_some() || trace_flags.is_some()
{
if span_id.is_none() {
return Err(StderrSpanTelemetryError::InvalidField("span_id"));
}
if trace_id.is_none() {
return Err(StderrSpanTelemetryError::InvalidField("trace_id"));
}
if parent_span_id.is_none() {
return Err(StderrSpanTelemetryError::InvalidField("parent_span_id"));
}
if trace_flags.is_none() {
return Err(StderrSpanTelemetryError::InvalidField("trace_flags"));
}
} else if traceparent.is_none() {
return Err(StderrSpanTelemetryError::InvalidField("traceparent"));
}
let start_time = timestamp_from_unix_nanos(required_u64_alias(
object,
&[
"start_unix_nanos",
"startTimeUnixNanos",
"startTimeUnixNano",
"start_time_unix_nanos",
],
"start_unix_nanos",
)?)?;
let end_time = timestamp_from_unix_nanos(required_u64_alias(
object,
&[
"end_unix_nanos",
"endTimeUnixNanos",
"endTimeUnixNano",
"end_time_unix_nanos",
],
"end_unix_nanos",
)?)?;
if end_time < start_time {
return Err(StderrSpanTelemetryError::InvalidField("end_unix_nanos"));
}
let attributes = object
.get("attrs")
.or_else(|| object.get("attributes"))
.and_then(Value::as_object)
.map(sanitized_attributes)
.unwrap_or_default();
Ok(SpanTelemetryRecord {
name,
span_id,
trace_id,
parent_span_id,
trace_flags,
traceparent,
tracestate,
start_time,
end_time,
attributes,
})
}
fn required_string<'a>(
object: &'a Map<String, Value>,
key: &'static str,
) -> Result<&'a str, StderrSpanTelemetryError> {
object
.get(key)
.and_then(Value::as_str)
.filter(|value| !value.is_empty())
.ok_or(StderrSpanTelemetryError::InvalidField(key))
}
fn optional_string<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
object
.get(key)
.and_then(Value::as_str)
.filter(|value| !value.is_empty())
}
fn optional_string_alias<'a>(
object: &'a Map<String, Value>,
keys: &[&'static str],
) -> Option<(&'static str, &'a str)> {
keys.iter()
.find_map(|key| optional_string(object, key).map(|value| (*key, value)))
}
fn optional_trace_id_alias(
object: &Map<String, Value>,
keys: &[&'static str],
) -> Result<Option<TraceId>, StderrSpanTelemetryError> {
let Some((key, value)) = optional_string_alias(object, keys) else {
return Ok(None);
};
parse_trace_id(value)
.map(Some)
.map_err(|_| StderrSpanTelemetryError::InvalidField(key))
}
fn optional_span_id_alias(
object: &Map<String, Value>,
keys: &[&'static str],
) -> Result<Option<SpanId>, StderrSpanTelemetryError> {
let Some((key, value)) = optional_string_alias(object, keys) else {
return Ok(None);
};
parse_span_id(value)
.map(Some)
.map_err(|_| StderrSpanTelemetryError::InvalidField(key))
}
fn optional_trace_flags_alias(
object: &Map<String, Value>,
keys: &[&'static str],
) -> Result<Option<TraceFlags>, StderrSpanTelemetryError> {
let Some((key, value)) = optional_string_alias(object, keys) else {
return Ok(None);
};
parse_trace_flags(value)
.map(Some)
.map_err(|_| StderrSpanTelemetryError::InvalidField(key))
}
fn parse_trace_id(value: &str) -> Result<TraceId, ()> {
if !is_exact_hex(value, /*len*/ 32) {
return Err(());
}
let trace_id = TraceId::from_hex(value).map_err(|_| ())?;
if trace_id == TraceId::INVALID {
return Err(());
}
Ok(trace_id)
}
fn parse_span_id(value: &str) -> Result<SpanId, ()> {
if !is_exact_hex(value, /*len*/ 16) {
return Err(());
}
let span_id = SpanId::from_hex(value).map_err(|_| ())?;
if span_id == SpanId::INVALID {
return Err(());
}
Ok(span_id)
}
fn parse_trace_flags(value: &str) -> Result<TraceFlags, ()> {
if !is_exact_hex(value, /*len*/ 2) {
return Err(());
}
u8::from_str_radix(value, 16)
.map(TraceFlags::new)
.map_err(|_| ())
}
fn is_exact_hex(value: &str, len: usize) -> bool {
value.len() == len && value.bytes().all(|byte| byte.is_ascii_hexdigit())
}
fn trace_state_from_header(value: Option<&str>) -> Result<TraceState, StderrSpanTelemetryError> {
let Some(value) = value else {
return Ok(TraceState::default());
};
value
.parse()
.map_err(|_| StderrSpanTelemetryError::InvalidField("tracestate"))
}
fn required_u64_alias(
object: &Map<String, Value>,
keys: &[&'static str],
error_key: &'static str,
) -> Result<u64, StderrSpanTelemetryError> {
keys.iter()
.find_map(|key| object.get(*key).and_then(u64_value))
.ok_or(StderrSpanTelemetryError::InvalidField(error_key))
}
fn u64_value(value: &Value) -> Option<u64> {
value
.as_u64()
.or_else(|| value.as_str().and_then(|value| value.parse().ok()))
}
fn timestamp_from_unix_nanos(nanos: u64) -> Result<SystemTime, StderrSpanTelemetryError> {
let secs = nanos / 1_000_000_000;
let sub_nanos = (nanos % 1_000_000_000) as u32;
SystemTime::UNIX_EPOCH
.checked_add(Duration::new(secs, sub_nanos))
.ok_or(StderrSpanTelemetryError::InvalidField("timestamp"))
}
fn sanitized_attributes(attrs: &Map<String, Value>) -> Vec<KeyValue> {
attrs
.iter()
.filter(|(key, _)| is_allowed_attribute_key(key))
.filter_map(|(key, value)| safe_attribute_value(key, value))
.collect()
}
fn is_allowed_attribute_key(key: &str) -> bool {
ALLOWED_ATTRIBUTE_KEYS.contains(&key)
|| ALLOWED_ATTRIBUTE_PREFIXES
.iter()
.any(|prefix| key.starts_with(prefix))
}
fn safe_attribute_value(key: &str, value: &Value) -> Option<KeyValue> {
match value {
Value::Bool(value) => Some(KeyValue::new(key.to_string(), *value)),
Value::Number(value) => {
if let Some(value) = value.as_i64() {
Some(KeyValue::new(key.to_string(), value))
} else if let Some(value) = value.as_u64().and_then(|value| i64::try_from(value).ok()) {
Some(KeyValue::new(key.to_string(), value))
} else {
value
.as_f64()
.map(|value| KeyValue::new(key.to_string(), value))
}
}
Value::String(value) => Some(KeyValue::new(
key.to_string(),
truncate_attribute_string(value).to_string(),
)),
Value::Null | Value::Array(_) | Value::Object(_) => None,
}
}
fn truncate_attribute_string(value: &str) -> &str {
if value.len() <= MAX_ATTRIBUTE_STRING_BYTES {
return value;
}
let mut end = MAX_ATTRIBUTE_STRING_BYTES;
while !value.is_char_boundary(end) {
end = end.saturating_sub(1);
}
&value[..end]
}
#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::Value as OtelValue;
use opentelemetry::trace::SpanId;
use opentelemetry::trace::TraceId;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::InMemorySpanExporter;
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::trace::SpanData;
use pretty_assertions::assert_eq;
use std::collections::BTreeMap;
#[test]
fn valid_span_telemetry_reconstructs_otel_span_with_sanitized_attrs() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
let trace_id = "00000000000000000000000000000001";
let parent_span_id = "0000000000000002";
let record = parse_span_telemetry_record(serde_json::json!({
"v": 1,
"type": "span",
"name": "browser_use.tab.goto",
"traceparent": format!("00-{trace_id}-{parent_span_id}-01"),
"start_unix_nanos": 1_000_000_123u64,
"end_unix_nanos": 2_000_000_456u64,
"attrs": {
"browser_use.url": "https://example.com",
"browser_use.timeout_ms": 2500,
"unknown.secret": "drop me",
"browser_use.object": {"drop": true}
}
}))
.expect("valid record");
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
provider.force_flush().expect("flush spans");
let spans = exporter.get_finished_spans().expect("finished spans");
assert_eq!(spans.len(), 1);
let span = &spans[0];
assert_eq!(span.name.as_ref(), "browser_use.tab.goto");
assert_eq!(
span.span_context.trace_id(),
TraceId::from_hex(trace_id).unwrap()
);
assert_eq!(
span.parent_span_id,
SpanId::from_hex(parent_span_id).unwrap()
);
assert_eq!(
span.start_time,
SystemTime::UNIX_EPOCH + Duration::new(1, 123)
);
assert_eq!(
span.end_time,
SystemTime::UNIX_EPOCH + Duration::new(2, 456)
);
let attrs = span
.attributes
.iter()
.map(|kv| (kv.key.as_str().to_string(), kv.value.clone()))
.collect::<BTreeMap<_, _>>();
assert_eq!(
attrs.get("browser_use.url"),
Some(&OtelValue::String("https://example.com".into()))
);
assert_eq!(
attrs.get("browser_use.timeout_ms"),
Some(&OtelValue::I64(2500))
);
assert!(!attrs.contains_key("unknown.secret"));
assert!(!attrs.contains_key("browser_use.object"));
}
#[test]
fn cua_span_telemetry_reconstructs_otel_span_with_sanitized_attrs() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
let trace_id = "00000000000000000000000000000001";
let parent_span_id = "0000000000000002";
let record = parse_span_telemetry_record(serde_json::json!({
"v": 1,
"type": "span",
"name": "cua.state.build_result",
"traceparent": format!("00-{trace_id}-{parent_span_id}-01"),
"start_unix_nanos": 1_000_000_123u64,
"end_unix_nanos": 2_000_000_456u64,
"attrs": {
"cua.screenshot.base64_bytes": 212000,
"cua.display.width": 1280,
"cua.display.height": 720,
"cua.accessibility.text_bytes": 4096,
"cua.success": true,
"cua.sensitive_text": "drop me",
"browser_use.object": {"drop": true}
}
}))
.expect("valid record");
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
provider.force_flush().expect("flush spans");
let spans = exporter.get_finished_spans().expect("finished spans");
assert_eq!(spans.len(), 1);
let span = &spans[0];
let attrs = span
.attributes
.iter()
.map(|kv| (kv.key.as_str().to_string(), kv.value.clone()))
.collect::<BTreeMap<_, _>>();
assert_eq!(span.name.as_ref(), "cua.state.build_result");
assert_eq!(
attrs,
BTreeMap::from([
(
"cua.accessibility.text_bytes".to_string(),
OtelValue::I64(4096),
),
("cua.display.height".to_string(), OtelValue::I64(720)),
("cua.display.width".to_string(), OtelValue::I64(1280)),
(
"cua.screenshot.base64_bytes".to_string(),
OtelValue::I64(212000),
),
("cua.success".to_string(), OtelValue::Bool(true)),
])
);
}
#[test]
fn stale_cua_get_screenshot_span_is_rejected() {
let error = parse_span_telemetry_record(serde_json::json!({
"v": 1,
"type": "span",
"name": "cua.mcp.get_screenshot",
"trace_id": "00000000000000000000000000000001",
"span_id": "0000000000000010",
"parent_span_id": "0000000000000002",
"trace_flags": "01",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}))
.expect_err("unsupported stale CUA span");
assert!(matches!(
error,
StderrSpanTelemetryError::UnsupportedSpanName
));
}
#[test]
fn explicit_ids_reconstruct_span_and_parent_ids() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
let trace_id = "00000000000000000000000000000001";
let parent_span_id = "0000000000000002";
let span_id = "0000000000000010";
let record = parse_span_telemetry_record(serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": trace_id,
"span_id": span_id,
"parent_span_id": parent_span_id,
"trace_flags": "01",
"tracestate": "vendor=value",
"start_unix_nanos": 1_000_000_123u64,
"end_unix_nanos": 2_000_000_456u64,
}))
.expect("valid record");
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
provider.force_flush().expect("flush spans");
let spans = exporter.get_finished_spans().expect("finished spans");
assert_eq!(spans.len(), 1);
let span = &spans[0];
assert_eq!(
span.span_context.trace_id(),
TraceId::from_hex(trace_id).unwrap()
);
assert_eq!(
span.span_context.span_id(),
SpanId::from_hex(span_id).unwrap()
);
assert_eq!(
span.parent_span_id,
SpanId::from_hex(parent_span_id).unwrap()
);
assert!(span.span_context.trace_flags().is_sampled());
}
#[test]
fn camel_case_explicit_ids_reconstruct_span_and_parent_ids() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
let trace_id = "00000000000000000000000000000001";
let parent_span_id = "0000000000000002";
let span_id = "0000000000000010";
let record = parse_span_telemetry_record(serde_json::json!({
"v": 1,
"type": "span",
"name": "browser_use.tab.click",
"traceId": trace_id,
"spanId": span_id,
"parentSpanId": parent_span_id,
"traceFlags": "01",
"start_unix_nanos": 1_000_000_123u64,
"end_unix_nanos": 2_000_000_456u64,
}))
.expect("valid record");
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
provider.force_flush().expect("flush spans");
let spans = exporter.get_finished_spans().expect("finished spans");
assert_eq!(spans.len(), 1);
let span = &spans[0];
assert_eq!(
span.span_context.trace_id(),
TraceId::from_hex(trace_id).unwrap()
);
assert_eq!(
span.span_context.span_id(),
SpanId::from_hex(span_id).unwrap()
);
assert_eq!(
span.parent_span_id,
SpanId::from_hex(parent_span_id).unwrap()
);
assert!(span.span_context.trace_flags().is_sampled());
}
#[test]
fn child_span_can_parent_to_previous_reconstructed_span_id() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
let trace_id = "00000000000000000000000000000001";
let mcp_span_id = "0000000000000002";
let node_span_id = "0000000000000010";
let child_span_id = "0000000000000011";
for payload in [
serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": trace_id,
"span_id": node_span_id,
"parent_span_id": mcp_span_id,
"trace_flags": "01",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 3_000_000_000u64,
}),
serde_json::json!({
"v": 1,
"type": "span",
"name": "browser_use.tab.goto",
"trace_id": trace_id,
"span_id": child_span_id,
"parent_span_id": node_span_id,
"trace_flags": "01",
"start_unix_nanos": 1_500_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}),
] {
let record = parse_span_telemetry_record(payload).expect("valid record");
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
}
provider.force_flush().expect("flush spans");
let spans = exporter.get_finished_spans().expect("finished spans");
let node_span = find_span(&spans, "node_repl.js");
let child_span = find_span(&spans, "browser_use.tab.goto");
assert_eq!(
node_span.span_context.span_id(),
SpanId::from_hex(node_span_id).unwrap()
);
assert_eq!(
child_span.span_context.span_id(),
SpanId::from_hex(child_span_id).unwrap()
);
assert_eq!(
child_span.parent_span_id,
SpanId::from_hex(node_span_id).unwrap()
);
}
#[test]
fn invalid_span_telemetry_is_rejected_without_emitting_span() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
let error = parse_span_telemetry_record(serde_json::json!({
"v": 99,
"type": "span",
}))
.expect_err("unsupported version");
assert!(matches!(
error,
StderrSpanTelemetryError::UnsupportedVersion
));
assert!(
emit_span_telemetry_record_with_tracer(
&tracer,
&SpanTelemetryRecord {
name: "browser_use.tab.goto".to_string(),
span_id: None,
trace_id: None,
parent_span_id: None,
trace_flags: None,
traceparent: Some("not-a-traceparent".to_string()),
tracestate: None,
start_time: SystemTime::UNIX_EPOCH,
end_time: SystemTime::UNIX_EPOCH,
attributes: Vec::new(),
},
)
.is_err()
);
provider.force_flush().expect("flush spans");
assert!(
exporter
.get_finished_spans()
.expect("finished spans")
.is_empty()
);
}
#[test]
fn invalid_explicit_ids_are_rejected_without_emitting_span() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
for payload in [
serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": "00000000000000000000000000000000",
"span_id": "0000000000000010",
"parent_span_id": "0000000000000002",
"trace_flags": "01",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}),
serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": "00000000000000000000000000000001",
"span_id": "0000000000000000",
"parent_span_id": "0000000000000002",
"trace_flags": "01",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}),
serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": "00000000000000000000000000000001",
"span_id": "0000000000000010",
"parent_span_id": "0002",
"trace_flags": "01",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}),
serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": "00000000000000000000000000000001",
"span_id": "0000000000000010",
"parent_span_id": "0000000000000002",
"trace_flags": "001",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}),
] {
let error = parse_span_telemetry_record(payload).expect_err("invalid id");
assert!(matches!(error, StderrSpanTelemetryError::InvalidField(_)));
}
assert!(
emit_mcp_subspan_telemetry(serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": "00000000000000000000000000000001",
"span_id": "0000000000000010",
"parent_span_id": "0000000000000002",
"trace_flags": "zz",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}))
.is_err()
);
provider.force_flush().expect("flush spans");
assert!(
exporter
.get_finished_spans()
.expect("finished spans")
.is_empty()
);
drop(tracer);
}
fn find_span<'a>(spans: &'a [SpanData], name: &str) -> &'a SpanData {
spans
.iter()
.find(|span| span.name == name)
.unwrap_or_else(|| panic!("missing span {name}"))
}
}

View File

@@ -41,9 +41,11 @@ use serde_json::to_vec;
use tokio::runtime::Handle;
use tokio::sync::broadcast;
use tracing::debug;
use tracing::info;
use tracing::warn;
use crate::stdio_server_launcher::StdioServerTelemetrySink;
use crate::stdio_server_launcher::handle_stderr_line;
static PROCESS_COUNTER: AtomicUsize = AtomicUsize::new(1);
// Remote public implementation.
@@ -78,6 +80,9 @@ pub(super) struct ExecutorProcessTransport {
/// Buffered stderr bytes for diagnostic logging.
stderr: Vec<u8>,
/// Optional sink for structured stderr telemetry lines.
telemetry_sink: Option<StdioServerTelemetrySink>,
/// Whether the executor has reported process closure or a terminal
/// subscription failure. Once closed, any remaining partial stdout line is
/// flushed once and then rmcp receives EOF.
@@ -95,7 +100,11 @@ pub(super) struct ExecutorProcessTransport {
}
impl ExecutorProcessTransport {
pub(super) fn new(process: Arc<dyn ExecProcess>, program_name: String) -> Self {
pub(super) fn new(
process: Arc<dyn ExecProcess>,
program_name: String,
telemetry_sink: Option<StdioServerTelemetrySink>,
) -> Self {
// Subscribe before returning the transport to rmcp. Some test servers
// can emit output or exit quickly after `process/start`, and the
// process event log will replay anything that landed before this
@@ -107,6 +116,7 @@ impl ExecutorProcessTransport {
program_name,
stdout: Vec::new(),
stderr: Vec::new(),
telemetry_sink,
closed: false,
terminated: false,
last_seq: 0,
@@ -321,10 +331,10 @@ impl ExecutorProcessTransport {
if line.last() == Some(&b'\r') {
line.pop();
}
info!(
"MCP server stderr ({}): {}",
self.program_name,
String::from_utf8_lossy(&line)
handle_stderr_line(
&self.program_name,
&String::from_utf8_lossy(&line),
self.telemetry_sink.as_ref(),
);
}
}
@@ -334,10 +344,10 @@ impl ExecutorProcessTransport {
return;
}
let line = take(&mut self.stderr);
info!(
"MCP server stderr ({}): {}",
self.program_name,
String::from_utf8_lossy(&line)
handle_stderr_line(
&self.program_name,
&String::from_utf8_lossy(&line),
self.telemetry_sink.as_ref(),
);
}

View File

@@ -35,3 +35,5 @@ pub use rmcp_client::ToolWithConnectorId;
pub use stdio_server_launcher::ExecutorStdioServerLauncher;
pub use stdio_server_launcher::LocalStdioServerLauncher;
pub use stdio_server_launcher::StdioServerLauncher;
pub use stdio_server_launcher::StdioServerTelemetry;
pub use stdio_server_launcher::StdioServerTelemetrySink;

View File

@@ -68,6 +68,7 @@ use crate::oauth::StoredOAuthTokens;
use crate::stdio_server_launcher::StdioServerCommand;
use crate::stdio_server_launcher::StdioServerLauncher;
use crate::stdio_server_launcher::StdioServerProcessHandle;
use crate::stdio_server_launcher::StdioServerTelemetrySink;
use crate::stdio_server_launcher::StdioServerTransport;
use crate::utils::apply_default_headers;
use crate::utils::build_default_headers;
@@ -282,9 +283,17 @@ impl RmcpClient {
env_vars: &[McpServerEnvVar],
cwd: Option<PathBuf>,
launcher: Arc<dyn StdioServerLauncher>,
telemetry_sink: Option<StdioServerTelemetrySink>,
) -> io::Result<Self> {
let transport_recipe = TransportRecipe::Stdio {
command: StdioServerCommand::new(program, args, env, env_vars.to_vec(), cwd),
command: StdioServerCommand::new(
program,
args,
env,
env_vars.to_vec(),
cwd,
telemetry_sink,
),
launcher,
};
let transport = Self::create_pending_transport(&transport_recipe)

View File

@@ -82,8 +82,16 @@ pub struct StdioServerCommand {
env: Option<HashMap<OsString, OsString>>,
env_vars: Vec<McpServerEnvVar>,
cwd: Option<PathBuf>,
telemetry_sink: Option<StdioServerTelemetrySink>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct StdioServerTelemetry {
pub payload: serde_json::Value,
}
pub type StdioServerTelemetrySink = Arc<dyn Fn(StdioServerTelemetry) + Send + Sync>;
/// Client-side rmcp transport for a launched MCP stdio server.
///
/// The concrete process placement stays private to this module. `RmcpClient`
@@ -149,6 +157,7 @@ impl StdioServerCommand {
env: Option<HashMap<OsString, OsString>>,
env_vars: Vec<McpServerEnvVar>,
cwd: Option<PathBuf>,
telemetry_sink: Option<StdioServerTelemetrySink>,
) -> Self {
Self {
program,
@@ -156,6 +165,7 @@ impl StdioServerCommand {
env,
env_vars,
cwd,
telemetry_sink,
}
}
}
@@ -243,6 +253,7 @@ impl LocalStdioServerLauncher {
env,
env_vars,
cwd,
telemetry_sink,
} = command;
let program_name = program.to_string_lossy().into_owned();
let envs = create_env_for_mcp_server(env, &env_vars).map_err(io::Error::other)?;
@@ -276,7 +287,7 @@ impl LocalStdioServerLauncher {
loop {
match reader.next_line().await {
Ok(Some(line)) => {
info!("MCP server stderr ({program_name}): {line}");
handle_stderr_line(&program_name, &line, telemetry_sink.as_ref());
}
Ok(None) => break,
Err(error) => {
@@ -479,6 +490,7 @@ impl ExecutorStdioServerLauncher {
env,
env_vars,
cwd,
telemetry_sink,
} = command;
let program_name = program.to_string_lossy().into_owned();
let envs = create_env_overlay_for_remote_mcp_server(env, &env_vars);
@@ -513,6 +525,7 @@ impl ExecutorStdioServerLauncher {
inner: StdioServerTransportInner::Executor(ExecutorProcessTransport::new(
started.process,
program_name,
telemetry_sink,
)),
process,
})
@@ -578,6 +591,38 @@ impl ExecutorStdioServerLauncher {
}
}
const CODEX_TELEMETRY_STDERR_PREFIX: &str = "@codex-telemetry ";
pub(super) fn handle_stderr_line(
program_name: &str,
line: &str,
telemetry_sink: Option<&StdioServerTelemetrySink>,
) {
let Some(sink) = telemetry_sink else {
info!("MCP server stderr ({program_name}): {line}");
return;
};
let Some(telemetry) = parse_stderr_telemetry_line(line) else {
info!("MCP server stderr ({program_name}): {line}");
return;
};
match telemetry {
Ok(telemetry) => sink(telemetry),
Err(error) => {
warn!("Failed to parse MCP server telemetry ({program_name}): {error}");
}
}
}
fn parse_stderr_telemetry_line(
line: &str,
) -> Option<Result<StdioServerTelemetry, serde_json::Error>> {
let payload = line.strip_prefix(CODEX_TELEMETRY_STDERR_PREFIX)?;
Some(serde_json::from_str(payload).map(|payload| StdioServerTelemetry { payload }))
}
#[cfg(test)]
mod tests {
use super::*;
@@ -651,4 +696,27 @@ mod tests {
);
assert!(!env.contains_key("UNREQUESTED_SECRET"));
}
#[test]
fn stderr_telemetry_parser_separates_prefixed_lines_from_normal_stderr() {
assert!(parse_stderr_telemetry_line("ordinary stderr").is_none());
assert_eq!(
parse_stderr_telemetry_line("@codex-telemetry {\"v\":1,\"type\":\"span\"}")
.expect("telemetry line")
.expect("valid telemetry"),
StdioServerTelemetry {
payload: serde_json::json!({
"v": 1,
"type": "span",
}),
}
);
assert!(
parse_stderr_telemetry_line("@codex-telemetry not-json")
.expect("telemetry line")
.is_err()
);
}
}

View File

@@ -116,6 +116,7 @@ async fn drop_kills_wrapper_process_group() -> Result<()> {
&[],
/*cwd*/ None,
Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)),
/*telemetry_sink*/ None,
)
.await?;
@@ -147,6 +148,7 @@ async fn shutdown_kills_initialized_stdio_server_with_in_flight_operation() -> R
&[],
/*cwd*/ None,
Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)),
/*telemetry_sink*/ None,
)
.await?,
);

View File

@@ -64,6 +64,7 @@ async fn rmcp_client_can_list_and_read_resources() -> anyhow::Result<()> {
&[],
/*cwd*/ None,
Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)),
/*telemetry_sink*/ None,
)
.await?;