[codex] produce events codex.api_request, codex.sse_event, codex.user_prompt, codex.tool_decision

This commit is contained in:
Anton Panasenko
2025-09-12 10:21:45 -07:00
parent 6b6ccd6223
commit dbe7529cd4
21 changed files with 420 additions and 793 deletions

16
codex-rs/Cargo.lock generated
View File

@@ -786,6 +786,7 @@ dependencies = [
"codex-protocol",
"core_test_support",
"libc",
"opentelemetry-appender-tracing",
"owo-colors",
"predicates",
"serde_json",
@@ -932,6 +933,7 @@ dependencies = [
"codex-mcp-client",
"codex-protocol",
"opentelemetry",
"opentelemetry-appender-tracing",
"opentelemetry-http",
"opentelemetry-otlp",
"opentelemetry-proto",
@@ -1014,6 +1016,7 @@ dependencies = [
"libc",
"mcp-types",
"once_cell",
"opentelemetry-appender-tracing",
"owo-colors",
"path-clean",
"pathdiff",
@@ -1034,7 +1037,6 @@ dependencies = [
"tokio-stream",
"tracing",
"tracing-appender",
"tracing-opentelemetry",
"tracing-subscriber",
"unicode-segmentation",
"unicode-width 0.1.14",
@@ -3240,6 +3242,18 @@ dependencies = [
"tracing",
]
[[package]]
name = "opentelemetry-appender-tracing"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e68f63eca5fad47e570e00e893094fc17be959c80c79a7d6ec1abdd5ae6ffc16"
dependencies = [
"opentelemetry",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "opentelemetry-http"
version = "0.30.0"

View File

@@ -1,4 +1,5 @@
use std::time::Duration;
use std::time::Instant;
use crate::ModelProviderInfo;
use crate::client_common::Prompt;
@@ -10,7 +11,7 @@ use crate::model_family::ModelFamily;
use crate::openai_tools::create_tools_json_for_chat_completions_api;
use crate::util::backoff;
use bytes::Bytes;
use codex_otel::trace_manager::TraceManager;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ResponseItem;
@@ -25,7 +26,6 @@ use std::task::Context;
use std::task::Poll;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tracing::Instrument;
use tracing::debug;
use tracing::trace;
@@ -35,7 +35,7 @@ pub(crate) async fn stream_chat_completions(
model_family: &ModelFamily,
client: &reqwest::Client,
provider: &ModelProviderInfo,
trace_manager: &TraceManager,
otel_event_manager: &OtelEventManager,
) -> Result<ResponseStream> {
// Build messages array
let mut messages = Vec::<serde_json::Value>::new();
@@ -277,64 +277,62 @@ pub(crate) async fn stream_chat_completions(
"tools": tools_json,
});
debug!(
"POST to {}: {}",
provider.get_full_url(&None),
serde_json::to_string_pretty(&payload).unwrap_or_default()
);
let payload_str = serde_json::to_string_pretty(&payload).unwrap_or_default();
debug!("POST to {}: {}", provider.get_full_url(&None), payload_str);
let mut attempt = 0;
let max_retries = provider.request_max_retries();
loop {
attempt += 1;
let request_span = trace_manager.request(&prompt.input);
let req_builder = provider.create_request_builder(client, &None).await?;
let tracing_headers = TraceManager::headers(&request_span);
let start = Instant::now();
let res = req_builder
.header(reqwest::header::ACCEPT, "text/event-stream")
.headers(tracing_headers)
.json(&payload)
.send()
.instrument(request_span.span())
.await;
let request_id = if let Ok(resp) = &res {
Some(
resp.headers()
.get("x-request-id")
.map(|v| v.to_str().unwrap_or_default())
.unwrap_or_default()
.to_string(),
)
} else {
None
};
otel_event_manager.request(request_id, attempt, start.elapsed(), &res);
match res {
Ok(resp) if resp.status().is_success() => {
request_span.status_code(resp.status());
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
tokio::spawn(process_chat_sse(
stream,
tx_event,
provider.stream_idle_timeout(),
trace_manager.clone(),
otel_event_manager.clone(),
));
return Ok(ResponseStream { rx_event });
}
Ok(res) => {
let status = res.status();
let mut log_message = status.to_string();
request_span.status_code(status);
if !(status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error()) {
let body = (res.text().await).unwrap_or_default();
if !body.is_empty() {
log_message = format!("{log_message}: {body}");
}
request_span.error(attempt, Some(status), &log_message);
return Err(CodexErr::UnexpectedStatus(status, body));
}
if attempt > max_retries {
request_span.error(attempt, Some(status), &log_message);
return Err(CodexErr::RetryLimit(status));
}
request_span.error(attempt, Some(status), &log_message);
let retry_after_secs = res
.headers()
.get(reqwest::header::RETRY_AFTER)
@@ -347,8 +345,6 @@ pub(crate) async fn stream_chat_completions(
tokio::time::sleep(delay).await;
}
Err(e) => {
let error_message = format!("{e:#}");
request_span.error(attempt, None, &error_message);
if attempt > max_retries {
return Err(e.into());
}
@@ -366,7 +362,7 @@ async fn process_chat_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
idle_timeout: Duration,
trace_manager: TraceManager,
otel_event_manager: OtelEventManager,
) where
S: Stream<Item = Result<Bytes>> + Unpin,
{
@@ -389,14 +385,13 @@ async fn process_chat_sse<S>(
let mut assistant_text = String::new();
let mut reasoning_text = String::new();
let sse_span = trace_manager.response();
loop {
let sse = match timeout(idle_timeout, stream.next().instrument(sse_span.span())).await {
let start = Instant::now();
let sse = match timeout(idle_timeout, stream.next()).await {
Ok(Some(Ok(ev))) => ev,
Ok(Some(Err(e))) => {
let error = e.to_string();
sse_span.error(error.as_str());
otel_event_manager.sse_event_failed(None, start.elapsed(), error.as_str());
let _ = tx_event.send(Err(CodexErr::Stream(error, None))).await;
return;
}
@@ -412,7 +407,7 @@ async fn process_chat_sse<S>(
}
Err(_) => {
let error = "idle timeout waiting for SSE";
sse_span.error(error);
otel_event_manager.sse_event_failed(None, start.elapsed(), error);
let _ = tx_event
.send(Err(CodexErr::Stream(error.into(), None)))
.await;
@@ -420,10 +415,9 @@ async fn process_chat_sse<S>(
}
};
sse_span.body(sse.data.as_str());
// OpenAI Chat streaming sends a literal string "[DONE]" when finished.
if sse.data.trim() == "[DONE]" {
otel_event_manager.sse_event(sse.event, start.elapsed());
// Emit any finalized items before closing so downstream consumers receive
// terminal events for both assistant content and raw reasoning.
if !assistant_text.is_empty() {
@@ -463,11 +457,12 @@ async fn process_chat_sse<S>(
Ok(v) => v,
Err(e) => {
let error = format!("Failed to parse SSE event: {e}, data: {}", &sse.data);
sse_span.error(error.as_str());
otel_event_manager.sse_event_failed(None, start.elapsed(), error.as_str());
continue;
}
};
trace!("chat_completions received SSE chunk: {chunk:?}");
otel_event_manager.sse_event(sse.event, start.elapsed());
let choice_opt = chunk.get("choices").and_then(|c| c.get(0));

View File

@@ -2,6 +2,7 @@ use std::io::BufRead;
use std::path::Path;
use std::sync::OnceLock;
use std::time::Duration;
use std::time::Instant;
use crate::AuthManager;
use bytes::Bytes;
@@ -17,7 +18,6 @@ use serde_json::Value;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_util::io::ReaderStream;
use tracing::Instrument;
use tracing::debug;
use tracing::trace;
use tracing::warn;
@@ -44,7 +44,7 @@ use crate::openai_tools::create_tools_json_for_responses_api;
use crate::protocol::TokenUsage;
use crate::token_data::PlanType;
use crate::util::backoff;
use codex_otel::trace_manager::TraceManager;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::models::ResponseItem;
@@ -71,7 +71,7 @@ struct Error {
pub struct ModelClient {
config: Arc<Config>,
auth_manager: Option<Arc<AuthManager>>,
trace_manager: TraceManager,
otel_event_manager: OtelEventManager,
client: reqwest::Client,
provider: ModelProviderInfo,
conversation_id: ConversationId,
@@ -83,7 +83,7 @@ impl ModelClient {
pub fn new(
config: Arc<Config>,
auth_manager: Option<Arc<AuthManager>>,
trace_manager: TraceManager,
otel_event_manager: OtelEventManager,
provider: ModelProviderInfo,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
@@ -94,7 +94,7 @@ impl ModelClient {
Self {
config,
auth_manager,
trace_manager,
otel_event_manager,
client,
provider,
conversation_id,
@@ -128,7 +128,7 @@ impl ModelClient {
&self.config.model_family,
&self.client,
&self.provider,
&self.trace_manager,
&self.otel_event_manager,
)
.await?;
@@ -165,8 +165,12 @@ impl ModelClient {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
// short circuit for tests
warn!(path, "Streaming from fixture");
return stream_from_fixture(path, self.provider.clone(), self.trace_manager.clone())
.await;
return stream_from_fixture(
path,
self.provider.clone(),
self.otel_event_manager.clone(),
)
.await;
}
let auth_manager = self.auth_manager.clone();
@@ -245,8 +249,6 @@ impl ModelClient {
payload_body.as_str()
);
let request_span = self.trace_manager.request(&input_with_instructions);
let mut req_builder = self
.provider
.create_request_builder(&self.client, &auth)
@@ -267,13 +269,10 @@ impl ModelClient {
req_builder = req_builder.header("chatgpt-account-id", account_id);
}
let tracing_headers = TraceManager::headers(&request_span);
let start = Instant::now();
let res = req_builder.send().await;
req_builder = req_builder.headers(tracing_headers);
let res = req_builder.send().instrument(request_span.span()).await;
if let Ok(resp) = &res {
let request_id = if let Ok(resp) = &res {
let request_id = resp
.headers()
.get("x-request-id")
@@ -287,12 +286,16 @@ impl ModelClient {
.map(|v| v.to_str().unwrap_or_default())
.unwrap_or_default()
);
request_span.request_id(request_id);
}
Some(request_id.to_string())
} else {
None
};
self.otel_event_manager
.request(request_id, attempt, start.elapsed(), &res);
match res {
Ok(resp) if resp.status().is_success() => {
request_span.status_code(resp.status());
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
// spawn task to process SSE
@@ -301,14 +304,13 @@ impl ModelClient {
stream,
tx_event,
self.provider.stream_idle_timeout(),
self.trace_manager.clone(),
self.otel_event_manager.clone(),
));
return Ok(ResponseStream { rx_event });
}
Ok(res) => {
let status = res.status();
let mut log_message = status.to_string();
// Pull out RetryAfter header if present.
let retry_after_secs = res
@@ -329,10 +331,6 @@ impl ModelClient {
|| status.is_server_error())
{
let body = res.text().await.unwrap_or_default();
if !body.is_empty() {
log_message = format!("{log_message}: {body}");
}
request_span.error(attempt, Some(status), &log_message);
return Err(CodexErr::UnexpectedStatus(status, body));
}
@@ -344,20 +342,17 @@ impl ModelClient {
.plan_type
.or_else(|| auth.as_ref().and_then(|a| a.get_plan_type()));
let resets_in_seconds = error.resets_in_seconds;
request_span.error(attempt, Some(status), &log_message);
return Err(CodexErr::UsageLimitReached(UsageLimitReachedError {
plan_type,
resets_in_seconds,
}));
} else if error.r#type.as_deref() == Some("usage_not_included") {
request_span.error(attempt, Some(status), &log_message);
return Err(CodexErr::UsageNotIncluded);
}
}
}
if attempt > max_retries {
request_span.error(attempt, Some(status), &log_message);
if status == StatusCode::INTERNAL_SERVER_ERROR {
return Err(CodexErr::InternalServerError);
}
@@ -365,16 +360,12 @@ impl ModelClient {
return Err(CodexErr::RetryLimit(status));
}
request_span.error(attempt, Some(status), &log_message);
let delay = retry_after_secs
.map(|s| Duration::from_millis(s * 1_000))
.unwrap_or_else(|| backoff(attempt));
tokio::time::sleep(delay).await;
}
Err(e) => {
let error_message = format!("{e:#}");
request_span.error(attempt, None, &error_message);
if attempt > max_retries {
return Err(e.into());
}
@@ -389,8 +380,8 @@ impl ModelClient {
self.provider.clone()
}
pub fn get_trace_manager(&self) -> TraceManager {
self.trace_manager.clone()
pub fn get_otel_event_manager(&self) -> OtelEventManager {
self.otel_event_manager.clone()
}
/// Returns the currently configured model slug.
@@ -508,7 +499,7 @@ async fn process_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
idle_timeout: Duration,
trace_manager: TraceManager,
otel_event_manager: OtelEventManager,
) where
S: Stream<Item = Result<Bytes>> + Unpin,
{
@@ -519,15 +510,14 @@ async fn process_sse<S>(
let mut response_completed: Option<ResponseCompleted> = None;
let mut response_error: Option<CodexErr> = None;
let sse_span = trace_manager.response();
loop {
let sse = match timeout(idle_timeout, stream.next().instrument(sse_span.span())).await {
let start = Instant::now();
let sse = match timeout(idle_timeout, stream.next()).await {
Ok(Some(Ok(sse))) => sse,
Ok(Some(Err(e))) => {
debug!("SSE Error: {e:#}");
let error = e.to_string();
sse_span.error(error.as_str());
otel_event_manager.sse_event_failed(None, start.elapsed(), error.as_str());
let event = CodexErr::Stream(error, None);
let _ = tx_event.send(Err(event)).await;
return;
@@ -539,7 +529,8 @@ async fn process_sse<S>(
usage,
}) => {
if let Some(token_usage) = &usage {
sse_span.token_usage(
otel_event_manager.sse_event_completed(
start.elapsed(),
token_usage.input_tokens,
token_usage.output_tokens,
token_usage
@@ -566,7 +557,11 @@ async fn process_sse<S>(
));
if let CodexErr::Stream(message, _) = &error {
sse_span.error(message.as_str());
otel_event_manager.sse_event_failed(
None,
start.elapsed(),
message.as_str(),
);
}
let _ = tx_event.send(Err(error)).await;
@@ -577,7 +572,7 @@ async fn process_sse<S>(
Err(_) => {
let error = "idle timeout waiting for SSE";
sse_span.error(error);
otel_event_manager.sse_event_failed(None, start.elapsed(), error);
let _ = tx_event
.send(Err(CodexErr::Stream(error.into(), None)))
@@ -589,13 +584,11 @@ async fn process_sse<S>(
let raw = sse.data.clone();
trace!("SSE event: {}", raw);
sse_span.body(raw.as_str());
let event: SseEvent = match serde_json::from_str(&sse.data) {
Ok(event) => event,
Err(e) => {
let error = format!("Failed to parse SSE event: {e}, data: {}", &sse.data);
sse_span.error(error.as_str());
otel_event_manager.sse_event_failed(None, start.elapsed(), error.as_str());
debug!(error);
continue;
}
@@ -621,20 +614,26 @@ async fn process_sse<S>(
// The fix is to forward the incremental events *as they come* and
// drop the duplicated list inside `response.completed`.
"response.output_item.done" => {
let Some(item_val) = event.item else { continue };
let Some(item_val) = event.item else {
otel_event_manager.sse_event(event.kind, start.elapsed());
continue
};
let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) else {
let error = "failed to parse ResponseItem from output_item.done";
debug!(error);
sse_span.error(error);
otel_event_manager.sse_event_failed(Some(event.kind), start.elapsed(), error);
continue;
};
otel_event_manager.sse_event(event.kind, start.elapsed());
let event = ResponseEvent::OutputItemDone(item);
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
"response.output_text.delta" => {
otel_event_manager.sse_event(event.kind, start.elapsed());
if let Some(delta) = event.delta {
let event = ResponseEvent::OutputTextDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
@@ -643,6 +642,7 @@ async fn process_sse<S>(
}
}
"response.reasoning_summary_text.delta" => {
otel_event_manager.sse_event(event.kind, start.elapsed());
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningSummaryDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
@@ -651,6 +651,7 @@ async fn process_sse<S>(
}
}
"response.reasoning_text.delta" => {
otel_event_manager.sse_event(event.kind, start.elapsed());
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningContentDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
@@ -659,6 +660,7 @@ async fn process_sse<S>(
}
}
"response.created" => {
otel_event_manager.sse_event(event.kind, start.elapsed());
if event.response.is_some() {
let _ = tx_event.send(Ok(ResponseEvent::Created {})).await;
}
@@ -671,8 +673,6 @@ async fn process_sse<S>(
None,
));
sse_span.error(error_message);
let error = resp_val.get("error");
if let Some(error) = error {
@@ -680,15 +680,17 @@ async fn process_sse<S>(
Ok(error) => {
let delay = try_parse_retry_after(&error);
let message = error.message.unwrap_or_default();
sse_span.error(message.as_str());
otel_event_manager.sse_event_failed(Some(event.kind), start.elapsed(), message.as_str());
response_error = Some(CodexErr::Stream(message, delay));
}
Err(e) => {
let error_message = format!("failed to parse ErrorResponse: {e}");
debug!(error_message);
sse_span.error(error_message.as_str());
otel_event_manager.sse_event_failed(Some(event.kind), start.elapsed(), error_message.as_str());
}
}
} else {
otel_event_manager.sse_event_failed(Some(event.kind), start.elapsed(), error_message);
}
}
}
@@ -702,7 +704,7 @@ async fn process_sse<S>(
Err(e) => {
let error_message = format!("failed to parse ResponseCompleted: {e}");
debug!(error_message);
sse_span.error(error_message.as_str());
otel_event_manager.sse_event_failed(Some(event.kind), start.elapsed(), error_message.as_str());
continue;
}
};
@@ -715,6 +717,7 @@ async fn process_sse<S>(
| "response.in_progress"
| "response.output_text.done" => {}
"response.output_item.added" => {
otel_event_manager.sse_event(event.kind, start.elapsed());
if let Some(item) = event.item.as_ref() {
// Detect web_search_call begin and forward a synthetic event upstream.
if let Some(ty) = item.get("type").and_then(|v| v.as_str())
@@ -734,13 +737,18 @@ async fn process_sse<S>(
}
"response.reasoning_summary_part.added" => {
// Boundary between reasoning summary sections (e.g., titles).
otel_event_manager.sse_event(event.kind, start.elapsed());
let event = ResponseEvent::ReasoningSummaryPartAdded;
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
"response.reasoning_summary_text.done" => {}
_ => {}
"response.reasoning_summary_text.done" => {
otel_event_manager.sse_event(event.kind, start.elapsed());
}
_ => {
otel_event_manager.sse_event(event.kind, start.elapsed());
}
}
}
}
@@ -749,7 +757,7 @@ async fn process_sse<S>(
async fn stream_from_fixture(
path: impl AsRef<Path>,
provider: ModelProviderInfo,
trace_manager: TraceManager,
otel_event_manager: OtelEventManager,
) -> Result<ResponseStream> {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let f = std::fs::File::open(path.as_ref())?;
@@ -768,7 +776,7 @@ async fn stream_from_fixture(
stream,
tx_event,
provider.stream_idle_timeout(),
trace_manager,
otel_event_manager,
));
Ok(ResponseStream { rx_event })
}
@@ -824,7 +832,7 @@ mod tests {
async fn collect_events(
chunks: &[&[u8]],
provider: ModelProviderInfo,
trace_manager: TraceManager,
otel_event_manager: OtelEventManager,
) -> Vec<Result<ResponseEvent>> {
let mut builder = IoBuilder::new();
for chunk in chunks {
@@ -838,7 +846,7 @@ mod tests {
stream,
tx,
provider.stream_idle_timeout(),
trace_manager,
otel_event_manager,
));
let mut events = Vec::new();
@@ -853,7 +861,7 @@ mod tests {
async fn run_sse(
events: Vec<serde_json::Value>,
provider: ModelProviderInfo,
trace_manager: TraceManager,
otel_event_manager: OtelEventManager,
) -> Vec<ResponseEvent> {
let mut body = String::new();
for e in events {
@@ -874,7 +882,7 @@ mod tests {
stream,
tx,
provider.stream_idle_timeout(),
trace_manager,
otel_event_manager,
));
let mut out = Vec::new();
@@ -935,7 +943,7 @@ mod tests {
requires_openai_auth: false,
};
let trace_manager = TraceManager::new(
let otel_event_manager = OtelEventManager::new(
ConversationId::new(),
"test",
"test",
@@ -948,7 +956,7 @@ mod tests {
let events = collect_events(
&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()],
provider,
trace_manager,
otel_event_manager,
)
.await;
@@ -1006,7 +1014,7 @@ mod tests {
requires_openai_auth: false,
};
let trace_manager = TraceManager::new(
let otel_event_manager = OtelEventManager::new(
ConversationId::new(),
"test",
"test",
@@ -1016,7 +1024,7 @@ mod tests {
"test".to_string(),
);
let events = collect_events(&[sse1.as_bytes()], provider, trace_manager).await;
let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await;
assert_eq!(events.len(), 2);
@@ -1050,7 +1058,7 @@ mod tests {
requires_openai_auth: false,
};
let trace_manager = TraceManager::new(
let otel_event_manager = OtelEventManager::new(
ConversationId::new(),
"test",
"test",
@@ -1060,7 +1068,7 @@ mod tests {
"test".to_string(),
);
let events = collect_events(&[sse1.as_bytes()], provider, trace_manager).await;
let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await;
assert_eq!(events.len(), 1);
@@ -1165,7 +1173,7 @@ mod tests {
requires_openai_auth: false,
};
let trace_manager = TraceManager::new(
let otel_event_manager = OtelEventManager::new(
ConversationId::new(),
"test",
"test",
@@ -1175,7 +1183,7 @@ mod tests {
"test".to_string(),
);
let out = run_sse(evs, provider, trace_manager).await;
let out = run_sse(evs, provider, otel_event_manager).await;
assert_eq!(out.len(), case.expected_len, "case {}", case.name);
assert!(
(case.expect_first)(&out[0]),

View File

@@ -122,7 +122,9 @@ use crate::unified_exec::UnifiedExecSessionManager;
use crate::user_instructions::UserInstructions;
use crate::user_notification::UserNotification;
use crate::util::backoff;
use codex_otel::trace_manager::{ToolDecisionOutcome, ToolDecisionSource, TraceManager};
use codex_otel::otel_event_manager::OtelEventManager;
use codex_otel::otel_event_manager::ToolDecisionOutcome;
use codex_otel::otel_event_manager::ToolDecisionSource;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::custom_prompts::CustomPrompt;
@@ -445,7 +447,7 @@ impl Session {
}
}
let trace_manager = TraceManager::new(
let otel_event_manager = OtelEventManager::new(
conversation_id,
config.model.as_str(),
config.model_family.slug.as_str(),
@@ -460,7 +462,7 @@ impl Session {
let client = ModelClient::new(
config.clone(),
Some(auth_manager.clone()),
trace_manager,
otel_event_manager,
provider.clone(),
model_reasoning_effort,
model_reasoning_summary,
@@ -1182,6 +1184,13 @@ impl AgentTask {
}
}
#[tracing::instrument(
name = "codex.conversation",
skip_all,
fields(
conversation.id = %sess.conversation_id,
)
)]
async fn submission_loop(
sess: Arc<Session>,
turn_context: TurnContext,
@@ -1232,7 +1241,7 @@ async fn submission_loop(
updated_config.model_context_window = Some(model_info.context_window);
}
let trace_manager = prev.client.get_trace_manager().with_model(
let otel_event_manager = prev.client.get_otel_event_manager().with_model(
updated_config.model.as_str(),
updated_config.model_family.slug.as_str(),
);
@@ -1240,7 +1249,7 @@ async fn submission_loop(
let client = ModelClient::new(
Arc::new(updated_config),
auth_manager,
trace_manager,
otel_event_manager,
provider,
effective_effort,
effective_summary,
@@ -1291,7 +1300,10 @@ async fn submission_loop(
}
}
Op::UserInput { items } => {
let _ = turn_context.client.get_trace_manager().user_prompt(&items);
turn_context
.client
.get_otel_event_manager()
.user_prompt(&items);
// attempt to inject input into current task
if let Err(items) = sess.inject_input(items).await {
// no current task, spawn a new one
@@ -1309,7 +1321,10 @@ async fn submission_loop(
effort,
summary,
} => {
let _ = turn_context.client.get_trace_manager().user_prompt(&items);
turn_context
.client
.get_otel_event_manager()
.user_prompt(&items);
// attempt to inject input into current task
if let Err(items) = sess.inject_input(items).await {
// Derive a fresh TurnContext for this turn using the provided overrides.
@@ -1328,17 +1343,18 @@ async fn submission_loop(
per_turn_config.model_context_window = Some(model_info.context_window);
}
let trace_manager = turn_context.client.get_trace_manager().with_model(
per_turn_config.model.as_str(),
per_turn_config.model_family.slug.as_str(),
);
let otel_event_manager =
turn_context.client.get_otel_event_manager().with_model(
per_turn_config.model.as_str(),
per_turn_config.model_family.slug.as_str(),
);
// Build a new client with perturn reasoning settings.
// Reuse the same provider and session id; auth defaults to env/API key.
let client = ModelClient::new(
Arc::new(per_turn_config),
auth_manager,
trace_manager,
otel_event_manager,
provider,
effort,
summary,
@@ -2749,7 +2765,7 @@ async fn handle_container_exec_with_params(
sub_id: String,
call_id: String,
) -> ResponseInputItem {
let trace_manager = turn_context.client.get_trace_manager();
let otel_event_manager = turn_context.client.get_otel_event_manager();
let mut auto_approved_via_user = false;
if params.with_escalated_permissions.unwrap_or(false)
@@ -2868,11 +2884,7 @@ async fn handle_container_exec_with_params(
ToolDecisionSource::Config
};
trace_manager.tool_decision(
tool_name,
ToolDecisionOutcome::Accept,
source,
);
otel_event_manager.tool_decision(tool_name, ToolDecisionOutcome::Accept, source);
sandbox_type
}
@@ -2888,14 +2900,14 @@ async fn handle_container_exec_with_params(
.await;
match rx_approve.await.unwrap_or_default() {
ReviewDecision::Approved => {
trace_manager.tool_decision(
otel_event_manager.tool_decision(
tool_name,
ToolDecisionOutcome::Accept,
ToolDecisionSource::UserTemporary,
);
}
ReviewDecision::ApprovedForSession => {
trace_manager.tool_decision(
otel_event_manager.tool_decision(
tool_name,
ToolDecisionOutcome::Accept,
ToolDecisionSource::UserForSession,
@@ -2903,7 +2915,7 @@ async fn handle_container_exec_with_params(
sess.add_approved_command(params.command.clone()).await;
}
ReviewDecision::Denied => {
trace_manager.tool_decision(
otel_event_manager.tool_decision(
tool_name,
ToolDecisionOutcome::Reject,
ToolDecisionSource::UserReject,
@@ -2917,7 +2929,7 @@ async fn handle_container_exec_with_params(
};
}
ReviewDecision::Abort => {
trace_manager.tool_decision(
otel_event_manager.tool_decision(
tool_name,
ToolDecisionOutcome::Reject,
ToolDecisionSource::UserAbort,
@@ -2938,7 +2950,7 @@ async fn handle_container_exec_with_params(
SandboxType::None
}
SafetyCheck::Reject { reason } => {
trace_manager.tool_decision(
otel_event_manager.tool_decision(
tool_name,
ToolDecisionOutcome::Reject,
ToolDecisionSource::Config,

View File

@@ -1065,31 +1065,16 @@ impl Config {
use crate::config_types::OtelConfig;
use crate::config_types::OtelConfigToml;
use crate::config_types::OtelExporterKind;
use crate::config_types::OtelSampler;
let t: OtelConfigToml = cfg.otel.unwrap_or_default();
let enabled = t.enabled.unwrap_or(false);
let log_user_prompt = t.log_user_prompt.unwrap_or(false);
let environment = t
.environment
.unwrap_or(DEFAULT_OTEL_ENVIRONMENT.to_string());
if !enabled {
OtelConfig {
enabled,
log_user_prompt,
environment,
sampler: OtelSampler::AlwaysOn,
exporter: OtelExporterKind::None,
}
} else {
let sampler = t.sampler.unwrap_or(OtelSampler::AlwaysOn);
let exporter = t.exporter.unwrap_or(OtelExporterKind::OtlpFile);
OtelConfig {
enabled,
log_user_prompt,
environment,
sampler,
exporter,
}
let exporter = t.exporter.unwrap_or(OtelExporterKind::None);
OtelConfig {
log_user_prompt,
environment,
exporter,
}
},
};
@@ -1671,11 +1656,9 @@ model_verbosity = "high"
disable_paste_burst: false,
tui_notifications: Default::default(),
otel: crate::config_types::OtelConfig {
enabled: false,
log_user_prompt: false,
environment: DEFAULT_OTEL_ENVIRONMENT.to_string(),
exporter: crate::config_types::OtelExporterKind::None,
sampler: crate::config_types::OtelSampler::AlwaysOn,
},
},
o3_profile_config
@@ -1736,11 +1719,9 @@ model_verbosity = "high"
disable_paste_burst: false,
tui_notifications: Default::default(),
otel: crate::config_types::OtelConfig {
enabled: false,
log_user_prompt: false,
environment: DEFAULT_OTEL_ENVIRONMENT.to_string(),
exporter: crate::config_types::OtelExporterKind::None,
sampler: crate::config_types::OtelSampler::AlwaysOn,
},
};
@@ -1816,11 +1797,9 @@ model_verbosity = "high"
disable_paste_burst: false,
tui_notifications: Default::default(),
otel: crate::config_types::OtelConfig {
enabled: false,
log_user_prompt: false,
environment: DEFAULT_OTEL_ENVIRONMENT.to_string(),
exporter: crate::config_types::OtelExporterKind::None,
sampler: crate::config_types::OtelSampler::AlwaysOn,
},
};
@@ -1882,11 +1861,9 @@ model_verbosity = "high"
disable_paste_burst: false,
tui_notifications: Default::default(),
otel: crate::config_types::OtelConfig {
enabled: false,
log_user_prompt: false,
environment: DEFAULT_OTEL_ENVIRONMENT.to_string(),
exporter: crate::config_types::OtelExporterKind::None,
sampler: crate::config_types::OtelSampler::AlwaysOn,
},
};

View File

@@ -92,7 +92,6 @@ pub enum OtelHttpProtocol {
#[serde(rename_all = "kebab-case")]
pub enum OtelExporterKind {
None,
OtlpFile,
OtlpHttp {
endpoint: String,
headers: HashMap<String, String>,
@@ -104,28 +103,15 @@ pub enum OtelExporterKind {
},
}
#[derive(Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum OtelSampler {
AlwaysOn,
TraceIdRatioBased { ratio: f64 },
}
/// OTEL settings loaded from config.toml. Fields are optional so we can apply defaults.
#[derive(Deserialize, Debug, Clone, PartialEq, Default)]
pub struct OtelConfigToml {
/// Enable or disable OTEL entirely. Defaults to false.
pub enabled: Option<bool>,
/// Log user prompt in traces
pub log_user_prompt: Option<bool>,
/// Mark traces with environment (dev, staging, prod, test). Defaults to dev.
pub environment: Option<String>,
/// Sampler strategy, defaults to AlwaysOn
pub sampler: Option<OtelSampler>,
/// Exporter to use. Defaults to `otlp-file`.
pub exporter: Option<OtelExporterKind>,
}
@@ -133,10 +119,8 @@ pub struct OtelConfigToml {
/// Effective OTEL settings after defaults are applied.
#[derive(Debug, Clone, PartialEq)]
pub struct OtelConfig {
pub enabled: bool,
pub log_user_prompt: bool,
pub environment: String,
pub sampler: OtelSampler,
pub exporter: OtelExporterKind,
}

View File

@@ -1,10 +1,8 @@
use crate::config::Config;
use crate::config_types::OtelExporterKind as Kind;
use crate::config_types::OtelHttpProtocol as Protocol;
use crate::config_types::OtelSampler as Sampler;
use codex_otel::config::OtelExporter;
use codex_otel::config::OtelHttpProtocol;
use codex_otel::config::OtelSampler;
use codex_otel::config::OtelSettings;
use codex_otel::otel_provider::OtelProvider;
use std::error::Error;
@@ -20,7 +18,6 @@ pub fn build_provider(
) -> Result<Option<OtelProvider>, Box<dyn Error>> {
let exporter = match &config.otel.exporter {
Kind::None => OtelExporter::None,
Kind::OtlpFile => OtelExporter::OtlpFile,
Kind::OtlpHttp {
endpoint,
headers,
@@ -49,18 +46,11 @@ pub fn build_provider(
},
};
let sampler = match config.otel.sampler {
Sampler::AlwaysOn => OtelSampler::AlwaysOn,
Sampler::TraceIdRatioBased { ratio } => OtelSampler::TraceIdRatioBased(ratio),
};
OtelProvider::from(&OtelSettings {
enabled: config.otel.enabled,
service_name: SERVICE_NAME.to_string(),
service_version: service_version.to_string(),
codex_home: config.codex_home.clone(),
environment: config.otel.environment.to_string(),
sampler,
exporter,
})
}
@@ -71,5 +61,5 @@ pub fn build_provider(
/// - use our naming convention (names starting with "codex.")
/// - originate from our crates (targets starting with "codex_")
pub fn codex_export_filter(meta: &tracing::Metadata<'_>) -> bool {
meta.target().starts_with("codex_")
meta.target().starts_with("codex_otel")
}

View File

@@ -11,7 +11,7 @@ use codex_core::ReasoningItemContent;
use codex_core::ResponseItem;
use codex_core::WireApi;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_otel::trace_manager::TraceManager;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use core_test_support::load_default_config_for_test;
@@ -74,7 +74,7 @@ async fn run_request(input: Vec<ResponseItem>) -> Value {
let conversation_id = ConversationId::new();
let trace_manager = TraceManager::new(
let trace_manager = OtelEventManager::new(
conversation_id,
config.model.as_str(),
config.model_family.slug.as_str(),

View File

@@ -8,7 +8,7 @@ use codex_core::ResponseEvent;
use codex_core::ResponseItem;
use codex_core::WireApi;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_otel::trace_manager::TraceManager;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use core_test_support::load_default_config_for_test;
@@ -67,7 +67,7 @@ async fn run_stream(sse_body: &str) -> Vec<ResponseEvent> {
let conversation_id = ConversationId::new();
let trace_manager = TraceManager::new(
let trace_manager = OtelEventManager::new(
conversation_id,
config.model.as_str(),
config.model_family.slug.as_str(),

View File

@@ -38,8 +38,9 @@ tokio = { version = "1", features = [
"signal",
] }
tracing = { version = "0.1.41", features = ["log"] }
tracing-opentelemetry = "0.31.0"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
tracing-opentelemetry = "0.31.0"
opentelemetry-appender-tracing = "0.30.1"
[dev-dependencies]
assert_cmd = "2"

View File

@@ -3,10 +3,6 @@ mod event_processor;
mod event_processor_with_human_output;
mod event_processor_with_json_output;
use std::io::IsTerminal;
use std::io::Read;
use std::path::PathBuf;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
pub use cli::Cli;
@@ -16,7 +12,6 @@ use codex_core::ConversationManager;
use codex_core::NewConversation;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::config_types::OtelExporterKind;
use codex_core::git_info::get_git_repo_root;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::Event;
@@ -28,10 +23,13 @@ use codex_ollama::DEFAULT_OSS_MODEL;
use codex_protocol::config_types::SandboxMode;
use event_processor_with_human_output::EventProcessorWithHumanOutput;
use event_processor_with_json_output::EventProcessorWithJsonOutput;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use std::io::IsTerminal;
use std::io::Read;
use std::path::PathBuf;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
@@ -183,10 +181,6 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
#[allow(clippy::print_stderr)]
let otel = match otel {
Ok(otel) => otel,
Err(e) if config.otel.exporter == OtelExporterKind::OtlpFile => {
eprintln!("Could not create trace log file: {e}");
std::process::exit(1);
}
Err(e) => {
eprintln!("Could not create otel exporter: {e}");
std::process::exit(1);
@@ -194,8 +188,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
};
if let Some(provider) = otel {
let tracer = provider.tracer();
let otel_layer = OpenTelemetryLayer::new(tracer).with_filter(
let otel_layer = OpenTelemetryTracingBridge::new(&provider.logger).with_filter(
tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter),
);

View File

@@ -35,10 +35,11 @@ chrono = "0.4.42"
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["registry", "fmt"], optional = true }
tracing-opentelemetry = { version = "0.31.0", optional = true }
opentelemetry = { version = "0.30.0", features = ["trace"], optional = true }
opentelemetry_sdk = { version = "0.30.0", features = ["trace", "rt-tokio"], optional = true }
opentelemetry = { version = "0.30.0", features = ["logs"], optional = true }
opentelemetry_sdk = { version = "0.30.0", features = ["logs", "rt-tokio"], optional = true }
opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic", "http-proto", "http-json", "trace", "reqwest", "reqwest-rustls"], optional = true }
opentelemetry-proto = { version = "0.30.0", features = ["gen-tonic"], optional = true }
opentelemetry-appender-tracing = { version = "0.30.0" }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"], optional = true }
tonic = { version = "0.13.1", optional = true }
serde = { version = "1", features = ["derive"] }

View File

@@ -3,21 +3,13 @@ use std::path::PathBuf;
#[derive(Clone, Debug)]
pub struct OtelSettings {
pub enabled: bool,
pub environment: String,
pub service_name: String,
pub service_version: String,
pub codex_home: PathBuf,
pub sampler: OtelSampler,
pub exporter: OtelExporter,
}
#[derive(Clone, Debug)]
pub enum OtelSampler {
AlwaysOn,
TraceIdRatioBased(f64),
}
#[derive(Clone, Debug)]
pub enum OtelHttpProtocol {
/// HTTP protocol with binary protobuf
@@ -29,7 +21,6 @@ pub enum OtelHttpProtocol {
#[derive(Clone, Debug)]
pub enum OtelExporter {
None,
OtlpFile,
OtlpGrpc {
endpoint: String,
headers: HashMap<String, String>,

View File

@@ -1,111 +0,0 @@
use crate::config::OtelSettings;
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkError;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::trace::SpanData;
use opentelemetry_sdk::trace::SpanExporter;
use std::fmt::Debug;
use std::fs;
use std::fs::File;
use std::io::Error as IoError;
use std::io::LineWriter;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
const TRACES_SUBDIR: &str = "traces";
#[derive(Debug)]
pub(crate) struct FileExporter<W: Write + Send + Debug> {
writer: Arc<Mutex<LineWriter<W>>>,
resource: Resource,
}
impl<W: Write + Send + Debug> FileExporter<W> {
pub(crate) fn new(writer: W, resource: Resource) -> Self {
Self {
writer: Arc::new(Mutex::new(LineWriter::new(writer))),
resource,
}
}
}
impl<W: Write + Send + Debug> SpanExporter for FileExporter<W> {
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
let resource = ResourceAttributesWithSchema::from(&self.resource);
let resource_spans = group_spans_by_resource_and_scope(batch, &resource);
let req = ExportTraceServiceRequest { resource_spans };
let mut writer = match self.writer.lock() {
Ok(f) => f,
Err(e) => {
return Err(OTelSdkError::InternalFailure(e.to_string()));
}
};
serde_json::to_writer(writer.get_mut(), &req)
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))
.and(writeln!(writer).map_err(|e| OTelSdkError::InternalFailure(e.to_string())))
}
fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult {
self.force_flush()
}
fn force_flush(&mut self) -> OTelSdkResult {
let mut writer = self
.writer
.lock()
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
writer
.flush()
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))
}
fn set_resource(&mut self, resource: &Resource) {
self.resource = resource.clone();
}
}
pub(crate) fn create_log_file(settings: &OtelSettings) -> std::io::Result<(File, PathBuf)> {
let run_id = Uuid::new_v4();
// Resolve ~/.codex/traces/YYYY/MM/DD and create it if missing.
let timestamp = OffsetDateTime::now_local()
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
let mut dir = settings.codex_home.clone();
dir.push(TRACES_SUBDIR);
dir.push(timestamp.year().to_string());
dir.push(format!("{:02}", u8::from(timestamp.month())));
dir.push(format!("{:02}", timestamp.day()));
fs::create_dir_all(&dir)?;
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
// compatibility with filesystems that do not allow colons in filenames.
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let date_str = timestamp
.format(format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let filename = format!("trace-{date_str}-{run_id}.jsonl");
let path = dir.join(filename);
let file = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(&path)?;
Ok((file, path))
}

View File

@@ -1,11 +1,8 @@
pub mod config;
#[cfg(feature = "otel")]
mod file_exporter;
pub mod otel_event_manager;
#[cfg(feature = "otel")]
pub mod otel_provider;
pub mod trace_manager;
#[cfg(not(feature = "otel"))]
mod imp {

View File

@@ -0,0 +1,230 @@
use chrono::SecondsFormat;
use chrono::Utc;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::InputItem;
use reqwest::Error;
use reqwest::Response;
use serde::Serialize;
use std::time::Duration;
use strum_macros::Display;
#[derive(Debug, Clone, Serialize, Display)]
#[serde(rename_all = "snake_case")]
pub enum ToolDecisionOutcome {
Accept,
Reject,
}
#[derive(Debug, Clone, Serialize, Display)]
#[serde(rename_all = "snake_case")]
pub enum ToolDecisionSource {
Config,
UserForSession,
UserTemporary,
UserAbort,
UserReject,
}
#[derive(Debug, Clone)]
pub struct OtelEventMetadata {
conversation_id: ConversationId,
auth_mode: AuthMode,
account_id: Option<String>,
model: String,
slug: String,
log_user_prompts: bool,
app_version: &'static str,
terminal_type: String,
}
#[derive(Debug, Clone)]
pub struct OtelEventManager {
metadata: OtelEventMetadata,
}
impl OtelEventManager {
pub fn new(
conversation_id: ConversationId,
model: &str,
slug: &str,
account_id: Option<String>,
auth_mode: AuthMode,
log_user_prompts: bool,
terminal_type: String,
) -> OtelEventManager {
Self {
metadata: OtelEventMetadata {
conversation_id,
auth_mode,
account_id,
model: model.to_owned(),
slug: slug.to_owned(),
log_user_prompts,
app_version: env!("CARGO_PKG_VERSION"),
terminal_type,
},
}
}
pub fn with_model(&self, model: &str, slug: &str) -> Self {
let mut manager = self.clone();
manager.metadata.model = model.to_owned();
manager.metadata.slug = slug.to_owned();
manager
}
pub fn request(
&self,
request_id: Option<String>,
attempt: u64,
duration: Duration,
response: &Result<Response, Error>,
) {
let (status, error) = match response {
Ok(response) => (Some(response.status().as_u16()), None),
Err(error) => (error.status().map(|s| s.as_u16()), Some(error.to_string())),
};
tracing::event!(
tracing::Level::INFO,
event.name = "codex.api_request",
event.timestamp = %timestamp(),
conversation.id = %self.metadata.conversation_id,
app.version = %self.metadata.app_version,
auth_mode = %self.metadata.auth_mode,
user.account_id = self.metadata.account_id,
terminal.type = %self.metadata.terminal_type,
model = %self.metadata.model,
slug = %self.metadata.slug,
request_id = request_id,
duration_ms = %duration.as_millis(),
http.response.status_code = status,
error.message = error,
attempt = attempt,
);
}
pub fn sse_event(&self, kind: String, duration: Duration) {
tracing::event!(
tracing::Level::INFO,
event.name = "codex.sse_event",
event.timestamp = %timestamp(),
event.kind = %kind,
conversation.id = %self.metadata.conversation_id,
app.version = %self.metadata.app_version,
auth_mode = %self.metadata.auth_mode,
user.account_id = self.metadata.account_id,
terminal.type = %self.metadata.terminal_type,
model = %self.metadata.model,
slug = %self.metadata.slug,
duration_ms = %duration.as_millis(),
);
}
pub fn sse_event_failed(&self, kind: Option<String>, duration: Duration, error: &str) {
tracing::event!(
tracing::Level::INFO,
event.name = "codex.sse_event",
event.timestamp = %timestamp(),
event.kind = kind,
conversation.id = %self.metadata.conversation_id,
app.version = %self.metadata.app_version,
auth_mode = %self.metadata.auth_mode,
user.account_id = self.metadata.account_id,
terminal.type = %self.metadata.terminal_type,
model = %self.metadata.model,
slug = %self.metadata.slug,
duration_ms = %duration.as_millis(),
error.message = %error,
);
}
pub fn sse_event_completed(
&self,
duration: Duration,
input_token_count: u64,
output_token_count: u64,
cached_token_count: Option<u64>,
reasoning_token_count: Option<u64>,
tool_token_count: u64,
) {
tracing::event!(
tracing::Level::INFO,
event.name = "codex.sse_event",
event.timestamp = %timestamp(),
event.kind = "response.completed",
conversation.id = %self.metadata.conversation_id,
app.version = %self.metadata.app_version,
auth_mode = %self.metadata.auth_mode,
user.account_id = self.metadata.account_id,
terminal.type = %self.metadata.terminal_type,
model = %self.metadata.model,
slug = %self.metadata.slug,
duration_ms = %duration.as_millis(),
input_token_count = %input_token_count,
output_token_count = %output_token_count,
cached_token_count = cached_token_count,
reasoning_token_count = reasoning_token_count,
tool_token_count = %tool_token_count,
);
}
pub fn user_prompt(&self, items: &[InputItem]) {
let prompt = items
.iter()
.flat_map(|item| match item {
InputItem::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<String>();
let prompt_to_log = if self.metadata.log_user_prompts {
prompt.as_str()
} else {
"[REDACTED]"
};
tracing::event!(
tracing::Level::INFO,
event.name = "codex.user_prompt",
event.timestamp = %timestamp(),
conversation.id = %self.metadata.conversation_id,
app.version = %self.metadata.app_version,
auth_mode = %self.metadata.auth_mode,
user.account_id = self.metadata.account_id,
terminal.type = %self.metadata.terminal_type,
model = %self.metadata.model,
slug = %self.metadata.slug,
prompt_length = %prompt.chars().count(),
prompt = %prompt_to_log,
);
}
pub fn tool_decision(
&self,
tool_name: &str,
outcome: ToolDecisionOutcome,
source: ToolDecisionSource,
) {
tracing::event!(
tracing::Level::INFO,
event.name = "codex.tool_decision",
event.timestamp = %timestamp(),
conversation.id = %self.metadata.conversation_id,
app.version = %self.metadata.app_version,
auth_mode = %self.metadata.auth_mode,
user.account_id = self.metadata.account_id,
terminal.type = %self.metadata.terminal_type,
model = %self.metadata.model,
slug = %self.metadata.slug,
tool_name = %tool_name,
decision = outcome.to_string(),
source = source.to_string(),
);
}
}
fn timestamp() -> String {
Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
}

View File

@@ -1,21 +1,14 @@
use crate::config::OtelExporter;
use crate::config::OtelHttpProtocol;
use crate::config::OtelSampler;
use crate::config::OtelSettings;
use crate::file_exporter::FileExporter;
use crate::file_exporter::create_log_file;
use opentelemetry::KeyValue;
use opentelemetry::global;
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::LogExporter;
use opentelemetry_otlp::Protocol;
use opentelemetry_otlp::SpanExporter;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::WithHttpConfig;
use opentelemetry_otlp::WithTonicConfig;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::trace::Sampler;
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::trace::Tracer;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_semantic_conventions as semconv;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
@@ -27,29 +20,15 @@ use tracing::debug;
const ENV_ATTRIBUTE: &str = "env";
pub struct OtelProvider {
pub name: String,
pub provider: SdkTracerProvider,
pub logger: SdkLoggerProvider,
}
impl OtelProvider {
pub fn tracer(&self) -> Tracer {
self.provider.tracer(self.name.clone())
}
pub fn shutdown(&self) {
let _ = self.provider.shutdown();
let _ = self.logger.shutdown();
}
pub fn from(settings: &OtelSettings) -> Result<Option<Self>, Box<dyn Error>> {
if !settings.enabled {
return Ok(None);
}
let sampler = match settings.sampler {
OtelSampler::AlwaysOn => Sampler::AlwaysOn,
OtelSampler::TraceIdRatioBased(ratio) => Sampler::TraceIdRatioBased(ratio),
};
let resource = Resource::builder()
.with_service_name(settings.service_name.clone())
.with_attributes(vec![
@@ -61,21 +40,12 @@ impl OtelProvider {
])
.build();
let mut builder = SdkTracerProvider::builder()
.with_resource(resource.clone())
.with_sampler(sampler);
let mut builder = SdkLoggerProvider::builder().with_resource(resource.clone());
match &settings.exporter {
OtelExporter::None => {
debug!("No exporter enabled in OTLP settings.");
}
OtelExporter::OtlpFile => {
let (log_file, log_path) = create_log_file(settings)?;
debug!("Using OTLP File exporter: {}", log_path.display());
let exporter = FileExporter::new(log_file, resource);
builder = builder.with_batch_exporter(exporter);
return Ok(None);
}
OtelExporter::OtlpGrpc { endpoint, headers } => {
debug!("Using OTLP Grpc exporter: {}", endpoint);
@@ -89,7 +59,7 @@ impl OtelProvider {
}
}
let exporter = SpanExporter::builder()
let exporter = LogExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_metadata(MetadataMap::from_headers(header_map))
@@ -109,7 +79,7 @@ impl OtelProvider {
OtelHttpProtocol::Json => Protocol::HttpJson,
};
let exporter = SpanExporter::builder()
let exporter = LogExporter::builder()
.with_http()
.with_endpoint(endpoint)
.with_protocol(protocol)
@@ -120,19 +90,14 @@ impl OtelProvider {
}
}
let provider = builder.build();
global::set_tracer_provider(provider.clone());
Ok(Some(Self {
name: settings.service_name.clone(),
provider,
logger: builder.build(),
}))
}
}
impl Drop for OtelProvider {
fn drop(&mut self) {
let _ = self.provider.shutdown();
let _ = self.logger.shutdown();
}
}

View File

@@ -1,303 +0,0 @@
use chrono::SecondsFormat;
use chrono::Utc;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InputItem;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_http::HeaderInjector;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use reqwest::StatusCode;
use reqwest::header::HeaderMap;
use serde::Serialize;
use tracing::Span;
use tracing::info_span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use strum_macros::Display;
pub struct RequestSpan(pub(crate) Span);
impl RequestSpan {
pub fn new(metadata: TraceMetadata) -> Self {
let span = info_span!(
"codex.api_request",
conversation.id = %metadata.conversation_id,
app.version = %metadata.app_version,
auth_mode = %metadata.auth_mode,
user.account_id = tracing::field::Empty,
terminal.type = %metadata.terminal_type,
event.timestamp = %timestamp(),
model = %metadata.model,
slug = %metadata.slug,
prompt = tracing::field::Empty,
http.response.status_code = tracing::field::Empty,
error.message = tracing::field::Empty,
attempt = tracing::field::Empty,
input_tokens = tracing::field::Empty,
output_tokens = tracing::field::Empty,
cache_read_tokens = tracing::field::Empty,
cache_creation_tokens = tracing::field::Empty,
request_id = tracing::field::Empty,
);
if let Some(account_id) = &metadata.account_id {
span.record("account_id", account_id);
}
// Todo include prompt conditionally
Self(span)
}
pub fn request_id(&self, request_id: &str) -> &Self {
self.0.record("request_id", request_id);
self
}
pub fn status_code(&self, status: StatusCode) -> &Self {
self.0.record("http.response.status_code", status.as_str());
self
}
pub fn error(&self, attempt: u64, status: Option<StatusCode>, message: &str) -> &Self {
self.0.record("attempt", attempt);
self.0.record("error.message", message);
if let Some(code) = status {
self.0.record("http.response.status_code", code.as_u16());
}
self
}
pub fn span(&self) -> Span {
self.0.clone()
}
}
pub struct SSESpan(pub(crate) Span);
impl SSESpan {
pub fn new(metadata: TraceMetadata) -> Self {
let span = info_span!(
"codex.sse_event",
conversation.id = %metadata.conversation_id,
app.version = %metadata.app_version,
auth_mode = %metadata.auth_mode,
user.account_id = tracing::field::Empty,
terminal.type = %metadata.terminal_type,
event.timestamp = %timestamp(),
model = %metadata.model,
slug = %metadata.slug,
error.message = tracing::field::Empty,
input_token_count = tracing::field::Empty,
output_token_count = tracing::field::Empty,
cached_content_token_count = tracing::field::Empty,
thoughts_token_count = tracing::field::Empty,
reasoning_token_count = tracing::field::Empty,
tool_token_count = tracing::field::Empty,
response_body = tracing::field::Empty,
);
if let Some(account_id) = &metadata.account_id {
span.record("account_id", account_id);
}
SSESpan(span)
}
pub fn body(&self, body: &str) -> &Self {
self.0.record("response_body", body);
self
}
pub fn token_usage(
&self,
input_token_count: u64,
output_token_count: u64,
cached_token_count: Option<u64>,
reasoning_token_count: Option<u64>,
tool_token_count: u64,
) -> &Self {
self.0.record("input_token_count", input_token_count);
self.0.record("output_token_count", output_token_count);
self.0.record("cached_token_count", cached_token_count);
self.0
.record("reasoning_token_count", reasoning_token_count);
self.0.record("tool_token_count", tool_token_count);
self
}
pub fn error(&self, error: &str) -> &Self {
self.0.record("error.message", error);
self
}
pub fn span(&self) -> Span {
self.0.clone()
}
}
#[derive(Debug, Clone, Serialize, Display)]
#[serde(rename_all = "snake_case")]
pub enum ToolDecisionOutcome {
Accept,
Reject,
}
#[derive(Debug, Clone, Serialize, Display)]
#[serde(rename_all = "snake_case")]
pub enum ToolDecisionSource {
Config,
UserForSession,
UserTemporary,
UserAbort,
UserReject,
}
pub struct ToolDecisionSpan(pub(crate) Span);
impl ToolDecisionSpan {
pub fn new(
metadata: TraceMetadata,
tool_name: &str,
outcome: ToolDecisionOutcome,
source: ToolDecisionSource,
) -> Self {
let span = info_span!(
"codex.tool_decision",
session.id = %metadata.conversation_id,
app.version = %metadata.app_version,
user.account_id = tracing::field::Empty,
terminal.type = %metadata.terminal_type,
event.timestamp = %timestamp(),
tool_name = %tool_name,
decision = outcome.to_string(),
source = source.to_string(),
);
if let Some(account_id) = &metadata.account_id {
span.record("user.account_id", account_id);
}
ToolDecisionSpan(span)
}
pub fn span(&self) -> Span {
self.0.clone()
}
}
pub struct UserPromptSpan(pub(crate) Span);
impl UserPromptSpan {
pub fn new(metadata: TraceMetadata, prompt: &str) -> Self {
let prompt_to_log = if metadata.log_user_prompts {
prompt
} else {
"[REDACTED]"
};
let span = info_span!(
"codex.user_prompt",
session.id = %metadata.conversation_id,
app.version = %metadata.app_version,
user.account_id = tracing::field::Empty,
terminal.type = %metadata.terminal_type,
event.timestamp = %timestamp(),
prompt_length = %prompt.chars().count(),
prompt = %prompt_to_log,
);
if let Some(account_id) = &metadata.account_id {
span.record("user.account_id", account_id);
}
Self(span)
}
pub fn span(&self) -> Span {
self.0.clone()
}
}
#[derive(Debug, Clone)]
pub struct TraceMetadata {
conversation_id: ConversationId,
auth_mode: AuthMode,
account_id: Option<String>,
model: String,
slug: String,
log_user_prompts: bool,
app_version: &'static str,
terminal_type: String,
}
#[derive(Debug, Clone)]
pub struct TraceManager {
metadata: TraceMetadata,
}
impl TraceManager {
pub fn new(
conversation_id: ConversationId,
model: &str,
slug: &str,
account_id: Option<String>,
auth_mode: AuthMode,
log_user_prompts: bool,
terminal_type: String,
) -> TraceManager {
Self {
metadata: TraceMetadata {
conversation_id,
auth_mode,
account_id,
model: model.to_owned(),
slug: slug.to_owned(),
log_user_prompts,
app_version: env!("CARGO_PKG_VERSION"),
terminal_type,
},
}
}
pub fn with_model(&self, model: &str, slug: &str) -> Self {
let mut manager = self.clone();
manager.metadata.model = model.to_owned();
manager.metadata.slug = slug.to_owned();
manager
}
pub fn headers(span: &RequestSpan) -> HeaderMap {
let mut injector = HeaderMap::new();
TraceContextPropagator::default()
.inject_context(&span.0.context(), &mut HeaderInjector(&mut injector));
injector
}
pub fn request(&self, _prompt: &[ResponseItem]) -> RequestSpan {
RequestSpan::new(self.metadata.clone())
}
pub fn response(&self) -> SSESpan {
SSESpan::new(self.metadata.clone())
}
pub fn user_prompt(&self, items: &[InputItem]) -> UserPromptSpan {
let prompt = items
.iter()
.flat_map(|item| match item {
InputItem::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<String>();
UserPromptSpan::new(self.metadata.clone(), prompt.as_ref())
}
pub fn tool_decision(&self, tool_name: &str, outcome: ToolDecisionOutcome, source: ToolDecisionSource) -> ToolDecisionSpan {
ToolDecisionSpan::new(self.metadata.clone(), tool_name, outcome, source)
}
}
fn timestamp() -> String {
Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
}

View File

@@ -1,113 +0,0 @@
#![cfg(feature = "otel")]
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use codex_otel::config::OtelExporter;
use codex_otel::config::OtelSampler;
use codex_otel::config::OtelSettings;
use codex_otel::otel_provider::OtelProvider;
use tempfile::TempDir;
use tracing_subscriber::prelude::*;
use walkdir::WalkDir;
fn latest_trace_file(dir: &Path) -> Option<PathBuf> {
let traces_dir = dir.join("traces");
WalkDir::new(&traces_dir)
.into_iter()
.filter_map(Result::ok)
.filter(|e| e.file_type().is_file())
.filter_map(|e| {
let mtime = e.metadata().ok()?.modified().ok()?;
Some((mtime, e.into_path()))
})
.max_by_key(|(mtime, _)| *mtime)
.map(|(_, path)| path)
}
fn settings(codex_home: PathBuf) -> OtelSettings {
OtelSettings {
enabled: true,
environment: "test".to_string(),
service_name: "codex-test".to_string(),
service_version: "0.0.0".to_string(),
codex_home,
sampler: OtelSampler::AlwaysOn,
exporter: OtelExporter::OtlpFile,
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn file_exporter_writes_span_json() {
let tmp = TempDir::new().expect("temp dir");
let codex_home = tmp.path().to_path_buf();
let provider = OtelProvider::from(&settings(codex_home.clone()))
.unwrap()
.expect("otel provider");
let tracer = provider.tracer();
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = tracing_subscriber::registry().with(otel_layer);
tracing::subscriber::with_default(subscriber, || {
let span = tracing::info_span!("test.span", test_id = %"123");
let _entered = span.entered();
});
drop(provider);
let file = latest_trace_file(&codex_home).expect("traces file should exist");
let contents = fs::read_to_string(&file).expect("read traces file");
assert!(!contents.is_empty(), "traces file should not be empty");
let first_line = contents.lines().next().expect("at least one line");
let v: serde_json::Value = serde_json::from_str(first_line).expect("valid json line");
let span_name = v["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["name"]
.as_str()
.unwrap_or("");
assert_eq!(span_name, "test.span");
assert!(v["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["traceId"].is_string());
assert!(v["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["spanId"].is_string());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn session_span_flushes_on_shutdown() {
let tmp = TempDir::new().expect("temp dir");
let codex_home = tmp.path().to_path_buf();
let provider = OtelProvider::from(&settings(codex_home.clone()))
.unwrap()
.expect("otel provider");
let tracer = provider.tracer();
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = tracing_subscriber::registry().with(otel_layer);
tracing::subscriber::with_default(subscriber, || {
let span = tracing::info_span!("codex.session", test_id = %"123");
drop(span);
});
drop(provider);
let file = latest_trace_file(&codex_home).expect("traces file should exist");
let contents = fs::read_to_string(&file).expect("read traces file");
let mut found = false;
for line in contents.lines() {
let v: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
if let Some(name) = v["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["name"].as_str()
&& name == "codex.session"
{
found = true;
break;
}
}
assert!(found, "expected a codex.session span to be exported");
}

View File

@@ -81,7 +81,7 @@ tracing = { version = "0.1.41", features = ["log"] }
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
pulldown-cmark = "0.10"
tracing-opentelemetry = { version = "0.31.0" }
opentelemetry-appender-tracing = "0.30.1"
unicode-segmentation = "1.12.0"
unicode-width = "0.1"
url = "2"

View File

@@ -22,6 +22,7 @@ use codex_core::protocol::SandboxPolicy;
use codex_ollama::DEFAULT_OSS_MODEL;
use codex_protocol::config_types::SandboxMode;
use codex_protocol::mcp_protocol::AuthMode;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use std::fs::OpenOptions;
use std::path::PathBuf;
use tracing::error;
@@ -245,10 +246,6 @@ pub async fn run_main(
#[allow(clippy::print_stderr)]
let otel = match otel {
Ok(otel) => otel,
Err(e) if config.otel.exporter == OtelExporterKind::OtlpFile => {
eprintln!("Could not create trace log file: {e}");
std::process::exit(1);
}
Err(e) => {
eprintln!("Could not create otel exporter: {e}");
std::process::exit(1);
@@ -256,8 +253,7 @@ pub async fn run_main(
};
if let Some(provider) = otel.as_ref() {
let tracer = provider.tracer();
let otel_layer = tracing_opentelemetry::OpenTelemetryLayer::new(tracer).with_filter(
let otel_layer = OpenTelemetryTracingBridge::new(&provider.logger).with_filter(
tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter),
);