Compare commits

...

2 Commits

Author SHA1 Message Date
Owen Lin
4fa0c979bd update 2026-03-12 15:55:46 -07:00
Owen Lin
b1e3b1d08d feat(app-server, core): add more spans 2026-03-12 15:28:35 -07:00
15 changed files with 522 additions and 75 deletions

View File

@@ -1998,6 +1998,12 @@ impl CodexMessageProcessor {
service_name,
request_trace,
)
.instrument(tracing::info_span!(
"app_server.thread_start.create_thread",
otel.name = "app_server.thread_start.create_thread",
thread_start.dynamic_tool_count = core_dynamic_tool_count,
thread_start.persist_extended_history = persist_extended_history,
))
.await
{
Ok(new_conv) => {
@@ -2007,7 +2013,13 @@ impl CodexMessageProcessor {
session_configured,
..
} = new_conv;
let config_snapshot = thread.config_snapshot().await;
let config_snapshot = thread
.config_snapshot()
.instrument(tracing::info_span!(
"app_server.thread_start.config_snapshot",
otel.name = "app_server.thread_start.config_snapshot",
))
.await;
let mut thread = build_thread_from_snapshot(
thread_id,
&config_snapshot,
@@ -2023,6 +2035,11 @@ impl CodexMessageProcessor {
experimental_raw_events,
ApiVersion::V2,
)
.instrument(tracing::info_span!(
"app_server.thread_start.attach_listener",
otel.name = "app_server.thread_start.attach_listener",
thread_start.experimental_raw_events = experimental_raw_events,
))
.await,
thread_id,
request_id.connection_id,
@@ -2032,12 +2049,20 @@ impl CodexMessageProcessor {
listener_task_context
.thread_watch_manager
.upsert_thread_silently(thread.clone())
.instrument(tracing::info_span!(
"app_server.thread_start.upsert_thread",
otel.name = "app_server.thread_start.upsert_thread",
))
.await;
thread.status = resolve_thread_status(
listener_task_context
.thread_watch_manager
.loaded_status_for_thread(&thread.id)
.instrument(tracing::info_span!(
"app_server.thread_start.resolve_status",
otel.name = "app_server.thread_start.resolve_status",
))
.await,
false,
);
@@ -2056,12 +2081,20 @@ impl CodexMessageProcessor {
listener_task_context
.outgoing
.send_response(request_id, response)
.instrument(tracing::info_span!(
"app_server.thread_start.send_response",
otel.name = "app_server.thread_start.send_response",
))
.await;
let notif = ThreadStartedNotification { thread };
listener_task_context
.outgoing
.send_server_notification(ServerNotification::ThreadStarted(notif))
.instrument(tracing::info_span!(
"app_server.thread_start.notify_started",
otel.name = "app_server.thread_start.notify_started",
))
.await;
}
Err(err) => {

View File

@@ -158,6 +158,11 @@ impl TracingHarness {
self.tracing.exporter.reset();
}
async fn shutdown(self) {
self.processor.shutdown_threads().await;
self.processor.drain_background_tasks().await;
}
async fn request<T>(&mut self, request: ClientRequest, trace: Option<W3cTraceContext>) -> T
where
T: serde::de::DeserializeOwned,
@@ -275,6 +280,18 @@ fn find_rpc_span_with_trace<'a>(
})
}
fn find_span_by_name<'a>(spans: &'a [SpanData], name: &str) -> &'a SpanData {
spans
.iter()
.find(|span| span.name.as_ref() == name)
.unwrap_or_else(|| {
panic!(
"missing span named {name}; exported spans:\n{}",
format_spans(spans)
)
})
}
fn find_span_by_name_with_trace<'a>(
spans: &'a [SpanData],
name: &str,
@@ -445,8 +462,20 @@ async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() ->
..
} = RemoteTrace::new("00000000000000000000000000000011", "0000000000000022");
let _: ThreadStartResponse = harness.start_thread(2, Some(remote_trace)).await;
drop(harness.processor);
let _: ThreadStartResponse = harness.start_thread(2, None).await;
let untraced_spans = wait_for_exported_spans(harness.tracing, |spans| {
spans
.iter()
.any(|span| span.name.as_ref() == "app_server.thread_start.derive_config")
})
.await;
let untraced_derive_config_span =
find_span_by_name(&untraced_spans, "app_server.thread_start.derive_config");
assert_ne!(untraced_derive_config_span.parent_span_id, SpanId::INVALID);
harness.reset_tracing();
let _: ThreadStartResponse = harness.start_thread(3, Some(remote_trace)).await;
let spans = wait_for_exported_spans(harness.tracing, |spans| {
spans.iter().any(|span| {
span.span_kind == SpanKind::Server
@@ -454,10 +483,11 @@ async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() ->
&& span.span_context.trace_id() == remote_trace_id
}) && spans
.iter()
.any(|span| span.name.as_ref() == "thread_spawn")
.any(|span| span.name.as_ref() == "app_server.thread_start.notify_started")
})
.await;
let derive_config_span = find_span_by_name(&spans, "app_server.thread_start.derive_config");
let server_request_span =
find_rpc_span_with_trace(&spans, SpanKind::Server, "thread/start", remote_trace_id);
let thread_spawn_span = find_span_by_name_with_trace(&spans, "thread_spawn", remote_trace_id);
@@ -465,9 +495,21 @@ async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() ->
assert_eq!(server_request_span.name.as_ref(), "thread/start");
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
assert_ne!(server_request_span.span_context.span_id(), SpanId::INVALID);
assert_eq!(
derive_config_span.span_context.trace_id(),
remote_trace_id,
"thread/start startup spans did not inherit the inbound request trace"
);
assert_span_descends_from(&spans, derive_config_span, server_request_span);
assert_span_descends_from(&spans, thread_spawn_span, server_request_span);
assert_span_descends_from(&spans, session_init_span, server_request_span);
let default_model_span = find_span_by_name(&spans, "get_default_model");
let session_init_rollout_span = find_span_by_name(&spans, "session_init.rollout");
assert_span_descends_from(&spans, default_model_span, server_request_span);
assert_span_descends_from(&spans, session_init_rollout_span, server_request_span);
harness.shutdown().await;
Ok(())
}
@@ -511,34 +553,27 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
)
.await;
let spans = wait_for_exported_spans(harness.tracing, |spans| {
spans
.iter()
.any(|span| span.name.as_ref() == "submission_dispatch")
&& spans
.iter()
.any(|span| span.name.as_ref() == "session_task.turn")
&& spans.iter().any(|span| span.name.as_ref() == "run_turn")
spans.iter().any(|span| {
span.span_kind == SpanKind::Server
&& span_attr(span, "rpc.method") == Some("turn/start")
&& span.span_context.trace_id() == remote_trace_id
}) && spans.iter().any(|span| {
span.name.as_ref() == "op.dispatch.user_input"
&& span.span_context.trace_id() == remote_trace_id
})
})
.await;
drop(harness.processor);
tokio::task::yield_now().await;
let server_request_span =
find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id);
let submission_dispatch_span =
find_span_by_name_with_trace(&spans, "submission_dispatch", remote_trace_id);
let session_task_turn_span =
find_span_by_name_with_trace(&spans, "session_task.turn", remote_trace_id);
let run_turn_span = find_span_by_name_with_trace(&spans, "run_turn", remote_trace_id);
let core_turn_span =
find_span_by_name_with_trace(&spans, "op.dispatch.user_input", remote_trace_id);
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
assert!(server_request_span.parent_span_is_remote);
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
assert_span_descends_from(&spans, submission_dispatch_span, server_request_span);
assert_span_descends_from(&spans, session_task_turn_span, server_request_span);
assert_span_descends_from(&spans, run_turn_span, server_request_span);
assert_span_descends_from(&spans, session_task_turn_span, submission_dispatch_span);
assert_span_descends_from(&spans, run_turn_span, session_task_turn_span);
assert_span_descends_from(&spans, core_turn_span, server_request_span);
harness.shutdown().await;
Ok(())
}

