[codex] implement codex.api_request and codex.sse_event trace events

This commit is contained in:
Anton Panasenko
2025-09-11 12:35:53 -07:00
parent 4ac336bbc6
commit 089e367814
14 changed files with 452 additions and 254 deletions

30
codex-rs/Cargo.lock generated
View File

@@ -78,12 +78,6 @@ version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
@@ -547,17 +541,16 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "chrono"
version = "0.4.41"
version = "0.4.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d"
checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"wasm-bindgen",
"windows-link",
"windows-link 0.2.0",
]
[[package]]
@@ -935,6 +928,9 @@ dependencies = [
name = "codex-otel"
version = "0.0.0"
dependencies = [
"chrono",
"codex-mcp-client",
"codex-protocol",
"opentelemetry",
"opentelemetry-http",
"opentelemetry-otlp",
@@ -5762,7 +5758,7 @@ checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-link 0.1.3",
"windows-result",
"windows-strings",
]
@@ -5795,13 +5791,19 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
[[package]]
name = "windows-link"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65"
[[package]]
name = "windows-registry"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e"
dependencies = [
"windows-link",
"windows-link 0.1.3",
"windows-result",
"windows-strings",
]
@@ -5812,7 +5814,7 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
dependencies = [
"windows-link",
"windows-link 0.1.3",
]
[[package]]
@@ -5821,7 +5823,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
dependencies = [
"windows-link",
"windows-link 0.1.3",
]
[[package]]

View File

@@ -10,8 +10,7 @@ use crate::model_family::ModelFamily;
use crate::openai_tools::create_tools_json_for_chat_completions_api;
use crate::util::backoff;
use bytes::Bytes;
use codex_otel::otel_provider::OtelProvider;
use codex_protocol::mcp_protocol::ConversationId;
use codex_otel::trace_manager::TraceManager;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ResponseItem;
@@ -28,26 +27,15 @@ use tokio::sync::mpsc;
use tokio::time::timeout;
use tracing::Instrument;
use tracing::debug;
use tracing::info_span;
use tracing::trace;
/// Implementation for the classic Chat Completions API.
#[tracing::instrument(
skip_all,
fields(
conversation_id = %_conversation_id,
model = %_model,
model_slug = %model_family.slug,
model_provider = %provider.name,
),
)]
pub(crate) async fn stream_chat_completions(
_conversation_id: &ConversationId,
prompt: &Prompt,
_model: &String,
model_family: &ModelFamily,
client: &reqwest::Client,
provider: &ModelProviderInfo,
trace_manager: &TraceManager,
) -> Result<ResponseStream> {
// Build messages array
let mut messages = Vec::<serde_json::Value>::new();
@@ -300,42 +288,53 @@ pub(crate) async fn stream_chat_completions(
loop {
attempt += 1;
let request_span = trace_manager.request(&prompt.input);
let req_builder = provider.create_request_builder(client, &None).await?;
let request_span = info_span!("request").or_current();
let tracing_headers = OtelProvider::headers(&request_span);
let tracing_headers = TraceManager::headers(&request_span);
let res = req_builder
.header(reqwest::header::ACCEPT, "text/event-stream")
.headers(tracing_headers)
.json(&payload)
.send()
.instrument(request_span)
.instrument(request_span.span())
.await;
match res {
Ok(resp) if resp.status().is_success() => {
request_span.status_code(resp.status());
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
tokio::spawn(process_chat_sse(
stream,
tx_event,
provider.stream_idle_timeout(),
trace_manager.clone(),
));
return Ok(ResponseStream { rx_event });
}
Ok(res) => {
let status = res.status();
let mut log_message = status.to_string();
request_span.status_code(status);
if !(status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error()) {
let body = (res.text().await).unwrap_or_default();
if !body.is_empty() {
log_message = format!("{log_message}: {body}");
}
request_span.error(attempt, Some(status), &log_message);
return Err(CodexErr::UnexpectedStatus(status, body));
}
if attempt > max_retries {
request_span.error(attempt, Some(status), &log_message);
return Err(CodexErr::RetryLimit(status));
}
request_span.error(attempt, Some(status), &log_message);
let retry_after_secs = res
.headers()
.get(reqwest::header::RETRY_AFTER)
@@ -348,6 +347,8 @@ pub(crate) async fn stream_chat_completions(
tokio::time::sleep(delay).await;
}
Err(e) => {
let error_message = format!("{e:#}");
request_span.error(attempt, None, &error_message);
if attempt > max_retries {
return Err(e.into());
}
@@ -365,6 +366,7 @@ async fn process_chat_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
idle_timeout: Duration,
trace_manager: TraceManager,
) where
S: Stream<Item = Result<Bytes>> + Unpin,
{
@@ -387,13 +389,15 @@ async fn process_chat_sse<S>(
let mut assistant_text = String::new();
let mut reasoning_text = String::new();
let sse_span = trace_manager.response();
loop {
let sse = match timeout(idle_timeout, stream.next()).await {
let sse = match timeout(idle_timeout, stream.next().instrument(sse_span.span())).await {
Ok(Some(Ok(ev))) => ev,
Ok(Some(Err(e))) => {
let _ = tx_event
.send(Err(CodexErr::Stream(e.to_string(), None)))
.await;
let error = e.to_string();
sse_span.error(error.as_str());
let _ = tx_event.send(Err(CodexErr::Stream(error, None))).await;
return;
}
Ok(None) => {
@@ -407,16 +411,17 @@ async fn process_chat_sse<S>(
return;
}
Err(_) => {
let error = "idle timeout waiting for SSE";
sse_span.error(error);
let _ = tx_event
.send(Err(CodexErr::Stream(
"idle timeout waiting for SSE".into(),
None,
)))
.send(Err(CodexErr::Stream(error.into(), None)))
.await;
return;
}
};
sse_span.body(sse.data.as_str());
// OpenAI Chat streaming sends a literal string "[DONE]" when finished.
if sse.data.trim() == "[DONE]" {
// Emit any finalized items before closing so downstream consumers receive
@@ -456,7 +461,11 @@ async fn process_chat_sse<S>(
// Parse JSON chunk
let chunk: serde_json::Value = match serde_json::from_str(&sse.data) {
Ok(v) => v,
Err(_) => continue,
Err(e) => {
let error = format!("Failed to parse SSE event: {e}, data: {}", &sse.data);
sse_span.error(error.as_str());
continue;
}
};
trace!("chat_completions received SSE chunk: {chunk:?}");

View File

@@ -18,9 +18,7 @@ use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_util::io::ReaderStream;
use tracing::Instrument;
use tracing::Span;
use tracing::debug;
use tracing::info_span;
use tracing::trace;
use tracing::warn;
@@ -46,7 +44,7 @@ use crate::openai_tools::create_tools_json_for_responses_api;
use crate::protocol::TokenUsage;
use crate::token_data::PlanType;
use crate::util::backoff;
use codex_otel::otel_provider::OtelProvider;
use codex_otel::trace_manager::TraceManager;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::models::ResponseItem;
@@ -73,6 +71,7 @@ struct Error {
pub struct ModelClient {
config: Arc<Config>,
auth_manager: Option<Arc<AuthManager>>,
trace_manager: TraceManager,
client: reqwest::Client,
provider: ModelProviderInfo,
conversation_id: ConversationId,
@@ -84,6 +83,7 @@ impl ModelClient {
pub fn new(
config: Arc<Config>,
auth_manager: Option<Arc<AuthManager>>,
trace_manager: TraceManager,
provider: ModelProviderInfo,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
@@ -94,6 +94,7 @@ impl ModelClient {
Self {
config,
auth_manager,
trace_manager,
client,
provider,
conversation_id,
@@ -117,27 +118,17 @@ impl ModelClient {
/// Dispatches to either the Responses or Chat implementation depending on
/// the provider config. Public callers always invoke `stream()` the
/// specialised helpers are private to avoid accidental misuse.
#[tracing::instrument(
skip(self),
fields(
conversation_id = %self.conversation_id,
model = %self.config.model,
model_slug = %self.config.model_family.slug,
model_provider = %self.config.model_provider.name,
),
)]
pub async fn stream(&self, prompt: &Prompt) -> Result<ResponseStream> {
match self.provider.wire_api {
WireApi::Responses => self.stream_responses(prompt).await,
WireApi::Chat => {
// Create the raw streaming connection first.
let response_stream = stream_chat_completions(
&self.conversation_id,
prompt,
&self.config.model,
&self.config.model_family,
&self.client,
&self.provider,
&self.trace_manager,
)
.await?;
@@ -170,20 +161,12 @@ impl ModelClient {
}
/// Implementation for the OpenAI *Responses* experimental API.
#[tracing::instrument(
skip_all,
fields(
conversation_id = %self.conversation_id,
model = %self.config.model,
model_slug = %self.config.model_family.slug,
model_provider = %self.config.model_provider.name,
),
)]
async fn stream_responses(&self, prompt: &Prompt) -> Result<ResponseStream> {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
// short circuit for tests
warn!(path, "Streaming from fixture");
return stream_from_fixture(path, self.provider.clone()).await;
return stream_from_fixture(path, self.provider.clone(), self.trace_manager.clone())
.await;
}
let auth_manager = self.auth_manager.clone();
@@ -262,6 +245,8 @@ impl ModelClient {
payload_body.as_str()
);
let request_span = self.trace_manager.request(&input_with_instructions);
let mut req_builder = self
.provider
.create_request_builder(&self.client, &auth)
@@ -282,14 +267,11 @@ impl ModelClient {
req_builder = req_builder.header("chatgpt-account-id", account_id);
}
let request_span =
info_span!("request", request_id = tracing::field::Empty).or_current();
let tracing_headers = OtelProvider::headers(&request_span);
let tracing_headers = TraceManager::headers(&request_span);
req_builder = req_builder.headers(tracing_headers);
let res = req_builder.send().instrument(request_span.clone()).await;
let res = req_builder.send().instrument(request_span.span()).await;
if let Ok(resp) = &res {
let request_id = resp
@@ -305,11 +287,12 @@ impl ModelClient {
.map(|v| v.to_str().unwrap_or_default())
.unwrap_or_default()
);
request_span.record("request_id", request_id);
request_span.request_id(request_id);
}
match res {
Ok(resp) if resp.status().is_success() => {
request_span.status_code(resp.status());
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
// spawn task to process SSE
@@ -318,12 +301,14 @@ impl ModelClient {
stream,
tx_event,
self.provider.stream_idle_timeout(),
self.trace_manager.clone(),
));
return Ok(ResponseStream { rx_event });
}
Ok(res) => {
let status = res.status();
let mut log_message = status.to_string();
// Pull out RetryAfter header if present.
let retry_after_secs = res
@@ -339,19 +324,15 @@ impl ModelClient {
let _ = manager.refresh_token().await;
}
// The OpenAI Responses endpoint returns structured JSON bodies even for 4xx/5xx
// errors. When we bubble early with only the HTTP status the caller sees an opaque
// "unexpected status 400 Bad Request" which makes debugging nearly impossible.
// Instead, read (and include) the response text so higher layers and users see the
// exact error message (e.g. "Unknown parameter: 'input[0].metadata'"). The body is
// small and this branch only runs on error paths so the extra allocation is
// negligible.
if !(status == StatusCode::TOO_MANY_REQUESTS
|| status == StatusCode::UNAUTHORIZED
|| status.is_server_error())
{
// Surface the error body to callers. Use `unwrap_or_default` per Clippy.
let body = res.text().await.unwrap_or_default();
if !body.is_empty() {
log_message = format!("{log_message}: {body}");
}
request_span.error(attempt, Some(status), &log_message);
return Err(CodexErr::UnexpectedStatus(status, body));
}
@@ -359,24 +340,24 @@ impl ModelClient {
let body = res.json::<ErrorResponse>().await.ok();
if let Some(ErrorResponse { error }) = body {
if error.r#type.as_deref() == Some("usage_limit_reached") {
// Prefer the plan_type provided in the error message if present
// because it's more up to date than the one encoded in the auth
// token.
let plan_type = error
.plan_type
.or_else(|| auth.as_ref().and_then(|a| a.get_plan_type()));
let resets_in_seconds = error.resets_in_seconds;
request_span.error(attempt, Some(status), &log_message);
return Err(CodexErr::UsageLimitReached(UsageLimitReachedError {
plan_type,
resets_in_seconds,
}));
} else if error.r#type.as_deref() == Some("usage_not_included") {
request_span.error(attempt, Some(status), &log_message);
return Err(CodexErr::UsageNotIncluded);
}
}
}
if attempt > max_retries {
request_span.error(attempt, Some(status), &log_message);
if status == StatusCode::INTERNAL_SERVER_ERROR {
return Err(CodexErr::InternalServerError);
}
@@ -384,12 +365,16 @@ impl ModelClient {
return Err(CodexErr::RetryLimit(status));
}
request_span.error(attempt, Some(status), &log_message);
let delay = retry_after_secs
.map(|s| Duration::from_millis(s * 1_000))
.unwrap_or_else(|| backoff(attempt));
tokio::time::sleep(delay).await;
}
Err(e) => {
let error_message = format!("{e:#}");
request_span.error(attempt, None, &error_message);
if attempt > max_retries {
return Err(e.into());
}
@@ -404,6 +389,10 @@ impl ModelClient {
self.provider.clone()
}
pub fn get_trace_manager(&self) -> TraceManager {
self.trace_manager.clone()
}
/// Returns the currently configured model slug.
pub fn get_model(&self) -> String {
self.config.model.clone()
@@ -519,6 +508,7 @@ async fn process_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
idle_timeout: Duration,
trace_manager: TraceManager,
) where
S: Stream<Item = Result<Bytes>> + Unpin,
{
@@ -529,12 +519,16 @@ async fn process_sse<S>(
let mut response_completed: Option<ResponseCompleted> = None;
let mut response_error: Option<CodexErr> = None;
let sse_span = trace_manager.response();
loop {
let sse = match timeout(idle_timeout, stream.next()).await {
let sse = match timeout(idle_timeout, stream.next().instrument(sse_span.span())).await {
Ok(Some(Ok(sse))) => sse,
Ok(Some(Err(e))) => {
debug!("SSE Error: {e:#}");
let event = CodexErr::Stream(e.to_string(), None);
let error = e.to_string();
sse_span.error(error.as_str());
let event = CodexErr::Stream(error, None);
let _ = tx_event.send(Err(event)).await;
return;
}
@@ -544,6 +538,21 @@ async fn process_sse<S>(
id: response_id,
usage,
}) => {
if let Some(token_usage) = &usage {
sse_span.token_usage(
token_usage.input_tokens,
token_usage.output_tokens,
token_usage
.input_tokens_details
.as_ref()
.map(|d| d.cached_tokens),
token_usage
.output_tokens_details
.as_ref()
.map(|d| d.reasoning_tokens),
token_usage.total_tokens,
);
}
let event = ResponseEvent::Completed {
response_id,
token_usage: usage.map(Into::into),
@@ -551,22 +560,27 @@ async fn process_sse<S>(
let _ = tx_event.send(Ok(event)).await;
}
None => {
let _ = tx_event
.send(Err(response_error.unwrap_or(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
))))
.await;
let error = response_error.unwrap_or(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
if let CodexErr::Stream(message, _) = &error {
sse_span.error(message.as_str());
}
let _ = tx_event.send(Err(error)).await;
}
}
return;
}
Err(_) => {
let error = "idle timeout waiting for SSE";
sse_span.error(error);
let _ = tx_event
.send(Err(CodexErr::Stream(
"idle timeout waiting for SSE".into(),
None,
)))
.send(Err(CodexErr::Stream(error.into(), None)))
.await;
return;
}
@@ -575,16 +589,18 @@ async fn process_sse<S>(
let raw = sse.data.clone();
trace!("SSE event: {}", raw);
sse_span.body(raw.as_str());
let event: SseEvent = match serde_json::from_str(&sse.data) {
Ok(event) => event,
Err(e) => {
debug!("Failed to parse SSE event: {e}, data: {}", &sse.data);
let error = format!("Failed to parse SSE event: {e}, data: {}", &sse.data);
sse_span.error(error.as_str());
debug!(error);
continue;
}
};
Span::current().record("otel.name", event.kind.as_str());
match event.kind.as_str() {
// Individual output item finalised. Forward immediately so the
// rest of the agent can stream assistant text/functions *live*
@@ -607,7 +623,9 @@ async fn process_sse<S>(
"response.output_item.done" => {
let Some(item_val) = event.item else { continue };
let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) else {
debug!("failed to parse ResponseItem from output_item.done");
let error = "failed to parse ResponseItem from output_item.done";
debug!(error);
sse_span.error(error);
continue;
};
@@ -647,11 +665,14 @@ async fn process_sse<S>(
}
"response.failed" => {
if let Some(resp_val) = event.response {
let error_message = "response.failed event received";
response_error = Some(CodexErr::Stream(
"response.failed event received".to_string(),
error_message.to_string(),
None,
));
sse_span.error(error_message);
let error = resp_val.get("error");
if let Some(error) = error {
@@ -659,10 +680,13 @@ async fn process_sse<S>(
Ok(error) => {
let delay = try_parse_retry_after(&error);
let message = error.message.unwrap_or_default();
sse_span.error(message.as_str());
response_error = Some(CodexErr::Stream(message, delay));
}
Err(e) => {
debug!("failed to parse ErrorResponse: {e}");
let error_message = format!("failed to parse ErrorResponse: {e}");
debug!(error_message);
sse_span.error(error_message.as_str());
}
}
}
@@ -676,7 +700,9 @@ async fn process_sse<S>(
response_completed = Some(r);
}
Err(e) => {
debug!("failed to parse ResponseCompleted: {e}");
let error_message = format!("failed to parse ResponseCompleted: {e}");
debug!(error_message);
sse_span.error(error_message.as_str());
continue;
}
};
@@ -723,6 +749,7 @@ async fn process_sse<S>(
async fn stream_from_fixture(
path: impl AsRef<Path>,
provider: ModelProviderInfo,
trace_manager: TraceManager,
) -> Result<ResponseStream> {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let f = std::fs::File::open(path.as_ref())?;
@@ -741,6 +768,7 @@ async fn stream_from_fixture(
stream,
tx_event,
provider.stream_idle_timeout(),
trace_manager,
));
Ok(ResponseStream { rx_event })
}
@@ -796,6 +824,7 @@ mod tests {
async fn collect_events(
chunks: &[&[u8]],
provider: ModelProviderInfo,
trace_manager: TraceManager,
) -> Vec<Result<ResponseEvent>> {
let mut builder = IoBuilder::new();
for chunk in chunks {
@@ -805,7 +834,12 @@ mod tests {
let reader = builder.build();
let stream = ReaderStream::new(reader).map_err(CodexErr::Io);
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(16);
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
tokio::spawn(process_sse(
stream,
tx,
provider.stream_idle_timeout(),
trace_manager,
));
let mut events = Vec::new();
while let Some(ev) = rx.recv().await {
@@ -819,6 +853,7 @@ mod tests {
async fn run_sse(
events: Vec<serde_json::Value>,
provider: ModelProviderInfo,
trace_manager: TraceManager,
) -> Vec<ResponseEvent> {
let mut body = String::new();
for e in events {
@@ -835,7 +870,12 @@ mod tests {
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8);
let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(CodexErr::Io);
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
tokio::spawn(process_sse(
stream,
tx,
provider.stream_idle_timeout(),
trace_manager,
));
let mut out = Vec::new();
while let Some(ev) = rx.recv().await {
@@ -895,9 +935,19 @@ mod tests {
requires_openai_auth: false,
};
let trace_manager = TraceManager::new(
ConversationId::new(),
"test",
"test",
None,
AuthMode::ChatGPT,
"test".to_string(),
);
let events = collect_events(
&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()],
provider,
trace_manager,
)
.await;
@@ -955,7 +1005,16 @@ mod tests {
requires_openai_auth: false,
};
let events = collect_events(&[sse1.as_bytes()], provider).await;
let trace_manager = TraceManager::new(
ConversationId::new(),
"test",
"test",
None,
AuthMode::ChatGPT,
"test".to_string(),
);
let events = collect_events(&[sse1.as_bytes()], provider, trace_manager).await;
assert_eq!(events.len(), 2);
@@ -989,7 +1048,16 @@ mod tests {
requires_openai_auth: false,
};
let events = collect_events(&[sse1.as_bytes()], provider).await;
let trace_manager = TraceManager::new(
ConversationId::new(),
"test",
"test",
None,
AuthMode::ChatGPT,
"test".to_string(),
);
let events = collect_events(&[sse1.as_bytes()], provider, trace_manager).await;
assert_eq!(events.len(), 1);
@@ -1094,7 +1162,16 @@ mod tests {
requires_openai_auth: false,
};
let out = run_sse(evs, provider).await;
let trace_manager = TraceManager::new(
ConversationId::new(),
"test",
"test",
None,
AuthMode::ChatGPT,
"test".to_string(),
);
let out = run_sse(evs, provider, trace_manager).await;
assert_eq!(out.len(), case.expected_len, "case {}", case.name);
assert!(
(case.expect_first)(&out[0]),

View File

@@ -12,6 +12,7 @@ use crate::AuthManager;
use crate::client_common::REVIEW_PROMPT;
use crate::event_mapping::map_response_item_to_event_messages;
use crate::review_format::format_review_findings_block;
use crate::terminal;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
@@ -34,12 +35,9 @@ use serde_json;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tokio::task::AbortHandle;
use tracing::Instrument;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::info_span;
use tracing::instrument;
use tracing::trace;
use tracing::warn;
@@ -124,6 +122,7 @@ use crate::unified_exec::UnifiedExecSessionManager;
use crate::user_instructions::UserInstructions;
use crate::user_notification::UserNotification;
use crate::util::backoff;
use codex_otel::trace_manager::TraceManager;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::custom_prompts::CustomPrompt;
@@ -446,11 +445,21 @@ impl Session {
}
}
let trace_manager = TraceManager::new(
conversation_id,
config.model.as_str(),
config.model_family.slug.as_str(),
auth_manager.auth().and_then(|a| a.get_account_id()),
auth_manager.preferred_auth_method(),
terminal::user_agent(),
);
// Now that the conversation id is final (may have been updated by resume),
// construct the model client.
let client = ModelClient::new(
config.clone(),
Some(auth_manager.clone()),
trace_manager,
provider.clone(),
model_reasoning_effort,
model_reasoning_summary,
@@ -777,14 +786,6 @@ impl Session {
}
}
#[tracing::instrument(
skip_all,
fields(
conversation_id = %self.conversation_id,
sub_id = %exec_command_context.sub_id,
call_id = %exec_command_context.call_id
),
)]
async fn on_exec_command_begin(
&self,
turn_diff_tracker: &mut TurnDiffTracker,
@@ -827,14 +828,6 @@ impl Session {
self.send_event(event).await;
}
#[tracing::instrument(
skip_all,
fields(
conversation_id = %self.conversation_id,
sub_id = %sub_id,
call_id = %call_id
),
)]
async fn on_exec_command_end(
&self,
turn_diff_tracker: &mut TurnDiffTracker,
@@ -900,16 +893,6 @@ impl Session {
/// command even on error.
///
/// Returns the output of the exec tool call.
#[tracing::instrument(
skip_all,
fields(
conversation_id = %self.conversation_id,
sub_id = %begin_ctx.sub_id,
call_id = %begin_ctx.call_id,
sandbox_type = %exec_args.sandbox_type,
sandbox_policy = %exec_args.sandbox_policy,
),
)]
async fn run_exec_with_events<'a>(
&self,
turn_diff_tracker: &mut TurnDiffTracker,
@@ -1016,10 +999,6 @@ impl Session {
}
}
#[instrument(
skip(self, arguments),
fields(%server, %tool, timeout_ms = timeout.map(|t| t.as_millis()))
)]
pub async fn call_tool(
&self,
server: &str,
@@ -1252,9 +1231,15 @@ async fn submission_loop(
updated_config.model_context_window = Some(model_info.context_window);
}
let trace_manager = prev
.client
.get_trace_manager()
.with_model(updated_config.model.as_str(), updated_config.model_family.slug.as_str());
let client = ModelClient::new(
Arc::new(updated_config),
auth_manager,
trace_manager,
provider,
effective_effort,
effective_summary,
@@ -1340,11 +1325,17 @@ async fn submission_loop(
per_turn_config.model_context_window = Some(model_info.context_window);
}
let trace_manager = turn_context
.client
.get_trace_manager()
.with_model(per_turn_config.model.as_str(), per_turn_config.model_family.slug.as_str());
// Build a new client with perturn reasoning settings.
// Reuse the same provider and session id; auth defaults to env/API key.
let client = ModelClient::new(
Arc::new(per_turn_config),
auth_manager,
trace_manager,
provider,
effort,
summary,
@@ -2050,15 +2041,6 @@ struct TurnRunResult {
total_token_usage: Option<TokenUsage>,
}
#[tracing::instrument(
skip_all,
fields(
session_id = %turn_context.client.get_conversation_id(),
model = %turn_context.client.get_model(),
model_slug = %turn_context.client.get_model_family().slug,
model_provider = %turn_context.client.get_provider().name,
),
)]
async fn try_run_turn(
sess: &Session,
turn_context: &TurnContext,
@@ -2135,20 +2117,10 @@ async fn try_run_turn(
let mut output = Vec::new();
loop {
let consuming_events_span = info_span!("consuming_events");
let _consuming_events_span_guard = consuming_events_span.enter();
let event_span = info_span!(
parent: &consuming_events_span,
"stream_next",
otel.name = tracing::field::Empty,
delta_len = tracing::field::Empty,
);
// Poll the next item from the model stream. We must inspect *both* Ok and Err
// cases so that transient stream failures (e.g., dropped SSE connection before
// `response.completed`) bubble up and trigger the caller's retry logic.
let event = stream.next().instrument(event_span.clone()).await;
let event = stream.next().await;
let Some(event) = event else {
// Channel closed without yielding a final Completed event or explicit error.
// Treat as a disconnected stream so the caller can retry.
@@ -2167,16 +2139,9 @@ async fn try_run_turn(
}
};
event_span.record("otel.name", event.to_string());
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let handle_response_span = info_span!(
parent: &consuming_events_span,
"handle_response",
otel.name = item.to_string(),
);
let response = handle_response_item(
sess,
turn_context,
@@ -2184,7 +2149,6 @@ async fn try_run_turn(
sub_id,
item.clone(),
)
.instrument(handle_response_span)
.await?;
output.push(ProcessedResponseItem { item, response });
}
@@ -2242,7 +2206,6 @@ async fn try_run_turn(
}
}
ResponseEvent::ReasoningSummaryDelta(delta) => {
event_span.record("delta_len", delta.len());
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
@@ -2257,7 +2220,6 @@ async fn try_run_turn(
sess.send_event(event).await;
}
ResponseEvent::ReasoningContentDelta(delta) => {
event_span.record("delta_len", delta.len());
if sess.show_raw_agent_reasoning {
let event = Event {
id: sub_id.to_string(),
@@ -2462,16 +2424,6 @@ async fn handle_unified_exec_tool_call(
}
}
#[tracing::instrument(
skip_all,
fields(
session_id = %turn_context.client.get_conversation_id(),
model = %turn_context.client.get_model(),
model_slug = %turn_context.client.get_model_family().slug,
model_provider = %turn_context.client.get_provider().name,
otel.name = %name,
),
)]
async fn handle_function_call(
sess: &Session,
turn_context: &TurnContext,
@@ -2669,15 +2621,6 @@ async fn handle_function_call(
}
}
#[tracing::instrument(
skip_all,
fields(
session_id = %turn_context.client.get_conversation_id(),
model = %turn_context.client.get_model(),
model_slug = %turn_context.client.get_model_family().slug,
model_provider = %turn_context.client.get_provider().name,
),
)]
async fn handle_custom_tool_call(
sess: &Session,
turn_context: &TurnContext,
@@ -2741,15 +2684,6 @@ fn to_exec_params(params: ShellToolCallParams, turn_context: &TurnContext) -> Ex
}
}
#[tracing::instrument(
skip_all,
fields(
session_id = %turn_context.client.get_conversation_id(),
model = %turn_context.client.get_model(),
model_slug = %turn_context.client.get_model_family().slug,
model_provider = %turn_context.client.get_provider().name,
),
)]
fn parse_container_exec_arguments(
arguments: String,
turn_context: &TurnContext,
@@ -2799,15 +2733,6 @@ fn maybe_translate_shell_command(
params
}
#[tracing::instrument(
skip_all,
fields(
session_id = %turn_context.client.get_conversation_id(),
model = %turn_context.client.get_model(),
model_slug = %turn_context.client.get_model_family().slug,
model_provider = %turn_context.client.get_provider().name,
),
)]
async fn handle_container_exec_with_params(
params: ExecParams,
sess: &Session,
@@ -3043,14 +2968,6 @@ async fn handle_container_exec_with_params(
}
}
#[tracing::instrument(
skip_all,
fields(
conversation_id = %sess.conversation_id,
sub_id = %exec_command_context.sub_id,
call_id = %exec_command_context.call_id
),
)]
async fn handle_sandbox_error(
turn_diff_tracker: &mut TurnDiffTracker,
params: ExecParams,

View File

@@ -80,13 +80,6 @@ pub struct StdoutStream {
pub tx_event: Sender<Event>,
}
#[tracing::instrument(
skip_all,
fields(
sandbox_type = %sandbox_type,
sandbox_policy = %sandbox_policy,
),
)]
pub async fn process_exec_tool_call(
params: ExecParams,
sandbox_type: SandboxType,
@@ -267,12 +260,6 @@ pub struct ExecToolCallOutput {
pub timed_out: bool,
}
#[tracing::instrument(
skip_all,
fields(
sandbox_policy = %sandbox_policy,
),
)]
async fn exec(
params: ExecParams,
sandbox_policy: &SandboxPolicy,

View File

@@ -13,12 +13,6 @@ use tokio::process::Child;
/// helper accepts a list of `--sandbox-permission`/`-s` flags mirroring the
/// public CLI. We convert the internal [`SandboxPolicy`] representation into
/// the equivalent CLI options.
#[tracing::instrument(
skip_all,
fields(
sandbox_policy = %sandbox_policy,
),
)]
pub async fn spawn_command_under_linux_sandbox<P>(
codex_linux_sandbox_exe: P,
command: Vec<String>,

View File

@@ -16,12 +16,6 @@ const MACOS_SEATBELT_BASE_POLICY: &str = include_str!("seatbelt_base_policy.sbpl
/// already has root access.
const MACOS_PATH_TO_SEATBELT_EXECUTABLE: &str = "/usr/bin/sandbox-exec";
#[tracing::instrument(
skip_all,
fields(
sandbox_policy = %sandbox_policy,
),
)]
pub async fn spawn_command_under_seatbelt(
command: Vec<String>,
command_cwd: PathBuf,

View File

@@ -11,6 +11,8 @@ use codex_core::ReasoningItemContent;
use codex_core::ResponseItem;
use codex_core::WireApi;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_otel::trace_manager::TraceManager;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use core_test_support::load_default_config_for_test;
use futures::StreamExt;
@@ -70,13 +72,25 @@ async fn run_request(input: Vec<ResponseItem>) -> Value {
let summary = config.model_reasoning_summary;
let config = Arc::new(config);
let conversation_id = ConversationId::new();
let trace_manager = TraceManager::new(
conversation_id,
config.model.as_str(),
config.model_family.slug.as_str(),
None,
AuthMode::ChatGPT,
"test".to_string(),
);
let client = ModelClient::new(
Arc::clone(&config),
None,
trace_manager,
provider,
effort,
summary,
ConversationId::new(),
conversation_id,
);
let mut prompt = Prompt::default();

View File

@@ -8,6 +8,8 @@ use codex_core::ResponseEvent;
use codex_core::ResponseItem;
use codex_core::WireApi;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_otel::trace_manager::TraceManager;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use core_test_support::load_default_config_for_test;
use futures::StreamExt;
@@ -63,13 +65,25 @@ async fn run_stream(sse_body: &str) -> Vec<ResponseEvent> {
let summary = config.model_reasoning_summary;
let config = Arc::new(config);
let conversation_id = ConversationId::new();
let trace_manager = TraceManager::new(
conversation_id,
config.model.as_str(),
config.model_family.slug.as_str(),
None,
AuthMode::ChatGPT,
"test".to_string(),
);
let client = ModelClient::new(
Arc::clone(&config),
None,
trace_manager,
provider,
effort,
summary,
ConversationId::new(),
conversation_id,
);
let mut prompt = Prompt::default();

View File

@@ -29,6 +29,9 @@ otel = [
[dependencies]
# Optional to keep build lean unless feature enabled
codex-protocol = { path = "../protocol" }
codex-mcp-client = { path = "../mcp-client" }
chrono = "0.4.42"
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["registry", "fmt"], optional = true }
tracing-opentelemetry = { version = "0.31.0", optional = true }

View File

@@ -5,6 +5,7 @@ mod file_exporter;
#[cfg(feature = "otel")]
pub mod otel_provider;
pub mod trace_manager;
#[cfg(not(feature = "otel"))]
mod imp {

View File

@@ -6,16 +6,13 @@ use crate::file_exporter::FileExporter;
use crate::file_exporter::create_log_file;
use opentelemetry::KeyValue;
use opentelemetry::global;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::trace::TracerProvider;
use opentelemetry_http::HeaderInjector;
use opentelemetry_otlp::Protocol;
use opentelemetry_otlp::SpanExporter;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::WithHttpConfig;
use opentelemetry_otlp::WithTonicConfig;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::Sampler;
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::trace::Tracer;
@@ -25,9 +22,7 @@ use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
use std::error::Error;
use tonic::metadata::MetadataMap;
use tracing::Span;
use tracing::debug;
use tracing_opentelemetry::OpenTelemetrySpanExt;
const ENV_ATTRIBUTE: &str = "env";
@@ -45,13 +40,6 @@ impl OtelProvider {
let _ = self.provider.shutdown();
}
pub fn headers(span: &Span) -> HeaderMap {
let mut injector = HeaderMap::new();
TraceContextPropagator::default()
.inject_context(&span.context(), &mut HeaderInjector(&mut injector));
injector
}
pub fn from(settings: &OtelSettings) -> Result<Option<Self>, Box<dyn Error>> {
if !settings.enabled {
return Ok(None);

View File

@@ -0,0 +1,198 @@
use chrono::SecondsFormat;
use chrono::Utc;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::models::ResponseItem;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_http::HeaderInjector;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use reqwest::StatusCode;
use reqwest::header::HeaderMap;
use tracing::Span;
use tracing::info_span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
pub struct RequestSpan(pub(crate) Span);
impl RequestSpan {
pub fn new(metadata: TraceMetadata) -> Self {
let span = info_span!(
"codex.api_request",
conversation.id = %metadata.conversation_id,
app.version = %metadata.app_version,
auth_mode = %metadata.auth_mode,
user.account_id = tracing::field::Empty,
terminal.type = %metadata.terminal_type,
event.timestamp = %timestamp(),
model = %metadata.model,
slug = %metadata.slug,
prompt = tracing::field::Empty,
http.response.status_code = tracing::field::Empty,
error.message = tracing::field::Empty,
attempt = tracing::field::Empty,
input_tokens = tracing::field::Empty,
output_tokens = tracing::field::Empty,
cache_read_tokens = tracing::field::Empty,
cache_creation_tokens = tracing::field::Empty,
request_id = tracing::field::Empty,
);
if let Some(account_id) = &metadata.account_id {
span.record("account_id", account_id);
}
// Todo include prompt conditionally
Self(span)
}
pub fn request_id(&self, request_id: &str) -> &Self {
self.0.record("request_id", request_id);
self
}
pub fn status_code(&self, status: StatusCode) -> &Self {
self.0.record("http.response.status_code", status.as_str());
self
}
pub fn error(&self, attempt: u64, status: Option<StatusCode>, message: &str) -> &Self {
self.0.record("attempt", attempt);
self.0.record("error.message", message);
if let Some(code) = status {
self.0.record("http.response.status_code", code.as_u16());
}
self
}
pub fn span(&self) -> Span {
self.0.clone()
}
}
pub struct SSESpan(pub(crate) Span);
impl SSESpan {
pub fn new(metadata: TraceMetadata) -> Self {
let span = info_span!(
"codex.sse_event",
conversation.id = %metadata.conversation_id,
app.version = %metadata.app_version,
auth_mode = %metadata.auth_mode,
user.account_id = tracing::field::Empty,
terminal.type = %metadata.terminal_type,
event.timestamp = %timestamp(),
model = %metadata.model,
slug = %metadata.slug,
error.message = tracing::field::Empty,
input_token_count = tracing::field::Empty,
output_token_count = tracing::field::Empty,
cached_content_token_count = tracing::field::Empty,
thoughts_token_count = tracing::field::Empty,
reasoning_token_count = tracing::field::Empty,
tool_token_count = tracing::field::Empty,
response_body = tracing::field::Empty,
);
if let Some(account_id) = &metadata.account_id {
span.record("account_id", account_id);
}
SSESpan(span)
}
pub fn body(&self, body: &str) -> &Self {
self.0.record("response_body", body);
self
}
pub fn token_usage(
&self,
input_token_count: u64,
output_token_count: u64,
cached_token_count: Option<u64>,
reasoning_token_count: Option<u64>,
tool_token_count: u64,
) -> &Self {
self.0.record("input_token_count", input_token_count);
self.0.record("output_token_count", output_token_count);
self.0.record("cached_token_count", cached_token_count);
self.0
.record("reasoning_token_count", reasoning_token_count);
self.0.record("tool_token_count", tool_token_count);
self
}
pub fn error(&self, error: &str) -> &Self {
self.0.record("error.message", error);
self
}
pub fn span(&self) -> Span {
self.0.clone()
}
}
#[derive(Debug, Clone)]
pub struct TraceMetadata {
conversation_id: ConversationId,
auth_mode: AuthMode,
account_id: Option<String>,
model: String,
slug: String,
app_version: &'static str,
terminal_type: String,
}
#[derive(Debug, Clone)]
pub struct TraceManager {
metadata: TraceMetadata,
}
impl TraceManager {
pub fn new(
conversation_id: ConversationId,
model: &str,
slug: &str,
account_id: Option<String>,
auth_mode: AuthMode,
terminal_type: String,
) -> TraceManager {
Self {
metadata: TraceMetadata {
conversation_id,
auth_mode,
account_id,
model: model.to_owned(),
slug: slug.to_owned(),
app_version: env!("CARGO_PKG_VERSION"),
terminal_type,
},
}
}
pub fn with_model(&self, model: &str, slug: &str) -> Self {
let mut manager = self.clone();
manager.metadata.model = model.to_owned();
manager.metadata.slug = slug.to_owned();
manager
}
pub fn headers(span: &RequestSpan) -> HeaderMap {
let mut injector = HeaderMap::new();
TraceContextPropagator::default()
.inject_context(&span.0.context(), &mut HeaderInjector(&mut injector));
injector
}
pub fn request(&self, _prompt: &[ResponseItem]) -> RequestSpan {
RequestSpan::new(self.metadata.clone())
}
pub fn response(&self) -> SSESpan {
SSESpan::new(self.metadata.clone())
}
}
fn timestamp() -> String {
Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
}

View File

@@ -81,7 +81,7 @@ impl GitSha {
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, TS)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, Display, TS)]
#[serde(rename_all = "lowercase")]
pub enum AuthMode {
ApiKey,