Compare commits

...

5 Commits

Author SHA1 Message Date
charley-openai
7993b1ad72 Fix clippy in otel log helper 2026-05-07 23:23:39 -07:00
charley-openai
b156621ba6 Fix otel image metadata test under Bazel 2026-05-07 22:40:54 -07:00
charley-openai
78a5f86f9d Remove stale session import 2026-05-07 22:40:54 -07:00
charley-openai
57a1e855f0 Move image input metadata to turn spans 2026-05-07 22:40:54 -07:00
charley-openai
2d514cf6d4 Add image input metadata to user prompt traces 2026-05-07 22:40:35 -07:00
11 changed files with 869 additions and 23 deletions

View File

@@ -353,7 +353,6 @@ use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use codex_protocol::protocol::WarningEvent;
use codex_protocol::user_input::UserInput;
use codex_tools::ToolEnvironmentMode;
use codex_tools::ToolsConfig;
use codex_tools::ToolsConfigParams;
use codex_utils_absolute_path::AbsolutePathBuf;

View File

@@ -1,5 +1,6 @@
use super::turn_context::image_generation_tool_auth_allowed;
use super::*;
use std::sync::OnceLock;
use std::sync::atomic::AtomicBool;
/// Spawn a review thread using the given prompt.
@@ -152,6 +153,8 @@ pub(super) async fn spawn_review_thread(
turn_metadata_state,
turn_skills: TurnSkillsContext::new(parent_turn_context.turn_skills.outcome.clone()),
turn_timing_state: Arc::new(TurnTimingState::default()),
turn_span: OnceLock::new(),
model_input_images: codex_otel::ModelInputImageTelemetry::default(),
server_model_warning_emitted: AtomicBool::new(false),
model_verification_emitted: AtomicBool::new(false),
};

View File

@@ -440,6 +440,11 @@ pub(crate) async fn run_turn(
.await
.for_prompt(&turn_context.model_info.input_modalities)
};
turn_context.session_telemetry.record_model_input_images(
&turn_context.turn_span(),
&turn_context.model_input_images,
&sampling_request_input,
);
let sampling_request_input_messages = sampling_request_input
.iter()

View File

