feat(app-server): propagate app-server trace context into core (#13368)

### Summary
Propagate trace context originating at app-server RPC method handlers ->
codex core submission loop (so this includes spans such as `run_turn`!).
This implements PR 2 of the app-server tracing rollout.

This also removes the old lower-level env-based reparenting in core so
explicit request/submission ancestry wins instead of being overridden by
ambient `TRACEPARENT` state.

### What changed
- Added `trace: Option<W3cTraceContext>` to codex_protocol::Submission
- Taught `Codex::submit()` / `submit_with_id()` to automatically capture
the current span context when constructing or forwarding a submission
- Wrapped the core submission loop in a submission_dispatch span
parented from Submission.trace
- Warn on invalid submission trace carriers and ignore them cleanly
- Removed the old env-based downstream reparenting path in core task
execution
- Stopped OTEL provider init from implicitly attaching env trace context
process-wide
- Updated mcp-server Submission call sites for the new field

Added focused unit tests for:
- capturing trace context into Submission
- preferring `Submission.trace` when building the core dispatch span

### Why
PR 1 gave us consistent inbound request spans in app-server, but that
only covered the transport boundary. For long-running work like turns
and reviews, the important missing piece was preserving ancestry after
the request handler returns and core continues work on a different async
path.

This change makes that handoff explicit and keeps the parentage rules
simple:
- app-server request span sets the current context
- `Submission.trace` snapshots that context
- core restores it once, at the submission boundary
- deeper core spans inherit naturally

That also lets us stop relying on env-based reparenting for this path,
which was too ambient and could override explicit ancestry.
This commit is contained in:
Owen Lin
2026-03-03 17:03:45 -08:00
committed by GitHub
parent 0fbd84081b
commit 52521a5e40
12 changed files with 489 additions and 196 deletions

View File

@@ -70,6 +70,8 @@ use codex_hooks::HooksConfig;
use codex_network_proxy::NetworkProxy;
use codex_network_proxy::NetworkProxyAuditMetadata;
use codex_network_proxy::normalize_host;
use codex_otel::current_span_w3c_trace_context;
use codex_otel::set_parent_from_w3c_trace_context;
use codex_protocol::ThreadId;
use codex_protocol::approvals::ExecPolicyAmendment;
use codex_protocol::approvals::NetworkPolicyAmendment;
@@ -539,14 +541,21 @@ impl Codex {
/// Submit the `op` wrapped in a `Submission` with a unique ID.
pub async fn submit(&self, op: Op) -> CodexResult<String> {
let id = Uuid::now_v7().to_string();
let sub = Submission { id: id.clone(), op };
let sub = Submission {
id: id.clone(),
op,
trace: None,
};
self.submit_with_id(sub).await?;
Ok(id)
}
/// Use sparingly: prefer `submit()` so Codex is responsible for generating
/// unique IDs for each submission.
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
pub async fn submit_with_id(&self, mut sub: Submission) -> CodexResult<()> {
if sub.trace.is_none() {
sub.trace = current_span_w3c_trace_context();
}
self.tx_sub
.send(sub)
.await
@@ -3686,176 +3695,230 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
// To break out of this loop, send Op::Shutdown.
while let Ok(sub) = rx_sub.recv().await {
debug!(?sub, "Submission");
match sub.op.clone() {
Op::Interrupt => {
handlers::interrupt(&sess).await;
}
Op::CleanBackgroundTerminals => {
handlers::clean_background_terminals(&sess).await;
}
Op::RealtimeConversationStart(params) => {
if let Err(err) =
handle_realtime_conversation_start(&sess, sub.id.clone(), params).await
{
sess.send_event_raw(Event {
id: sub.id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: err.to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
}),
})
.await;
let dispatch_span = submission_dispatch_span(&sub);
let should_exit = async {
match sub.op.clone() {
Op::Interrupt => {
handlers::interrupt(&sess).await;
false
}
}
Op::RealtimeConversationAudio(params) => {
handle_realtime_conversation_audio(&sess, sub.id.clone(), params).await;
}
Op::RealtimeConversationText(params) => {
handle_realtime_conversation_text(&sess, sub.id.clone(), params).await;
}
Op::RealtimeConversationClose => {
handle_realtime_conversation_close(&sess, sub.id.clone()).await;
}
Op::OverrideTurnContext {
cwd,
approval_policy,
sandbox_policy,
windows_sandbox_level,
model,
effort,
summary,
service_tier,
collaboration_mode,
personality,
} => {
let collaboration_mode = if let Some(collab_mode) = collaboration_mode {
collab_mode
} else {
let state = sess.state.lock().await;
state.session_configuration.collaboration_mode.with_updates(
model.clone(),
effort,
None,
Op::CleanBackgroundTerminals => {
handlers::clean_background_terminals(&sess).await;
false
}
Op::RealtimeConversationStart(params) => {
if let Err(err) =
handle_realtime_conversation_start(&sess, sub.id.clone(), params).await
{
sess.send_event_raw(Event {
id: sub.id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: err.to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
}),
})
.await;
}
false
}
Op::RealtimeConversationAudio(params) => {
handle_realtime_conversation_audio(&sess, sub.id.clone(), params).await;
false
}
Op::RealtimeConversationText(params) => {
handle_realtime_conversation_text(&sess, sub.id.clone(), params).await;
false
}
Op::RealtimeConversationClose => {
handle_realtime_conversation_close(&sess, sub.id.clone()).await;
false
}
Op::OverrideTurnContext {
cwd,
approval_policy,
sandbox_policy,
windows_sandbox_level,
model,
effort,
summary,
service_tier,
collaboration_mode,
personality,
} => {
let collaboration_mode = if let Some(collab_mode) = collaboration_mode {
collab_mode
} else {
let state = sess.state.lock().await;
state.session_configuration.collaboration_mode.with_updates(
model.clone(),
effort,
None,
)
};
handlers::override_turn_context(
&sess,
sub.id.clone(),
SessionSettingsUpdate {
cwd,
approval_policy,
sandbox_policy,
windows_sandbox_level,
collaboration_mode: Some(collaboration_mode),
reasoning_summary: summary,
service_tier,
personality,
..Default::default()
},
)
};
handlers::override_turn_context(
&sess,
sub.id.clone(),
SessionSettingsUpdate {
cwd,
approval_policy,
sandbox_policy,
windows_sandbox_level,
collaboration_mode: Some(collaboration_mode),
reasoning_summary: summary,
service_tier,
personality,
..Default::default()
},
)
.await;
}
Op::UserInput { .. } | Op::UserTurn { .. } => {
handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op).await;
}
Op::ExecApproval {
id: approval_id,
turn_id,
decision,
} => {
handlers::exec_approval(&sess, approval_id, turn_id, decision).await;
}
Op::PatchApproval { id, decision } => {
handlers::patch_approval(&sess, id, decision).await;
}
Op::UserInputAnswer { id, response } => {
handlers::request_user_input_response(&sess, id, response).await;
}
Op::DynamicToolResponse { id, response } => {
handlers::dynamic_tool_response(&sess, id, response).await;
}
Op::AddToHistory { text } => {
handlers::add_to_history(&sess, &config, text).await;
}
Op::GetHistoryEntryRequest { offset, log_id } => {
handlers::get_history_entry_request(&sess, &config, sub.id.clone(), offset, log_id)
.await;
}
Op::ListMcpTools => {
handlers::list_mcp_tools(&sess, &config, sub.id.clone()).await;
}
Op::RefreshMcpServers { config } => {
handlers::refresh_mcp_servers(&sess, config).await;
}
Op::ReloadUserConfig => {
handlers::reload_user_config(&sess).await;
}
Op::ListCustomPrompts => {
handlers::list_custom_prompts(&sess, sub.id.clone()).await;
}
Op::ListSkills { cwds, force_reload } => {
handlers::list_skills(&sess, sub.id.clone(), cwds, force_reload).await;
}
Op::ListRemoteSkills {
hazelnut_scope,
product_surface,
enabled,
} => {
handlers::list_remote_skills(
&sess,
&config,
sub.id.clone(),
false
}
Op::UserInput { .. } | Op::UserTurn { .. } => {
handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op).await;
false
}
Op::ExecApproval {
id: approval_id,
turn_id,
decision,
} => {
handlers::exec_approval(&sess, approval_id, turn_id, decision).await;
false
}
Op::PatchApproval { id, decision } => {
handlers::patch_approval(&sess, id, decision).await;
false
}
Op::UserInputAnswer { id, response } => {
handlers::request_user_input_response(&sess, id, response).await;
false
}
Op::DynamicToolResponse { id, response } => {
handlers::dynamic_tool_response(&sess, id, response).await;
false
}
Op::AddToHistory { text } => {
handlers::add_to_history(&sess, &config, text).await;
false
}
Op::GetHistoryEntryRequest { offset, log_id } => {
handlers::get_history_entry_request(
&sess,
&config,
sub.id.clone(),
offset,
log_id,
)
.await;
false
}
Op::ListMcpTools => {
handlers::list_mcp_tools(&sess, &config, sub.id.clone()).await;
false
}
Op::RefreshMcpServers { config } => {
handlers::refresh_mcp_servers(&sess, config).await;
false
}
Op::ReloadUserConfig => {
handlers::reload_user_config(&sess).await;
false
}
Op::ListCustomPrompts => {
handlers::list_custom_prompts(&sess, sub.id.clone()).await;
false
}
Op::ListSkills { cwds, force_reload } => {
handlers::list_skills(&sess, sub.id.clone(), cwds, force_reload).await;
false
}
Op::ListRemoteSkills {
hazelnut_scope,
product_surface,
enabled,
)
.await;
}
Op::DownloadRemoteSkill { hazelnut_id } => {
handlers::export_remote_skill(&sess, &config, sub.id.clone(), hazelnut_id).await;
}
Op::Undo => {
handlers::undo(&sess, sub.id.clone()).await;
}
Op::Compact => {
handlers::compact(&sess, sub.id.clone()).await;
}
Op::DropMemories => {
handlers::drop_memories(&sess, &config, sub.id.clone()).await;
}
Op::UpdateMemories => {
handlers::update_memories(&sess, &config, sub.id.clone()).await;
}
Op::ThreadRollback { num_turns } => {
handlers::thread_rollback(&sess, sub.id.clone(), num_turns).await;
}
Op::SetThreadName { name } => {
handlers::set_thread_name(&sess, sub.id.clone(), name).await;
}
Op::RunUserShellCommand { command } => {
handlers::run_user_shell_command(&sess, sub.id.clone(), command).await;
}
Op::ResolveElicitation {
server_name,
request_id,
decision,
} => {
handlers::resolve_elicitation(&sess, server_name, request_id, decision).await;
}
Op::Shutdown => {
if handlers::shutdown(&sess, sub.id.clone()).await {
break;
} => {
handlers::list_remote_skills(
&sess,
&config,
sub.id.clone(),
hazelnut_scope,
product_surface,
enabled,
)
.await;
false
}
Op::DownloadRemoteSkill { hazelnut_id } => {
handlers::export_remote_skill(&sess, &config, sub.id.clone(), hazelnut_id)
.await;
false
}
Op::Undo => {
handlers::undo(&sess, sub.id.clone()).await;
false
}
Op::Compact => {
handlers::compact(&sess, sub.id.clone()).await;
false
}
Op::DropMemories => {
handlers::drop_memories(&sess, &config, sub.id.clone()).await;
false
}
Op::UpdateMemories => {
handlers::update_memories(&sess, &config, sub.id.clone()).await;
false
}
Op::ThreadRollback { num_turns } => {
handlers::thread_rollback(&sess, sub.id.clone(), num_turns).await;
false
}
Op::SetThreadName { name } => {
handlers::set_thread_name(&sess, sub.id.clone(), name).await;
false
}
Op::RunUserShellCommand { command } => {
handlers::run_user_shell_command(&sess, sub.id.clone(), command).await;
false
}
Op::ResolveElicitation {
server_name,
request_id,
decision,
} => {
handlers::resolve_elicitation(&sess, server_name, request_id, decision).await;
false
}
Op::Shutdown => handlers::shutdown(&sess, sub.id.clone()).await,
Op::Review { review_request } => {
handlers::review(&sess, &config, sub.id.clone(), review_request).await;
false
}
_ => false, // Ignore unknown ops; enum is non_exhaustive to allow extensions.
}
Op::Review { review_request } => {
handlers::review(&sess, &config, sub.id.clone(), review_request).await;
}
_ => {} // Ignore unknown ops; enum is non_exhaustive to allow extensions.
}
.instrument(dispatch_span)
.await;
if should_exit {
break;
}
}
debug!("Agent loop exited");
}
fn submission_dispatch_span(sub: &Submission) -> tracing::Span {
let dispatch_span = info_span!("submission_dispatch", submission.id = sub.id.as_str());
if let Some(trace) = sub.trace.as_ref()
&& !set_parent_from_w3c_trace_context(&dispatch_span, trace)
{
warn!(
submission.id = sub.id.as_str(),
"ignoring invalid submission trace carrier"
);
}
dispatch_span
}
/// Operation handlers
mod handlers {
use crate::codex::Session;
@@ -6627,9 +6690,17 @@ mod tests {
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelsResponse;
use codex_protocol::protocol::Submission;
use codex_protocol::protocol::W3cTraceContext;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::TraceId;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::SdkTracerProvider;
use std::path::Path;
use std::time::Duration;
use tokio::time::sleep;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::prelude::*;
use codex_protocol::mcp::CallToolResult as McpCallToolResult;
use pretty_assertions::assert_eq;
@@ -8105,6 +8176,12 @@ mod tests {
})
}
fn test_tracing_subscriber() -> impl tracing::Subscriber + Send + Sync {
let provider = SdkTracerProvider::builder().build();
let tracer = provider.tracer("codex-core-tests");
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer))
}
async fn build_test_config(codex_home: &Path) -> Config {
ConfigBuilder::default()
.codex_home(codex_home.to_path_buf())
@@ -8432,6 +8509,86 @@ mod tests {
(session, turn_context)
}
#[tokio::test]
async fn submit_with_id_captures_current_span_trace_context() {
let (session, _turn_context) = make_session_and_context().await;
let (tx_sub, rx_sub) = async_channel::bounded(1);
let (_tx_event, rx_event) = async_channel::unbounded();
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
let codex = Codex {
tx_sub,
rx_event,
agent_status,
session: Arc::new(session),
};
let subscriber = test_tracing_subscriber();
let _guard = tracing::subscriber::set_default(subscriber);
let request_parent = W3cTraceContext {
traceparent: Some("00-00000000000000000000000000000011-0000000000000022-01".into()),
tracestate: Some("vendor=value".into()),
};
let request_span = info_span!("app_server.request");
assert!(set_parent_from_w3c_trace_context(
&request_span,
&request_parent
));
let expected_trace = async {
let expected_trace =
current_span_w3c_trace_context().expect("current span should have trace context");
codex
.submit_with_id(Submission {
id: "sub-1".into(),
op: Op::Interrupt,
trace: None,
})
.await
.expect("submit should succeed");
expected_trace
}
.instrument(request_span)
.await;
let submitted = rx_sub.recv().await.expect("submission");
assert_eq!(submitted.trace, Some(expected_trace));
}
#[test]
fn submission_dispatch_span_prefers_submission_trace_context() {
let subscriber = test_tracing_subscriber();
let _guard = tracing::subscriber::set_default(subscriber);
let ambient_parent = W3cTraceContext {
traceparent: Some("00-00000000000000000000000000000033-0000000000000044-01".into()),
tracestate: None,
};
let ambient_span = info_span!("ambient");
assert!(set_parent_from_w3c_trace_context(
&ambient_span,
&ambient_parent
));
let submission_trace = W3cTraceContext {
traceparent: Some("00-00000000000000000000000000000055-0000000000000066-01".into()),
tracestate: Some("vendor=value".into()),
};
let dispatch_span = ambient_span.in_scope(|| {
submission_dispatch_span(&Submission {
id: "sub-1".into(),
op: Op::Interrupt,
trace: Some(submission_trace),
})
});
let trace_id = dispatch_span.context().span().span_context().trace_id();
assert_eq!(
trace_id,
TraceId::from_hex("00000000000000000000000000000055").expect("trace id")
);
}
pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
dynamic_tools: Vec<DynamicToolSpec>,
) -> (

View File

@@ -154,6 +154,7 @@ pub(crate) async fn run_codex_thread_one_shot(
.send(Submission {
id: "shutdown".to_string(),
op: Op::Shutdown {},
trace: None,
})
.await;
child_cancel.cancel();
@@ -298,11 +299,11 @@ async fn forward_ops(
cancel_token_ops: CancellationToken,
) {
loop {
let op: Op = match rx_ops.recv().or_cancel(&cancel_token_ops).await {
Ok(Ok(Submission { id: _, op })) => op,
let submission = match rx_ops.recv().or_cancel(&cancel_token_ops).await {
Ok(Ok(submission)) => submission,
Ok(Err(_)) | Err(_) => break,
};
let _ = codex.submit(op).await;
let _ = codex.submit_with_id(submission).await;
}
}
@@ -550,4 +551,47 @@ mod tests {
"expected Shutdown op after cancellation"
);
}
#[tokio::test]
async fn forward_ops_preserves_submission_trace_context() {
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
let (_tx_events, rx_events) = bounded(SUBMISSION_CHANNEL_CAPACITY);
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
let (session, _ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await;
let codex = Arc::new(Codex {
tx_sub,
rx_event: rx_events,
agent_status,
session,
});
let (tx_ops, rx_ops) = bounded(1);
let cancel = CancellationToken::new();
let forward = tokio::spawn(forward_ops(Arc::clone(&codex), rx_ops, cancel));
let submission = Submission {
id: "sub-1".to_string(),
op: Op::Interrupt,
trace: Some(codex_protocol::protocol::W3cTraceContext {
traceparent: Some(
"00-1234567890abcdef1234567890abcdef-1234567890abcdef-01".to_string(),
),
tracestate: Some("vendor=state".to_string()),
}),
};
tx_ops.send(submission.clone()).await.unwrap();
drop(tx_ops);
let forwarded = timeout(Duration::from_secs(1), rx_sub.recv())
.await
.expect("forward_ops hung")
.expect("forwarded submission missing");
assert_eq!(submission.id, forwarded.id);
assert_eq!(submission.op, forwarded.op);
assert_eq!(submission.trace, forwarded.trace);
timeout(Duration::from_secs(1), forward)
.await
.expect("forward_ops did not exit")
.expect("forward_ops join error");
}
}

View File

@@ -78,9 +78,6 @@ impl SessionTask for RegularTask {
let sess = session.clone_session();
let run_turn_span = trace_span!("run_turn");
sess.set_server_reasoning_included(false).await;
sess.services
.otel_manager
.apply_traceparent_parent(&run_turn_span);
let prewarmed_client_session = self.take_prewarmed_session().await;
run_turn(
sess,