mirror of
https://github.com/openai/codex.git
synced 2026-05-02 02:17:22 +00:00
Compare commits
2 Commits
xli-codex/
...
dev/charle
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5fa172006e | ||
|
|
6e420986ce |
@@ -564,6 +564,19 @@ impl McpConnectionManager {
|
||||
.server_supports_sandbox_state_meta_capability)
|
||||
}
|
||||
|
||||
pub async fn server_supports_subspan_tracing_for_tool(
|
||||
&self,
|
||||
server: &str,
|
||||
tool: &str,
|
||||
) -> Result<bool> {
|
||||
Ok(self
|
||||
.client_by_name(server)
|
||||
.await?
|
||||
.subspan_tracing_capability
|
||||
.as_ref()
|
||||
.is_some_and(|capability| capability.supports_tool(tool)))
|
||||
}
|
||||
|
||||
/// List resources from the specified server.
|
||||
pub async fn list_resources(
|
||||
&self,
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
pub use connection_manager::McpConnectionManager;
|
||||
pub use rmcp_client::MCP_SANDBOX_STATE_META_CAPABILITY;
|
||||
pub use rmcp_client::MCP_SUBSPAN_TRACING_CAPABILITY;
|
||||
pub use rmcp_client::MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL;
|
||||
pub use rmcp_client::MCP_SUBSPAN_TRACING_VERSION;
|
||||
pub use runtime::McpRuntimeEnvironment;
|
||||
pub use runtime::SandboxState;
|
||||
pub use tools::ToolInfo;
|
||||
|
||||
@@ -7,7 +7,9 @@
|
||||
//! [`crate::connection_manager`].
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::env;
|
||||
use std::ffi::OsString;
|
||||
use std::sync::Arc;
|
||||
@@ -50,6 +52,8 @@ use codex_rmcp_client::ExecutorStdioServerLauncher;
|
||||
use codex_rmcp_client::LocalStdioServerLauncher;
|
||||
use codex_rmcp_client::RmcpClient;
|
||||
use codex_rmcp_client::StdioServerLauncher;
|
||||
use codex_rmcp_client::StdioServerTelemetry;
|
||||
use codex_rmcp_client::StdioServerTelemetrySink;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::future::FutureExt;
|
||||
use futures::future::Shared;
|
||||
@@ -58,6 +62,7 @@ use rmcp::model::ElicitationCapability;
|
||||
use rmcp::model::FormElicitationCapability;
|
||||
use rmcp::model::Implementation;
|
||||
use rmcp::model::InitializeRequestParams;
|
||||
use rmcp::model::JsonObject;
|
||||
use rmcp::model::ProtocolVersion;
|
||||
use rmcp::model::Tool as RmcpTool;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -66,6 +71,12 @@ use tracing::warn;
|
||||
/// MCP server capability indicating that Codex should include [`SandboxState`]
|
||||
/// in tool-call request `_meta` under this key.
|
||||
pub const MCP_SANDBOX_STATE_META_CAPABILITY: &str = "codex/sandbox-state-meta";
|
||||
/// MCP server capability indicating that Codex should include W3C trace context
|
||||
/// in tool-call request `_meta` so the server can emit reconstructed child spans.
|
||||
/// See `codex-rs/docs/mcp_subspan_tracing.md` for the protocol contract.
|
||||
pub const MCP_SUBSPAN_TRACING_CAPABILITY: &str = "codex/subspan-tracing";
|
||||
pub const MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL: &str = "stderr-jsonl";
|
||||
pub const MCP_SUBSPAN_TRACING_VERSION: u64 = 1;
|
||||
|
||||
pub(crate) const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms";
|
||||
pub(crate) const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str =
|
||||
@@ -89,9 +100,52 @@ pub(crate) struct ManagedClient {
|
||||
pub(crate) tool_timeout: Option<Duration>,
|
||||
pub(crate) server_instructions: Option<String>,
|
||||
pub(crate) server_supports_sandbox_state_meta_capability: bool,
|
||||
pub(crate) subspan_tracing_capability: Option<McpSubspanTracingCapability>,
|
||||
pub(crate) codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub(crate) struct McpSubspanTracingCapability {
|
||||
tools: Option<HashSet<String>>,
|
||||
}
|
||||
|
||||
impl McpSubspanTracingCapability {
|
||||
fn from_experimental_capability(capability: &JsonObject) -> Option<Self> {
|
||||
if capability
|
||||
.get("version")
|
||||
.and_then(serde_json::Value::as_u64)
|
||||
!= Some(MCP_SUBSPAN_TRACING_VERSION)
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
let supports_stderr_jsonl = capability
|
||||
.get("transports")
|
||||
.and_then(serde_json::Value::as_array)
|
||||
.is_some_and(|transports| {
|
||||
transports.iter().any(|transport| {
|
||||
transport.as_str() == Some(MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL)
|
||||
})
|
||||
});
|
||||
if !supports_stderr_jsonl {
|
||||
return None;
|
||||
}
|
||||
|
||||
let tools = capability
|
||||
.get("tools")
|
||||
.and_then(serde_json::Value::as_object)
|
||||
.map(|tools| tools.keys().cloned().collect());
|
||||
|
||||
Some(Self { tools })
|
||||
}
|
||||
|
||||
pub(crate) fn supports_tool(&self, tool_name: &str) -> bool {
|
||||
self.tools
|
||||
.as_ref()
|
||||
.is_none_or(|tools| tools.contains(tool_name))
|
||||
}
|
||||
}
|
||||
|
||||
impl ManagedClient {
|
||||
fn listed_tools(&self) -> Vec<ToolInfo> {
|
||||
let total_start = Instant::now();
|
||||
@@ -152,7 +206,11 @@ impl AsyncManagedClient {
|
||||
.map(|tools| filter_tools(tools, &tool_filter));
|
||||
let startup_tool_filter = tool_filter;
|
||||
let startup_complete = Arc::new(AtomicBool::new(false));
|
||||
let subspan_tracing_enabled = Arc::new(AtomicBool::new(false));
|
||||
let subspan_tracing_stderr_jsonl_available =
|
||||
subspan_tracing_stderr_jsonl_available(&config, &runtime_environment);
|
||||
let startup_complete_for_fut = Arc::clone(&startup_complete);
|
||||
let subspan_tracing_enabled_for_fut = Arc::clone(&subspan_tracing_enabled);
|
||||
let cancel_token_for_fut = cancel_token.clone();
|
||||
let fut = async move {
|
||||
let outcome = match async {
|
||||
@@ -167,6 +225,7 @@ impl AsyncManagedClient {
|
||||
store_mode,
|
||||
runtime_environment,
|
||||
runtime_auth_provider,
|
||||
Arc::clone(&subspan_tracing_enabled_for_fut),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
@@ -182,6 +241,8 @@ impl AsyncManagedClient {
|
||||
tx_event,
|
||||
elicitation_requests,
|
||||
codex_apps_tools_cache_context,
|
||||
subspan_tracing_enabled: subspan_tracing_enabled_for_fut,
|
||||
subspan_tracing_stderr_jsonl_available,
|
||||
},
|
||||
)
|
||||
.await
|
||||
@@ -462,12 +523,29 @@ async fn start_server_task(
|
||||
tx_event,
|
||||
elicitation_requests,
|
||||
codex_apps_tools_cache_context,
|
||||
subspan_tracing_enabled,
|
||||
subspan_tracing_stderr_jsonl_available,
|
||||
} = params;
|
||||
let elicitation = elicitation_capability_for_server(&server_name);
|
||||
let params = InitializeRequestParams {
|
||||
let client_experimental_capabilities = subspan_tracing_stderr_jsonl_available.then(|| {
|
||||
let mut subspan_tracing_capability = JsonObject::new();
|
||||
subspan_tracing_capability.insert(
|
||||
"version".to_string(),
|
||||
serde_json::json!(MCP_SUBSPAN_TRACING_VERSION),
|
||||
);
|
||||
subspan_tracing_capability.insert(
|
||||
"transports".to_string(),
|
||||
serde_json::json!([MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL]),
|
||||
);
|
||||
BTreeMap::from([(
|
||||
MCP_SUBSPAN_TRACING_CAPABILITY.to_string(),
|
||||
subspan_tracing_capability,
|
||||
)])
|
||||
});
|
||||
let initialize_params = InitializeRequestParams {
|
||||
meta: None,
|
||||
capabilities: ClientCapabilities {
|
||||
experimental: None,
|
||||
experimental: client_experimental_capabilities,
|
||||
extensions: None,
|
||||
roots: None,
|
||||
sampling: None,
|
||||
@@ -488,7 +566,7 @@ async fn start_server_task(
|
||||
let send_elicitation = elicitation_requests.make_sender(server_name.clone(), tx_event);
|
||||
|
||||
let initialize_result = client
|
||||
.initialize(params, startup_timeout, send_elicitation)
|
||||
.initialize(initialize_params, startup_timeout, send_elicitation)
|
||||
.await
|
||||
.map_err(StartupOutcomeError::from)?;
|
||||
|
||||
@@ -498,6 +576,13 @@ async fn start_server_task(
|
||||
.as_ref()
|
||||
.and_then(|exp| exp.get(MCP_SANDBOX_STATE_META_CAPABILITY))
|
||||
.is_some();
|
||||
let subspan_tracing_capability = initialize_result
|
||||
.capabilities
|
||||
.experimental
|
||||
.as_ref()
|
||||
.and_then(|exp| exp.get(MCP_SUBSPAN_TRACING_CAPABILITY))
|
||||
.and_then(McpSubspanTracingCapability::from_experimental_capability);
|
||||
subspan_tracing_enabled.store(subspan_tracing_capability.is_some(), Ordering::Release);
|
||||
let list_start = Instant::now();
|
||||
let fetch_start = Instant::now();
|
||||
let tools = list_tools_for_client_uncached(
|
||||
@@ -534,6 +619,7 @@ async fn start_server_task(
|
||||
tool_filter,
|
||||
server_instructions: initialize_result.instructions,
|
||||
server_supports_sandbox_state_meta_capability,
|
||||
subspan_tracing_capability,
|
||||
codex_apps_tools_cache_context,
|
||||
};
|
||||
|
||||
@@ -547,6 +633,23 @@ struct StartServerTaskParams {
|
||||
tx_event: Sender<Event>,
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
|
||||
subspan_tracing_enabled: Arc<AtomicBool>,
|
||||
subspan_tracing_stderr_jsonl_available: bool,
|
||||
}
|
||||
|
||||
fn subspan_tracing_stderr_jsonl_available(
|
||||
config: &McpServerConfig,
|
||||
runtime_environment: &McpRuntimeEnvironment,
|
||||
) -> bool {
|
||||
if !matches!(config.transport, McpServerTransportConfig::Stdio { .. }) {
|
||||
return false;
|
||||
}
|
||||
|
||||
match config.experimental_environment.as_deref() {
|
||||
None | Some("local") => true,
|
||||
Some("remote") => runtime_environment.environment().is_remote(),
|
||||
Some(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn make_rmcp_client(
|
||||
@@ -555,6 +658,7 @@ async fn make_rmcp_client(
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
runtime_environment: McpRuntimeEnvironment,
|
||||
runtime_auth_provider: Option<SharedAuthProvider>,
|
||||
subspan_tracing_enabled: Arc<AtomicBool>,
|
||||
) -> Result<RmcpClient, StartupOutcomeError> {
|
||||
let McpServerConfig {
|
||||
transport,
|
||||
@@ -607,9 +711,20 @@ async fn make_rmcp_client(
|
||||
// `RmcpClient` always sees a launched MCP stdio server. The
|
||||
// launcher hides whether that means a local child process or an
|
||||
// executor process whose stdin/stdout bytes cross the process API.
|
||||
RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher)
|
||||
.await
|
||||
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
|
||||
RmcpClient::new_stdio_client(
|
||||
command_os,
|
||||
args_os,
|
||||
env_os,
|
||||
&env_vars,
|
||||
cwd,
|
||||
launcher,
|
||||
Some(subspan_tracing_telemetry_sink(
|
||||
server_name,
|
||||
subspan_tracing_enabled,
|
||||
)),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
|
||||
}
|
||||
McpServerTransportConfig::StreamableHttp {
|
||||
url,
|
||||
@@ -643,11 +758,40 @@ async fn make_rmcp_client(
|
||||
}
|
||||
}
|
||||
|
||||
fn subspan_tracing_telemetry_sink(
|
||||
server_name: &str,
|
||||
enabled: Arc<AtomicBool>,
|
||||
) -> StdioServerTelemetrySink {
|
||||
let server_name = server_name.to_string();
|
||||
Arc::new(move |telemetry: StdioServerTelemetry| {
|
||||
if !enabled.load(Ordering::Acquire) {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Err(error) = codex_otel::emit_mcp_subspan_telemetry(telemetry.payload) {
|
||||
match error {
|
||||
codex_otel::StderrSpanTelemetryError::UnsupportedVersion
|
||||
| codex_otel::StderrSpanTelemetryError::UnsupportedType => {
|
||||
tracing::debug!(
|
||||
"ignoring unsupported MCP subspan telemetry from {server_name}: {error}"
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
warn!("ignoring invalid MCP subspan telemetry from {server_name}: {error}");
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_exec_server::Environment;
|
||||
use rmcp::model::JsonObject;
|
||||
use rmcp::model::Meta;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
fn tool_with_connector_meta() -> RmcpTool {
|
||||
RmcpTool {
|
||||
@@ -744,4 +888,97 @@ mod tests {
|
||||
assert!(meta.0.contains_key(key), "{key} should be preserved");
|
||||
}
|
||||
}
|
||||
|
||||
fn local_runtime_environment() -> McpRuntimeEnvironment {
|
||||
McpRuntimeEnvironment::new(
|
||||
Arc::new(Environment::default_for_tests()),
|
||||
PathBuf::from("/tmp"),
|
||||
)
|
||||
}
|
||||
|
||||
fn remote_runtime_environment() -> McpRuntimeEnvironment {
|
||||
McpRuntimeEnvironment::new(
|
||||
Arc::new(
|
||||
Environment::create_for_tests(Some("ws://executor.example".to_string()))
|
||||
.expect("remote environment"),
|
||||
),
|
||||
PathBuf::from("/tmp"),
|
||||
)
|
||||
}
|
||||
|
||||
fn mcp_server_config(value: serde_json::Value) -> McpServerConfig {
|
||||
serde_json::from_value(value).expect("mcp server config")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subspan_tracing_stderr_jsonl_is_available_only_when_stderr_is_observed() {
|
||||
assert!(subspan_tracing_stderr_jsonl_available(
|
||||
&mcp_server_config(serde_json::json!({"command": "node"})),
|
||||
&local_runtime_environment(),
|
||||
));
|
||||
|
||||
assert!(!subspan_tracing_stderr_jsonl_available(
|
||||
&mcp_server_config(serde_json::json!({"url": "https://mcp.example"})),
|
||||
&local_runtime_environment(),
|
||||
));
|
||||
|
||||
assert!(!subspan_tracing_stderr_jsonl_available(
|
||||
&mcp_server_config(serde_json::json!({
|
||||
"command": "node",
|
||||
"experimental_environment": "remote",
|
||||
})),
|
||||
&local_runtime_environment(),
|
||||
));
|
||||
|
||||
assert!(subspan_tracing_stderr_jsonl_available(
|
||||
&mcp_server_config(serde_json::json!({
|
||||
"command": "node",
|
||||
"experimental_environment": "remote",
|
||||
})),
|
||||
&remote_runtime_environment(),
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subspan_tracing_capability_accepts_stderr_jsonl_transport_and_tool_filter() {
|
||||
let capability: JsonObject = serde_json::from_value(serde_json::json!({
|
||||
"version": 1,
|
||||
"transports": ["stderr-jsonl"],
|
||||
"tools": {
|
||||
"js": {
|
||||
"attributeProfile": "browser-use-v1"
|
||||
}
|
||||
}
|
||||
}))
|
||||
.expect("capability object");
|
||||
|
||||
let capability = McpSubspanTracingCapability::from_experimental_capability(&capability)
|
||||
.expect("valid capability");
|
||||
|
||||
assert!(capability.supports_tool("js"));
|
||||
assert!(!capability.supports_tool("other"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subspan_tracing_capability_rejects_unknown_version_or_transport() {
|
||||
for capability in [
|
||||
serde_json::json!({
|
||||
"version": 2,
|
||||
"transports": ["stderr-jsonl"],
|
||||
}),
|
||||
serde_json::json!({
|
||||
"version": 1,
|
||||
"transports": ["events"],
|
||||
}),
|
||||
serde_json::json!({
|
||||
"version": 1,
|
||||
}),
|
||||
] {
|
||||
let capability: JsonObject =
|
||||
serde_json::from_value(capability).expect("capability object");
|
||||
assert!(
|
||||
McpSubspanTracingCapability::from_experimental_capability(&capability).is_none()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,8 @@ use codex_config::types::AppToolApproval;
|
||||
use codex_features::Feature;
|
||||
use codex_hooks::PermissionRequestDecision;
|
||||
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use codex_mcp::MCP_SUBSPAN_TRACING_CAPABILITY;
|
||||
use codex_mcp::MCP_SUBSPAN_TRACING_VERSION;
|
||||
use codex_mcp::SandboxState;
|
||||
use codex_mcp::declared_openai_file_input_param_names;
|
||||
use codex_mcp::mcp_permission_prompt_is_auto_approved;
|
||||
@@ -534,6 +536,9 @@ async fn execute_mcp_tool_call(
|
||||
rewritten_arguments: Option<JsonValue>,
|
||||
request_meta: Option<JsonValue>,
|
||||
) -> Result<CallToolResult, String> {
|
||||
let request_meta =
|
||||
augment_mcp_tool_request_meta_with_subspan_tracing(sess, server, tool_name, request_meta)
|
||||
.await;
|
||||
let request_meta =
|
||||
with_mcp_tool_call_thread_id_meta(request_meta, &sess.conversation_id.to_string());
|
||||
let request_meta =
|
||||
@@ -721,6 +726,7 @@ const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
|
||||
const MCP_TOOL_OPENAI_OUTPUT_TEMPLATE_META_KEY: &str = "openai/outputTemplate";
|
||||
const MCP_TOOL_UI_RESOURCE_URI_META_KEY: &str = "ui/resourceUri";
|
||||
const MCP_TOOL_THREAD_ID_META_KEY: &str = "threadId";
|
||||
const MCP_TOOL_SUBSPAN_TRACING_META_KEY: &str = MCP_SUBSPAN_TRACING_CAPABILITY;
|
||||
|
||||
fn custom_mcp_tool_approval_mode(
|
||||
turn_context: &TurnContext,
|
||||
@@ -780,6 +786,79 @@ fn build_mcp_tool_call_request_meta(
|
||||
(!request_meta.is_empty()).then_some(serde_json::Value::Object(request_meta))
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP subspan metadata reads through the session-owned manager guard"
|
||||
)]
|
||||
async fn augment_mcp_tool_request_meta_with_subspan_tracing(
|
||||
sess: &Session,
|
||||
server: &str,
|
||||
tool_name: &str,
|
||||
meta: Option<serde_json::Value>,
|
||||
) -> Option<serde_json::Value> {
|
||||
let supports_subspan_tracing = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.server_supports_subspan_tracing_for_tool(server, tool_name)
|
||||
.await
|
||||
.unwrap_or(false);
|
||||
augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(supports_subspan_tracing, meta)
|
||||
}
|
||||
|
||||
fn augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
|
||||
supports_subspan_tracing: bool,
|
||||
meta: Option<serde_json::Value>,
|
||||
) -> Option<serde_json::Value> {
|
||||
if !supports_subspan_tracing {
|
||||
return meta;
|
||||
}
|
||||
|
||||
let Some(trace_context) = codex_otel::current_span_w3c_trace_context() else {
|
||||
return meta;
|
||||
};
|
||||
let Some(traceparent) = trace_context.traceparent else {
|
||||
return meta;
|
||||
};
|
||||
|
||||
let mut telemetry_meta = serde_json::Map::new();
|
||||
telemetry_meta.insert("enabled".to_string(), serde_json::Value::Bool(true));
|
||||
telemetry_meta.insert(
|
||||
"version".to_string(),
|
||||
serde_json::Value::Number(MCP_SUBSPAN_TRACING_VERSION.into()),
|
||||
);
|
||||
telemetry_meta.insert(
|
||||
"traceparent".to_string(),
|
||||
serde_json::Value::String(traceparent),
|
||||
);
|
||||
if let Some(tracestate) = trace_context.tracestate {
|
||||
telemetry_meta.insert(
|
||||
"tracestate".to_string(),
|
||||
serde_json::Value::String(tracestate),
|
||||
);
|
||||
}
|
||||
|
||||
match meta {
|
||||
Some(serde_json::Value::Object(mut map)) => {
|
||||
map.insert(
|
||||
MCP_TOOL_SUBSPAN_TRACING_META_KEY.to_string(),
|
||||
serde_json::Value::Object(telemetry_meta),
|
||||
);
|
||||
Some(serde_json::Value::Object(map))
|
||||
}
|
||||
None => {
|
||||
let mut map = serde_json::Map::new();
|
||||
map.insert(
|
||||
MCP_TOOL_SUBSPAN_TRACING_META_KEY.to_string(),
|
||||
serde_json::Value::Object(telemetry_meta),
|
||||
);
|
||||
Some(serde_json::Value::Object(map))
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
|
||||
fn with_mcp_tool_call_thread_id_meta(
|
||||
meta: Option<serde_json::Value>,
|
||||
thread_id: &str,
|
||||
|
||||
@@ -941,6 +941,83 @@ async fn codex_apps_tool_call_request_meta_includes_call_id_without_existing_cod
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subspan_tracing_request_meta_includes_trace_context_when_supported() {
|
||||
use opentelemetry::trace::TracerProvider as _;
|
||||
use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
let provider = SdkTracerProvider::builder().build();
|
||||
let tracer = provider.tracer("codex-core-mcp-tool-call-tests");
|
||||
let subscriber =
|
||||
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
|
||||
let _guard = tracing::subscriber::set_default(subscriber);
|
||||
|
||||
let span = tracing::info_span!("mcp.tools.call");
|
||||
let _entered = span.enter();
|
||||
|
||||
let meta = augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
|
||||
/*supports_subspan_tracing*/ true,
|
||||
Some(serde_json::json!({
|
||||
"threadId": "thread-live",
|
||||
})),
|
||||
)
|
||||
.expect("meta");
|
||||
|
||||
let telemetry = meta
|
||||
.get(MCP_TOOL_SUBSPAN_TRACING_META_KEY)
|
||||
.expect("subspan tracing metadata");
|
||||
assert_eq!(
|
||||
telemetry.get("enabled"),
|
||||
Some(&serde_json::Value::Bool(true))
|
||||
);
|
||||
assert_eq!(
|
||||
telemetry.get("version"),
|
||||
Some(&serde_json::json!(MCP_SUBSPAN_TRACING_VERSION))
|
||||
);
|
||||
assert!(
|
||||
telemetry
|
||||
.get("traceparent")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.is_some_and(|traceparent| traceparent.starts_with("00-"))
|
||||
);
|
||||
assert_eq!(
|
||||
meta.get("threadId"),
|
||||
Some(&serde_json::json!("thread-live"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subspan_tracing_request_meta_is_only_added_when_supported_and_trace_context_exists() {
|
||||
assert_eq!(
|
||||
augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
|
||||
/*supports_subspan_tracing*/ true,
|
||||
Some(serde_json::json!({"threadId": "thread-live"})),
|
||||
),
|
||||
Some(serde_json::json!({"threadId": "thread-live"}))
|
||||
);
|
||||
|
||||
use opentelemetry::trace::TracerProvider as _;
|
||||
use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
let provider = SdkTracerProvider::builder().build();
|
||||
let tracer = provider.tracer("codex-core-mcp-tool-call-tests");
|
||||
let subscriber =
|
||||
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
|
||||
let _guard = tracing::subscriber::set_default(subscriber);
|
||||
let span = tracing::info_span!("mcp.tools.call");
|
||||
let _entered = span.enter();
|
||||
|
||||
assert_eq!(
|
||||
augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(
|
||||
/*supports_subspan_tracing*/ false,
|
||||
Some(serde_json::json!({"threadId": "thread-live"})),
|
||||
),
|
||||
Some(serde_json::json!({"threadId": "thread-live"}))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mcp_tool_call_thread_id_meta_is_added_to_request_meta() {
|
||||
assert_eq!(
|
||||
|
||||
173
codex-rs/docs/mcp_subspan_tracing.md
Normal file
173
codex-rs/docs/mcp_subspan_tracing.md
Normal file
@@ -0,0 +1,173 @@
|
||||
# MCP Subspan Tracing [experimental]
|
||||
|
||||
This document describes Codex's experimental MCP client extension for ingesting child OpenTelemetry spans emitted by MCP servers.
|
||||
|
||||
- Status: experimental and subject to change without notice
|
||||
- Capability: `codex/subspan-tracing`
|
||||
- Supported version: `1`
|
||||
- Supported transport: `stderr-jsonl`
|
||||
|
||||
## Purpose
|
||||
|
||||
Codex creates an `mcp.tools.call` span around each MCP tool call. Some MCP servers perform meaningful nested work that is useful to inspect as child spans of that call. The `codex/subspan-tracing` capability lets a server opt in to receiving the active W3C trace context for a tool call and emitting sanitized span records back to Codex.
|
||||
|
||||
This is intended for stdio MCP servers whose stderr stream is observed by Codex. Telemetry records are out-of-band with respect to the MCP JSON-RPC stream and must not affect tool-call success or transport liveness.
|
||||
|
||||
## Capability Negotiation
|
||||
|
||||
Codex advertises client support during MCP `initialize` only when the active MCP transport can observe stderr telemetry:
|
||||
|
||||
```json
|
||||
{
|
||||
"capabilities": {
|
||||
"experimental": {
|
||||
"codex/subspan-tracing": {
|
||||
"version": 1,
|
||||
"transports": ["stderr-jsonl"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
An MCP server opts in by returning a compatible experimental capability in its initialize result:
|
||||
|
||||
```json
|
||||
{
|
||||
"capabilities": {
|
||||
"experimental": {
|
||||
"codex/subspan-tracing": {
|
||||
"version": 1,
|
||||
"transports": ["stderr-jsonl"],
|
||||
"tools": {
|
||||
"js": {
|
||||
"attributeProfile": "browser-use-v1"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
If `tools` is present, Codex enables subspan tracing only for those tool names. If `tools` is omitted, Codex treats the capability as applying to every tool exposed by that server.
|
||||
|
||||
Unknown versions or transports are ignored.
|
||||
|
||||
## Tool Call Metadata
|
||||
|
||||
For a negotiated server/tool pair, Codex adds `_meta["codex/subspan-tracing"]` while it is inside the active `mcp.tools.call` span:
|
||||
|
||||
```json
|
||||
{
|
||||
"_meta": {
|
||||
"codex/subspan-tracing": {
|
||||
"enabled": true,
|
||||
"version": 1,
|
||||
"traceparent": "00-00000000000000000000000000000001-0000000000000002-01",
|
||||
"tracestate": "vendor=value"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
`tracestate` is omitted when no tracestate is active. If tracing is not active, Codex does not add this metadata.
|
||||
|
||||
Servers should disable subspan emission for a tool call unless:
|
||||
|
||||
- `enabled` is `true`
|
||||
- `version` is `1`
|
||||
- `traceparent` is present and parseable
|
||||
|
||||
## Stderr Transport
|
||||
|
||||
For `stderr-jsonl`, each telemetry record is written to the MCP server process stderr as one line with this exact prefix:
|
||||
|
||||
```text
|
||||
@codex-telemetry
|
||||
```
|
||||
|
||||
The rest of the line is a single JSON object:
|
||||
|
||||
```text
|
||||
@codex-telemetry {"v":1,"type":"span","name":"browser_use.tab.goto",...}
|
||||
```
|
||||
|
||||
Normal stderr output must not use that prefix. Codex preserves ordinary stderr logging behavior for non-telemetry lines.
|
||||
|
||||
## Span Record Schema
|
||||
|
||||
Span records use schema version `v: 1`:
|
||||
|
||||
```json
|
||||
{
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
"name": "browser_use.tab.goto",
|
||||
"trace_id": "00000000000000000000000000000001",
|
||||
"span_id": "0000000000000010",
|
||||
"parent_span_id": "0000000000000002",
|
||||
"trace_flags": "01",
|
||||
"tracestate": "vendor=value",
|
||||
"start_unix_nanos": 1000000000,
|
||||
"end_unix_nanos": 2000000000,
|
||||
"attrs": {
|
||||
"browser_use.url": "https://example.com",
|
||||
"browser_use.timeout_ms": 2500
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Required fields:
|
||||
|
||||
- `v`: must be `1`
|
||||
- `type`: must be `"span"`
|
||||
- `name`: allowlisted span name
|
||||
- `trace_id`: 32 lowercase or uppercase hex characters, nonzero
|
||||
- `span_id`: 16 lowercase or uppercase hex characters, nonzero
|
||||
- `parent_span_id`: 16 lowercase or uppercase hex characters, nonzero
|
||||
- `trace_flags`: 2 hex characters
|
||||
- `start_unix_nanos`: Unix timestamp in nanoseconds
|
||||
- `end_unix_nanos`: Unix timestamp in nanoseconds, greater than or equal to `start_unix_nanos`
|
||||
|
||||
Optional fields:
|
||||
|
||||
- `tracestate`: W3C tracestate header value
|
||||
- `attrs`: span attributes object
|
||||
|
||||
Codex also accepts `traceparent` for backward compatibility, but explicit IDs are the canonical protocol for reconstructed spans because they preserve parent-child relationships across records.
|
||||
|
||||
## Span IDs and Hierarchy
|
||||
|
||||
The first server-created span for a tool call should use:
|
||||
|
||||
- `trace_id` from the request `traceparent`
|
||||
- `parent_span_id` from the request `traceparent`
|
||||
- a newly generated `span_id`
|
||||
|
||||
Nested server spans should use the same `trace_id`, their own generated `span_id`, and the parent reconstructed span's `span_id` as `parent_span_id`.
|
||||
|
||||
## Sanitization
|
||||
|
||||
Servers must emit only sanitized, allowlisted attributes. Attribute values must be primitives:
|
||||
|
||||
- string
|
||||
- integer or float
|
||||
- boolean
|
||||
|
||||
Do not emit objects, arrays, cookies, credentials, auth headers, bearer tokens, full request/response payloads, arbitrary DOM text, or user-sensitive selectors. Codex applies its own allowlist and drops unsupported attributes, but servers should sanitize before writing records.
|
||||
|
||||
## Failure Behavior
|
||||
|
||||
Subspan telemetry is best effort:
|
||||
|
||||
- malformed telemetry lines are ignored
|
||||
- unsupported versions or record types are ignored
|
||||
- invalid span records are ignored or logged at warning level
|
||||
- telemetry write failures must not fail the MCP tool call
|
||||
- telemetry parser failures must not break MCP transport
|
||||
- no telemetry is emitted or reconstructed when Codex tracing is inactive
|
||||
|
||||
## Current Attribute Profile
|
||||
|
||||
`browser-use-v1` is the initial attribute profile used by Browser Use instrumentation. Codex currently allows Browser Use, Node REPL, and JS-related span names and attribute keys needed for that profile. New profiles should be added deliberately with their own allowlist changes and tests.
|
||||
@@ -2,6 +2,7 @@ pub(crate) mod config;
|
||||
mod events;
|
||||
pub(crate) mod metrics;
|
||||
pub(crate) mod provider;
|
||||
mod stderr_span_telemetry;
|
||||
pub(crate) mod trace_context;
|
||||
|
||||
mod otlp;
|
||||
@@ -23,6 +24,8 @@ pub use crate::metrics::runtime_metrics::RuntimeMetricsSummary;
|
||||
pub use crate::metrics::timer::Timer;
|
||||
pub use crate::metrics::*;
|
||||
pub use crate::provider::OtelProvider;
|
||||
pub use crate::stderr_span_telemetry::StderrSpanTelemetryError;
|
||||
pub use crate::stderr_span_telemetry::emit_mcp_subspan_telemetry;
|
||||
pub use crate::trace_context::context_from_w3c_trace_context;
|
||||
pub use crate::trace_context::current_span_trace_id;
|
||||
pub use crate::trace_context::current_span_w3c_trace_context;
|
||||
|
||||
756
codex-rs/otel/src/stderr_span_telemetry.rs
Normal file
756
codex-rs/otel/src/stderr_span_telemetry.rs
Normal file
@@ -0,0 +1,756 @@
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use opentelemetry::Context;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::global;
|
||||
use opentelemetry::trace::Span as _;
|
||||
use opentelemetry::trace::SpanContext;
|
||||
use opentelemetry::trace::SpanId;
|
||||
use opentelemetry::trace::SpanKind;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use opentelemetry::trace::TraceFlags;
|
||||
use opentelemetry::trace::TraceId;
|
||||
use opentelemetry::trace::TraceState;
|
||||
use opentelemetry::trace::Tracer;
|
||||
use serde_json::Map;
|
||||
use serde_json::Value;
|
||||
|
||||
const MCP_SUBSPAN_TELEMETRY_TRACER_NAME: &str = "codex-mcp-subspan-stderr";
|
||||
const CURRENT_SCHEMA_VERSION: u64 = 1;
|
||||
const SPAN_RECORD_TYPE: &str = "span";
|
||||
const MAX_ATTRIBUTE_STRING_BYTES: usize = 1024;
|
||||
|
||||
const ALLOWED_SPAN_NAMES: &[&str] = &[
|
||||
"node_repl.js",
|
||||
"browser_use.playwright.dom_snapshot",
|
||||
"browser_use.tab.goto",
|
||||
"browser_use.tab.click",
|
||||
"browser_use.tab.type",
|
||||
"browser_use.tab.screenshot",
|
||||
"browser_use.cdp.execute",
|
||||
"browser_use.tab.wait_for_load_state",
|
||||
];
|
||||
|
||||
const ALLOWED_ATTRIBUTE_PREFIXES: &[&str] = &["browser_use.", "node_repl.", "js."];
|
||||
const ALLOWED_ATTRIBUTE_KEYS: &[&str] = &["error.type", "error.message"];
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum StderrSpanTelemetryError {
|
||||
#[error("telemetry payload must be a JSON object")]
|
||||
NotObject,
|
||||
#[error("unsupported telemetry schema version")]
|
||||
UnsupportedVersion,
|
||||
#[error("unsupported telemetry record type")]
|
||||
UnsupportedType,
|
||||
#[error("missing or invalid telemetry field `{0}`")]
|
||||
InvalidField(&'static str),
|
||||
#[error("unsupported telemetry span name")]
|
||||
UnsupportedSpanName,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
struct SpanTelemetryRecord {
|
||||
name: String,
|
||||
span_id: Option<SpanId>,
|
||||
trace_id: Option<TraceId>,
|
||||
parent_span_id: Option<SpanId>,
|
||||
trace_flags: Option<TraceFlags>,
|
||||
traceparent: Option<String>,
|
||||
tracestate: Option<String>,
|
||||
start_time: SystemTime,
|
||||
end_time: SystemTime,
|
||||
attributes: Vec<KeyValue>,
|
||||
}
|
||||
|
||||
pub fn emit_mcp_subspan_telemetry(payload: Value) -> Result<(), StderrSpanTelemetryError> {
|
||||
let record = parse_span_telemetry_record(payload)?;
|
||||
let tracer = global::tracer(MCP_SUBSPAN_TELEMETRY_TRACER_NAME);
|
||||
emit_span_telemetry_record_with_tracer(&tracer, &record)
|
||||
}
|
||||
|
||||
fn emit_span_telemetry_record_with_tracer<T>(
|
||||
tracer: &T,
|
||||
record: &SpanTelemetryRecord,
|
||||
) -> Result<(), StderrSpanTelemetryError>
|
||||
where
|
||||
T: Tracer,
|
||||
{
|
||||
let parent_context = record.parent_context()?;
|
||||
|
||||
let mut builder = tracer
|
||||
.span_builder(record.name.clone())
|
||||
.with_kind(SpanKind::Internal)
|
||||
.with_start_time(record.start_time)
|
||||
.with_attributes(record.attributes.clone());
|
||||
if let Some(span_id) = record.span_id {
|
||||
builder = builder.with_span_id(span_id);
|
||||
}
|
||||
let mut span = tracer.build_with_context(builder, &parent_context);
|
||||
span.end_with_timestamp(record.end_time);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl SpanTelemetryRecord {
|
||||
fn parent_context(&self) -> Result<Context, StderrSpanTelemetryError> {
|
||||
if let (Some(trace_id), Some(parent_span_id), Some(trace_flags)) =
|
||||
(self.trace_id, self.parent_span_id, self.trace_flags)
|
||||
{
|
||||
let trace_state = trace_state_from_header(self.tracestate.as_deref())?;
|
||||
let span_context = SpanContext::new(
|
||||
trace_id,
|
||||
parent_span_id,
|
||||
trace_flags,
|
||||
/*is_remote*/ true,
|
||||
trace_state,
|
||||
);
|
||||
return Ok(Context::new().with_remote_span_context(span_context));
|
||||
}
|
||||
|
||||
let Some(traceparent) = self.traceparent.as_deref() else {
|
||||
return Err(StderrSpanTelemetryError::InvalidField("traceparent"));
|
||||
};
|
||||
crate::trace_context::context_from_trace_headers(
|
||||
Some(traceparent),
|
||||
self.tracestate.as_deref(),
|
||||
)
|
||||
.ok_or(StderrSpanTelemetryError::InvalidField("traceparent"))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_span_telemetry_record(
|
||||
payload: Value,
|
||||
) -> Result<SpanTelemetryRecord, StderrSpanTelemetryError> {
|
||||
let object = payload
|
||||
.as_object()
|
||||
.ok_or(StderrSpanTelemetryError::NotObject)?;
|
||||
|
||||
match object.get("v").and_then(Value::as_u64) {
|
||||
Some(CURRENT_SCHEMA_VERSION) => {}
|
||||
Some(_) => return Err(StderrSpanTelemetryError::UnsupportedVersion),
|
||||
None => return Err(StderrSpanTelemetryError::InvalidField("v")),
|
||||
}
|
||||
|
||||
match object.get("type").and_then(Value::as_str) {
|
||||
Some(SPAN_RECORD_TYPE) => {}
|
||||
Some(_) => return Err(StderrSpanTelemetryError::UnsupportedType),
|
||||
None => return Err(StderrSpanTelemetryError::InvalidField("type")),
|
||||
}
|
||||
|
||||
let name = required_string(object, "name")?.to_string();
|
||||
if !ALLOWED_SPAN_NAMES.contains(&name.as_str()) {
|
||||
return Err(StderrSpanTelemetryError::UnsupportedSpanName);
|
||||
}
|
||||
|
||||
let traceparent = optional_string(object, "traceparent").map(str::to_string);
|
||||
let span_id = optional_span_id_alias(object, &["span_id", "spanId"])?;
|
||||
let trace_id = optional_trace_id_alias(object, &["trace_id", "traceId"])?;
|
||||
let parent_span_id = optional_span_id_alias(object, &["parent_span_id", "parentSpanId"])?;
|
||||
let trace_flags = optional_trace_flags_alias(object, &["trace_flags", "traceFlags"])?;
|
||||
let tracestate = optional_string(object, "tracestate").map(str::to_string);
|
||||
if span_id.is_some() || trace_id.is_some() || parent_span_id.is_some() || trace_flags.is_some()
|
||||
{
|
||||
if span_id.is_none() {
|
||||
return Err(StderrSpanTelemetryError::InvalidField("span_id"));
|
||||
}
|
||||
if trace_id.is_none() {
|
||||
return Err(StderrSpanTelemetryError::InvalidField("trace_id"));
|
||||
}
|
||||
if parent_span_id.is_none() {
|
||||
return Err(StderrSpanTelemetryError::InvalidField("parent_span_id"));
|
||||
}
|
||||
if trace_flags.is_none() {
|
||||
return Err(StderrSpanTelemetryError::InvalidField("trace_flags"));
|
||||
}
|
||||
} else if traceparent.is_none() {
|
||||
return Err(StderrSpanTelemetryError::InvalidField("traceparent"));
|
||||
}
|
||||
|
||||
let start_time = timestamp_from_unix_nanos(required_u64_alias(
|
||||
object,
|
||||
&[
|
||||
"start_unix_nanos",
|
||||
"startTimeUnixNanos",
|
||||
"startTimeUnixNano",
|
||||
"start_time_unix_nanos",
|
||||
],
|
||||
"start_unix_nanos",
|
||||
)?)?;
|
||||
let end_time = timestamp_from_unix_nanos(required_u64_alias(
|
||||
object,
|
||||
&[
|
||||
"end_unix_nanos",
|
||||
"endTimeUnixNanos",
|
||||
"endTimeUnixNano",
|
||||
"end_time_unix_nanos",
|
||||
],
|
||||
"end_unix_nanos",
|
||||
)?)?;
|
||||
if end_time < start_time {
|
||||
return Err(StderrSpanTelemetryError::InvalidField("end_unix_nanos"));
|
||||
}
|
||||
|
||||
let attributes = object
|
||||
.get("attrs")
|
||||
.or_else(|| object.get("attributes"))
|
||||
.and_then(Value::as_object)
|
||||
.map(sanitized_attributes)
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok(SpanTelemetryRecord {
|
||||
name,
|
||||
span_id,
|
||||
trace_id,
|
||||
parent_span_id,
|
||||
trace_flags,
|
||||
traceparent,
|
||||
tracestate,
|
||||
start_time,
|
||||
end_time,
|
||||
attributes,
|
||||
})
|
||||
}
|
||||
|
||||
fn required_string<'a>(
|
||||
object: &'a Map<String, Value>,
|
||||
key: &'static str,
|
||||
) -> Result<&'a str, StderrSpanTelemetryError> {
|
||||
object
|
||||
.get(key)
|
||||
.and_then(Value::as_str)
|
||||
.filter(|value| !value.is_empty())
|
||||
.ok_or(StderrSpanTelemetryError::InvalidField(key))
|
||||
}
|
||||
|
||||
fn optional_string<'a>(object: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
|
||||
object
|
||||
.get(key)
|
||||
.and_then(Value::as_str)
|
||||
.filter(|value| !value.is_empty())
|
||||
}
|
||||
|
||||
fn optional_string_alias<'a>(
|
||||
object: &'a Map<String, Value>,
|
||||
keys: &[&'static str],
|
||||
) -> Option<(&'static str, &'a str)> {
|
||||
keys.iter()
|
||||
.find_map(|key| optional_string(object, key).map(|value| (*key, value)))
|
||||
}
|
||||
|
||||
fn optional_trace_id_alias(
|
||||
object: &Map<String, Value>,
|
||||
keys: &[&'static str],
|
||||
) -> Result<Option<TraceId>, StderrSpanTelemetryError> {
|
||||
let Some((key, value)) = optional_string_alias(object, keys) else {
|
||||
return Ok(None);
|
||||
};
|
||||
parse_trace_id(value)
|
||||
.map(Some)
|
||||
.map_err(|_| StderrSpanTelemetryError::InvalidField(key))
|
||||
}
|
||||
|
||||
fn optional_span_id_alias(
|
||||
object: &Map<String, Value>,
|
||||
keys: &[&'static str],
|
||||
) -> Result<Option<SpanId>, StderrSpanTelemetryError> {
|
||||
let Some((key, value)) = optional_string_alias(object, keys) else {
|
||||
return Ok(None);
|
||||
};
|
||||
parse_span_id(value)
|
||||
.map(Some)
|
||||
.map_err(|_| StderrSpanTelemetryError::InvalidField(key))
|
||||
}
|
||||
|
||||
fn optional_trace_flags_alias(
|
||||
object: &Map<String, Value>,
|
||||
keys: &[&'static str],
|
||||
) -> Result<Option<TraceFlags>, StderrSpanTelemetryError> {
|
||||
let Some((key, value)) = optional_string_alias(object, keys) else {
|
||||
return Ok(None);
|
||||
};
|
||||
parse_trace_flags(value)
|
||||
.map(Some)
|
||||
.map_err(|_| StderrSpanTelemetryError::InvalidField(key))
|
||||
}
|
||||
|
||||
fn parse_trace_id(value: &str) -> Result<TraceId, ()> {
|
||||
if !is_exact_hex(value, /*len*/ 32) {
|
||||
return Err(());
|
||||
}
|
||||
let trace_id = TraceId::from_hex(value).map_err(|_| ())?;
|
||||
if trace_id == TraceId::INVALID {
|
||||
return Err(());
|
||||
}
|
||||
Ok(trace_id)
|
||||
}
|
||||
|
||||
fn parse_span_id(value: &str) -> Result<SpanId, ()> {
|
||||
if !is_exact_hex(value, /*len*/ 16) {
|
||||
return Err(());
|
||||
}
|
||||
let span_id = SpanId::from_hex(value).map_err(|_| ())?;
|
||||
if span_id == SpanId::INVALID {
|
||||
return Err(());
|
||||
}
|
||||
Ok(span_id)
|
||||
}
|
||||
|
||||
fn parse_trace_flags(value: &str) -> Result<TraceFlags, ()> {
|
||||
if !is_exact_hex(value, /*len*/ 2) {
|
||||
return Err(());
|
||||
}
|
||||
u8::from_str_radix(value, 16)
|
||||
.map(TraceFlags::new)
|
||||
.map_err(|_| ())
|
||||
}
|
||||
|
||||
fn is_exact_hex(value: &str, len: usize) -> bool {
|
||||
value.len() == len && value.bytes().all(|byte| byte.is_ascii_hexdigit())
|
||||
}
|
||||
|
||||
fn trace_state_from_header(value: Option<&str>) -> Result<TraceState, StderrSpanTelemetryError> {
|
||||
let Some(value) = value else {
|
||||
return Ok(TraceState::default());
|
||||
};
|
||||
value
|
||||
.parse()
|
||||
.map_err(|_| StderrSpanTelemetryError::InvalidField("tracestate"))
|
||||
}
|
||||
|
||||
fn required_u64_alias(
|
||||
object: &Map<String, Value>,
|
||||
keys: &[&'static str],
|
||||
error_key: &'static str,
|
||||
) -> Result<u64, StderrSpanTelemetryError> {
|
||||
keys.iter()
|
||||
.find_map(|key| object.get(*key).and_then(u64_value))
|
||||
.ok_or(StderrSpanTelemetryError::InvalidField(error_key))
|
||||
}
|
||||
|
||||
fn u64_value(value: &Value) -> Option<u64> {
|
||||
value
|
||||
.as_u64()
|
||||
.or_else(|| value.as_str().and_then(|value| value.parse().ok()))
|
||||
}
|
||||
|
||||
fn timestamp_from_unix_nanos(nanos: u64) -> Result<SystemTime, StderrSpanTelemetryError> {
|
||||
let secs = nanos / 1_000_000_000;
|
||||
let sub_nanos = (nanos % 1_000_000_000) as u32;
|
||||
SystemTime::UNIX_EPOCH
|
||||
.checked_add(Duration::new(secs, sub_nanos))
|
||||
.ok_or(StderrSpanTelemetryError::InvalidField("timestamp"))
|
||||
}
|
||||
|
||||
fn sanitized_attributes(attrs: &Map<String, Value>) -> Vec<KeyValue> {
|
||||
attrs
|
||||
.iter()
|
||||
.filter(|(key, _)| is_allowed_attribute_key(key))
|
||||
.filter_map(|(key, value)| safe_attribute_value(key, value))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn is_allowed_attribute_key(key: &str) -> bool {
|
||||
ALLOWED_ATTRIBUTE_KEYS.contains(&key)
|
||||
|| ALLOWED_ATTRIBUTE_PREFIXES
|
||||
.iter()
|
||||
.any(|prefix| key.starts_with(prefix))
|
||||
}
|
||||
|
||||
fn safe_attribute_value(key: &str, value: &Value) -> Option<KeyValue> {
|
||||
match value {
|
||||
Value::Bool(value) => Some(KeyValue::new(key.to_string(), *value)),
|
||||
Value::Number(value) => {
|
||||
if let Some(value) = value.as_i64() {
|
||||
Some(KeyValue::new(key.to_string(), value))
|
||||
} else if let Some(value) = value.as_u64().and_then(|value| i64::try_from(value).ok()) {
|
||||
Some(KeyValue::new(key.to_string(), value))
|
||||
} else {
|
||||
value
|
||||
.as_f64()
|
||||
.map(|value| KeyValue::new(key.to_string(), value))
|
||||
}
|
||||
}
|
||||
Value::String(value) => Some(KeyValue::new(
|
||||
key.to_string(),
|
||||
truncate_attribute_string(value).to_string(),
|
||||
)),
|
||||
Value::Null | Value::Array(_) | Value::Object(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate_attribute_string(value: &str) -> &str {
|
||||
if value.len() <= MAX_ATTRIBUTE_STRING_BYTES {
|
||||
return value;
|
||||
}
|
||||
|
||||
let mut end = MAX_ATTRIBUTE_STRING_BYTES;
|
||||
while !value.is_char_boundary(end) {
|
||||
end = end.saturating_sub(1);
|
||||
}
|
||||
&value[..end]
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use opentelemetry::Value as OtelValue;
|
||||
use opentelemetry::trace::SpanId;
|
||||
use opentelemetry::trace::TraceId;
|
||||
use opentelemetry::trace::TracerProvider as _;
|
||||
use opentelemetry_sdk::trace::InMemorySpanExporter;
|
||||
use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use opentelemetry_sdk::trace::SpanData;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
#[test]
|
||||
fn valid_span_telemetry_reconstructs_otel_span_with_sanitized_attrs() {
|
||||
let exporter = InMemorySpanExporter::default();
|
||||
let provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(exporter.clone())
|
||||
.build();
|
||||
let tracer = provider.tracer("codex-otel-tests");
|
||||
let trace_id = "00000000000000000000000000000001";
|
||||
let parent_span_id = "0000000000000002";
|
||||
|
||||
let record = parse_span_telemetry_record(serde_json::json!({
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
"name": "browser_use.tab.goto",
|
||||
"traceparent": format!("00-{trace_id}-{parent_span_id}-01"),
|
||||
"start_unix_nanos": 1_000_000_123u64,
|
||||
"end_unix_nanos": 2_000_000_456u64,
|
||||
"attrs": {
|
||||
"browser_use.url": "https://example.com",
|
||||
"browser_use.timeout_ms": 2500,
|
||||
"unknown.secret": "drop me",
|
||||
"browser_use.object": {"drop": true}
|
||||
}
|
||||
}))
|
||||
.expect("valid record");
|
||||
|
||||
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
|
||||
provider.force_flush().expect("flush spans");
|
||||
let spans = exporter.get_finished_spans().expect("finished spans");
|
||||
assert_eq!(spans.len(), 1);
|
||||
let span = &spans[0];
|
||||
|
||||
assert_eq!(span.name.as_ref(), "browser_use.tab.goto");
|
||||
assert_eq!(
|
||||
span.span_context.trace_id(),
|
||||
TraceId::from_hex(trace_id).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
span.parent_span_id,
|
||||
SpanId::from_hex(parent_span_id).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
span.start_time,
|
||||
SystemTime::UNIX_EPOCH + Duration::new(1, 123)
|
||||
);
|
||||
assert_eq!(
|
||||
span.end_time,
|
||||
SystemTime::UNIX_EPOCH + Duration::new(2, 456)
|
||||
);
|
||||
|
||||
let attrs = span
|
||||
.attributes
|
||||
.iter()
|
||||
.map(|kv| (kv.key.as_str().to_string(), kv.value.clone()))
|
||||
.collect::<BTreeMap<_, _>>();
|
||||
assert_eq!(
|
||||
attrs.get("browser_use.url"),
|
||||
Some(&OtelValue::String("https://example.com".into()))
|
||||
);
|
||||
assert_eq!(
|
||||
attrs.get("browser_use.timeout_ms"),
|
||||
Some(&OtelValue::I64(2500))
|
||||
);
|
||||
assert!(!attrs.contains_key("unknown.secret"));
|
||||
assert!(!attrs.contains_key("browser_use.object"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn explicit_ids_reconstruct_span_and_parent_ids() {
|
||||
let exporter = InMemorySpanExporter::default();
|
||||
let provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(exporter.clone())
|
||||
.build();
|
||||
let tracer = provider.tracer("codex-otel-tests");
|
||||
let trace_id = "00000000000000000000000000000001";
|
||||
let parent_span_id = "0000000000000002";
|
||||
let span_id = "0000000000000010";
|
||||
|
||||
let record = parse_span_telemetry_record(serde_json::json!({
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
"name": "node_repl.js",
|
||||
"trace_id": trace_id,
|
||||
"span_id": span_id,
|
||||
"parent_span_id": parent_span_id,
|
||||
"trace_flags": "01",
|
||||
"tracestate": "vendor=value",
|
||||
"start_unix_nanos": 1_000_000_123u64,
|
||||
"end_unix_nanos": 2_000_000_456u64,
|
||||
}))
|
||||
.expect("valid record");
|
||||
|
||||
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
|
||||
provider.force_flush().expect("flush spans");
|
||||
let spans = exporter.get_finished_spans().expect("finished spans");
|
||||
assert_eq!(spans.len(), 1);
|
||||
let span = &spans[0];
|
||||
|
||||
assert_eq!(
|
||||
span.span_context.trace_id(),
|
||||
TraceId::from_hex(trace_id).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
span.span_context.span_id(),
|
||||
SpanId::from_hex(span_id).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
span.parent_span_id,
|
||||
SpanId::from_hex(parent_span_id).unwrap()
|
||||
);
|
||||
assert!(span.span_context.trace_flags().is_sampled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn camel_case_explicit_ids_reconstruct_span_and_parent_ids() {
|
||||
let exporter = InMemorySpanExporter::default();
|
||||
let provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(exporter.clone())
|
||||
.build();
|
||||
let tracer = provider.tracer("codex-otel-tests");
|
||||
let trace_id = "00000000000000000000000000000001";
|
||||
let parent_span_id = "0000000000000002";
|
||||
let span_id = "0000000000000010";
|
||||
|
||||
let record = parse_span_telemetry_record(serde_json::json!({
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
"name": "browser_use.tab.click",
|
||||
"traceId": trace_id,
|
||||
"spanId": span_id,
|
||||
"parentSpanId": parent_span_id,
|
||||
"traceFlags": "01",
|
||||
"start_unix_nanos": 1_000_000_123u64,
|
||||
"end_unix_nanos": 2_000_000_456u64,
|
||||
}))
|
||||
.expect("valid record");
|
||||
|
||||
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
|
||||
provider.force_flush().expect("flush spans");
|
||||
let spans = exporter.get_finished_spans().expect("finished spans");
|
||||
assert_eq!(spans.len(), 1);
|
||||
let span = &spans[0];
|
||||
|
||||
assert_eq!(
|
||||
span.span_context.trace_id(),
|
||||
TraceId::from_hex(trace_id).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
span.span_context.span_id(),
|
||||
SpanId::from_hex(span_id).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
span.parent_span_id,
|
||||
SpanId::from_hex(parent_span_id).unwrap()
|
||||
);
|
||||
assert!(span.span_context.trace_flags().is_sampled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn child_span_can_parent_to_previous_reconstructed_span_id() {
|
||||
let exporter = InMemorySpanExporter::default();
|
||||
let provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(exporter.clone())
|
||||
.build();
|
||||
let tracer = provider.tracer("codex-otel-tests");
|
||||
let trace_id = "00000000000000000000000000000001";
|
||||
let mcp_span_id = "0000000000000002";
|
||||
let node_span_id = "0000000000000010";
|
||||
let child_span_id = "0000000000000011";
|
||||
|
||||
for payload in [
|
||||
serde_json::json!({
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
"name": "node_repl.js",
|
||||
"trace_id": trace_id,
|
||||
"span_id": node_span_id,
|
||||
"parent_span_id": mcp_span_id,
|
||||
"trace_flags": "01",
|
||||
"start_unix_nanos": 1_000_000_000u64,
|
||||
"end_unix_nanos": 3_000_000_000u64,
|
||||
}),
|
||||
serde_json::json!({
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
"name": "browser_use.tab.goto",
|
||||
"trace_id": trace_id,
|
||||
"span_id": child_span_id,
|
||||
"parent_span_id": node_span_id,
|
||||
"trace_flags": "01",
|
||||
"start_unix_nanos": 1_500_000_000u64,
|
||||
"end_unix_nanos": 2_000_000_000u64,
|
||||
}),
|
||||
] {
|
||||
let record = parse_span_telemetry_record(payload).expect("valid record");
|
||||
emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted");
|
||||
}
|
||||
|
||||
provider.force_flush().expect("flush spans");
|
||||
let spans = exporter.get_finished_spans().expect("finished spans");
|
||||
let node_span = find_span(&spans, "node_repl.js");
|
||||
let child_span = find_span(&spans, "browser_use.tab.goto");
|
||||
|
||||
assert_eq!(
|
||||
node_span.span_context.span_id(),
|
||||
SpanId::from_hex(node_span_id).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
child_span.span_context.span_id(),
|
||||
SpanId::from_hex(child_span_id).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
child_span.parent_span_id,
|
||||
SpanId::from_hex(node_span_id).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_span_telemetry_is_rejected_without_emitting_span() {
|
||||
let exporter = InMemorySpanExporter::default();
|
||||
let provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(exporter.clone())
|
||||
.build();
|
||||
let tracer = provider.tracer("codex-otel-tests");
|
||||
|
||||
let error = parse_span_telemetry_record(serde_json::json!({
|
||||
"v": 99,
|
||||
"type": "span",
|
||||
}))
|
||||
.expect_err("unsupported version");
|
||||
assert!(matches!(
|
||||
error,
|
||||
StderrSpanTelemetryError::UnsupportedVersion
|
||||
));
|
||||
|
||||
assert!(
|
||||
emit_span_telemetry_record_with_tracer(
|
||||
&tracer,
|
||||
&SpanTelemetryRecord {
|
||||
name: "browser_use.tab.goto".to_string(),
|
||||
span_id: None,
|
||||
trace_id: None,
|
||||
parent_span_id: None,
|
||||
trace_flags: None,
|
||||
traceparent: Some("not-a-traceparent".to_string()),
|
||||
tracestate: None,
|
||||
start_time: SystemTime::UNIX_EPOCH,
|
||||
end_time: SystemTime::UNIX_EPOCH,
|
||||
attributes: Vec::new(),
|
||||
},
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
provider.force_flush().expect("flush spans");
|
||||
assert!(
|
||||
exporter
|
||||
.get_finished_spans()
|
||||
.expect("finished spans")
|
||||
.is_empty()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_explicit_ids_are_rejected_without_emitting_span() {
|
||||
let exporter = InMemorySpanExporter::default();
|
||||
let provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(exporter.clone())
|
||||
.build();
|
||||
let tracer = provider.tracer("codex-otel-tests");
|
||||
|
||||
for payload in [
|
||||
serde_json::json!({
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
"name": "node_repl.js",
|
||||
"trace_id": "00000000000000000000000000000000",
|
||||
"span_id": "0000000000000010",
|
||||
"parent_span_id": "0000000000000002",
|
||||
"trace_flags": "01",
|
||||
"start_unix_nanos": 1_000_000_000u64,
|
||||
"end_unix_nanos": 2_000_000_000u64,
|
||||
}),
|
||||
serde_json::json!({
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
"name": "node_repl.js",
|
||||
"trace_id": "00000000000000000000000000000001",
|
||||
"span_id": "0000000000000000",
|
||||
"parent_span_id": "0000000000000002",
|
||||
"trace_flags": "01",
|
||||
"start_unix_nanos": 1_000_000_000u64,
|
||||
"end_unix_nanos": 2_000_000_000u64,
|
||||
}),
|
||||
serde_json::json!({
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
"name": "node_repl.js",
|
||||
"trace_id": "00000000000000000000000000000001",
|
||||
"span_id": "0000000000000010",
|
||||
"parent_span_id": "0002",
|
||||
"trace_flags": "01",
|
||||
"start_unix_nanos": 1_000_000_000u64,
|
||||
"end_unix_nanos": 2_000_000_000u64,
|
||||
}),
|
||||
serde_json::json!({
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
"name": "node_repl.js",
|
||||
"trace_id": "00000000000000000000000000000001",
|
||||
"span_id": "0000000000000010",
|
||||
"parent_span_id": "0000000000000002",
|
||||
"trace_flags": "001",
|
||||
"start_unix_nanos": 1_000_000_000u64,
|
||||
"end_unix_nanos": 2_000_000_000u64,
|
||||
}),
|
||||
] {
|
||||
let error = parse_span_telemetry_record(payload).expect_err("invalid id");
|
||||
assert!(matches!(error, StderrSpanTelemetryError::InvalidField(_)));
|
||||
}
|
||||
|
||||
assert!(
|
||||
emit_mcp_subspan_telemetry(serde_json::json!({
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
"name": "node_repl.js",
|
||||
"trace_id": "00000000000000000000000000000001",
|
||||
"span_id": "0000000000000010",
|
||||
"parent_span_id": "0000000000000002",
|
||||
"trace_flags": "zz",
|
||||
"start_unix_nanos": 1_000_000_000u64,
|
||||
"end_unix_nanos": 2_000_000_000u64,
|
||||
}))
|
||||
.is_err()
|
||||
);
|
||||
provider.force_flush().expect("flush spans");
|
||||
assert!(
|
||||
exporter
|
||||
.get_finished_spans()
|
||||
.expect("finished spans")
|
||||
.is_empty()
|
||||
);
|
||||
drop(tracer);
|
||||
}
|
||||
|
||||
fn find_span<'a>(spans: &'a [SpanData], name: &str) -> &'a SpanData {
|
||||
spans
|
||||
.iter()
|
||||
.find(|span| span.name == name)
|
||||
.unwrap_or_else(|| panic!("missing span {name}"))
|
||||
}
|
||||
}
|
||||
@@ -41,9 +41,11 @@ use serde_json::to_vec;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::debug;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::stdio_server_launcher::StdioServerTelemetrySink;
|
||||
use crate::stdio_server_launcher::handle_stderr_line;
|
||||
|
||||
static PROCESS_COUNTER: AtomicUsize = AtomicUsize::new(1);
|
||||
|
||||
// Remote public implementation.
|
||||
@@ -78,6 +80,9 @@ pub(super) struct ExecutorProcessTransport {
|
||||
/// Buffered stderr bytes for diagnostic logging.
|
||||
stderr: Vec<u8>,
|
||||
|
||||
/// Optional sink for structured stderr telemetry lines.
|
||||
telemetry_sink: Option<StdioServerTelemetrySink>,
|
||||
|
||||
/// Whether the executor has reported process closure or a terminal
|
||||
/// subscription failure. Once closed, any remaining partial stdout line is
|
||||
/// flushed once and then rmcp receives EOF.
|
||||
@@ -95,7 +100,11 @@ pub(super) struct ExecutorProcessTransport {
|
||||
}
|
||||
|
||||
impl ExecutorProcessTransport {
|
||||
pub(super) fn new(process: Arc<dyn ExecProcess>, program_name: String) -> Self {
|
||||
pub(super) fn new(
|
||||
process: Arc<dyn ExecProcess>,
|
||||
program_name: String,
|
||||
telemetry_sink: Option<StdioServerTelemetrySink>,
|
||||
) -> Self {
|
||||
// Subscribe before returning the transport to rmcp. Some test servers
|
||||
// can emit output or exit quickly after `process/start`, and the
|
||||
// process event log will replay anything that landed before this
|
||||
@@ -107,6 +116,7 @@ impl ExecutorProcessTransport {
|
||||
program_name,
|
||||
stdout: Vec::new(),
|
||||
stderr: Vec::new(),
|
||||
telemetry_sink,
|
||||
closed: false,
|
||||
terminated: false,
|
||||
last_seq: 0,
|
||||
@@ -321,10 +331,10 @@ impl ExecutorProcessTransport {
|
||||
if line.last() == Some(&b'\r') {
|
||||
line.pop();
|
||||
}
|
||||
info!(
|
||||
"MCP server stderr ({}): {}",
|
||||
self.program_name,
|
||||
String::from_utf8_lossy(&line)
|
||||
handle_stderr_line(
|
||||
&self.program_name,
|
||||
&String::from_utf8_lossy(&line),
|
||||
self.telemetry_sink.as_ref(),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -334,10 +344,10 @@ impl ExecutorProcessTransport {
|
||||
return;
|
||||
}
|
||||
let line = take(&mut self.stderr);
|
||||
info!(
|
||||
"MCP server stderr ({}): {}",
|
||||
self.program_name,
|
||||
String::from_utf8_lossy(&line)
|
||||
handle_stderr_line(
|
||||
&self.program_name,
|
||||
&String::from_utf8_lossy(&line),
|
||||
self.telemetry_sink.as_ref(),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -35,3 +35,5 @@ pub use rmcp_client::ToolWithConnectorId;
|
||||
pub use stdio_server_launcher::ExecutorStdioServerLauncher;
|
||||
pub use stdio_server_launcher::LocalStdioServerLauncher;
|
||||
pub use stdio_server_launcher::StdioServerLauncher;
|
||||
pub use stdio_server_launcher::StdioServerTelemetry;
|
||||
pub use stdio_server_launcher::StdioServerTelemetrySink;
|
||||
|
||||
@@ -68,6 +68,7 @@ use crate::oauth::StoredOAuthTokens;
|
||||
use crate::stdio_server_launcher::StdioServerCommand;
|
||||
use crate::stdio_server_launcher::StdioServerLauncher;
|
||||
use crate::stdio_server_launcher::StdioServerProcessHandle;
|
||||
use crate::stdio_server_launcher::StdioServerTelemetrySink;
|
||||
use crate::stdio_server_launcher::StdioServerTransport;
|
||||
use crate::utils::apply_default_headers;
|
||||
use crate::utils::build_default_headers;
|
||||
@@ -282,9 +283,17 @@ impl RmcpClient {
|
||||
env_vars: &[McpServerEnvVar],
|
||||
cwd: Option<PathBuf>,
|
||||
launcher: Arc<dyn StdioServerLauncher>,
|
||||
telemetry_sink: Option<StdioServerTelemetrySink>,
|
||||
) -> io::Result<Self> {
|
||||
let transport_recipe = TransportRecipe::Stdio {
|
||||
command: StdioServerCommand::new(program, args, env, env_vars.to_vec(), cwd),
|
||||
command: StdioServerCommand::new(
|
||||
program,
|
||||
args,
|
||||
env,
|
||||
env_vars.to_vec(),
|
||||
cwd,
|
||||
telemetry_sink,
|
||||
),
|
||||
launcher,
|
||||
};
|
||||
let transport = Self::create_pending_transport(&transport_recipe)
|
||||
|
||||
@@ -82,8 +82,16 @@ pub struct StdioServerCommand {
|
||||
env: Option<HashMap<OsString, OsString>>,
|
||||
env_vars: Vec<McpServerEnvVar>,
|
||||
cwd: Option<PathBuf>,
|
||||
telemetry_sink: Option<StdioServerTelemetrySink>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct StdioServerTelemetry {
|
||||
pub payload: serde_json::Value,
|
||||
}
|
||||
|
||||
pub type StdioServerTelemetrySink = Arc<dyn Fn(StdioServerTelemetry) + Send + Sync>;
|
||||
|
||||
/// Client-side rmcp transport for a launched MCP stdio server.
|
||||
///
|
||||
/// The concrete process placement stays private to this module. `RmcpClient`
|
||||
@@ -149,6 +157,7 @@ impl StdioServerCommand {
|
||||
env: Option<HashMap<OsString, OsString>>,
|
||||
env_vars: Vec<McpServerEnvVar>,
|
||||
cwd: Option<PathBuf>,
|
||||
telemetry_sink: Option<StdioServerTelemetrySink>,
|
||||
) -> Self {
|
||||
Self {
|
||||
program,
|
||||
@@ -156,6 +165,7 @@ impl StdioServerCommand {
|
||||
env,
|
||||
env_vars,
|
||||
cwd,
|
||||
telemetry_sink,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -243,6 +253,7 @@ impl LocalStdioServerLauncher {
|
||||
env,
|
||||
env_vars,
|
||||
cwd,
|
||||
telemetry_sink,
|
||||
} = command;
|
||||
let program_name = program.to_string_lossy().into_owned();
|
||||
let envs = create_env_for_mcp_server(env, &env_vars).map_err(io::Error::other)?;
|
||||
@@ -276,7 +287,7 @@ impl LocalStdioServerLauncher {
|
||||
loop {
|
||||
match reader.next_line().await {
|
||||
Ok(Some(line)) => {
|
||||
info!("MCP server stderr ({program_name}): {line}");
|
||||
handle_stderr_line(&program_name, &line, telemetry_sink.as_ref());
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(error) => {
|
||||
@@ -479,6 +490,7 @@ impl ExecutorStdioServerLauncher {
|
||||
env,
|
||||
env_vars,
|
||||
cwd,
|
||||
telemetry_sink,
|
||||
} = command;
|
||||
let program_name = program.to_string_lossy().into_owned();
|
||||
let envs = create_env_overlay_for_remote_mcp_server(env, &env_vars);
|
||||
@@ -513,6 +525,7 @@ impl ExecutorStdioServerLauncher {
|
||||
inner: StdioServerTransportInner::Executor(ExecutorProcessTransport::new(
|
||||
started.process,
|
||||
program_name,
|
||||
telemetry_sink,
|
||||
)),
|
||||
process,
|
||||
})
|
||||
@@ -578,6 +591,38 @@ impl ExecutorStdioServerLauncher {
|
||||
}
|
||||
}
|
||||
|
||||
const CODEX_TELEMETRY_STDERR_PREFIX: &str = "@codex-telemetry ";
|
||||
|
||||
pub(super) fn handle_stderr_line(
|
||||
program_name: &str,
|
||||
line: &str,
|
||||
telemetry_sink: Option<&StdioServerTelemetrySink>,
|
||||
) {
|
||||
let Some(sink) = telemetry_sink else {
|
||||
info!("MCP server stderr ({program_name}): {line}");
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(telemetry) = parse_stderr_telemetry_line(line) else {
|
||||
info!("MCP server stderr ({program_name}): {line}");
|
||||
return;
|
||||
};
|
||||
|
||||
match telemetry {
|
||||
Ok(telemetry) => sink(telemetry),
|
||||
Err(error) => {
|
||||
warn!("Failed to parse MCP server telemetry ({program_name}): {error}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_stderr_telemetry_line(
|
||||
line: &str,
|
||||
) -> Option<Result<StdioServerTelemetry, serde_json::Error>> {
|
||||
let payload = line.strip_prefix(CODEX_TELEMETRY_STDERR_PREFIX)?;
|
||||
Some(serde_json::from_str(payload).map(|payload| StdioServerTelemetry { payload }))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -651,4 +696,27 @@ mod tests {
|
||||
);
|
||||
assert!(!env.contains_key("UNREQUESTED_SECRET"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stderr_telemetry_parser_separates_prefixed_lines_from_normal_stderr() {
|
||||
assert!(parse_stderr_telemetry_line("ordinary stderr").is_none());
|
||||
|
||||
assert_eq!(
|
||||
parse_stderr_telemetry_line("@codex-telemetry {\"v\":1,\"type\":\"span\"}")
|
||||
.expect("telemetry line")
|
||||
.expect("valid telemetry"),
|
||||
StdioServerTelemetry {
|
||||
payload: serde_json::json!({
|
||||
"v": 1,
|
||||
"type": "span",
|
||||
}),
|
||||
}
|
||||
);
|
||||
|
||||
assert!(
|
||||
parse_stderr_telemetry_line("@codex-telemetry not-json")
|
||||
.expect("telemetry line")
|
||||
.is_err()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,6 +116,7 @@ async fn drop_kills_wrapper_process_group() -> Result<()> {
|
||||
&[],
|
||||
/*cwd*/ None,
|
||||
Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)),
|
||||
/*telemetry_sink*/ None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -147,6 +148,7 @@ async fn shutdown_kills_initialized_stdio_server_with_in_flight_operation() -> R
|
||||
&[],
|
||||
/*cwd*/ None,
|
||||
Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)),
|
||||
/*telemetry_sink*/ None,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
@@ -64,6 +64,7 @@ async fn rmcp_client_can_list_and_read_resources() -> anyhow::Result<()> {
|
||||
&[],
|
||||
/*cwd*/ None,
|
||||
Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)),
|
||||
/*telemetry_sink*/ None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user