Compare commits

...

2 Commits

Author SHA1 Message Date
charley-openai
5fa172006e Gate MCP subspan tracing on stderr support 2026-05-01 16:21:19 -07:00
charley-openai
6e420986ce Ingest node_repl stderr telemetry spans 2026-04-30 18:03:16 -07:00
14 changed files with 1451 additions and 18 deletions

View File

@@ -564,6 +564,19 @@ impl McpConnectionManager {
.server_supports_sandbox_state_meta_capability)
}
pub async fn server_supports_subspan_tracing_for_tool(
&self,
server: &str,
tool: &str,
) -> Result<bool> {
Ok(self
.client_by_name(server)
.await?
.subspan_tracing_capability
.as_ref()
.is_some_and(|capability| capability.supports_tool(tool)))
}
/// List resources from the specified server.
pub async fn list_resources(
&self,

View File

@@ -1,5 +1,8 @@
pub use connection_manager::McpConnectionManager;
pub use rmcp_client::MCP_SANDBOX_STATE_META_CAPABILITY;
pub use rmcp_client::MCP_SUBSPAN_TRACING_CAPABILITY;
pub use rmcp_client::MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL;
pub use rmcp_client::MCP_SUBSPAN_TRACING_VERSION;
pub use runtime::McpRuntimeEnvironment;
pub use runtime::SandboxState;
pub use tools::ToolInfo;

View File

@@ -7,7 +7,9 @@
//! [`crate::connection_manager`].
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::env;
use std::ffi::OsString;
use std::sync::Arc;
@@ -50,6 +52,8 @@ use codex_rmcp_client::ExecutorStdioServerLauncher;
use codex_rmcp_client::LocalStdioServerLauncher;
use codex_rmcp_client::RmcpClient;
use codex_rmcp_client::StdioServerLauncher;
use codex_rmcp_client::StdioServerTelemetry;
use codex_rmcp_client::StdioServerTelemetrySink;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::future::Shared;
@@ -58,6 +62,7 @@ use rmcp::model::ElicitationCapability;
use rmcp::model::FormElicitationCapability;
use rmcp::model::Implementation;
use rmcp::model::InitializeRequestParams;
use rmcp::model::JsonObject;
use rmcp::model::ProtocolVersion;
use rmcp::model::Tool as RmcpTool;
use tokio_util::sync::CancellationToken;
@@ -66,6 +71,12 @@ use tracing::warn;
/// MCP server capability indicating that Codex should include [`SandboxState`]
/// in tool-call request `_meta` under this key.
pub const MCP_SANDBOX_STATE_META_CAPABILITY: &str = "codex/sandbox-state-meta";
/// MCP server capability indicating that Codex should include W3C trace context
/// in tool-call request `_meta` so the server can emit reconstructed child spans.
/// See `codex-rs/docs/mcp_subspan_tracing.md` for the protocol contract.
pub const MCP_SUBSPAN_TRACING_CAPABILITY: &str = "codex/subspan-tracing";
pub const MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL: &str = "stderr-jsonl";
pub const MCP_SUBSPAN_TRACING_VERSION: u64 = 1;
pub(crate) const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms";
pub(crate) const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str =
@@ -89,9 +100,52 @@ pub(crate) struct ManagedClient {
pub(crate) tool_timeout: Option<Duration>,
pub(crate) server_instructions: Option<String>,
pub(crate) server_supports_sandbox_state_meta_capability: bool,
pub(crate) subspan_tracing_capability: Option<McpSubspanTracingCapability>,
pub(crate) codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct McpSubspanTracingCapability {
tools: Option<HashSet<String>>,
}
impl McpSubspanTracingCapability {
fn from_experimental_capability(capability: &JsonObject) -> Option<Self> {
if capability
.get("version")
.and_then(serde_json::Value::as_u64)
!= Some(MCP_SUBSPAN_TRACING_VERSION)
{
return None;
}
let supports_stderr_jsonl = capability
.get("transports")
.and_then(serde_json::Value::as_array)
.is_some_and(|transports| {
transports.iter().any(|transport| {
transport.as_str() == Some(MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL)
})
});
if !supports_stderr_jsonl {
return None;
}
let tools = capability
.get("tools")
.and_then(serde_json::Value::as_object)
.map(|tools| tools.keys().cloned().collect());
Some(Self { tools })
}
pub(crate) fn supports_tool(&self, tool_name: &str) -> bool {
self.tools
.as_ref()
.is_none_or(|tools| tools.contains(tool_name))
}
}
impl ManagedClient {
fn listed_tools(&self) -> Vec<ToolInfo> {
let total_start = Instant::now();
@@ -152,7 +206,11 @@ impl AsyncManagedClient {
.map(|tools| filter_tools(tools, &tool_filter));
let startup_tool_filter = tool_filter;
let startup_complete = Arc::new(AtomicBool::new(false));
let subspan_tracing_enabled = Arc::new(AtomicBool::new(false));
let subspan_tracing_stderr_jsonl_available =
subspan_tracing_stderr_jsonl_available(&config, &runtime_environment);
let startup_complete_for_fut = Arc::clone(&startup_complete);
let subspan_tracing_enabled_for_fut = Arc::clone(&subspan_tracing_enabled);
let cancel_token_for_fut = cancel_token.clone();
let fut = async move {
let outcome = match async {
@@ -167,6 +225,7 @@ impl AsyncManagedClient {
store_mode,
runtime_environment,
runtime_auth_provider,
Arc::clone(&subspan_tracing_enabled_for_fut),
)
.await?,
);
@@ -182,6 +241,8 @@ impl AsyncManagedClient {
tx_event,
elicitation_requests,
codex_apps_tools_cache_context,
subspan_tracing_enabled: subspan_tracing_enabled_for_fut,
subspan_tracing_stderr_jsonl_available,
},
)
.await
@@ -462,12 +523,29 @@ async fn start_server_task(
tx_event,
elicitation_requests,
codex_apps_tools_cache_context,
subspan_tracing_enabled,
subspan_tracing_stderr_jsonl_available,
} = params;
let elicitation = elicitation_capability_for_server(&server_name);
let params = InitializeRequestParams {
let client_experimental_capabilities = subspan_tracing_stderr_jsonl_available.then(|| {
let mut subspan_tracing_capability = JsonObject::new();
subspan_tracing_capability.insert(
"version".to_string(),
serde_json::json!(MCP_SUBSPAN_TRACING_VERSION),
);
subspan_tracing_capability.insert(
"transports".to_string(),
serde_json::json!([MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL]),
);
BTreeMap::from([(
MCP_SUBSPAN_TRACING_CAPABILITY.to_string(),
subspan_tracing_capability,
)])
});
let initialize_params = InitializeRequestParams {
meta: None,
capabilities: ClientCapabilities {
experimental: None,
experimental: client_experimental_capabilities,
extensions: None,
roots: None,
sampling: None,
@@ -488,7 +566,7 @@ async fn start_server_task(
let send_elicitation = elicitation_requests.make_sender(server_name.clone(), tx_event);
let initialize_result = client
.initialize(params, startup_timeout, send_elicitation)
.initialize(initialize_params, startup_timeout, send_elicitation)
.await
.map_err(StartupOutcomeError::from)?;
@@ -498,6 +576,13 @@ async fn start_server_task(
.as_ref()
.and_then(|exp| exp.get(MCP_SANDBOX_STATE_META_CAPABILITY))
.is_some();
let subspan_tracing_capability = initialize_result
.capabilities
.experimental
.as_ref()
.and_then(|exp| exp.get(MCP_SUBSPAN_TRACING_CAPABILITY))
.and_then(McpSubspanTracingCapability::from_experimental_capability);
subspan_tracing_enabled.store(subspan_tracing_capability.is_some(), Ordering::Release);
let list_start = Instant::now();
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(
@@ -534,6 +619,7 @@ async fn start_server_task(
tool_filter,
server_instructions: initialize_result.instructions,
server_supports_sandbox_state_meta_capability,
subspan_tracing_capability,
codex_apps_tools_cache_context,
};
@@ -547,6 +633,23 @@ struct StartServerTaskParams {
tx_event: Sender<Event>,
elicitation_requests: ElicitationRequestManager,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
subspan_tracing_enabled: Arc<AtomicBool>,
subspan_tracing_stderr_jsonl_available: bool,
}
fn subspan_tracing_stderr_jsonl_available(
config: &McpServerConfig,
runtime_environment: &McpRuntimeEnvironment,
) -> bool {
if !matches!(config.transport, McpServerTransportConfig::Stdio { .. }) {
return false;
}
match config.experimental_environment.as_deref() {
None | Some("local") => true,
Some("remote") => runtime_environment.environment().is_remote(),
Some(_) => false,
}
}
async fn make_rmcp_client(
@@ -555,6 +658,7 @@ async fn make_rmcp_client(
store_mode: OAuthCredentialsStoreMode,
runtime_environment: McpRuntimeEnvironment,
runtime_auth_provider: Option<SharedAuthProvider>,
subspan_tracing_enabled: Arc<AtomicBool>,
) -> Result<RmcpClient, StartupOutcomeError> {
let McpServerConfig {
transport,
@@ -607,9 +711,20 @@ async fn make_rmcp_client(
// `RmcpClient` always sees a launched MCP stdio server. The
// launcher hides whether that means a local child process or an
// executor process whose stdin/stdout bytes cross the process API.
RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher)
.await
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
RmcpClient::new_stdio_client(
command_os,
args_os,
env_os,
&env_vars,
cwd,
launcher,
Some(subspan_tracing_telemetry_sink(
server_name,
subspan_tracing_enabled,
)),
)
.await
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
}
McpServerTransportConfig::StreamableHttp {
url,
@@ -643,11 +758,40 @@ async fn make_rmcp_client(
}
}
fn subspan_tracing_telemetry_sink(
server_name: &str,
enabled: Arc<AtomicBool>,
) -> StdioServerTelemetrySink {
let server_name = server_name.to_string();
Arc::new(move |telemetry: StdioServerTelemetry| {
if !enabled.load(Ordering::Acquire) {
return;
}
if let Err(error) = codex_otel::emit_mcp_subspan_telemetry(telemetry.payload) {
match error {
codex_otel::StderrSpanTelemetryError::UnsupportedVersion
| codex_otel::StderrSpanTelemetryError::UnsupportedType => {
tracing::debug!(
"ignoring unsupported MCP subspan telemetry from {server_name}: {error}"
);
}
_ => {
warn!("ignoring invalid MCP subspan telemetry from {server_name}: {error}");
}
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use codex_exec_server::Environment;
use rmcp::model::JsonObject;
use rmcp::model::Meta;
use std::path::PathBuf;
use std::sync::Arc;
fn tool_with_connector_meta() -> RmcpTool {
RmcpTool {
@@ -744,4 +888,97 @@ mod tests {
assert!(meta.0.contains_key(key), "{key} should be preserved");
}
}
fn local_runtime_environment() -> McpRuntimeEnvironment {
McpRuntimeEnvironment::new(
Arc::new(Environment::default_for_tests()),
PathBuf::from("/tmp"),
)
}
fn remote_runtime_environment() -> McpRuntimeEnvironment {
McpRuntimeEnvironment::new(
Arc::new(
Environment::create_for_tests(Some("ws://executor.example".to_string()))
.expect("remote environment"),
),
PathBuf::from("/tmp"),
)
}
fn mcp_server_config(value: serde_json::Value) -> McpServerConfig {
serde_json::from_value(value).expect("mcp server config")
}
#[tokio::test]
async fn subspan_tracing_stderr_jsonl_is_available_only_when_stderr_is_observed() {
assert!(subspan_tracing_stderr_jsonl_available(
&mcp_server_config(serde_json::json!({"command": "node"})),
&local_runtime_environment(),
));
assert!(!subspan_tracing_stderr_jsonl_available(
&mcp_server_config(serde_json::json!({"url": "https://mcp.example"})),
&local_runtime_environment(),
));
assert!(!subspan_tracing_stderr_jsonl_available(
&mcp_server_config(serde_json::json!({
"command": "node",
"experimental_environment": "remote",
})),
&local_runtime_environment(),
));
assert!(subspan_tracing_stderr_jsonl_available(
&mcp_server_config(serde_json::json!({
"command": "node",
"experimental_environment": "remote",
})),
&remote_runtime_environment(),
));
}
#[test]
fn subspan_tracing_capability_accepts_stderr_jsonl_transport_and_tool_filter() {
let capability: JsonObject = serde_json::from_value(serde_json::json!({
"version": 1,
"transports": ["stderr-jsonl"],
"tools": {
"js": {
"attributeProfile": "browser-use-v1"
}
}
}))
.expect("capability object");
let capability = McpSubspanTracingCapability::from_experimental_capability(&capability)
.expect("valid capability");
assert!(capability.supports_tool("js"));
assert!(!capability.supports_tool("other"));
}
#[test]
fn subspan_tracing_capability_rejects_unknown_version_or_transport() {
for capability in [
serde_json::json!({
"version": 2,
"transports": ["stderr-jsonl"],
}),
serde_json::json!({
"version": 1,
"transports": ["events"],
}),
serde_json::json!({
"version": 1,
}),
] {
let capability: JsonObject =
serde_json::from_value(capability).expect("capability object");
assert!(
McpSubspanTracingCapability::from_experimental_capability(&capability).is_none()
);
}
}
}

View File

@@ -40,6 +40,8 @@ use codex_config::types::AppToolApproval;
use codex_features::Feature;
use codex_hooks::PermissionRequestDecision;
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
use codex_mcp::MCP_SUBSPAN_TRACING_CAPABILITY;
use codex_mcp::MCP_SUBSPAN_TRACING_VERSION;
use codex_mcp::SandboxState;
use codex_mcp::declared_openai_file_input_param_names;
use codex_mcp::mcp_permission_prompt_is_auto_approved;
@@ -534,6 +536,9 @@ async fn execute_mcp_tool_call(
rewritten_arguments: Option<JsonValue>,
request_meta: Option<JsonValue>,
) -> Result<CallToolResult, String> {
let request_meta =
augment_mcp_tool_request_meta_with_subspan_tracing(sess, server, tool_name, request_meta)
.await;
let request_meta =
with_mcp_tool_call_thread_id_meta(request_meta, &sess.conversation_id.to_string());
let request_meta =
@@ -721,6 +726,7 @@ const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
const MCP_TOOL_OPENAI_OUTPUT_TEMPLATE_META_KEY: &str = "openai/outputTemplate";
const MCP_TOOL_UI_RESOURCE_URI_META_KEY: &str = "ui/resourceUri";
const MCP_TOOL_THREAD_ID_META_KEY: &str = "threadId";
const MCP_TOOL_SUBSPAN_TRACING_META_KEY: &str = MCP_SUBSPAN_TRACING_CAPABILITY;
fn custom_mcp_tool_approval_mode(
turn_context: &TurnContext,
@@ -780,6 +786,79 @@ fn build_mcp_tool_call_request_meta(
(!request_meta.is_empty()).then_some(serde_json::Value::Object(request_meta))
}
#[expect(
clippy::await_holding_invalid_type,
reason = "MCP subspan metadata reads through the session-owned manager guard"
)]
async fn augment_mcp_tool_request_meta_with_subspan_tracing(
sess: &Session,
server: &str,
tool_name: &str,
meta: Option<serde_json::Value>,
) -> Option<serde_json::Value> {
let supports_subspan_tracing = sess
.services
.mcp_connection_manager
.read()
.await
.server_supports_subspan_tracing_for_tool(server, tool_name)
.await
.unwrap_or(false);
augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(supports_subspan_tracing, meta)
}
fn augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
supports_subspan_tracing: bool,
meta: Option<serde_json::Value>,
) -> Option<serde_json::Value> {
if !supports_subspan_tracing {
return meta;
}
let Some(trace_context) = codex_otel::current_span_w3c_trace_context() else {
return meta;
};
let Some(traceparent) = trace_context.traceparent else {
return meta;
};
let mut telemetry_meta = serde_json::Map::new();
telemetry_meta.insert("enabled".to_string(), serde_json::Value::Bool(true));
telemetry_meta.insert(
"version".to_string(),
serde_json::Value::Number(MCP_SUBSPAN_TRACING_VERSION.into()),
);
telemetry_meta.insert(
"traceparent".to_string(),
serde_json::Value::String(traceparent),
);
if let Some(tracestate) = trace_context.tracestate {
telemetry_meta.insert(
"tracestate".to_string(),
serde_json::Value::String(tracestate),
);
}
match meta {
Some(serde_json::Value::Object(mut map)) => {
map.insert(
MCP_TOOL_SUBSPAN_TRACING_META_KEY.to_string(),
serde_json::Value::Object(telemetry_meta),
);
Some(serde_json::Value::Object(map))
}
None => {
let mut map = serde_json::Map::new();
map.insert(
MCP_TOOL_SUBSPAN_TRACING_META_KEY.to_string(),
serde_json::Value::Object(telemetry_meta),
);
Some(serde_json::Value::Object(map))
}
other => other,
}
}
fn with_mcp_tool_call_thread_id_meta(
meta: Option<serde_json::Value>,
thread_id: &str,

View File

@@ -941,6 +941,83 @@ async fn codex_apps_tool_call_request_meta_includes_call_id_without_existing_cod
);
}
#[test]
fn subspan_tracing_request_meta_includes_trace_context_when_supported() {
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::SdkTracerProvider;
use tracing_subscriber::prelude::*;
let provider = SdkTracerProvider::builder().build();
let tracer = provider.tracer("codex-core-mcp-tool-call-tests");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
let _guard = tracing::subscriber::set_default(subscriber);
let span = tracing::info_span!("mcp.tools.call");
let _entered = span.enter();
let meta = augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
/*supports_subspan_tracing*/ true,
Some(serde_json::json!({
"threadId": "thread-live",
})),
)
.expect("meta");
let telemetry = meta
.get(MCP_TOOL_SUBSPAN_TRACING_META_KEY)
.expect("subspan tracing metadata");
assert_eq!(
telemetry.get("enabled"),
Some(&serde_json::Value::Bool(true))
);
assert_eq!(
telemetry.get("version"),
Some(&serde_json::json!(MCP_SUBSPAN_TRACING_VERSION))
);
assert!(
telemetry
.get("traceparent")
.and_then(serde_json::Value::as_str)
.is_some_and(|traceparent| traceparent.starts_with("00-"))
);
assert_eq!(
meta.get("threadId"),
Some(&serde_json::json!("thread-live"))
);
}
#[test]
fn subspan_tracing_request_meta_is_only_added_when_supported_and_trace_context_exists() {
assert_eq!(
augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
/*supports_subspan_tracing*/ true,
Some(serde_json::json!({"threadId": "thread-live"})),
),
Some(serde_json::json!({"threadId": "thread-live"}))
);
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::SdkTracerProvider;
use tracing_subscriber::prelude::*;
let provider = SdkTracerProvider::builder().build();
let tracer = provider.tracer("codex-core-mcp-tool-call-tests");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
let _guard = tracing::subscriber::set_default(subscriber);
let span = tracing::info_span!("mcp.tools.call");
let _entered = span.enter();
assert_eq!(
augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
/*supports_subspan_tracing*/ false,
Some(serde_json::json!({"threadId": "thread-live"})),
),
Some(serde_json::json!({"threadId": "thread-live"}))
);
}
#[test]
fn mcp_tool_call_thread_id_meta_is_added_to_request_meta() {
assert_eq!(

View File

@@ -0,0 +1,173 @@
# MCP Subspan Tracing [experimental]
This document describes Codex's experimental MCP client extension for ingesting child OpenTelemetry spans emitted by MCP servers.
- Status: experimental and subject to change without notice
- Capability: `codex/subspan-tracing`
- Supported version: `1`
- Supported transport: `stderr-jsonl`
## Purpose
Codex creates an `mcp.tools.call` span around each MCP tool call. Some MCP servers perform meaningful nested work that is useful to inspect as child spans of that call. The `codex/subspan-tracing` capability lets a server opt in to receiving the active W3C trace context for a tool call and emitting sanitized span records back to Codex.
This is intended for stdio MCP servers whose stderr stream is observed by Codex. Telemetry records are out-of-band with respect to the MCP JSON-RPC stream and must not affect tool-call success or transport liveness.
## Capability Negotiation
Codex advertises client support during MCP `initialize` only when the active MCP transport can observe stderr telemetry:
```json
{
"capabilities": {
"experimental": {
"codex/subspan-tracing": {
"version": 1,
"transports": ["stderr-jsonl"]
}
}
}
}
```
An MCP server opts in by returning a compatible experimental capability in its initialize result:
```json
{
"capabilities": {
"experimental": {
"codex/subspan-tracing": {
"version": 1,
"transports": ["stderr-jsonl"],
"tools": {
"js": {
"attributeProfile": "browser-use-v1"
}
}
}
}
}
}
```
If `tools` is present, Codex enables subspan tracing only for those tool names. If `tools` is omitted, Codex treats the capability as applying to every tool exposed by that server.
Unknown versions or transports are ignored.
## Tool Call Metadata
For a negotiated server/tool pair, Codex adds `_meta["codex/subspan-tracing"]` while it is inside the active `mcp.tools.call` span:
```json
{
"_meta": {
"codex/subspan-tracing": {
"enabled": true,
"version": 1,
"traceparent": "00-00000000000000000000000000000001-0000000000000002-01",
"tracestate": "vendor=value"
}
}
}
```
`tracestate` is omitted when no tracestate is active. If tracing is not active, Codex does not add this metadata.
Servers should disable subspan emission for a tool call unless:
- `enabled` is `true`
- `version` is `1`
- `traceparent` is present and parseable
## Stderr Transport
For `stderr-jsonl`, each telemetry record is written to the MCP server process stderr as one line with this exact prefix:
```text
@codex-telemetry
```
The rest of the line is a single JSON object:
```text
@codex-telemetry {"v":1,"type":"span","name":"browser_use.tab.goto",...}
```
Normal stderr output must not use that prefix. Codex preserves ordinary stderr logging behavior for non-telemetry lines.
## Span Record Schema
Span records use schema version `v: 1`:
```json
{
"v": 1,
"type": "span",
"name": "browser_use.tab.goto",
"trace_id": "00000000000000000000000000000001",
"span_id": "0000000000000010",
"parent_span_id": "0000000000000002",
"trace_flags": "01",
"tracestate": "vendor=value",
"start_unix_nanos": 1000000000,
"end_unix_nanos": 2000000000,
"attrs": {
"browser_use.url": "https://example.com",
"browser_use.timeout_ms": 2500
}
}
```
Required fields:
- `v`: must be `1`
- `type`: must be `"span"`
- `name`: allowlisted span name
- `trace_id`: 32 lowercase or uppercase hex characters, nonzero
- `span_id`: 16 lowercase or uppercase hex characters, nonzero
- `parent_span_id`: 16 lowercase or uppercase hex characters, nonzero
- `trace_flags`: 2 hex characters
- `start_unix_nanos`: Unix timestamp in nanoseconds
- `end_unix_nanos`: Unix timestamp in nanoseconds, greater than or equal to `start_unix_nanos`
Optional fields:
- `tracestate`: W3C tracestate header value
- `attrs`: span attributes object
Codex also accepts `traceparent` for backward compatibility, but explicit IDs are the canonical protocol for reconstructed spans because they preserve parent-child relationships across records.
## Span IDs and Hierarchy
The first server-created span for a tool call should use:
- `trace_id` from the request `traceparent`
- `parent_span_id` from the request `traceparent`
- a newly generated `span_id`
Nested server spans should use the same `trace_id`, their own generated `span_id`, and the parent reconstructed span's `span_id` as `parent_span_id`.
## Sanitization
Servers must emit only sanitized, allowlisted attributes. Attribute values must be primitives:
- string
- integer or float
- boolean
Do not emit objects, arrays, cookies, credentials, auth headers, bearer tokens, full request/response payloads, arbitrary DOM text, or user-sensitive selectors. Codex applies its own allowlist and drops unsupported attributes, but servers should sanitize before writing records.
## Failure Behavior
Subspan telemetry is best effort:
- malformed telemetry lines are ignored
- unsupported versions or record types are ignored
- invalid span records are ignored or logged at warning level
- telemetry write failures must not fail the MCP tool call
- telemetry parser failures must not break MCP transport
- no telemetry is emitted or reconstructed when Codex tracing is inactive
## Current Attribute Profile
`browser-use-v1` is the initial attribute profile used by Browser Use instrumentation. Codex currently allows Browser Use, Node REPL, and JS-related span names and attribute keys needed for that profile. New profiles should be added deliberately with their own allowlist changes and tests.

View File

@@ -2,6 +2,7 @@ pub(crate) mod config;
mod events;
pub(crate) mod metrics;
pub(crate) mod provider;
mod stderr_span_telemetry;
pub(crate) mod trace_context;
mod otlp;
@@ -23,6 +24,8 @@ pub use crate::metrics::runtime_metrics::RuntimeMetricsSummary;
pub use crate::metrics::timer::Timer;
pub use crate::metrics::*;
pub use crate::provider::OtelProvider;
pub use crate::stderr_span_telemetry::StderrSpanTelemetryError;
pub use crate::stderr_span_telemetry::emit_mcp_subspan_telemetry;
pub use crate::trace_context::context_from_w3c_trace_context;
pub use crate::trace_context::current_span_trace_id;
pub use crate::trace_context::current_span_w3c_trace_context;

View File

@@ -0,0 +1,756 @@
use std::time::Duration;
use std::time::SystemTime;
use opentelemetry::Context;
use opentelemetry::KeyValue;
use opentelemetry::global;
use opentelemetry::trace::Span as _;
use opentelemetry::trace::SpanContext;
use opentelemetry::trace::SpanId;
use opentelemetry::trace::SpanKind;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::TraceFlags;
use opentelemetry::trace::TraceId;
use opentelemetry::trace::TraceState;
use opentelemetry::trace::Tracer;
use serde_json::Map;
use serde_json::Value;
const MCP_SUBSPAN_TELEMETRY_TRACER_NAME: &str = "codex-mcp-subspan-stderr";
const CURRENT_SCHEMA_VERSION: u64 = 1;
const SPAN_RECORD_TYPE: &str = "span";
const MAX_ATTRIBUTE_STRING_BYTES: usize = 1024;
const ALLOWED_SPAN_NAMES: &[&str] = &[
"node_repl.js",
"browser_use.playwright.dom_snapshot",
"browser_use.tab.goto",
"browser_use.tab.click",
"browser_use.tab.type",
"browser_use.tab.screenshot",
"browser_use.cdp.execute",
"browser_use.tab.wait_for_load_state",
];
const ALLOWED_ATTRIBUTE_PREFIXES: &[&str] = &["browser_use.", "node_repl.", "js."];
const ALLOWED_ATTRIBUTE_KEYS: &[&str] = &["error.type", "error.message"];
#[derive(Debug, thiserror::Error)]
pub enum StderrSpanTelemetryError {
#[error("telemetry payload must be a JSON object")]
NotObject,
#[error("unsupported telemetry schema version")]
UnsupportedVersion,
#[error("unsupported telemetry record type")]
UnsupportedType,
#[error("missing or invalid telemetry field `{0}`")]
InvalidField(&'static str),
#[error("unsupported telemetry span name")]
UnsupportedSpanName,
}
#[derive(Debug, Clone, PartialEq)]
struct SpanTelemetryRecord {
name: String,
span_id: Option<SpanId>,
trace_id: Option<TraceId>,
parent_span_id: Option<SpanId>,
trace_flags: Option<TraceFlags>,
traceparent: Option<String>,
tracestate: Option<String>,
start_time: SystemTime,
end_time: SystemTime,
attributes: Vec<KeyValue>,
}
pub fn emit_mcp_subspan_telemetry(payload: Value) -> Result<(), StderrSpanTelemetryError> {
let record = parse_span_telemetry_record(payload)?;
let tracer = global::tracer(MCP_SUBSPAN_TELEMETRY_TRACER_NAME);
emit_span_telemetry_record_with_tracer(&tracer, &record)
}
fn emit_span_telemetry_record_with_tracer<T>(
tracer: &T,
record: &SpanTelemetryRecord,
) -> Result<(), StderrSpanTelemetryError>
where
T: Tracer,
{
let parent_context = record.parent_context()?;
let mut builder = tracer
.span_builder(record.name.clone())
.with_kind(SpanKind::Internal)
.with_start_time(record.start_time)
.with_attributes(record.attributes.clone());
if let Some(span_id) = record.span_id {
builder = builder.with_span_id(span_id);
}
let mut span = tracer.build_with_context(builder, &parent_context);
span.end_with_timestamp(record.end_time);
Ok(())
}
impl SpanTelemetryRecord {
fn parent_context(&self) -> Result<Context, StderrSpanTelemetryError> {
if let (Some(trace_id), Some(parent_span_id), Some(trace_flags)) =
(self.trace_id, self.parent_span_id, self.trace_flags)
{
let trace_state = trace_state_from_header(self.tracestate.as_deref())?;
let span_context = SpanContext::new(
trace_id,
parent_span_id,
trace_flags,
/*is_remote*/ true,
trace_state,
);
return Ok(Context::new().with_remote_span_context(span_context));
}
let Some(traceparent) = self.traceparent.as_deref() else {
return Err(StderrSpanTelemetryError::InvalidField("traceparent"));
};
crate::trace_context::context_from_trace_headers(
Some(traceparent),
self.tracestate.as_deref(),
)
.ok_or(StderrSpanTelemetryError::InvalidField("traceparent"))
}
}
fn parse_span_telemetry_record(
payload: Value,
) -> Result<SpanTelemetryRecord, StderrSpanTelemetryError> {
let object = payload
.as_object()
.ok_or(StderrSpanTelemetryError::NotObject)?;
match object.get("v").and_then(Value::as_u64) {
Some(CURRENT_SCHEMA_VERSION) => {}
Some(_) => return Err(StderrSpanTelemetryError::UnsupportedVersion),
None => return Err(StderrSpanTelemetryError::InvalidField("v")),
}
match object.get("type").and_then(Value::as_str) {
Some(SPAN_RECORD_TYPE) => {}
Some(_) => return Err(StderrSpanTelemetryError::UnsupportedType),
None => return Err(StderrSpanTelemetryError::InvalidField("type")),
}
let name = required_string(object, "name")?.to_string();
if !ALLOWED_SPAN_NAMES.contains(&name.as_str()) {
return Err(StderrSpanTelemetryError::UnsupportedSpanName);
}
let traceparent = optional_string(object, "traceparent").map(str::to_string);
let span_id = optional_span_id_alias(object, &["span_id", "spanId"])?;
let trace_id = optional_trace_id_alias(object, &["trace_id", "traceId"])?;
let parent_span_id = optional_span_id_alias(object, &["parent_span_id", "parentSpanId"])?;
let trace_flags = optional_trace_flags_alias(object, &["trace_flags", "traceFlags"])?;
let tracestate = optional_string(object, "tracestate").map(str::to_string);
if span_id.is_some() || trace_id.is_some() || parent_span_id.is_some() || trace_flags.is_some()
{
if span_id.is_none() {
return Err(StderrSpanTelemetryError::InvalidField("span_id"));
}
if trace_id.is_none() {
return Err(StderrSpanTelemetryError::InvalidField("trace_id"));
}
if parent_span_id.is_none() {
return Err(StderrSpanTelemetryError::InvalidField("parent_span_id"));
}
if trace_flags.is_none() {
return Err(StderrSpanTelemetryError::InvalidField("trace_flags"));
}
} else if traceparent.is_none() {
return Err(StderrSpanTelemetryError::InvalidField("traceparent"));
}
let start_time = timestamp_from_unix_nanos(required_u64_alias(
object,
&[
"start_unix_nanos",
"startTimeUnixNanos",
"startTimeUnixNano",
"start_time_unix_nanos",
],
"start_unix_nanos",
)?)?;
let end_time = timestamp_from_unix_nanos(required_u64_alias(
object,
&[
"end_unix_nanos",
"endTimeUnixNanos",
"endTimeUnixNano",
"end_time_unix_nanos",
],
"end_unix_nanos",
)?)?;
if end_time < start_time {
return Err(StderrSpanTelemetryError::InvalidField("end_unix_nanos"));
}
let attributes = object
.get("attrs")
.or_else(|| object.get("attributes"))
.and_then(Value::as_object)
.map(sanitized_attributes)
.unwrap_or_default();
Ok(SpanTelemetryRecord {
name,
span_id,
trace_id,
parent_span_id,
trace_flags,
traceparent,
tracestate,
start_time,
end_time,
attributes,
})
}
fn required_string<'a>(
object: &'a Map<String, Value>,
key: &'static str,
) -> Result<&'a str, StderrSpanTelemetryError> {
object
.get(key)
.and_then(Value::as_str)
.filter(|value| !value.is_empty())
.ok_or(StderrSpanTelemetryError::InvalidField(key))
}
fn optional_string<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
object
.get(key)
.and_then(Value::as_str)
.filter(|value| !value.is_empty())
}
fn optional_string_alias<'a>(
object: &'a Map<String, Value>,
keys: &[&'static str],
) -> Option<(&'static str, &'a str)> {
keys.iter()
.find_map(|key| optional_string(object, key).map(|value| (*key, value)))
}
fn optional_trace_id_alias(
object: &Map<String, Value>,
keys: &[&'static str],
) -> Result<Option<TraceId>, StderrSpanTelemetryError> {
let Some((key, value)) = optional_string_alias(object, keys) else {
return Ok(None);
};
parse_trace_id(value)
.map(Some)
.map_err(|_| StderrSpanTelemetryError::InvalidField(key))
}
fn optional_span_id_alias(
object: &Map<String, Value>,
keys: &[&'static str],
) -> Result<Option<SpanId>, StderrSpanTelemetryError> {
let Some((key, value)) = optional_string_alias(object, keys) else {
return Ok(None);
};
parse_span_id(value)
.map(Some)
.map_err(|_| StderrSpanTelemetryError::InvalidField(key))
}
fn optional_trace_flags_alias(
object: &Map<String, Value>,
keys: &[&'static str],
) -> Result<Option<TraceFlags>, StderrSpanTelemetryError> {
let Some((key, value)) = optional_string_alias(object, keys) else {
return Ok(None);
};
parse_trace_flags(value)
.map(Some)
.map_err(|_| StderrSpanTelemetryError::InvalidField(key))
}
fn parse_trace_id(value: &str) -> Result<TraceId, ()> {
if !is_exact_hex(value, /*len*/ 32) {
return Err(());
}
let trace_id = TraceId::from_hex(value).map_err(|_| ())?;
if trace_id == TraceId::INVALID {
return Err(());
}
Ok(trace_id)
}
fn parse_span_id(value: &str) -> Result<SpanId, ()> {
if !is_exact_hex(value, /*len*/ 16) {
return Err(());
}
let span_id = SpanId::from_hex(value).map_err(|_| ())?;
if span_id == SpanId::INVALID {
return Err(());
}
Ok(span_id)
}
fn parse_trace_flags(value: &str) -> Result<TraceFlags, ()> {
if !is_exact_hex(value, /*len*/ 2) {
return Err(());
}
u8::from_str_radix(value, 16)
.map(TraceFlags::new)
.map_err(|_| ())
}
fn is_exact_hex(value: &str, len: usize) -> bool {
value.len() == len && value.bytes().all(|byte| byte.is_ascii_hexdigit())
}
fn trace_state_from_header(value: Option<&str>) -> Result<TraceState, StderrSpanTelemetryError> {
let Some(value) = value else {
return Ok(TraceState::default());
};
value
.parse()
.map_err(|_| StderrSpanTelemetryError::InvalidField("tracestate"))
}
fn required_u64_alias(
object: &Map<String, Value>,
keys: &[&'static str],
error_key: &'static str,
) -> Result<u64, StderrSpanTelemetryError> {
keys.iter()
.find_map(|key| object.get(*key).and_then(u64_value))
.ok_or(StderrSpanTelemetryError::InvalidField(error_key))
}
fn u64_value(value: &Value) -> Option<u64> {
value
.as_u64()
.or_else(|| value.as_str().and_then(|value| value.parse().ok()))
}
fn timestamp_from_unix_nanos(nanos: u64) -> Result<SystemTime, StderrSpanTelemetryError> {
let secs = nanos / 1_000_000_000;
let sub_nanos = (nanos % 1_000_000_000) as u32;
SystemTime::UNIX_EPOCH
.checked_add(Duration::new(secs, sub_nanos))
.ok_or(StderrSpanTelemetryError::InvalidField("timestamp"))
}
fn sanitized_attributes(attrs: &Map<String, Value>) -> Vec<KeyValue> {
attrs
.iter()
.filter(|(key, _)| is_allowed_attribute_key(key))
.filter_map(|(key, value)| safe_attribute_value(key, value))
.collect()
}
fn is_allowed_attribute_key(key: &str) -> bool {
ALLOWED_ATTRIBUTE_KEYS.contains(&key)
|| ALLOWED_ATTRIBUTE_PREFIXES
.iter()
.any(|prefix| key.starts_with(prefix))
}
fn safe_attribute_value(key: &str, value: &Value) -> Option<KeyValue> {
match value {
Value::Bool(value) => Some(KeyValue::new(key.to_string(), *value)),
Value::Number(value) => {
if let Some(value) = value.as_i64() {
Some(KeyValue::new(key.to_string(), value))
} else if let Some(value) = value.as_u64().and_then(|value| i64::try_from(value).ok()) {
Some(KeyValue::new(key.to_string(), value))
} else {
value
.as_f64()
.map(|value| KeyValue::new(key.to_string(), value))
}
}
Value::String(value) => Some(KeyValue::new(
key.to_string(),
truncate_attribute_string(value).to_string(),
)),
Value::Null | Value::Array(_) | Value::Object(_) => None,
}
}
fn truncate_attribute_string(value: &str) -> &str {
if value.len() <= MAX_ATTRIBUTE_STRING_BYTES {
return value;
}
let mut end = MAX_ATTRIBUTE_STRING_BYTES;
while !value.is_char_boundary(end) {
end = end.saturating_sub(1);
}
&value[..end]
}
#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::Value as OtelValue;
use opentelemetry::trace::SpanId;
use opentelemetry::trace::TraceId;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::InMemorySpanExporter;
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::trace::SpanData;
use pretty_assertions::assert_eq;
use std::collections::BTreeMap;
#[test]
fn valid_span_telemetry_reconstructs_otel_span_with_sanitized_attrs() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
let trace_id = "00000000000000000000000000000001";
let parent_span_id = "0000000000000002";
let record = parse_span_telemetry_record(serde_json::json!({
"v": 1,
"type": "span",
"name": "browser_use.tab.goto",
"traceparent": format!("00-{trace_id}-{parent_span_id}-01"),
"start_unix_nanos": 1_000_000_123u64,
"end_unix_nanos": 2_000_000_456u64,
"attrs": {
"browser_use.url": "https://example.com",
"browser_use.timeout_ms": 2500,
"unknown.secret": "drop me",
"browser_use.object": {"drop": true}
}
}))
.expect("valid record");
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
provider.force_flush().expect("flush spans");
let spans = exporter.get_finished_spans().expect("finished spans");
assert_eq!(spans.len(), 1);
let span = &spans[0];
assert_eq!(span.name.as_ref(), "browser_use.tab.goto");
assert_eq!(
span.span_context.trace_id(),
TraceId::from_hex(trace_id).unwrap()
);
assert_eq!(
span.parent_span_id,
SpanId::from_hex(parent_span_id).unwrap()
);
assert_eq!(
span.start_time,
SystemTime::UNIX_EPOCH + Duration::new(1, 123)
);
assert_eq!(
span.end_time,
SystemTime::UNIX_EPOCH + Duration::new(2, 456)
);
let attrs = span
.attributes
.iter()
.map(|kv| (kv.key.as_str().to_string(), kv.value.clone()))
.collect::<BTreeMap<_, _>>();
assert_eq!(
attrs.get("browser_use.url"),
Some(&OtelValue::String("https://example.com".into()))
);
assert_eq!(
attrs.get("browser_use.timeout_ms"),
Some(&OtelValue::I64(2500))
);
assert!(!attrs.contains_key("unknown.secret"));
assert!(!attrs.contains_key("browser_use.object"));
}
#[test]
fn explicit_ids_reconstruct_span_and_parent_ids() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
let trace_id = "00000000000000000000000000000001";
let parent_span_id = "0000000000000002";
let span_id = "0000000000000010";
let record = parse_span_telemetry_record(serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": trace_id,
"span_id": span_id,
"parent_span_id": parent_span_id,
"trace_flags": "01",
"tracestate": "vendor=value",
"start_unix_nanos": 1_000_000_123u64,
"end_unix_nanos": 2_000_000_456u64,
}))
.expect("valid record");
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
provider.force_flush().expect("flush spans");
let spans = exporter.get_finished_spans().expect("finished spans");
assert_eq!(spans.len(), 1);
let span = &spans[0];
assert_eq!(
span.span_context.trace_id(),
TraceId::from_hex(trace_id).unwrap()
);
assert_eq!(
span.span_context.span_id(),
SpanId::from_hex(span_id).unwrap()
);
assert_eq!(
span.parent_span_id,
SpanId::from_hex(parent_span_id).unwrap()
);
assert!(span.span_context.trace_flags().is_sampled());
}
#[test]
fn camel_case_explicit_ids_reconstruct_span_and_parent_ids() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
let trace_id = "00000000000000000000000000000001";
let parent_span_id = "0000000000000002";
let span_id = "0000000000000010";
let record = parse_span_telemetry_record(serde_json::json!({
"v": 1,
"type": "span",
"name": "browser_use.tab.click",
"traceId": trace_id,
"spanId": span_id,
"parentSpanId": parent_span_id,
"traceFlags": "01",
"start_unix_nanos": 1_000_000_123u64,
"end_unix_nanos": 2_000_000_456u64,
}))
.expect("valid record");
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
provider.force_flush().expect("flush spans");
let spans = exporter.get_finished_spans().expect("finished spans");
assert_eq!(spans.len(), 1);
let span = &spans[0];
assert_eq!(
span.span_context.trace_id(),
TraceId::from_hex(trace_id).unwrap()
);
assert_eq!(
span.span_context.span_id(),
SpanId::from_hex(span_id).unwrap()
);
assert_eq!(
span.parent_span_id,
SpanId::from_hex(parent_span_id).unwrap()
);
assert!(span.span_context.trace_flags().is_sampled());
}
#[test]
fn child_span_can_parent_to_previous_reconstructed_span_id() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
let trace_id = "00000000000000000000000000000001";
let mcp_span_id = "0000000000000002";
let node_span_id = "0000000000000010";
let child_span_id = "0000000000000011";
for payload in [
serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": trace_id,
"span_id": node_span_id,
"parent_span_id": mcp_span_id,
"trace_flags": "01",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 3_000_000_000u64,
}),
serde_json::json!({
"v": 1,
"type": "span",
"name": "browser_use.tab.goto",
"trace_id": trace_id,
"span_id": child_span_id,
"parent_span_id": node_span_id,
"trace_flags": "01",
"start_unix_nanos": 1_500_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}),
] {
let record = parse_span_telemetry_record(payload).expect("valid record");
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
}
provider.force_flush().expect("flush spans");
let spans = exporter.get_finished_spans().expect("finished spans");
let node_span = find_span(&spans, "node_repl.js");
let child_span = find_span(&spans, "browser_use.tab.goto");
assert_eq!(
node_span.span_context.span_id(),
SpanId::from_hex(node_span_id).unwrap()
);
assert_eq!(
child_span.span_context.span_id(),
SpanId::from_hex(child_span_id).unwrap()
);
assert_eq!(
child_span.parent_span_id,
SpanId::from_hex(node_span_id).unwrap()
);
}
#[test]
fn invalid_span_telemetry_is_rejected_without_emitting_span() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
let error = parse_span_telemetry_record(serde_json::json!({
"v": 99,
"type": "span",
}))
.expect_err("unsupported version");
assert!(matches!(
error,
StderrSpanTelemetryError::UnsupportedVersion
));
assert!(
emit_span_telemetry_record_with_tracer(
&tracer,
&SpanTelemetryRecord {
name: "browser_use.tab.goto".to_string(),
span_id: None,
trace_id: None,
parent_span_id: None,
trace_flags: None,
traceparent: Some("not-a-traceparent".to_string()),
tracestate: None,
start_time: SystemTime::UNIX_EPOCH,
end_time: SystemTime::UNIX_EPOCH,
attributes: Vec::new(),
},
)
.is_err()
);
provider.force_flush().expect("flush spans");
assert!(
exporter
.get_finished_spans()
.expect("finished spans")
.is_empty()
);
}
#[test]
fn invalid_explicit_ids_are_rejected_without_emitting_span() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("codex-otel-tests");
for payload in [
serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": "00000000000000000000000000000000",
"span_id": "0000000000000010",
"parent_span_id": "0000000000000002",
"trace_flags": "01",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}),
serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": "00000000000000000000000000000001",
"span_id": "0000000000000000",
"parent_span_id": "0000000000000002",
"trace_flags": "01",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}),
serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": "00000000000000000000000000000001",
"span_id": "0000000000000010",
"parent_span_id": "0002",
"trace_flags": "01",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}),
serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": "00000000000000000000000000000001",
"span_id": "0000000000000010",
"parent_span_id": "0000000000000002",
"trace_flags": "001",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}),
] {
let error = parse_span_telemetry_record(payload).expect_err("invalid id");
assert!(matches!(error, StderrSpanTelemetryError::InvalidField(_)));
}
assert!(
emit_mcp_subspan_telemetry(serde_json::json!({
"v": 1,
"type": "span",
"name": "node_repl.js",
"trace_id": "00000000000000000000000000000001",
"span_id": "0000000000000010",
"parent_span_id": "0000000000000002",
"trace_flags": "zz",
"start_unix_nanos": 1_000_000_000u64,
"end_unix_nanos": 2_000_000_000u64,
}))
.is_err()
);
provider.force_flush().expect("flush spans");
assert!(
exporter
.get_finished_spans()
.expect("finished spans")
.is_empty()
);
drop(tracer);
}
fn find_span<'a>(spans: &'a [SpanData], name: &str) -> &'a SpanData {
spans
.iter()
.find(|span| span.name == name)
.unwrap_or_else(|| panic!("missing span {name}"))
}
}

