mirror of
https://github.com/openai/codex.git
synced 2026-03-13 10:13:49 +00:00
Compare commits
2 Commits
message_ro
...
codex/webs
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4fa0c979bd | ||
|
|
b1e3b1d08d |
@@ -1998,6 +1998,12 @@ impl CodexMessageProcessor {
|
||||
service_name,
|
||||
request_trace,
|
||||
)
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.create_thread",
|
||||
otel.name = "app_server.thread_start.create_thread",
|
||||
thread_start.dynamic_tool_count = core_dynamic_tool_count,
|
||||
thread_start.persist_extended_history = persist_extended_history,
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(new_conv) => {
|
||||
@@ -2007,7 +2013,13 @@ impl CodexMessageProcessor {
|
||||
session_configured,
|
||||
..
|
||||
} = new_conv;
|
||||
let config_snapshot = thread.config_snapshot().await;
|
||||
let config_snapshot = thread
|
||||
.config_snapshot()
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.config_snapshot",
|
||||
otel.name = "app_server.thread_start.config_snapshot",
|
||||
))
|
||||
.await;
|
||||
let mut thread = build_thread_from_snapshot(
|
||||
thread_id,
|
||||
&config_snapshot,
|
||||
@@ -2023,6 +2035,11 @@ impl CodexMessageProcessor {
|
||||
experimental_raw_events,
|
||||
ApiVersion::V2,
|
||||
)
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.attach_listener",
|
||||
otel.name = "app_server.thread_start.attach_listener",
|
||||
thread_start.experimental_raw_events = experimental_raw_events,
|
||||
))
|
||||
.await,
|
||||
thread_id,
|
||||
request_id.connection_id,
|
||||
@@ -2032,12 +2049,20 @@ impl CodexMessageProcessor {
|
||||
listener_task_context
|
||||
.thread_watch_manager
|
||||
.upsert_thread_silently(thread.clone())
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.upsert_thread",
|
||||
otel.name = "app_server.thread_start.upsert_thread",
|
||||
))
|
||||
.await;
|
||||
|
||||
thread.status = resolve_thread_status(
|
||||
listener_task_context
|
||||
.thread_watch_manager
|
||||
.loaded_status_for_thread(&thread.id)
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.resolve_status",
|
||||
otel.name = "app_server.thread_start.resolve_status",
|
||||
))
|
||||
.await,
|
||||
false,
|
||||
);
|
||||
@@ -2056,12 +2081,20 @@ impl CodexMessageProcessor {
|
||||
listener_task_context
|
||||
.outgoing
|
||||
.send_response(request_id, response)
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.send_response",
|
||||
otel.name = "app_server.thread_start.send_response",
|
||||
))
|
||||
.await;
|
||||
|
||||
let notif = ThreadStartedNotification { thread };
|
||||
listener_task_context
|
||||
.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadStarted(notif))
|
||||
.instrument(tracing::info_span!(
|
||||
"app_server.thread_start.notify_started",
|
||||
otel.name = "app_server.thread_start.notify_started",
|
||||
))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
@@ -158,6 +158,11 @@ impl TracingHarness {
|
||||
self.tracing.exporter.reset();
|
||||
}
|
||||
|
||||
async fn shutdown(self) {
|
||||
self.processor.shutdown_threads().await;
|
||||
self.processor.drain_background_tasks().await;
|
||||
}
|
||||
|
||||
async fn request<T>(&mut self, request: ClientRequest, trace: Option<W3cTraceContext>) -> T
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
@@ -275,6 +280,18 @@ fn find_rpc_span_with_trace<'a>(
|
||||
})
|
||||
}
|
||||
|
||||
fn find_span_by_name<'a>(spans: &'a [SpanData], name: &str) -> &'a SpanData {
|
||||
spans
|
||||
.iter()
|
||||
.find(|span| span.name.as_ref() == name)
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"missing span named {name}; exported spans:\n{}",
|
||||
format_spans(spans)
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn find_span_by_name_with_trace<'a>(
|
||||
spans: &'a [SpanData],
|
||||
name: &str,
|
||||
@@ -445,8 +462,20 @@ async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() ->
|
||||
..
|
||||
} = RemoteTrace::new("00000000000000000000000000000011", "0000000000000022");
|
||||
|
||||
let _: ThreadStartResponse = harness.start_thread(2, Some(remote_trace)).await;
|
||||
drop(harness.processor);
|
||||
let _: ThreadStartResponse = harness.start_thread(2, None).await;
|
||||
let untraced_spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans
|
||||
.iter()
|
||||
.any(|span| span.name.as_ref() == "app_server.thread_start.derive_config")
|
||||
})
|
||||
.await;
|
||||
let untraced_derive_config_span =
|
||||
find_span_by_name(&untraced_spans, "app_server.thread_start.derive_config");
|
||||
assert_ne!(untraced_derive_config_span.parent_span_id, SpanId::INVALID);
|
||||
|
||||
harness.reset_tracing();
|
||||
|
||||
let _: ThreadStartResponse = harness.start_thread(3, Some(remote_trace)).await;
|
||||
let spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
@@ -454,10 +483,11 @@ async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() ->
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans
|
||||
.iter()
|
||||
.any(|span| span.name.as_ref() == "thread_spawn")
|
||||
.any(|span| span.name.as_ref() == "app_server.thread_start.notify_started")
|
||||
})
|
||||
.await;
|
||||
|
||||
let derive_config_span = find_span_by_name(&spans, "app_server.thread_start.derive_config");
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "thread/start", remote_trace_id);
|
||||
let thread_spawn_span = find_span_by_name_with_trace(&spans, "thread_spawn", remote_trace_id);
|
||||
@@ -465,9 +495,21 @@ async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() ->
|
||||
assert_eq!(server_request_span.name.as_ref(), "thread/start");
|
||||
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
|
||||
assert_ne!(server_request_span.span_context.span_id(), SpanId::INVALID);
|
||||
assert_eq!(
|
||||
derive_config_span.span_context.trace_id(),
|
||||
remote_trace_id,
|
||||
"thread/start startup spans did not inherit the inbound request trace"
|
||||
);
|
||||
assert_span_descends_from(&spans, derive_config_span, server_request_span);
|
||||
assert_span_descends_from(&spans, thread_spawn_span, server_request_span);
|
||||
assert_span_descends_from(&spans, session_init_span, server_request_span);
|
||||
|
||||
let default_model_span = find_span_by_name(&spans, "get_default_model");
|
||||
let session_init_rollout_span = find_span_by_name(&spans, "session_init.rollout");
|
||||
assert_span_descends_from(&spans, default_model_span, server_request_span);
|
||||
assert_span_descends_from(&spans, session_init_rollout_span, server_request_span);
|
||||
harness.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -511,34 +553,27 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
)
|
||||
.await;
|
||||
let spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans
|
||||
.iter()
|
||||
.any(|span| span.name.as_ref() == "submission_dispatch")
|
||||
&& spans
|
||||
.iter()
|
||||
.any(|span| span.name.as_ref() == "session_task.turn")
|
||||
&& spans.iter().any(|span| span.name.as_ref() == "run_turn")
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("turn/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span.name.as_ref() == "op.dispatch.user_input"
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
})
|
||||
})
|
||||
.await;
|
||||
drop(harness.processor);
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id);
|
||||
let submission_dispatch_span =
|
||||
find_span_by_name_with_trace(&spans, "submission_dispatch", remote_trace_id);
|
||||
let session_task_turn_span =
|
||||
find_span_by_name_with_trace(&spans, "session_task.turn", remote_trace_id);
|
||||
let run_turn_span = find_span_by_name_with_trace(&spans, "run_turn", remote_trace_id);
|
||||
let core_turn_span =
|
||||
find_span_by_name_with_trace(&spans, "op.dispatch.user_input", remote_trace_id);
|
||||
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
assert!(server_request_span.parent_span_is_remote);
|
||||
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
|
||||
assert_span_descends_from(&spans, submission_dispatch_span, server_request_span);
|
||||
assert_span_descends_from(&spans, session_task_turn_span, server_request_span);
|
||||
assert_span_descends_from(&spans, run_turn_span, server_request_span);
|
||||
assert_span_descends_from(&spans, session_task_turn_span, submission_dispatch_span);
|
||||
assert_span_descends_from(&spans, run_turn_span, session_task_turn_span);
|
||||
assert_span_descends_from(&spans, core_turn_span, server_request_span);
|
||||
harness.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use futures::Stream;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -179,6 +180,7 @@ impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
|
||||
text: request.text.clone(),
|
||||
generate: None,
|
||||
client_metadata: None,
|
||||
trace: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -207,6 +209,8 @@ pub struct ResponseCreateWsRequest {
|
||||
pub generate: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub client_metadata: Option<HashMap<String, String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub trace: Option<W3cTraceContext>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
|
||||
@@ -21,6 +21,7 @@ use http::Method;
|
||||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
use tracing::instrument;
|
||||
|
||||
pub struct ResponsesClient<T: HttpTransport, A: AuthProvider> {
|
||||
session: EndpointSession<T, A>,
|
||||
@@ -55,6 +56,16 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "responses.stream_request",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(
|
||||
transport = "responses_http",
|
||||
http.method = "POST",
|
||||
api.path = "responses"
|
||||
)
|
||||
)]
|
||||
pub async fn stream_request(
|
||||
&self,
|
||||
request: ResponsesApiRequest,
|
||||
@@ -90,6 +101,17 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
||||
"responses"
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "responses.stream",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(
|
||||
transport = "responses_http",
|
||||
http.method = "POST",
|
||||
api.path = "responses",
|
||||
turn.has_state = turn_state.is_some()
|
||||
)
|
||||
)]
|
||||
pub async fn stream(
|
||||
&self,
|
||||
body: Value,
|
||||
|
||||
@@ -33,9 +33,12 @@ use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::tungstenite::Error as WsError;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
|
||||
use tracing::Instrument;
|
||||
use tracing::Span;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::instrument;
|
||||
use tracing::trace;
|
||||
use tungstenite::extensions::ExtensionsConfig;
|
||||
use tungstenite::extensions::compression::deflate::DeflateConfig;
|
||||
@@ -200,6 +203,12 @@ impl ResponsesWebsocketConnection {
|
||||
self.stream.lock().await.is_none()
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "responses_websocket.stream_request",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(transport = "responses_websocket", api.path = "responses")
|
||||
)]
|
||||
pub async fn stream_request(
|
||||
&self,
|
||||
request: ResponsesWsRequest,
|
||||
@@ -216,48 +225,52 @@ impl ResponsesWebsocketConnection {
|
||||
ApiError::Stream(format!("failed to encode websocket request: {err}"))
|
||||
})?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Some(model) = server_model {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::ServerModel(model))).await;
|
||||
}
|
||||
if let Some(etag) = models_etag {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::ModelsEtag(etag))).await;
|
||||
}
|
||||
if server_reasoning_included {
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::ServerReasoningIncluded(true)))
|
||||
.await;
|
||||
}
|
||||
let mut guard = stream.lock().await;
|
||||
let result = {
|
||||
let Some(ws_stream) = guard.as_mut() else {
|
||||
let current_span = Span::current();
|
||||
tokio::spawn(
|
||||
async move {
|
||||
if let Some(model) = server_model {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::ServerModel(model))).await;
|
||||
}
|
||||
if let Some(etag) = models_etag {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::ModelsEtag(etag))).await;
|
||||
}
|
||||
if server_reasoning_included {
|
||||
let _ = tx_event
|
||||
.send(Err(ApiError::Stream(
|
||||
"websocket connection is closed".to_string(),
|
||||
)))
|
||||
.send(Ok(ResponseEvent::ServerReasoningIncluded(true)))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
let mut guard = stream.lock().await;
|
||||
let result = {
|
||||
let Some(ws_stream) = guard.as_mut() else {
|
||||
let _ = tx_event
|
||||
.send(Err(ApiError::Stream(
|
||||
"websocket connection is closed".to_string(),
|
||||
)))
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
run_websocket_response_stream(
|
||||
ws_stream,
|
||||
tx_event.clone(),
|
||||
request_body,
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
run_websocket_response_stream(
|
||||
ws_stream,
|
||||
tx_event.clone(),
|
||||
request_body,
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
if let Err(err) = result {
|
||||
// A terminal stream error should reach the caller immediately. Waiting for a
|
||||
// graceful close handshake here can stall indefinitely and mask the error.
|
||||
let failed_stream = guard.take();
|
||||
drop(guard);
|
||||
drop(failed_stream);
|
||||
let _ = tx_event.send(Err(err)).await;
|
||||
if let Err(err) = result {
|
||||
// A terminal stream error should reach the caller immediately. Waiting for a
|
||||
// graceful close handshake here can stall indefinitely and mask the error.
|
||||
let failed_stream = guard.take();
|
||||
drop(guard);
|
||||
drop(failed_stream);
|
||||
let _ = tx_event.send(Err(err)).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
.instrument(current_span),
|
||||
);
|
||||
|
||||
Ok(ResponseStream { rx_event })
|
||||
}
|
||||
@@ -273,6 +286,12 @@ impl<A: AuthProvider> ResponsesWebsocketClient<A> {
|
||||
Self { provider, auth }
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "responses_websocket.connect",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(transport = "responses_websocket", api.path = "responses")
|
||||
)]
|
||||
pub async fn connect(
|
||||
&self,
|
||||
extra_headers: HeaderMap,
|
||||
|
||||
@@ -12,6 +12,7 @@ use http::HeaderMap;
|
||||
use http::Method;
|
||||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use tracing::instrument;
|
||||
|
||||
pub(crate) struct EndpointSession<T: HttpTransport, A: AuthProvider> {
|
||||
transport: T,
|
||||
@@ -68,6 +69,12 @@ impl<T: HttpTransport, A: AuthProvider> EndpointSession<T, A> {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "endpoint_session.execute_with",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(http.method = %method, api.path = path)
|
||||
)]
|
||||
pub(crate) async fn execute_with<C>(
|
||||
&self,
|
||||
method: Method,
|
||||
@@ -96,6 +103,12 @@ impl<T: HttpTransport, A: AuthProvider> EndpointSession<T, A> {
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "endpoint_session.stream_with",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(http.method = %method, api.path = path)
|
||||
)]
|
||||
pub(crate) async fn stream_with<C>(
|
||||
&self,
|
||||
method: Method,
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
use http::Error as HttpError;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderName;
|
||||
use http::HeaderValue;
|
||||
use opentelemetry::global;
|
||||
use opentelemetry::propagation::Injector;
|
||||
use reqwest::IntoUrl;
|
||||
use reqwest::Method;
|
||||
use reqwest::Response;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderName;
|
||||
use reqwest::header::HeaderValue;
|
||||
use serde::Serialize;
|
||||
use std::fmt::Display;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -58,6 +58,7 @@ use codex_api::create_text_param_for_request;
|
||||
use codex_api::error::ApiError;
|
||||
use codex_api::requests::responses::Compression;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_otel::current_span_w3c_trace_context;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
@@ -80,6 +81,7 @@ use tokio::sync::oneshot;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio_tungstenite::tungstenite::Error;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::instrument;
|
||||
use tracing::trace;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -692,6 +694,18 @@ impl ModelClientSession {
|
||||
Ok(())
|
||||
}
|
||||
/// Returns a websocket connection for this turn.
|
||||
#[instrument(
|
||||
name = "model_client.websocket_connection",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(
|
||||
model = %self.client.state.provider.name,
|
||||
wire_api = ?self.client.state.provider.wire_api,
|
||||
transport = "responses_websocket",
|
||||
api.path = "responses",
|
||||
turn.has_metadata_header = turn_metadata_header.is_some()
|
||||
)
|
||||
)]
|
||||
async fn websocket_connection(
|
||||
&mut self,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
@@ -749,6 +763,19 @@ impl ModelClientSession {
|
||||
/// Handles SSE fixtures, reasoning summaries, verbosity, and the
|
||||
/// `text` controls used for output schemas.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(
|
||||
name = "model_client.stream_responses_api",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(
|
||||
model = %model_info.slug,
|
||||
wire_api = ?self.client.state.provider.wire_api,
|
||||
transport = "responses_http",
|
||||
http.method = "POST",
|
||||
api.path = "responses",
|
||||
turn.has_metadata_header = turn_metadata_header.is_some()
|
||||
)
|
||||
)]
|
||||
async fn stream_responses_api(
|
||||
&self,
|
||||
prompt: &Prompt,
|
||||
@@ -816,6 +843,19 @@ impl ModelClientSession {
|
||||
|
||||
/// Streams a turn via the Responses API over WebSocket transport.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(
|
||||
name = "model_client.stream_responses_websocket",
|
||||
level = "info",
|
||||
skip_all,
|
||||
fields(
|
||||
model = %model_info.slug,
|
||||
wire_api = ?self.client.state.provider.wire_api,
|
||||
transport = "responses_websocket",
|
||||
api.path = "responses",
|
||||
turn.has_metadata_header = turn_metadata_header.is_some(),
|
||||
websocket.warmup = warmup
|
||||
)
|
||||
)]
|
||||
async fn stream_responses_websocket(
|
||||
&mut self,
|
||||
prompt: &Prompt,
|
||||
@@ -847,6 +887,7 @@ impl ModelClientSession {
|
||||
)?;
|
||||
let mut ws_payload = ResponseCreateWsRequest {
|
||||
client_metadata: build_ws_client_metadata(turn_metadata_header),
|
||||
trace: current_span_w3c_trace_context(),
|
||||
..ResponseCreateWsRequest::from(&request)
|
||||
};
|
||||
if warmup {
|
||||
|
||||
@@ -583,7 +583,6 @@ impl Codex {
|
||||
let session_source_clone = session_configuration.session_source.clone();
|
||||
let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit);
|
||||
|
||||
let session_init_span = info_span!("session_init");
|
||||
let session = Session::new(
|
||||
session_configuration,
|
||||
config.clone(),
|
||||
@@ -600,7 +599,6 @@ impl Codex {
|
||||
file_watcher,
|
||||
agent_control,
|
||||
)
|
||||
.instrument(session_init_span)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("Failed to create session: {e:#}");
|
||||
@@ -1335,6 +1333,7 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(name = "session_init", level = "info", skip_all)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn new(
|
||||
mut session_configuration: SessionConfiguration,
|
||||
@@ -1426,18 +1425,29 @@ impl Session {
|
||||
.await?;
|
||||
Ok((Some(rollout_recorder), state_db_ctx))
|
||||
}
|
||||
};
|
||||
}
|
||||
.instrument(info_span!(
|
||||
"session_init.rollout",
|
||||
otel.name = "session_init.rollout",
|
||||
session_init.ephemeral = config.ephemeral,
|
||||
));
|
||||
|
||||
let is_subagent = matches!(
|
||||
session_configuration.session_source,
|
||||
SessionSource::SubAgent(_)
|
||||
);
|
||||
let history_meta_fut = async {
|
||||
if matches!(
|
||||
session_configuration.session_source,
|
||||
SessionSource::SubAgent(_)
|
||||
) {
|
||||
if is_subagent {
|
||||
(0, 0)
|
||||
} else {
|
||||
crate::message_history::history_metadata(&config).await
|
||||
}
|
||||
};
|
||||
}
|
||||
.instrument(info_span!(
|
||||
"session_init.history_metadata",
|
||||
otel.name = "session_init.history_metadata",
|
||||
session_init.is_subagent = is_subagent,
|
||||
));
|
||||
let auth_manager_clone = Arc::clone(&auth_manager);
|
||||
let config_for_mcp = Arc::clone(&config);
|
||||
let mcp_manager_for_mcp = Arc::clone(&mcp_manager);
|
||||
@@ -1450,7 +1460,11 @@ impl Session {
|
||||
)
|
||||
.await;
|
||||
(auth, mcp_servers, auth_statuses)
|
||||
};
|
||||
}
|
||||
.instrument(info_span!(
|
||||
"session_init.auth_mcp",
|
||||
otel.name = "session_init.auth_mcp",
|
||||
));
|
||||
|
||||
// Join all independent futures.
|
||||
let (
|
||||
@@ -1608,7 +1622,12 @@ impl Session {
|
||||
tx
|
||||
};
|
||||
let thread_name =
|
||||
match session_index::find_thread_name_by_id(&config.codex_home, &conversation_id).await
|
||||
match session_index::find_thread_name_by_id(&config.codex_home, &conversation_id)
|
||||
.instrument(info_span!(
|
||||
"session_init.thread_name_lookup",
|
||||
otel.name = "session_init.thread_name_lookup",
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(name) => name,
|
||||
Err(err) => {
|
||||
@@ -1658,6 +1677,12 @@ impl Session {
|
||||
managed_network_requirements_enabled,
|
||||
network_proxy_audit_metadata,
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"session_init.network_proxy",
|
||||
otel.name = "session_init.network_proxy",
|
||||
session_init.managed_network_requirements_enabled =
|
||||
managed_network_requirements_enabled,
|
||||
))
|
||||
.await?;
|
||||
(Some(network_proxy), Some(session_network_proxy))
|
||||
} else {
|
||||
@@ -1807,6 +1832,8 @@ impl Session {
|
||||
.map(|(name, _)| name.clone())
|
||||
.collect();
|
||||
required_mcp_servers.sort();
|
||||
let enabled_mcp_server_count = mcp_servers.len();
|
||||
let required_mcp_server_count = required_mcp_servers.len();
|
||||
let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref());
|
||||
{
|
||||
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
|
||||
@@ -1824,6 +1851,12 @@ impl Session {
|
||||
codex_apps_tools_cache_key(auth),
|
||||
tool_plugin_provenance,
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"session_init.mcp_manager_init",
|
||||
otel.name = "session_init.mcp_manager_init",
|
||||
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
{
|
||||
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
|
||||
@@ -1843,6 +1876,11 @@ impl Session {
|
||||
.read()
|
||||
.await
|
||||
.required_startup_failures(&required_mcp_servers)
|
||||
.instrument(info_span!(
|
||||
"session_init.required_mcp_wait",
|
||||
otel.name = "session_init.required_mcp_wait",
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
if !failures.is_empty() {
|
||||
let details = failures
|
||||
@@ -4259,11 +4297,23 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
}
|
||||
|
||||
fn submission_dispatch_span(sub: &Submission) -> tracing::Span {
|
||||
let op_name = sub.op.kind();
|
||||
let span_name = format!("op.dispatch.{op_name}");
|
||||
let dispatch_span = match &sub.op {
|
||||
Op::RealtimeConversationAudio(_) => {
|
||||
debug_span!("submission_dispatch", submission.id = sub.id.as_str())
|
||||
debug_span!(
|
||||
"submission_dispatch",
|
||||
otel.name = span_name.as_str(),
|
||||
submission.id = sub.id.as_str(),
|
||||
codex.op = op_name
|
||||
)
|
||||
}
|
||||
_ => info_span!("submission_dispatch", submission.id = sub.id.as_str()),
|
||||
_ => info_span!(
|
||||
"submission_dispatch",
|
||||
otel.name = span_name.as_str(),
|
||||
submission.id = sub.id.as_str(),
|
||||
codex.op = op_name
|
||||
),
|
||||
};
|
||||
if let Some(trace) = sub.trace.as_ref()
|
||||
&& !set_parent_from_w3c_trace_context(&dispatch_span, trace)
|
||||
|
||||
@@ -2495,6 +2495,34 @@ fn submission_dispatch_span_uses_debug_for_realtime_audio() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn op_kind_distinguishes_turn_ops() {
|
||||
assert_eq!(
|
||||
Op::OverrideTurnContext {
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
windows_sandbox_level: None,
|
||||
model: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
}
|
||||
.kind(),
|
||||
"override_turn_context"
|
||||
);
|
||||
assert_eq!(
|
||||
Op::UserInput {
|
||||
items: vec![],
|
||||
final_output_json_schema: None,
|
||||
}
|
||||
.kind(),
|
||||
"user_input"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
|
||||
struct TraceCaptureTask {
|
||||
|
||||
@@ -28,6 +28,7 @@ use codex_protocol::protocol::SandboxPolicy;
|
||||
use thiserror::Error;
|
||||
use tokio::fs;
|
||||
use tokio::task::spawn_blocking;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::bash::parse_shell_lc_plain_commands;
|
||||
use crate::bash::parse_shell_lc_single_command_prefix;
|
||||
@@ -187,6 +188,7 @@ impl ExecPolicyManager {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "info", skip_all)]
|
||||
pub(crate) async fn load(config_stack: &ConfigLayerStack) -> Result<Self, ExecPolicyError> {
|
||||
let (policy, warning) = load_exec_policy_with_warning(config_stack).await?;
|
||||
if let Some(err) = warning.as_ref() {
|
||||
|
||||
@@ -26,6 +26,7 @@ use tokio::sync::TryLockError;
|
||||
use tokio::time::timeout;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::instrument;
|
||||
|
||||
const MODEL_CACHE_FILE: &str = "models_cache.json";
|
||||
const DEFAULT_MODEL_CACHE_TTL: Duration = Duration::from_secs(300);
|
||||
@@ -102,6 +103,7 @@ impl ModelsManager {
|
||||
/// List all available models, refreshing according to the specified strategy.
|
||||
///
|
||||
/// Returns model presets sorted by priority and filtered by auth mode and visibility.
|
||||
#[instrument(level = "info", skip(self))]
|
||||
pub async fn list_models(&self, refresh_strategy: RefreshStrategy) -> Vec<ModelPreset> {
|
||||
if let Err(err) = self.refresh_available_models(refresh_strategy).await {
|
||||
error!("failed to refresh available models: {err}");
|
||||
@@ -137,6 +139,7 @@ impl ModelsManager {
|
||||
///
|
||||
/// If `model` is provided, returns it directly. Otherwise selects the default based on
|
||||
/// auth mode and available models.
|
||||
#[instrument(level = "info", skip(self))]
|
||||
pub async fn get_default_model(
|
||||
&self,
|
||||
model: &Option<String>,
|
||||
@@ -160,6 +163,7 @@ impl ModelsManager {
|
||||
|
||||
// todo(aibrahim): look if we can tighten it to pub(crate)
|
||||
/// Look up model metadata, applying remote overrides and config adjustments.
|
||||
#[instrument(level = "info", skip(self, config))]
|
||||
pub async fn get_model_info(&self, model: &str, config: &Config) -> ModelInfo {
|
||||
let remote_models = self.get_remote_models().await;
|
||||
Self::construct_model_info_from_candidates(model, &remote_models, config)
|
||||
|
||||
@@ -32,6 +32,7 @@ use std::path::PathBuf;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::error;
|
||||
use tracing::instrument;
|
||||
|
||||
pub(crate) const HIERARCHICAL_AGENTS_MESSAGE: &str =
|
||||
include_str!("../hierarchical_agents_message.md");
|
||||
@@ -80,6 +81,7 @@ fn render_js_repl_instructions(config: &Config) -> Option<String> {
|
||||
|
||||
/// Combines `Config::instructions` and `AGENTS.md` (if present) into a single
|
||||
/// string of instructions.
|
||||
#[instrument(level = "info", skip_all)]
|
||||
pub(crate) async fn get_user_instructions(
|
||||
config: &Config,
|
||||
skills: Option<&[SkillMetadata]>,
|
||||
|
||||
@@ -11,6 +11,7 @@ use codex_core::features::Feature;
|
||||
use codex_core::ws_version_from_features;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_otel::TelemetryAuthMode;
|
||||
use codex_otel::current_span_w3c_trace_context;
|
||||
use codex_otel::metrics::MetricsClient;
|
||||
use codex_otel::metrics::MetricsConfig;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -25,6 +26,7 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use core_test_support::responses::WebSocketConnectionConfig;
|
||||
@@ -38,12 +40,20 @@ use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use futures::StreamExt;
|
||||
use opentelemetry::global;
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use opentelemetry_sdk::propagation::TraceContextPropagator;
|
||||
use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use serial_test::serial;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tracing::Instrument;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
use tracing_test::traced_test;
|
||||
|
||||
const MODEL: &str = "gpt-5.2-codex";
|
||||
@@ -51,6 +61,31 @@ const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
|
||||
const WS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
|
||||
const X_CLIENT_REQUEST_ID_HEADER: &str = "x-client-request-id";
|
||||
|
||||
fn trace_id(traceparent: &str) -> &str {
|
||||
traceparent
|
||||
.split('-')
|
||||
.nth(1)
|
||||
.expect("traceparent missing trace id")
|
||||
}
|
||||
|
||||
fn assert_request_trace_matches(body: &serde_json::Value, expected_trace: &W3cTraceContext) {
|
||||
let trace = body["trace"].as_object().expect("missing trace payload");
|
||||
let actual_traceparent = trace
|
||||
.get("traceparent")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.expect("missing traceparent");
|
||||
let expected_traceparent = expected_trace
|
||||
.traceparent
|
||||
.as_deref()
|
||||
.expect("missing expected traceparent");
|
||||
|
||||
assert_eq!(trace_id(actual_traceparent), trace_id(expected_traceparent));
|
||||
assert_eq!(
|
||||
trace.get("tracestate").and_then(serde_json::Value::as_str),
|
||||
expected_trace.tracestate.as_deref()
|
||||
);
|
||||
}
|
||||
|
||||
struct WebsocketTestHarness {
|
||||
_codex_home: TempDir,
|
||||
client: ModelClient,
|
||||
@@ -98,6 +133,124 @@ async fn responses_websocket_streams_request() {
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[serial]
|
||||
async fn responses_websocket_reuses_connection_with_per_turn_trace_payloads() {
|
||||
skip_if_no_network!();
|
||||
|
||||
global::set_text_map_propagator(TraceContextPropagator::new());
|
||||
|
||||
let provider = SdkTracerProvider::builder().build();
|
||||
let tracer = provider.tracer("client-websocket-test");
|
||||
let subscriber =
|
||||
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
|
||||
let _guard = subscriber.set_default();
|
||||
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![ev_response_created("resp-1"), ev_completed("resp-1")],
|
||||
vec![ev_response_created("resp-2"), ev_completed("resp-2")],
|
||||
]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
let prompt_one = prompt_with_input(vec![message_item("hello")]);
|
||||
let prompt_two = prompt_with_input(vec![message_item("again")]);
|
||||
|
||||
let first_trace = {
|
||||
let mut client_session = harness.client.new_session();
|
||||
async {
|
||||
let expected_trace =
|
||||
current_span_w3c_trace_context().expect("current span should have trace context");
|
||||
stream_until_complete(&mut client_session, &harness, &prompt_one).await;
|
||||
expected_trace
|
||||
}
|
||||
.instrument(tracing::info_span!("client.websocket.turn_one"))
|
||||
.await
|
||||
};
|
||||
|
||||
let second_trace = {
|
||||
let mut client_session = harness.client.new_session();
|
||||
async {
|
||||
let expected_trace =
|
||||
current_span_w3c_trace_context().expect("current span should have trace context");
|
||||
stream_until_complete(&mut client_session, &harness, &prompt_two).await;
|
||||
expected_trace
|
||||
}
|
||||
.instrument(tracing::info_span!("client.websocket.turn_two"))
|
||||
.await
|
||||
};
|
||||
|
||||
assert_eq!(server.handshakes().len(), 1);
|
||||
let connection = server.single_connection();
|
||||
assert_eq!(connection.len(), 2);
|
||||
|
||||
let first_request = connection
|
||||
.first()
|
||||
.expect("missing first request")
|
||||
.body_json();
|
||||
let second_request = connection
|
||||
.get(1)
|
||||
.expect("missing second request")
|
||||
.body_json();
|
||||
assert_request_trace_matches(&first_request, &first_trace);
|
||||
assert_request_trace_matches(&second_request, &second_trace);
|
||||
|
||||
let first_traceparent = first_request["trace"]["traceparent"]
|
||||
.as_str()
|
||||
.expect("missing first traceparent");
|
||||
let second_traceparent = second_request["trace"]["traceparent"]
|
||||
.as_str()
|
||||
.expect("missing second traceparent");
|
||||
assert_ne!(trace_id(first_traceparent), trace_id(second_traceparent));
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[serial]
|
||||
async fn responses_websocket_preconnect_does_not_replace_turn_trace_payload() {
|
||||
skip_if_no_network!();
|
||||
|
||||
global::set_text_map_propagator(TraceContextPropagator::new());
|
||||
|
||||
let provider = SdkTracerProvider::builder().build();
|
||||
let tracer = provider.tracer("client-websocket-test");
|
||||
let subscriber =
|
||||
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
|
||||
let _guard = subscriber.set_default();
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
let mut client_session = harness.client.new_session();
|
||||
client_session
|
||||
.preconnect_websocket(&harness.session_telemetry, &harness.model_info)
|
||||
.await
|
||||
.expect("websocket preconnect failed");
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
|
||||
let expected_trace = async {
|
||||
let expected_trace =
|
||||
current_span_w3c_trace_context().expect("current span should have trace context");
|
||||
stream_until_complete(&mut client_session, &harness, &prompt).await;
|
||||
expected_trace
|
||||
}
|
||||
.instrument(tracing::info_span!("client.websocket.request"))
|
||||
.await;
|
||||
|
||||
assert_eq!(server.handshakes().len(), 1);
|
||||
let connection = server.single_connection();
|
||||
assert_eq!(connection.len(), 1);
|
||||
let request = connection.first().expect("missing request").body_json();
|
||||
assert_request_trace_matches(&request, &expected_trace);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_preconnect_reuses_connection() {
|
||||
skip_if_no_network!();
|
||||
|
||||
@@ -478,6 +478,47 @@ pub enum Op {
|
||||
ListModels,
|
||||
}
|
||||
|
||||
impl Op {
|
||||
pub fn kind(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Interrupt => "interrupt",
|
||||
Self::CleanBackgroundTerminals => "clean_background_terminals",
|
||||
Self::RealtimeConversationStart(_) => "realtime_conversation_start",
|
||||
Self::RealtimeConversationAudio(_) => "realtime_conversation_audio",
|
||||
Self::RealtimeConversationText(_) => "realtime_conversation_text",
|
||||
Self::RealtimeConversationClose => "realtime_conversation_close",
|
||||
Self::UserInput { .. } => "user_input",
|
||||
Self::UserTurn { .. } => "user_turn",
|
||||
Self::OverrideTurnContext { .. } => "override_turn_context",
|
||||
Self::ExecApproval { .. } => "exec_approval",
|
||||
Self::PatchApproval { .. } => "patch_approval",
|
||||
Self::ResolveElicitation { .. } => "resolve_elicitation",
|
||||
Self::UserInputAnswer { .. } => "user_input_answer",
|
||||
Self::RequestPermissionsResponse { .. } => "request_permissions_response",
|
||||
Self::DynamicToolResponse { .. } => "dynamic_tool_response",
|
||||
Self::AddToHistory { .. } => "add_to_history",
|
||||
Self::GetHistoryEntryRequest { .. } => "get_history_entry_request",
|
||||
Self::ListMcpTools => "list_mcp_tools",
|
||||
Self::RefreshMcpServers { .. } => "refresh_mcp_servers",
|
||||
Self::ReloadUserConfig => "reload_user_config",
|
||||
Self::ListCustomPrompts => "list_custom_prompts",
|
||||
Self::ListSkills { .. } => "list_skills",
|
||||
Self::ListRemoteSkills { .. } => "list_remote_skills",
|
||||
Self::DownloadRemoteSkill { .. } => "download_remote_skill",
|
||||
Self::Compact => "compact",
|
||||
Self::DropMemories => "drop_memories",
|
||||
Self::UpdateMemories => "update_memories",
|
||||
Self::SetThreadName { .. } => "set_thread_name",
|
||||
Self::Undo => "undo",
|
||||
Self::ThreadRollback { .. } => "thread_rollback",
|
||||
Self::Review { .. } => "review",
|
||||
Self::Shutdown => "shutdown",
|
||||
Self::RunUserShellCommand { .. } => "run_user_shell_command",
|
||||
Self::ListModels => "list_models",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines the conditions under which the user is consulted to approve
|
||||
/// running the command proposed by Codex.
|
||||
#[derive(
|
||||
|
||||
Reference in New Issue
Block a user