@@ -11,6 +11,8 @@ use codex_protocol::protocol::TurnEnvironmentSelection;
use codex_sandboxing::compatibility_sandbox_policy_for_permission_profile;
use codex_sandboxing::policy_transforms::effective_file_system_sandbox_policy;
use codex_sandboxing::policy_transforms::effective_network_sandbox_policy;
use codex_tools::ToolEnvironmentMode;
use std::sync::OnceLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@@ -95,6 +97,8 @@ pub(crate) struct TurnContext {
pub(crate) turn_metadata_state: Arc<TurnMetadataState>,
pub(crate) turn_skills: TurnSkillsContext,
pub(crate) turn_timing_state: Arc<TurnTimingState>,
pub(crate) turn_span: OnceLock<tracing::Span>,
pub(crate) model_input_images: codex_otel::ModelInputImageTelemetry,
pub(crate) server_model_warning_emitted: AtomicBool,
pub(crate) model_verification_emitted: AtomicBool,
}
@@ -111,6 +115,17 @@ impl TurnContext {
self.permission_profile.network_sandbox_policy()
}
pub(crate) fn set_turn_span(&self, span: tracing::Span) {
let _ = self.turn_span.set(span);
}
pub(crate) fn turn_span(&self) -> tracing::Span {
self.turn_span
.get()
.cloned()
.unwrap_or_else(tracing::Span::none)
}
pub(crate) fn sandbox_policy(&self) -> SandboxPolicy {
let file_system_sandbox_policy = self.file_system_sandbox_policy();
let network_sandbox_policy = self.network_sandbox_policy();
@@ -235,6 +250,11 @@ impl TurnContext {
&config.agent_roles,
));
let turn_span = OnceLock::new();
if let Some(span) = self.turn_span.get() {
let _ = turn_span.set(span.clone());
}
Self {
sub_id: self.sub_id.clone(),
trace_id: self.trace_id.clone(),
@@ -278,6 +298,8 @@ impl TurnContext {
turn_metadata_state: self.turn_metadata_state.clone(),
turn_skills: self.turn_skills.clone(),
turn_timing_state: Arc::clone(&self.turn_timing_state),
turn_span,
model_input_images: self.model_input_images.clone(),
server_model_warning_emitted: AtomicBool::new(
self.server_model_warning_emitted.load(Ordering::Relaxed),
),
@@ -573,6 +595,8 @@ impl Session {
turn_metadata_state,
turn_skills: TurnSkillsContext::new(skills_outcome),
turn_timing_state: Arc::new(TurnTimingState::default()),
turn_span: OnceLock::new(),
model_input_images: codex_otel::ModelInputImageTelemetry::default(),
server_model_warning_emitted: AtomicBool::new(false),
model_verification_emitted: AtomicBool::new(false),
}

View File

@@ -360,10 +360,6 @@ impl Session {
let turn = active.get_or_insert_with(ActiveTurn::default);
debug_assert!(turn.tasks.is_empty());
let done_clone = Arc::clone(&done);
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
let ctx = Arc::clone(&turn_context);
let task_for_run = Arc::clone(&task);
let task_cancellation_token = cancellation_token.child_token();
// Task-owned turn spans keep a core-owned span open for the
// full task lifecycle after the submission dispatch span ends.
let reasoning_effort = turn_context.effective_reasoning_effort_for_tracing();
@@ -380,7 +376,18 @@ impl Session {
codex.turn.token_usage.output_tokens = field::Empty,
codex.turn.token_usage.reasoning_output_tokens = field::Empty,
codex.turn.token_usage.total_tokens = field::Empty,
codex.turn.model_input_image_count = field::Empty,
codex.turn.model_input_message_image_count = field::Empty,
codex.turn.model_input_tool_image_count = field::Empty,
codex.turn.model_input_image_types = field::Empty,
codex.turn.model_input_image_mime_types = field::Empty,
codex.turn.model_input_image_details = field::Empty,
);
turn_context.set_turn_span(task_span.clone());
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
let ctx = Arc::clone(&turn_context);
let task_for_run = Arc::clone(&task);
let task_cancellation_token = cancellation_token.child_token();
let handle = tokio::spawn(
async move {
let ctx_for_finish = Arc::clone(&ctx);

View File

@@ -1,3 +1,6 @@
use anyhow::Result;
use codex_config::types::McpServerConfig;
use codex_config::types::McpServerTransportConfig;
use codex_core::config::Constrained;
use codex_features::Feature;
use codex_protocol::models::PermissionProfile;
@@ -11,6 +14,7 @@ use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_custom_tool_call;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_function_call_with_namespace;
use core_test_support::responses::ev_local_shell_call;
use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
@@ -24,10 +28,15 @@ use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::sse_response;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::stdio_server_bin;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tracing::Level;
use tracing_test::traced_test;
@@ -72,6 +81,29 @@ fn assert_empty_mcp_tool_fields(line: &str) -> Result<(), String> {
Ok(())
}
async fn assert_log_line_eventually(
buffer: &Mutex<Vec<u8>>,
failure_message: &str,
matches: impl Fn(&str) -> bool,
) {
let start = Instant::now();
loop {
let bytes = match buffer.lock() {
Ok(guard) => guard.clone(),
Err(poisoned) => poisoned.into_inner().clone(),
};
let logs = String::from_utf8_lossy(&bytes);
if logs.lines().any(&matches) || start.elapsed() > Duration::from_secs(5) {
assert!(
logs.lines().any(&matches),
"{failure_message}\nlogs:\n{logs}"
);
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
#[test]
fn extract_log_field_handles_empty_bare_values() {
let line = "event.name=\"codex.tool_result\" mcp_server= mcp_server_origin=";
@@ -565,7 +597,7 @@ async fn process_sse_emits_completed_telemetry() {
}
#[tokio::test(flavor = "current_thread")]
async fn turn_and_completed_response_spans_record_token_usage() {
async fn turn_span_records_token_usage_and_image_metadata() {
let buffer: &'static Mutex<Vec<u8>> = Box::leak(Box::new(Mutex::new(Vec::new())));
let subscriber = tracing_subscriber::fmt()
.with_level(true)
@@ -603,6 +635,10 @@ async fn turn_and_completed_response_spans_record_token_usage() {
.features
.disable(Feature::GhostCommit)
.expect("test config should allow feature update");
config
.features
.disable(Feature::ShellSnapshot)
.expect("test config should allow feature update");
})
.build(&server)
.await
@@ -613,10 +649,15 @@ async fn turn_and_completed_response_spans_record_token_usage() {
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
items: vec![
UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
},
UserInput::Image {
image_url: "DATA:image/jpeg;BASE64,AAAA".to_string(),
},
],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
@@ -640,8 +681,10 @@ async fn turn_and_completed_response_spans_record_token_usage() {
}),
"missing completed response span token usage\nlogs:\n{logs}"
);
assert!(
logs.lines().any(|line| {
assert_log_line_eventually(
buffer,
"missing regular turn span token usage and image metadata",
|line| {
line.contains("turn{otel.name=\"session_task.turn\"")
&& line.contains("codex.turn.reasoning_effort=high")
&& line.contains("codex.turn.token_usage.input_tokens=3")
@@ -650,9 +693,149 @@ async fn turn_and_completed_response_spans_record_token_usage() {
&& line.contains("codex.turn.token_usage.output_tokens=5")
&& line.contains("codex.turn.token_usage.reasoning_output_tokens=2")
&& line.contains("codex.turn.token_usage.total_tokens=9")
}),
"missing regular turn span token usage\nlogs:\n{logs}"
);
&& line.contains("codex.turn.model_input_image_count=1")
&& line.contains("codex.turn.model_input_message_image_count=1")
&& line.contains("codex.turn.model_input_tool_image_count=0")
&& line.contains("codex.turn.model_input_image_types=\"jpeg\"")
&& line.contains("codex.turn.model_input_image_mime_types=\"image/jpeg\"")
&& line.contains("\\\"source\\\":\\\"message\\\"")
&& line.contains("\\\"byte_length\\\":3")
},
)
.await;
}
#[tokio::test(flavor = "current_thread")]
async fn turn_span_records_tool_output_image_metadata() -> Result<()> {
skip_if_no_network!(Ok(()));
let buffer: &'static Mutex<Vec<u8>> = Box::leak(Box::new(Mutex::new(Vec::new())));
let subscriber = tracing_subscriber::fmt()
.with_level(true)
.with_ansi(false)
.with_max_level(Level::TRACE)
.with_span_events(FmtSpan::FULL)
.with_writer(MockWriter::new(buffer))
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let server = start_mock_server().await;
let call_id = "rmcp-image-otel";
let server_name = "rmcp";
let namespace = format!("mcp__{server_name}__");
let openai_png = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMB/ee9bQAAAABJRU5ErkJggg==";
mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_function_call_with_namespace(call_id, &namespace, "image", "{}"),
ev_completed("resp-1"),
]),
)
.await;
mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
)
.await;
let rmcp_test_server_bin = stdio_server_bin()?;
let fixture = test_codex()
.with_config(move |config| {
config
.features
.disable(Feature::GhostCommit)
.expect("test config should allow feature update");
config
.features
.disable(Feature::ShellSnapshot)
.expect("test config should allow feature update");
let mut servers = config.mcp_servers.get().clone();
servers.insert(
server_name.to_string(),
McpServerConfig {
transport: McpServerTransportConfig::Stdio {
command: rmcp_test_server_bin,
args: Vec::new(),
env: Some(HashMap::from([(
"MCP_TEST_IMAGE_DATA_URL".to_string(),
openai_png.to_string(),
)])),
env_vars: Vec::new(),
cwd: None,
},
experimental_environment: None,
enabled: true,
required: false,
supports_parallel_tool_calls: false,
disabled_reason: None,
startup_timeout_sec: Some(Duration::from_secs(10)),
tool_timeout_sec: None,
default_tools_approval_mode: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth_resource: None,
tools: HashMap::new(),
},
);
config
.mcp_servers
.set(servers)
.expect("test mcp servers should accept any configuration");
})
.build(&server)
.await?;
let session_model = fixture.session_configured.model.clone();
let permission_profile = PermissionProfile::read_only();
let sandbox_policy = permission_profile.to_legacy_sandbox_policy(fixture.cwd.path())?;
fixture
.codex
.submit(Op::UserTurn {
environments: None,
items: vec![UserInput::Text {
text: "call the rmcp image tool".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: fixture.cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy,
permission_profile: Some(permission_profile),
model: session_model,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_log_line_eventually(
buffer,
"missing tool output image metadata on turn span",
|line| {
line.contains("turn{otel.name=\"session_task.turn\"")
&& line.contains("codex.turn.model_input_image_count=1")
&& line.contains("codex.turn.model_input_message_image_count=0")
&& line.contains("codex.turn.model_input_tool_image_count=1")
&& line.contains("codex.turn.model_input_image_types=\"png\"")
&& line.contains("codex.turn.model_input_image_mime_types=\"image/png\"")
&& line.contains("\\\"source\\\":\\\"tool_output\\\"")
},
)
.await;
Ok(())
}
#[tokio::test]

View File

@@ -0,0 +1,250 @@
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::ResponseItem;
use serde::Serialize;
use std::collections::BTreeSet;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ImageInputTelemetrySnapshot {
pub(crate) details_json: String,
pub(crate) image_types: Vec<String>,
pub(crate) mime_types: Vec<String>,
pub(crate) message_image_count: usize,
pub(crate) tool_output_image_count: usize,
}
#[derive(Debug, Clone, Default)]
pub struct ModelInputImageTelemetry {
state: Arc<Mutex<ImageInputTelemetryState>>,
}
impl ModelInputImageTelemetry {
pub(crate) fn record_items(&self, items: &[ResponseItem]) -> ImageInputTelemetrySnapshot {
let request = model_input_image_telemetry(items);
let mut state = match self.state.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
state.accumulate(request);
state.snapshot()
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
struct ImageInputTelemetryState {
details: Vec<ImageInputDetail>,
image_types: BTreeSet<String>,
mime_types: BTreeSet<String>,
message_image_count: usize,
tool_output_image_count: usize,
}
impl ImageInputTelemetryState {
fn accumulate(&mut self, request: Self) {
self.message_image_count = self.message_image_count.max(request.message_image_count);
self.tool_output_image_count = self
.tool_output_image_count
.max(request.tool_output_image_count);
self.image_types.extend(request.image_types);
self.mime_types.extend(request.mime_types);
if request.details.len() > self.details.len() {
self.details = request.details;
}
}
fn snapshot(&self) -> ImageInputTelemetrySnapshot {
let details_json =
serde_json::to_string(&self.details).unwrap_or_else(|_| "[]".to_string());
ImageInputTelemetrySnapshot {
details_json,
image_types: self.image_types.iter().cloned().collect(),
mime_types: self.mime_types.iter().cloned().collect(),
message_image_count: self.message_image_count,
tool_output_image_count: self.tool_output_image_count,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
struct ImageInputDetail {
source: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
image_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
mime_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
byte_length: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
extension: Option<String>,
}
fn model_input_image_telemetry(items: &[ResponseItem]) -> ImageInputTelemetryState {
let mut details: Vec<ImageInputDetail> = Vec::new();
let mut message_image_count = 0;
let mut tool_output_image_count = 0;
for item in items {
match item {
ResponseItem::Message { content, .. } => {
for content_item in content {
if let ContentItem::InputImage { image_url, .. } = content_item {
message_image_count += 1;
details.push(image_url_detail("message", image_url));
}
}
}
ResponseItem::FunctionCallOutput { output, .. } => {
if let Some(content_items) = output.content_items() {
tool_output_image_count += collect_tool_output_image_details(
"tool_output",
content_items,
&mut details,
);
}
}
ResponseItem::CustomToolCallOutput { output, .. } => {
if let Some(content_items) = output.content_items() {
tool_output_image_count += collect_tool_output_image_details(
"custom_tool_output",
content_items,
&mut details,
);
}
}
_ => {}
}
}
let image_types = collect_unique_strings(
details
.iter()
.filter_map(|detail| detail.image_type.as_deref()),
);
let mime_types = collect_unique_strings(
details
.iter()
.filter_map(|detail| detail.mime_type.as_deref()),
);
ImageInputTelemetryState {
details,
image_types,
mime_types,
message_image_count,
tool_output_image_count,
}
}
fn collect_tool_output_image_details(
source: &'static str,
content_items: &[FunctionCallOutputContentItem],
details: &mut Vec<ImageInputDetail>,
) -> usize {
let mut image_count = 0;
for content_item in content_items {
if let FunctionCallOutputContentItem::InputImage { image_url, .. } = content_item {
details.push(image_url_detail(source, image_url));
image_count += 1;
}
}
image_count
}
fn image_url_detail(source: &'static str, image_url: &str) -> ImageInputDetail {
if let Some(detail) = data_url_detail(source, image_url) {
return detail;
}
let extension = safe_image_extension_from_url(image_url);
let mime_type = extension.as_deref().and_then(mime_type_from_extension);
ImageInputDetail {
source,
image_type: mime_type.as_deref().and_then(image_type_from_mime),
mime_type,
byte_length: None,
extension,
}
}
fn data_url_detail(source: &'static str, image_url: &str) -> Option<ImageInputDetail> {
let image_url = strip_data_scheme(image_url)?;
let (metadata, payload) = image_url.split_once(',')?;
let mut metadata_parts = metadata.split(';');
let mime_type = normalize_known_image_mime(metadata_parts.next()?)?;
let is_base64 = metadata_parts.any(|part| part.eq_ignore_ascii_case("base64"));
Some(ImageInputDetail {
source,
image_type: image_type_from_mime(&mime_type),
mime_type: Some(mime_type),
byte_length: is_base64.then(|| base64_payload_byte_len(payload)),
extension: None,
})
}
fn strip_data_scheme(image_url: &str) -> Option<&str> {
let scheme = image_url.get(..5)?;
if !scheme.eq_ignore_ascii_case("data:") {
return None;
}
image_url.get(5..)
}
fn safe_image_extension_from_url(url: &str) -> Option<String> {
let path = url.split(['?', '#']).next().unwrap_or(url);
let extension = Path::new(path)
.extension()
.and_then(|extension| extension.to_str())
.map(str::to_ascii_lowercase)?;
mime_type_from_extension(&extension)?;
Some(extension)
}
fn normalize_known_image_mime(mime_type: &str) -> Option<String> {
match mime_type.trim().to_ascii_lowercase().as_str() {
"image/png" => Some("image/png".to_string()),
"image/jpeg" | "image/jpg" => Some("image/jpeg".to_string()),
"image/gif" => Some("image/gif".to_string()),
"image/webp" => Some("image/webp".to_string()),
"image/bmp" => Some("image/bmp".to_string()),
"image/heic" => Some("image/heic".to_string()),
"image/heif" => Some("image/heif".to_string()),
"image/tiff" => Some("image/tiff".to_string()),
"image/svg+xml" => Some("image/svg+xml".to_string()),
_ => None,
}
}
fn mime_type_from_extension(extension: &str) -> Option<String> {
match extension {
"png" => Some("image/png".to_string()),
"jpg" | "jpeg" => Some("image/jpeg".to_string()),
"gif" => Some("image/gif".to_string()),
"webp" => Some("image/webp".to_string()),
"bmp" => Some("image/bmp".to_string()),
"heic" => Some("image/heic".to_string()),
"heif" => Some("image/heif".to_string()),
"tif" | "tiff" => Some("image/tiff".to_string()),
"svg" => Some("image/svg+xml".to_string()),
_ => None,
}
}
fn image_type_from_mime(mime_type: &str) -> Option<String> {
mime_type
.strip_prefix("image/")
.map(str::to_ascii_lowercase)
}
fn base64_payload_byte_len(payload: &str) -> u64 {
let trimmed = payload.trim_end_matches('=');
((trimmed.len() * 3) / 4) as u64
}
fn collect_unique_strings<'a>(values: impl Iterator<Item = &'a str>) -> BTreeSet<String> {
values.map(str::to_string).collect()
}

View File

@@ -1,2 +1,3 @@
pub(crate) mod image_input_telemetry;
pub(crate) mod session_telemetry;
pub(crate) mod shared;

View File

@@ -1,5 +1,6 @@
use crate::TelemetryAuthMode;
use crate::ToolDecisionSource;
use crate::events::image_input_telemetry::ModelInputImageTelemetry;
use crate::events::shared::log_and_trace_event;
use crate::events::shared::log_event;
use crate::events::shared::trace_event;
@@ -878,6 +879,41 @@ impl SessionTelemetry {
);
}
pub fn record_model_input_images(
&self,
turn_span: &Span,
model_input_images: &ModelInputImageTelemetry,
items: &[ResponseItem],
) {
let image_telemetry = model_input_images.record_items(items);
let total_image_count =
image_telemetry.message_image_count + image_telemetry.tool_output_image_count;
let image_types = image_telemetry.image_types.join(",");
let mime_types = image_telemetry.mime_types.join(",");
turn_span.record(
"codex.turn.model_input_image_count",
total_image_count as i64,
);
turn_span.record(
"codex.turn.model_input_message_image_count",
image_telemetry.message_image_count as i64,
);
turn_span.record(
"codex.turn.model_input_tool_image_count",
image_telemetry.tool_output_image_count as i64,
);
turn_span.record("codex.turn.model_input_image_types", image_types.as_str());
turn_span.record(
"codex.turn.model_input_image_mime_types",
mime_types.as_str(),
);
turn_span.record(
"codex.turn.model_input_image_details",
image_telemetry.details_json.as_str(),
);
}
pub fn tool_decision(
&self,
tool_name: &str,

View File

@@ -17,6 +17,7 @@ pub use crate::config::OtelSettings;
pub use crate::config::OtelTlsConfig;
pub use crate::config::StatsigMetricsSettings;
pub use crate::config::validate_span_attributes;
pub use crate::events::image_input_telemetry::ModelInputImageTelemetry;
pub use crate::events::session_telemetry::AuthEnvTelemetryMetadata;
pub use crate::events::session_telemetry::SessionTelemetry;
pub use crate::events::session_telemetry::SessionTelemetryMetadata;

View File

@@ -1,4 +1,5 @@
use codex_otel::AuthEnvTelemetryMetadata;
use codex_otel::ModelInputImageTelemetry;
use codex_otel::OtelProvider;
use codex_otel::SessionTelemetry;
use codex_otel::TelemetryAuthMode;
@@ -11,15 +12,21 @@ use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::trace::InMemorySpanExporter;
use opentelemetry_sdk::trace::SdkTracerProvider;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::fs;
use tracing_subscriber::Layer;
use tracing_subscriber::filter::filter_fn;
use tracing_subscriber::layer::SubscriberExt;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
@@ -40,6 +47,13 @@ fn span_event_attributes(event: &opentelemetry::trace::Event) -> BTreeMap<String
.collect()
}
fn span_attributes(span: &opentelemetry_sdk::trace::SpanData) -> BTreeMap<String, String> {
span.attributes
.iter()
.map(|KeyValue { key, value, .. }| (key.as_str().to_string(), value.to_string()))
.collect()
}
fn any_value_to_string(value: &AnyValue) -> String {
match value {
AnyValue::Int(value) => value.to_string(),
@@ -93,6 +107,18 @@ fn auth_env_metadata() -> AuthEnvTelemetryMetadata {
#[test]
fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() {
let local_png = std::env::temp_dir().join(format!(
"codex-otel-user-prompt-image-{}.png",
std::process::id()
));
fs::write(
&local_png,
[
0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n', 0x00, 0x00, 0x00, 0x0d, b'I', b'H',
b'D', b'R', 0x00, 0x00, 0x02, 0x80, 0x00, 0x00, 0x01, 0xe0,
],
)
.expect("write local png header");
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
@@ -132,18 +158,67 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() {
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.user_prompt(&[
let items = [
UserInput::Text {
text: "super secret prompt".to_string(),
text_elements: Vec::new(),
},
UserInput::Image {
image_url: "https://example.com/image.png".to_string(),
image_url: "DATA:image/jpeg;BASE64,AAAA".to_string(),
},
UserInput::Image {
image_url: "https://example.com/image.customer-secret".to_string(),
},
UserInput::LocalImage {
path: PathBuf::from("/tmp/secret.png"),
path: local_png.clone(),
},
]);
];
let turn_span = tracing::info_span!(
"turn",
otel.name = "session_task.turn",
codex.turn.model_input_image_count = tracing::field::Empty,
codex.turn.model_input_message_image_count = tracing::field::Empty,
codex.turn.model_input_tool_image_count = tracing::field::Empty,
codex.turn.model_input_image_types = tracing::field::Empty,
codex.turn.model_input_image_mime_types = tracing::field::Empty,
codex.turn.model_input_image_details = tracing::field::Empty,
);
let model_input_images = ModelInputImageTelemetry::default();
let model_input = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![
ContentItem::InputText {
text: "super secret prompt".to_string(),
},
ContentItem::InputImage {
image_url: "DATA:image/jpeg;BASE64,AAAA".to_string(),
detail: None,
},
ContentItem::InputImage {
image_url: "https://example.com/image.customer-secret".to_string(),
detail: None,
},
],
phase: None,
},
ResponseItem::FunctionCallOutput {
call_id: "call_1".to_string(),
output: FunctionCallOutputPayload::from_content_items(vec![
FunctionCallOutputContentItem::InputText {
text: "page screenshot".to_string(),
},
FunctionCallOutputContentItem::InputImage {
image_url: "data:image/png;base64,AAAA".to_string(),
detail: None,
},
]),
},
];
manager.record_model_input_images(&turn_span, &model_input_images, &model_input);
manager.user_prompt(&items);
let _turn_guard = turn_span.enter();
});
logger_provider.force_flush().expect("flush logs");
@@ -167,8 +242,12 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() {
);
let spans = span_exporter.get_finished_spans().expect("span export");
assert_eq!(spans.len(), 1);
let span_events = &spans[0].events.events;
assert_eq!(spans.len(), 2);
let root_span = spans
.iter()
.find(|span| span.name == "root")
.expect("missing root span");
let span_events = &root_span.events.events;
assert_eq!(span_events.len(), 1);
let prompt_trace_event = find_span_event_by_name_attr(span_events, "codex.user_prompt");
@@ -187,7 +266,7 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() {
prompt_trace_attrs
.get("image_input_count")
.map(String::as_str),
Some("1")
Some("2")
);
assert_eq!(
prompt_trace_attrs
@@ -195,9 +274,79 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() {
.map(String::as_str),
Some("1")
);
assert!(!prompt_trace_attrs.contains_key("image_input_types"));
assert!(!prompt_trace_attrs.contains_key("image_input_mime_types"));
assert!(!prompt_trace_attrs.contains_key("image_input_details"));
let turn_span = spans
.iter()
.find(|span| span.name == "session_task.turn")
.expect("missing turn span");
let turn_attrs = span_attributes(turn_span);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_count")
.map(String::as_str),
Some("3")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_message_image_count")
.map(String::as_str),
Some("2")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_tool_image_count")
.map(String::as_str),
Some("1")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_types")
.map(String::as_str),
Some("jpeg,png")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_mime_types")
.map(String::as_str),
Some("image/jpeg,image/png")
);
let image_input_details: Value = serde_json::from_str(
turn_attrs
.get("codex.turn.model_input_image_details")
.expect("image input details"),
)
.expect("image input details should be json");
assert_eq!(
image_input_details,
json!([
{
"source": "message",
"image_type": "jpeg",
"mime_type": "image/jpeg",
"byte_length": 3
},
{
"source": "message"
},
{
"source": "tool_output",
"image_type": "png",
"mime_type": "image/png",
"byte_length": 3
}
])
);
assert!(
!turn_attrs["codex.turn.model_input_image_details"].contains("customer-secret"),
"unknown remote URL suffixes should not be traced"
);
assert!(!prompt_trace_attrs.contains_key("prompt"));
assert!(!prompt_trace_attrs.contains_key("user.email"));
assert!(!prompt_trace_attrs.contains_key("user.account_id"));
fs::remove_file(local_png).expect("remove local png header");
}
#[test]
@@ -459,6 +608,194 @@ fn otel_export_routing_policy_routes_auth_recovery_log_and_trace_events() {
);
}
#[test]
fn record_model_input_images_accumulates_when_retry_has_no_images() {
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("model-input-image-retry-test");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
/*log_user_prompts*/ false,
"tty".to_string(),
SessionSource::Cli,
);
let turn_span = tracing::info_span!(
"turn",
otel.name = "session_task.turn",
codex.turn.model_input_image_count = tracing::field::Empty,
codex.turn.model_input_message_image_count = tracing::field::Empty,
codex.turn.model_input_tool_image_count = tracing::field::Empty,
codex.turn.model_input_image_types = tracing::field::Empty,
codex.turn.model_input_image_mime_types = tracing::field::Empty,
codex.turn.model_input_image_details = tracing::field::Empty,
);
let model_input_images = ModelInputImageTelemetry::default();
manager.record_model_input_images(
&turn_span,
&model_input_images,
&[ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputImage {
image_url: "data:image/png;base64,AAAA".to_string(),
detail: None,
}],
phase: None,
}],
);
manager.record_model_input_images(
&turn_span,
&model_input_images,
&[ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "retry after image sanitization".to_string(),
}],
phase: None,
}],
);
let _turn_guard = turn_span.enter();
});
tracer_provider.force_flush().expect("flush traces");
let spans = span_exporter.get_finished_spans().expect("span export");
let turn_span = spans
.iter()
.find(|span| span.name == "session_task.turn")
.expect("missing turn span");
let turn_attrs = span_attributes(turn_span);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_count")
.map(String::as_str),
Some("1")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_message_image_count")
.map(String::as_str),
Some("1")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_tool_image_count")
.map(String::as_str),
Some("0")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_types")
.map(String::as_str),
Some("png")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_mime_types")
.map(String::as_str),
Some("image/png")
);
let image_input_details: Value = serde_json::from_str(
turn_attrs
.get("codex.turn.model_input_image_details")
.expect("image input details"),
)
.expect("image input details should be json");
assert_eq!(
image_input_details,
json!([
{
"source": "message",
"image_type": "png",
"mime_type": "image/png",
"byte_length": 3
}
])
);
}
#[test]
fn record_model_input_images_records_empty_json_details_without_images() {
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("model-input-image-empty-test");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
/*log_user_prompts*/ false,
"tty".to_string(),
SessionSource::Cli,
);
let turn_span = tracing::info_span!(
"turn",
otel.name = "session_task.turn",
codex.turn.model_input_image_count = tracing::field::Empty,
codex.turn.model_input_message_image_count = tracing::field::Empty,
codex.turn.model_input_tool_image_count = tracing::field::Empty,
codex.turn.model_input_image_types = tracing::field::Empty,
codex.turn.model_input_image_mime_types = tracing::field::Empty,
codex.turn.model_input_image_details = tracing::field::Empty,
);
let model_input_images = ModelInputImageTelemetry::default();
manager.record_model_input_images(
&turn_span,
&model_input_images,
&[ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "no images".to_string(),
}],
phase: None,
}],
);
let _turn_guard = turn_span.enter();
});
tracer_provider.force_flush().expect("flush traces");
let spans = span_exporter.get_finished_spans().expect("span export");
let turn_span = spans
.iter()
.find(|span| span.name == "session_task.turn")
.expect("missing turn span");
let turn_attrs = span_attributes(turn_span);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_details")
.map(String::as_str),
Some("[]")
);
}
#[test]
fn otel_export_routing_policy_routes_api_request_auth_observability() {
let log_exporter = InMemoryLogExporter::default();