View File

@@ -41,9 +41,11 @@ use serde_json::to_vec;
use tokio::runtime::Handle;
use tokio::sync::broadcast;
use tracing::debug;
use tracing::info;
use tracing::warn;
use crate::stdio_server_launcher::StdioServerTelemetrySink;
use crate::stdio_server_launcher::handle_stderr_line;
static PROCESS_COUNTER: AtomicUsize = AtomicUsize::new(1);
// Remote public implementation.
@@ -78,6 +80,9 @@ pub(super) struct ExecutorProcessTransport {
/// Buffered stderr bytes for diagnostic logging.
stderr: Vec<u8>,
/// Optional sink for structured stderr telemetry lines.
telemetry_sink: Option<StdioServerTelemetrySink>,
/// Whether the executor has reported process closure or a terminal
/// subscription failure. Once closed, any remaining partial stdout line is
/// flushed once and then rmcp receives EOF.
@@ -95,7 +100,11 @@ pub(super) struct ExecutorProcessTransport {
}
impl ExecutorProcessTransport {
pub(super) fn new(process: Arc<dyn ExecProcess>, program_name: String) -> Self {
pub(super) fn new(
process: Arc<dyn ExecProcess>,
program_name: String,
telemetry_sink: Option<StdioServerTelemetrySink>,
) -> Self {
// Subscribe before returning the transport to rmcp. Some test servers
// can emit output or exit quickly after `process/start`, and the
// process event log will replay anything that landed before this
@@ -107,6 +116,7 @@ impl ExecutorProcessTransport {
program_name,
stdout: Vec::new(),
stderr: Vec::new(),
telemetry_sink,
closed: false,
terminated: false,
last_seq: 0,
@@ -321,10 +331,10 @@ impl ExecutorProcessTransport {
if line.last() == Some(&b'\r') {
line.pop();
}
info!(
"MCP server stderr ({}): {}",
self.program_name,
String::from_utf8_lossy(&line)
handle_stderr_line(
&self.program_name,
&String::from_utf8_lossy(&line),
self.telemetry_sink.as_ref(),
);
}
}
@@ -334,10 +344,10 @@ impl ExecutorProcessTransport {
return;
}
let line = take(&mut self.stderr);
info!(
"MCP server stderr ({}): {}",
self.program_name,
String::from_utf8_lossy(&line)
handle_stderr_line(
&self.program_name,
&String::from_utf8_lossy(&line),
self.telemetry_sink.as_ref(),
);
}

View File

@@ -35,3 +35,5 @@ pub use rmcp_client::ToolWithConnectorId;
pub use stdio_server_launcher::ExecutorStdioServerLauncher;
pub use stdio_server_launcher::LocalStdioServerLauncher;
pub use stdio_server_launcher::StdioServerLauncher;
pub use stdio_server_launcher::StdioServerTelemetry;
pub use stdio_server_launcher::StdioServerTelemetrySink;

View File

@@ -68,6 +68,7 @@ use crate::oauth::StoredOAuthTokens;
use crate::stdio_server_launcher::StdioServerCommand;
use crate::stdio_server_launcher::StdioServerLauncher;
use crate::stdio_server_launcher::StdioServerProcessHandle;
use crate::stdio_server_launcher::StdioServerTelemetrySink;
use crate::stdio_server_launcher::StdioServerTransport;
use crate::utils::apply_default_headers;
use crate::utils::build_default_headers;
@@ -282,9 +283,17 @@ impl RmcpClient {
env_vars: &[McpServerEnvVar],
cwd: Option<PathBuf>,
launcher: Arc<dyn StdioServerLauncher>,
telemetry_sink: Option<StdioServerTelemetrySink>,
) -> io::Result<Self> {
let transport_recipe = TransportRecipe::Stdio {
command: StdioServerCommand::new(program, args, env, env_vars.to_vec(), cwd),
command: StdioServerCommand::new(
program,
args,
env,
env_vars.to_vec(),
cwd,
telemetry_sink,
),
launcher,
};
let transport = Self::create_pending_transport(&transport_recipe)

View File

@@ -82,8 +82,16 @@ pub struct StdioServerCommand {
env: Option<HashMap<OsString, OsString>>,
env_vars: Vec<McpServerEnvVar>,
cwd: Option<PathBuf>,
telemetry_sink: Option<StdioServerTelemetrySink>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct StdioServerTelemetry {
pub payload: serde_json::Value,
}
pub type StdioServerTelemetrySink = Arc<dyn Fn(StdioServerTelemetry) + Send + Sync>;
/// Client-side rmcp transport for a launched MCP stdio server.
///
/// The concrete process placement stays private to this module. `RmcpClient`
@@ -149,6 +157,7 @@ impl StdioServerCommand {
env: Option<HashMap<OsString, OsString>>,
env_vars: Vec<McpServerEnvVar>,
cwd: Option<PathBuf>,
telemetry_sink: Option<StdioServerTelemetrySink>,
) -> Self {
Self {
program,
@@ -156,6 +165,7 @@ impl StdioServerCommand {
env,
env_vars,
cwd,
telemetry_sink,
}
}
}
@@ -243,6 +253,7 @@ impl LocalStdioServerLauncher {
env,
env_vars,
cwd,
telemetry_sink,
} = command;
let program_name = program.to_string_lossy().into_owned();
let envs = create_env_for_mcp_server(env, &env_vars).map_err(io::Error::other)?;
@@ -276,7 +287,7 @@ impl LocalStdioServerLauncher {
loop {
match reader.next_line().await {
Ok(Some(line)) => {
info!("MCP server stderr ({program_name}): {line}");
handle_stderr_line(&program_name, &line, telemetry_sink.as_ref());
}
Ok(None) => break,
Err(error) => {
@@ -479,6 +490,7 @@ impl ExecutorStdioServerLauncher {
env,
env_vars,
cwd,
telemetry_sink,
} = command;
let program_name = program.to_string_lossy().into_owned();
let envs = create_env_overlay_for_remote_mcp_server(env, &env_vars);
@@ -513,6 +525,7 @@ impl ExecutorStdioServerLauncher {
inner: StdioServerTransportInner::Executor(ExecutorProcessTransport::new(
started.process,
program_name,
telemetry_sink,
)),
process,
})
@@ -578,6 +591,38 @@ impl ExecutorStdioServerLauncher {
}
}
const CODEX_TELEMETRY_STDERR_PREFIX: &str = "@codex-telemetry ";
pub(super) fn handle_stderr_line(
program_name: &str,
line: &str,
telemetry_sink: Option<&StdioServerTelemetrySink>,
) {
let Some(sink) = telemetry_sink else {
info!("MCP server stderr ({program_name}): {line}");
return;
};
let Some(telemetry) = parse_stderr_telemetry_line(line) else {
info!("MCP server stderr ({program_name}): {line}");
return;
};
match telemetry {
Ok(telemetry) => sink(telemetry),
Err(error) => {
warn!("Failed to parse MCP server telemetry ({program_name}): {error}");
}
}
}
fn parse_stderr_telemetry_line(
line: &str,
) -> Option<Result<StdioServerTelemetry, serde_json::Error>> {
let payload = line.strip_prefix(CODEX_TELEMETRY_STDERR_PREFIX)?;
Some(serde_json::from_str(payload).map(|payload| StdioServerTelemetry { payload }))
}
#[cfg(test)]
mod tests {
use super::*;
@@ -651,4 +696,27 @@ mod tests {
);
assert!(!env.contains_key("UNREQUESTED_SECRET"));
}
#[test]
fn stderr_telemetry_parser_separates_prefixed_lines_from_normal_stderr() {
assert!(parse_stderr_telemetry_line("ordinary stderr").is_none());
assert_eq!(
parse_stderr_telemetry_line("@codex-telemetry {\"v\":1,\"type\":\"span\"}")
.expect("telemetry line")
.expect("valid telemetry"),
StdioServerTelemetry {
payload: serde_json::json!({
"v": 1,
"type": "span",
}),
}
);
assert!(
parse_stderr_telemetry_line("@codex-telemetry not-json")
.expect("telemetry line")
.is_err()
);
}
}

View File

@@ -116,6 +116,7 @@ async fn drop_kills_wrapper_process_group() -> Result<()> {
&[],
/*cwd*/ None,
Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)),
/*telemetry_sink*/ None,
)
.await?;
@@ -147,6 +148,7 @@ async fn shutdown_kills_initialized_stdio_server_with_in_flight_operation() -> R
&[],
/*cwd*/ None,
Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)),
/*telemetry_sink*/ None,
)
.await?,
);

View File

@@ -64,6 +64,7 @@ async fn rmcp_client_can_list_and_read_resources() -> anyhow::Result<()> {
&[],
/*cwd*/ None,
Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)),
/*telemetry_sink*/ None,
)
.await?;