View File

@@ -5,6 +5,7 @@ use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::W3cTraceContext;
use futures::Stream;
use serde::Deserialize;
use serde::Serialize;
@@ -179,6 +180,7 @@ impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
text: request.text.clone(),
generate: None,
client_metadata: None,
trace: None,
}
}
}
@@ -207,6 +209,8 @@ pub struct ResponseCreateWsRequest {
pub generate: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub client_metadata: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub trace: Option<W3cTraceContext>,
}
#[derive(Debug, Serialize)]

View File

@@ -21,6 +21,7 @@ use http::Method;
use serde_json::Value;
use std::sync::Arc;
use std::sync::OnceLock;
use tracing::instrument;
pub struct ResponsesClient<T: HttpTransport, A: AuthProvider> {
session: EndpointSession<T, A>,
@@ -55,6 +56,16 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
}
}
#[instrument(
name = "responses.stream_request",
level = "info",
skip_all,
fields(
transport = "responses_http",
http.method = "POST",
api.path = "responses"
)
)]
pub async fn stream_request(
&self,
request: ResponsesApiRequest,
@@ -90,6 +101,17 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
"responses"
}
#[instrument(
name = "responses.stream",
level = "info",
skip_all,
fields(
transport = "responses_http",
http.method = "POST",
api.path = "responses",
turn.has_state = turn_state.is_some()
)
)]
pub async fn stream(
&self,
body: Value,

View File

@@ -33,9 +33,12 @@ use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::Error as WsError;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tracing::Instrument;
use tracing::Span;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::instrument;
use tracing::trace;
use tungstenite::extensions::ExtensionsConfig;
use tungstenite::extensions::compression::deflate::DeflateConfig;
@@ -200,6 +203,12 @@ impl ResponsesWebsocketConnection {
self.stream.lock().await.is_none()
}
#[instrument(
name = "responses_websocket.stream_request",
level = "info",
skip_all,
fields(transport = "responses_websocket", api.path = "responses")
)]
pub async fn stream_request(
&self,
request: ResponsesWsRequest,
@@ -216,48 +225,52 @@ impl ResponsesWebsocketConnection {
ApiError::Stream(format!("failed to encode websocket request: {err}"))
})?;
tokio::spawn(async move {
if let Some(model) = server_model {
let _ = tx_event.send(Ok(ResponseEvent::ServerModel(model))).await;
}
if let Some(etag) = models_etag {
let _ = tx_event.send(Ok(ResponseEvent::ModelsEtag(etag))).await;
}
if server_reasoning_included {
let _ = tx_event
.send(Ok(ResponseEvent::ServerReasoningIncluded(true)))
.await;
}
let mut guard = stream.lock().await;
let result = {
let Some(ws_stream) = guard.as_mut() else {
let current_span = Span::current();
tokio::spawn(
async move {
if let Some(model) = server_model {
let _ = tx_event.send(Ok(ResponseEvent::ServerModel(model))).await;
}
if let Some(etag) = models_etag {
let _ = tx_event.send(Ok(ResponseEvent::ModelsEtag(etag))).await;
}
if server_reasoning_included {
let _ = tx_event
.send(Err(ApiError::Stream(
"websocket connection is closed".to_string(),
)))
.send(Ok(ResponseEvent::ServerReasoningIncluded(true)))
.await;
return;
}
let mut guard = stream.lock().await;
let result = {
let Some(ws_stream) = guard.as_mut() else {
let _ = tx_event
.send(Err(ApiError::Stream(
"websocket connection is closed".to_string(),
)))
.await;
return;
};
run_websocket_response_stream(
ws_stream,
tx_event.clone(),
request_body,
idle_timeout,
telemetry,
)
.await
};
run_websocket_response_stream(
ws_stream,
tx_event.clone(),
request_body,
idle_timeout,
telemetry,
)
.await
};
if let Err(err) = result {
// A terminal stream error should reach the caller immediately. Waiting for a
// graceful close handshake here can stall indefinitely and mask the error.
let failed_stream = guard.take();
drop(guard);
drop(failed_stream);
let _ = tx_event.send(Err(err)).await;
if let Err(err) = result {
// A terminal stream error should reach the caller immediately. Waiting for a
// graceful close handshake here can stall indefinitely and mask the error.
let failed_stream = guard.take();
drop(guard);
drop(failed_stream);
let _ = tx_event.send(Err(err)).await;
}
}
});
.instrument(current_span),
);
Ok(ResponseStream { rx_event })
}
@@ -273,6 +286,12 @@ impl<A: AuthProvider> ResponsesWebsocketClient<A> {
Self { provider, auth }
}
#[instrument(
name = "responses_websocket.connect",
level = "info",
skip_all,
fields(transport = "responses_websocket", api.path = "responses")
)]
pub async fn connect(
&self,
extra_headers: HeaderMap,

View File

@@ -12,6 +12,7 @@ use http::HeaderMap;
use http::Method;
use serde_json::Value;
use std::sync::Arc;
use tracing::instrument;
pub(crate) struct EndpointSession<T: HttpTransport, A: AuthProvider> {
transport: T,
@@ -68,6 +69,12 @@ impl<T: HttpTransport, A: AuthProvider> EndpointSession<T, A> {
.await
}
#[instrument(
name = "endpoint_session.execute_with",
level = "info",
skip_all,
fields(http.method = %method, api.path = path)
)]
pub(crate) async fn execute_with<C>(
&self,
method: Method,
@@ -96,6 +103,12 @@ impl<T: HttpTransport, A: AuthProvider> EndpointSession<T, A> {
Ok(response)
}
#[instrument(
name = "endpoint_session.stream_with",
level = "info",
skip_all,
fields(http.method = %method, api.path = path)
)]
pub(crate) async fn stream_with<C>(
&self,
method: Method,

View File

@@ -1,12 +1,12 @@
use http::Error as HttpError;
use http::HeaderMap;
use http::HeaderName;
use http::HeaderValue;
use opentelemetry::global;
use opentelemetry::propagation::Injector;
use reqwest::IntoUrl;
use reqwest::Method;
use reqwest::Response;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
use serde::Serialize;
use std::fmt::Display;
use std::time::Duration;

View File

@@ -58,6 +58,7 @@ use codex_api::create_text_param_for_request;
use codex_api::error::ApiError;
use codex_api::requests::responses::Compression;
use codex_otel::SessionTelemetry;
use codex_otel::current_span_w3c_trace_context;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
@@ -80,6 +81,7 @@ use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio_tungstenite::tungstenite::Error;
use tokio_tungstenite::tungstenite::Message;
use tracing::instrument;
use tracing::trace;
use tracing::warn;
@@ -692,6 +694,18 @@ impl ModelClientSession {
Ok(())
}
/// Returns a websocket connection for this turn.
#[instrument(
name = "model_client.websocket_connection",
level = "info",
skip_all,
fields(
model = %self.client.state.provider.name,
wire_api = ?self.client.state.provider.wire_api,
transport = "responses_websocket",
api.path = "responses",
turn.has_metadata_header = turn_metadata_header.is_some()
)
)]
async fn websocket_connection(
&mut self,
session_telemetry: &SessionTelemetry,
@@ -749,6 +763,19 @@ impl ModelClientSession {
/// Handles SSE fixtures, reasoning summaries, verbosity, and the
/// `text` controls used for output schemas.
#[allow(clippy::too_many_arguments)]
#[instrument(
name = "model_client.stream_responses_api",
level = "info",
skip_all,
fields(
model = %model_info.slug,
wire_api = ?self.client.state.provider.wire_api,
transport = "responses_http",
http.method = "POST",
api.path = "responses",
turn.has_metadata_header = turn_metadata_header.is_some()
)
)]
async fn stream_responses_api(
&self,
prompt: &Prompt,
@@ -816,6 +843,19 @@ impl ModelClientSession {
/// Streams a turn via the Responses API over WebSocket transport.
#[allow(clippy::too_many_arguments)]
#[instrument(
name = "model_client.stream_responses_websocket",
level = "info",
skip_all,
fields(
model = %model_info.slug,
wire_api = ?self.client.state.provider.wire_api,
transport = "responses_websocket",
api.path = "responses",
turn.has_metadata_header = turn_metadata_header.is_some(),
websocket.warmup = warmup
)
)]
async fn stream_responses_websocket(
&mut self,
prompt: &Prompt,
@@ -847,6 +887,7 @@ impl ModelClientSession {
)?;
let mut ws_payload = ResponseCreateWsRequest {
client_metadata: build_ws_client_metadata(turn_metadata_header),
trace: current_span_w3c_trace_context(),
..ResponseCreateWsRequest::from(&request)
};
if warmup {

View File

@@ -583,7 +583,6 @@ impl Codex {
let session_source_clone = session_configuration.session_source.clone();
let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit);
let session_init_span = info_span!("session_init");
let session = Session::new(
session_configuration,
config.clone(),
@@ -600,7 +599,6 @@ impl Codex {
file_watcher,
agent_control,
)
.instrument(session_init_span)
.await
.map_err(|e| {
error!("Failed to create session: {e:#}");
@@ -1335,6 +1333,7 @@ impl Session {
}
}
#[instrument(name = "session_init", level = "info", skip_all)]
#[allow(clippy::too_many_arguments)]
async fn new(
mut session_configuration: SessionConfiguration,
@@ -1426,18 +1425,29 @@ impl Session {
.await?;
Ok((Some(rollout_recorder), state_db_ctx))
}
};
}
.instrument(info_span!(
"session_init.rollout",
otel.name = "session_init.rollout",
session_init.ephemeral = config.ephemeral,
));
let is_subagent = matches!(
session_configuration.session_source,
SessionSource::SubAgent(_)
);
let history_meta_fut = async {
if matches!(
session_configuration.session_source,
SessionSource::SubAgent(_)
) {
if is_subagent {
(0, 0)
} else {
crate::message_history::history_metadata(&config).await
}
};
}
.instrument(info_span!(
"session_init.history_metadata",
otel.name = "session_init.history_metadata",
session_init.is_subagent = is_subagent,
));
let auth_manager_clone = Arc::clone(&auth_manager);
let config_for_mcp = Arc::clone(&config);
let mcp_manager_for_mcp = Arc::clone(&mcp_manager);
@@ -1450,7 +1460,11 @@ impl Session {
)
.await;
(auth, mcp_servers, auth_statuses)
};
}
.instrument(info_span!(
"session_init.auth_mcp",
otel.name = "session_init.auth_mcp",
));
// Join all independent futures.
let (
@@ -1608,7 +1622,12 @@ impl Session {
tx
};
let thread_name =
match session_index::find_thread_name_by_id(&config.codex_home, &conversation_id).await
match session_index::find_thread_name_by_id(&config.codex_home, &conversation_id)
.instrument(info_span!(
"session_init.thread_name_lookup",
otel.name = "session_init.thread_name_lookup",
))
.await
{
Ok(name) => name,
Err(err) => {
@@ -1658,6 +1677,12 @@ impl Session {
managed_network_requirements_enabled,
network_proxy_audit_metadata,
)
.instrument(info_span!(
"session_init.network_proxy",
otel.name = "session_init.network_proxy",
session_init.managed_network_requirements_enabled =
managed_network_requirements_enabled,
))
.await?;
(Some(network_proxy), Some(session_network_proxy))
} else {
@@ -1807,6 +1832,8 @@ impl Session {
.map(|(name, _)| name.clone())
.collect();
required_mcp_servers.sort();
let enabled_mcp_server_count = mcp_servers.len();
let required_mcp_server_count = required_mcp_servers.len();
let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref());
{
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
@@ -1824,6 +1851,12 @@ impl Session {
codex_apps_tools_cache_key(auth),
tool_plugin_provenance,
)
.instrument(info_span!(
"session_init.mcp_manager_init",
otel.name = "session_init.mcp_manager_init",
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
{
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
@@ -1843,6 +1876,11 @@ impl Session {
.read()
.await
.required_startup_failures(&required_mcp_servers)
.instrument(info_span!(
"session_init.required_mcp_wait",
otel.name = "session_init.required_mcp_wait",
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
if !failures.is_empty() {
let details = failures
@@ -4259,11 +4297,23 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
}
fn submission_dispatch_span(sub: &Submission) -> tracing::Span {
let op_name = sub.op.kind();
let span_name = format!("op.dispatch.{op_name}");
let dispatch_span = match &sub.op {
Op::RealtimeConversationAudio(_) => {
debug_span!("submission_dispatch", submission.id = sub.id.as_str())
debug_span!(
"submission_dispatch",
otel.name = span_name.as_str(),
submission.id = sub.id.as_str(),
codex.op = op_name
)
}
_ => info_span!("submission_dispatch", submission.id = sub.id.as_str()),
_ => info_span!(
"submission_dispatch",
otel.name = span_name.as_str(),
submission.id = sub.id.as_str(),
codex.op = op_name
),
};
if let Some(trace) = sub.trace.as_ref()
&& !set_parent_from_w3c_trace_context(&dispatch_span, trace)

View File

@@ -2495,6 +2495,34 @@ fn submission_dispatch_span_uses_debug_for_realtime_audio() {
);
}
#[test]
fn op_kind_distinguishes_turn_ops() {
assert_eq!(
Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
windows_sandbox_level: None,
model: None,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
}
.kind(),
"override_turn_context"
);
assert_eq!(
Op::UserInput {
items: vec![],
final_output_json_schema: None,
}
.kind(),
"user_input"
);
}
#[tokio::test]
async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
struct TraceCaptureTask {

View File

@@ -28,6 +28,7 @@ use codex_protocol::protocol::SandboxPolicy;
use thiserror::Error;
use tokio::fs;
use tokio::task::spawn_blocking;
use tracing::instrument;
use crate::bash::parse_shell_lc_plain_commands;
use crate::bash::parse_shell_lc_single_command_prefix;
@@ -187,6 +188,7 @@ impl ExecPolicyManager {
}
}
#[instrument(level = "info", skip_all)]
pub(crate) async fn load(config_stack: &ConfigLayerStack) -> Result<Self, ExecPolicyError> {
let (policy, warning) = load_exec_policy_with_warning(config_stack).await?;
if let Some(err) = warning.as_ref() {

View File

@@ -26,6 +26,7 @@ use tokio::sync::TryLockError;
use tokio::time::timeout;
use tracing::error;
use tracing::info;
use tracing::instrument;
const MODEL_CACHE_FILE: &str = "models_cache.json";
const DEFAULT_MODEL_CACHE_TTL: Duration = Duration::from_secs(300);
@@ -102,6 +103,7 @@ impl ModelsManager {
/// List all available models, refreshing according to the specified strategy.
///
/// Returns model presets sorted by priority and filtered by auth mode and visibility.
#[instrument(level = "info", skip(self))]
pub async fn list_models(&self, refresh_strategy: RefreshStrategy) -> Vec<ModelPreset> {
if let Err(err) = self.refresh_available_models(refresh_strategy).await {
error!("failed to refresh available models: {err}");
@@ -137,6 +139,7 @@ impl ModelsManager {
///
/// If `model` is provided, returns it directly. Otherwise selects the default based on
/// auth mode and available models.
#[instrument(level = "info", skip(self))]
pub async fn get_default_model(
&self,
model: &Option<String>,
@@ -160,6 +163,7 @@ impl ModelsManager {
// todo(aibrahim): look if we can tighten it to pub(crate)
/// Look up model metadata, applying remote overrides and config adjustments.
#[instrument(level = "info", skip(self, config))]
pub async fn get_model_info(&self, model: &str, config: &Config) -> ModelInfo {
let remote_models = self.get_remote_models().await;
Self::construct_model_info_from_candidates(model, &remote_models, config)

View File

@@ -32,6 +32,7 @@ use std::path::PathBuf;
use tokio::io::AsyncReadExt;
use toml::Value as TomlValue;
use tracing::error;
use tracing::instrument;
pub(crate) const HIERARCHICAL_AGENTS_MESSAGE: &str =
include_str!("../hierarchical_agents_message.md");
@@ -80,6 +81,7 @@ fn render_js_repl_instructions(config: &Config) -> Option<String> {
/// Combines `Config::instructions` and `AGENTS.md` (if present) into a single
/// string of instructions.
#[instrument(level = "info", skip_all)]
pub(crate) async fn get_user_instructions(
config: &Config,
skills: Option<&[SkillMetadata]>,

View File

@@ -11,6 +11,7 @@ use codex_core::features::Feature;
use codex_core::ws_version_from_features;
use codex_otel::SessionTelemetry;
use codex_otel::TelemetryAuthMode;
use codex_otel::current_span_w3c_trace_context;
use codex_otel::metrics::MetricsClient;
use codex_otel::metrics::MetricsConfig;
use codex_protocol::ThreadId;
@@ -25,6 +26,7 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::user_input::UserInput;
use core_test_support::load_default_config_for_test;
use core_test_support::responses::WebSocketConnectionConfig;
@@ -38,12 +40,20 @@ use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use futures::StreamExt;
use opentelemetry::global;
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::SdkTracerProvider;
use pretty_assertions::assert_eq;
use serde_json::json;
use serial_test::serial;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use tracing::Instrument;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_test::traced_test;
const MODEL: &str = "gpt-5.2-codex";
@@ -51,6 +61,31 @@ const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
const WS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
const X_CLIENT_REQUEST_ID_HEADER: &str = "x-client-request-id";
fn trace_id(traceparent: &str) -> &str {
traceparent
.split('-')
.nth(1)
.expect("traceparent missing trace id")
}
fn assert_request_trace_matches(body: &serde_json::Value, expected_trace: &W3cTraceContext) {
let trace = body["trace"].as_object().expect("missing trace payload");
let actual_traceparent = trace
.get("traceparent")
.and_then(serde_json::Value::as_str)
.expect("missing traceparent");
let expected_traceparent = expected_trace
.traceparent
.as_deref()
.expect("missing expected traceparent");
assert_eq!(trace_id(actual_traceparent), trace_id(expected_traceparent));
assert_eq!(
trace.get("tracestate").and_then(serde_json::Value::as_str),
expected_trace.tracestate.as_deref()
);
}
struct WebsocketTestHarness {
_codex_home: TempDir,
client: ModelClient,
@@ -98,6 +133,124 @@ async fn responses_websocket_streams_request() {
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn responses_websocket_reuses_connection_with_per_turn_trace_payloads() {
skip_if_no_network!();
global::set_text_map_propagator(TraceContextPropagator::new());
let provider = SdkTracerProvider::builder().build();
let tracer = provider.tracer("client-websocket-test");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
let _guard = subscriber.set_default();
let server = start_websocket_server(vec![vec![
vec![ev_response_created("resp-1"), ev_completed("resp-1")],
vec![ev_response_created("resp-2"), ev_completed("resp-2")],
]])
.await;
let harness = websocket_harness(&server).await;
let prompt_one = prompt_with_input(vec![message_item("hello")]);
let prompt_two = prompt_with_input(vec![message_item("again")]);
let first_trace = {
let mut client_session = harness.client.new_session();
async {
let expected_trace =
current_span_w3c_trace_context().expect("current span should have trace context");
stream_until_complete(&mut client_session, &harness, &prompt_one).await;
expected_trace
}
.instrument(tracing::info_span!("client.websocket.turn_one"))
.await
};
let second_trace = {
let mut client_session = harness.client.new_session();
async {
let expected_trace =
current_span_w3c_trace_context().expect("current span should have trace context");
stream_until_complete(&mut client_session, &harness, &prompt_two).await;
expected_trace
}
.instrument(tracing::info_span!("client.websocket.turn_two"))
.await
};
assert_eq!(server.handshakes().len(), 1);
let connection = server.single_connection();
assert_eq!(connection.len(), 2);
let first_request = connection
.first()
.expect("missing first request")
.body_json();
let second_request = connection
.get(1)
.expect("missing second request")
.body_json();
assert_request_trace_matches(&first_request, &first_trace);
assert_request_trace_matches(&second_request, &second_trace);
let first_traceparent = first_request["trace"]["traceparent"]
.as_str()
.expect("missing first traceparent");
let second_traceparent = second_request["trace"]["traceparent"]
.as_str()
.expect("missing second traceparent");
assert_ne!(trace_id(first_traceparent), trace_id(second_traceparent));
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn responses_websocket_preconnect_does_not_replace_turn_trace_payload() {
skip_if_no_network!();
global::set_text_map_propagator(TraceContextPropagator::new());
let provider = SdkTracerProvider::builder().build();
let tracer = provider.tracer("client-websocket-test");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
let _guard = subscriber.set_default();
let server = start_websocket_server(vec![vec![vec![
ev_response_created("resp-1"),
ev_completed("resp-1"),
]]])
.await;
let harness = websocket_harness(&server).await;
let mut client_session = harness.client.new_session();
client_session
.preconnect_websocket(&harness.session_telemetry, &harness.model_info)
.await
.expect("websocket preconnect failed");
let prompt = prompt_with_input(vec![message_item("hello")]);
let expected_trace = async {
let expected_trace =
current_span_w3c_trace_context().expect("current span should have trace context");
stream_until_complete(&mut client_session, &harness, &prompt).await;
expected_trace
}
.instrument(tracing::info_span!("client.websocket.request"))
.await;
assert_eq!(server.handshakes().len(), 1);
let connection = server.single_connection();
assert_eq!(connection.len(), 1);
let request = connection.first().expect("missing request").body_json();
assert_request_trace_matches(&request, &expected_trace);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_preconnect_reuses_connection() {
skip_if_no_network!();

View File

@@ -478,6 +478,47 @@ pub enum Op {
ListModels,
}
impl Op {
pub fn kind(&self) -> &'static str {
match self {
Self::Interrupt => "interrupt",
Self::CleanBackgroundTerminals => "clean_background_terminals",
Self::RealtimeConversationStart(_) => "realtime_conversation_start",
Self::RealtimeConversationAudio(_) => "realtime_conversation_audio",
Self::RealtimeConversationText(_) => "realtime_conversation_text",
Self::RealtimeConversationClose => "realtime_conversation_close",
Self::UserInput { .. } => "user_input",
Self::UserTurn { .. } => "user_turn",
Self::OverrideTurnContext { .. } => "override_turn_context",
Self::ExecApproval { .. } => "exec_approval",
Self::PatchApproval { .. } => "patch_approval",
Self::ResolveElicitation { .. } => "resolve_elicitation",
Self::UserInputAnswer { .. } => "user_input_answer",
Self::RequestPermissionsResponse { .. } => "request_permissions_response",
Self::DynamicToolResponse { .. } => "dynamic_tool_response",
Self::AddToHistory { .. } => "add_to_history",
Self::GetHistoryEntryRequest { .. } => "get_history_entry_request",
Self::ListMcpTools => "list_mcp_tools",
Self::RefreshMcpServers { .. } => "refresh_mcp_servers",
Self::ReloadUserConfig => "reload_user_config",
Self::ListCustomPrompts => "list_custom_prompts",
Self::ListSkills { .. } => "list_skills",
Self::ListRemoteSkills { .. } => "list_remote_skills",
Self::DownloadRemoteSkill { .. } => "download_remote_skill",
Self::Compact => "compact",
Self::DropMemories => "drop_memories",
Self::UpdateMemories => "update_memories",
Self::SetThreadName { .. } => "set_thread_name",
Self::Undo => "undo",
Self::ThreadRollback { .. } => "thread_rollback",
Self::Review { .. } => "review",
Self::Shutdown => "shutdown",
Self::RunUserShellCommand { .. } => "run_user_shell_command",
Self::ListModels => "list_models",
}
}
}
/// Determines the conditions under which the user is consulted to approve
/// running the command proposed by Codex.
#[derive(