From e1cf5abb385a9225dad791da648fac643b6cce95 Mon Sep 17 00:00:00 2001 From: charley-openai Date: Mon, 4 May 2026 11:17:04 -0700 Subject: [PATCH] Move image input metadata to turn spans --- codex-rs/core/src/session/review.rs | 3 + codex-rs/core/src/session/turn.rs | 5 + codex-rs/core/src/session/turn_context.rs | 23 ++ codex-rs/core/src/tasks/mod.rs | 15 +- codex-rs/core/tests/suite/otel.rs | 168 +++++++++- .../otel/src/events/image_input_telemetry.rs | 317 ++++++++---------- codex-rs/otel/src/events/session_telemetry.rs | 68 ++-- codex-rs/otel/src/lib.rs | 1 + .../tests/suite/otel_export_routing_policy.rs | 310 ++++++++++++++++- 9 files changed, 683 insertions(+), 227 deletions(-) diff --git a/codex-rs/core/src/session/review.rs b/codex-rs/core/src/session/review.rs index 67c1e33310..7ca5fb72ab 100644 --- a/codex-rs/core/src/session/review.rs +++ b/codex-rs/core/src/session/review.rs @@ -1,5 +1,6 @@ use super::turn_context::image_generation_tool_auth_allowed; use super::*; +use std::sync::OnceLock; use std::sync::atomic::AtomicBool; /// Spawn a review thread using the given prompt. @@ -150,6 +151,8 @@ pub(super) async fn spawn_review_thread( turn_metadata_state, turn_skills: TurnSkillsContext::new(parent_turn_context.turn_skills.outcome.clone()), turn_timing_state: Arc::new(TurnTimingState::default()), + turn_span: OnceLock::new(), + model_input_images: codex_otel::ModelInputImageTelemetry::default(), server_model_warning_emitted: AtomicBool::new(false), model_verification_emitted: AtomicBool::new(false), }; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 75f4762c9e..7afb97a9da 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -433,6 +433,11 @@ pub(crate) async fn run_turn( .await .for_prompt(&turn_context.model_info.input_modalities) }; + turn_context.session_telemetry.record_model_input_images( + &turn_context.turn_span(), + &turn_context.model_input_images, + &sampling_request_input, + ); let sampling_request_input_messages = sampling_request_input .iter() diff --git a/codex-rs/core/src/session/turn_context.rs b/codex-rs/core/src/session/turn_context.rs index efd6afdb3a..1c4d29e7fc 100644 --- a/codex-rs/core/src/session/turn_context.rs +++ b/codex-rs/core/src/session/turn_context.rs @@ -9,6 +9,7 @@ use codex_sandboxing::compatibility_sandbox_policy_for_permission_profile; use codex_sandboxing::policy_transforms::effective_file_system_sandbox_policy; use codex_sandboxing::policy_transforms::effective_network_sandbox_policy; use codex_tools::ToolEnvironmentMode; +use std::sync::OnceLock; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -92,6 +93,8 @@ pub(crate) struct TurnContext { pub(crate) turn_metadata_state: Arc, pub(crate) turn_skills: TurnSkillsContext, pub(crate) turn_timing_state: Arc, + pub(crate) turn_span: OnceLock, + pub(crate) model_input_images: codex_otel::ModelInputImageTelemetry, pub(crate) server_model_warning_emitted: AtomicBool, pub(crate) model_verification_emitted: AtomicBool, } @@ -108,6 +111,17 @@ impl TurnContext { self.permission_profile.network_sandbox_policy() } + pub(crate) fn set_turn_span(&self, span: tracing::Span) { + let _ = self.turn_span.set(span); + } + + pub(crate) fn turn_span(&self) -> tracing::Span { + self.turn_span + .get() + .cloned() + .unwrap_or_else(tracing::Span::none) + } + pub(crate) fn sandbox_policy(&self) -> SandboxPolicy { let file_system_sandbox_policy = self.file_system_sandbox_policy(); let network_sandbox_policy = self.network_sandbox_policy(); @@ -232,6 +246,11 @@ impl TurnContext { &config.agent_roles, )); + let turn_span = OnceLock::new(); + if let Some(span) = self.turn_span.get() { + let _ = turn_span.set(span.clone()); + } + Self { sub_id: self.sub_id.clone(), trace_id: self.trace_id.clone(), @@ -274,6 +293,8 @@ impl TurnContext { turn_metadata_state: self.turn_metadata_state.clone(), turn_skills: self.turn_skills.clone(), turn_timing_state: Arc::clone(&self.turn_timing_state), + turn_span, + model_input_images: self.model_input_images.clone(), server_model_warning_emitted: AtomicBool::new( self.server_model_warning_emitted.load(Ordering::Relaxed), ), @@ -566,6 +587,8 @@ impl Session { turn_metadata_state, turn_skills: TurnSkillsContext::new(skills_outcome), turn_timing_state: Arc::new(TurnTimingState::default()), + turn_span: OnceLock::new(), + model_input_images: codex_otel::ModelInputImageTelemetry::default(), server_model_warning_emitted: AtomicBool::new(false), model_verification_emitted: AtomicBool::new(false), } diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index bb7a79f58e..43f922d177 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -360,10 +360,6 @@ impl Session { let turn = active.get_or_insert_with(ActiveTurn::default); debug_assert!(turn.tasks.is_empty()); let done_clone = Arc::clone(&done); - let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self))); - let ctx = Arc::clone(&turn_context); - let task_for_run = Arc::clone(&task); - let task_cancellation_token = cancellation_token.child_token(); // Task-owned turn spans keep a core-owned span open for the // full task lifecycle after the submission dispatch span ends. let reasoning_effort = turn_context.effective_reasoning_effort_for_tracing(); @@ -380,7 +376,18 @@ impl Session { codex.turn.token_usage.output_tokens = field::Empty, codex.turn.token_usage.reasoning_output_tokens = field::Empty, codex.turn.token_usage.total_tokens = field::Empty, + codex.turn.model_input_image_count = field::Empty, + codex.turn.model_input_message_image_count = field::Empty, + codex.turn.model_input_tool_image_count = field::Empty, + codex.turn.model_input_image_types = field::Empty, + codex.turn.model_input_image_mime_types = field::Empty, + codex.turn.model_input_image_details = field::Empty, ); + turn_context.set_turn_span(task_span.clone()); + let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self))); + let ctx = Arc::clone(&turn_context); + let task_for_run = Arc::clone(&task); + let task_cancellation_token = cancellation_token.child_token(); let handle = tokio::spawn( async move { let ctx_for_finish = Arc::clone(&ctx); diff --git a/codex-rs/core/tests/suite/otel.rs b/codex-rs/core/tests/suite/otel.rs index 5539599b24..42d4a2caba 100644 --- a/codex-rs/core/tests/suite/otel.rs +++ b/codex-rs/core/tests/suite/otel.rs @@ -1,3 +1,6 @@ +use anyhow::Result; +use codex_config::types::McpServerConfig; +use codex_config::types::McpServerTransportConfig; use codex_core::config::Constrained; use codex_features::Feature; use codex_protocol::models::PermissionProfile; @@ -11,6 +14,7 @@ use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_custom_tool_call; use core_test_support::responses::ev_function_call; +use core_test_support::responses::ev_function_call_with_namespace; use core_test_support::responses::ev_local_shell_call; use core_test_support::responses::ev_message_item_added; use core_test_support::responses::ev_output_text_delta; @@ -24,10 +28,14 @@ use core_test_support::responses::mount_sse_once; use core_test_support::responses::sse; use core_test_support::responses::sse_response; use core_test_support::responses::start_mock_server; +use core_test_support::skip_if_no_network; +use core_test_support::stdio_server_bin; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; +use std::collections::HashMap; use std::sync::Mutex; +use std::time::Duration; use tracing::Level; use tracing_test::traced_test; @@ -565,7 +573,7 @@ async fn process_sse_emits_completed_telemetry() { } #[tokio::test(flavor = "current_thread")] -async fn turn_and_completed_response_spans_record_token_usage() { +async fn turn_span_records_token_usage_and_image_metadata() { let buffer: &'static Mutex> = Box::leak(Box::new(Mutex::new(Vec::new()))); let subscriber = tracing_subscriber::fmt() .with_level(true) @@ -603,6 +611,10 @@ async fn turn_and_completed_response_spans_record_token_usage() { .features .disable(Feature::GhostCommit) .expect("test config should allow feature update"); + config + .features + .disable(Feature::ShellSnapshot) + .expect("test config should allow feature update"); }) .build(&server) .await @@ -613,10 +625,15 @@ async fn turn_and_completed_response_spans_record_token_usage() { codex .submit(Op::UserInput { environments: None, - items: vec![UserInput::Text { - text: "hello".into(), - text_elements: Vec::new(), - }], + items: vec![ + UserInput::Text { + text: "hello".into(), + text_elements: Vec::new(), + }, + UserInput::Image { + image_url: "DATA:image/jpeg;BASE64,AAAA".to_string(), + }, + ], final_output_json_schema: None, responsesapi_client_metadata: None, }) @@ -650,11 +667,150 @@ async fn turn_and_completed_response_spans_record_token_usage() { && line.contains("codex.turn.token_usage.output_tokens=5") && line.contains("codex.turn.token_usage.reasoning_output_tokens=2") && line.contains("codex.turn.token_usage.total_tokens=9") + && line.contains("codex.turn.model_input_image_count=1") + && line.contains("codex.turn.model_input_message_image_count=1") + && line.contains("codex.turn.model_input_tool_image_count=0") + && line.contains("codex.turn.model_input_image_types=\"jpeg\"") + && line.contains("codex.turn.model_input_image_mime_types=\"image/jpeg\"") + && line.contains("\\\"source\\\":\\\"message\\\"") + && line.contains("\\\"byte_length\\\":3") }), - "missing regular turn span token usage\nlogs:\n{logs}" + "missing regular turn span token usage and image metadata\nlogs:\n{logs}" ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn turn_span_records_tool_output_image_metadata() -> Result<()> { + skip_if_no_network!(Ok(())); + + let buffer: &'static Mutex> = Box::leak(Box::new(Mutex::new(Vec::new()))); + let subscriber = tracing_subscriber::fmt() + .with_level(true) + .with_ansi(false) + .with_max_level(Level::TRACE) + .with_span_events(FmtSpan::FULL) + .with_writer(MockWriter::new(buffer)) + .finish(); + let _guard = tracing::subscriber::set_default(subscriber); + + let server = start_mock_server().await; + + let call_id = "rmcp-image-otel"; + let server_name = "rmcp"; + let namespace = format!("mcp__{server_name}__"); + let openai_png = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMB/ee9bQAAAABJRU5ErkJggg=="; + + mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-1"), + ev_function_call_with_namespace(call_id, &namespace, "image", "{}"), + ev_completed("resp-1"), + ]), + ) + .await; + mount_sse_once( + &server, + sse(vec![ + ev_assistant_message("msg-1", "done"), + ev_completed("resp-2"), + ]), + ) + .await; + + let rmcp_test_server_bin = stdio_server_bin()?; + let fixture = test_codex() + .with_config(move |config| { + config + .features + .disable(Feature::GhostCommit) + .expect("test config should allow feature update"); + config + .features + .disable(Feature::ShellSnapshot) + .expect("test config should allow feature update"); + let mut servers = config.mcp_servers.get().clone(); + servers.insert( + server_name.to_string(), + McpServerConfig { + transport: McpServerTransportConfig::Stdio { + command: rmcp_test_server_bin, + args: Vec::new(), + env: Some(HashMap::from([( + "MCP_TEST_IMAGE_DATA_URL".to_string(), + openai_png.to_string(), + )])), + env_vars: Vec::new(), + cwd: None, + }, + experimental_environment: None, + enabled: true, + required: false, + supports_parallel_tool_calls: false, + disabled_reason: None, + startup_timeout_sec: Some(Duration::from_secs(10)), + tool_timeout_sec: None, + default_tools_approval_mode: None, + enabled_tools: None, + disabled_tools: None, + scopes: None, + oauth_resource: None, + tools: HashMap::new(), + }, + ); + config + .mcp_servers + .set(servers) + .expect("test mcp servers should accept any configuration"); + }) + .build(&server) + .await?; + let session_model = fixture.session_configured.model.clone(); + let permission_profile = PermissionProfile::read_only(); + let sandbox_policy = permission_profile.to_legacy_sandbox_policy(fixture.cwd.path())?; + + fixture + .codex + .submit(Op::UserTurn { + environments: None, + items: vec![UserInput::Text { + text: "call the rmcp image tool".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: fixture.cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + approvals_reviewer: None, + sandbox_policy, + permission_profile: Some(permission_profile), + model: session_model, + effort: None, + summary: None, + service_tier: None, + collaboration_mode: None, + personality: None, + }) + .await?; + + wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let logs = String::from_utf8(buffer.lock().unwrap().clone()).unwrap(); + assert!( + logs.lines().any(|line| { + line.contains("turn{otel.name=\"session_task.turn\"") + && line.contains("codex.turn.model_input_image_count=1") + && line.contains("codex.turn.model_input_message_image_count=0") + && line.contains("codex.turn.model_input_tool_image_count=1") + && line.contains("codex.turn.model_input_image_types=\"png\"") + && line.contains("codex.turn.model_input_image_mime_types=\"image/png\"") + && line.contains("\\\"source\\\":\\\"tool_output\\\"") + }), + "missing tool output image metadata on turn span\nlogs:\n{logs}" + ); + + Ok(()) +} + #[tokio::test] async fn handle_responses_span_records_response_kind_and_tool_name() { let buffer: &'static Mutex> = Box::leak(Box::new(Mutex::new(Vec::new()))); diff --git a/codex-rs/otel/src/events/image_input_telemetry.rs b/codex-rs/otel/src/events/image_input_telemetry.rs index da55329a88..6d4a7e82ea 100644 --- a/codex-rs/otel/src/events/image_input_telemetry.rs +++ b/codex-rs/otel/src/events/image_input_telemetry.rs @@ -1,16 +1,72 @@ -use codex_protocol::user_input::UserInput; +use codex_protocol::models::ContentItem; +use codex_protocol::models::FunctionCallOutputContentItem; +use codex_protocol::models::ResponseItem; use serde::Serialize; use std::collections::BTreeSet; -use std::io::Read; use std::path::Path; - -const IMAGE_HEADER_READ_LIMIT: u64 = 64 * 1024; +use std::sync::Arc; +use std::sync::Mutex; #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct ImageInputTelemetry { +pub(crate) struct ImageInputTelemetrySnapshot { pub(crate) details_json: String, - pub(crate) image_types: String, - pub(crate) mime_types: String, + pub(crate) image_types: Vec, + pub(crate) mime_types: Vec, + pub(crate) message_image_count: usize, + pub(crate) tool_output_image_count: usize, +} + +#[derive(Debug, Clone, Default)] +pub struct ModelInputImageTelemetry { + state: Arc>, +} + +impl ModelInputImageTelemetry { + pub(crate) fn record_items(&self, items: &[ResponseItem]) -> ImageInputTelemetrySnapshot { + let request = model_input_image_telemetry(items); + let mut state = match self.state.lock() { + Ok(guard) => guard, + Err(poisoned) => poisoned.into_inner(), + }; + state.accumulate(request); + state.snapshot() + } +} + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +struct ImageInputTelemetryState { + details: Vec, + image_types: BTreeSet, + mime_types: BTreeSet, + message_image_count: usize, + tool_output_image_count: usize, +} + +impl ImageInputTelemetryState { + fn accumulate(&mut self, request: Self) { + self.message_image_count = self.message_image_count.max(request.message_image_count); + self.tool_output_image_count = self + .tool_output_image_count + .max(request.tool_output_image_count); + self.image_types.extend(request.image_types); + self.mime_types.extend(request.mime_types); + if request.details.len() > self.details.len() { + self.details = request.details; + } + } + + fn snapshot(&self) -> ImageInputTelemetrySnapshot { + let details_json = + serde_json::to_string(&self.details).unwrap_or_else(|_| "[]".to_string()); + + ImageInputTelemetrySnapshot { + details_json, + image_types: self.image_types.iter().cloned().collect(), + mime_types: self.mime_types.iter().cloned().collect(), + message_image_count: self.message_image_count, + tool_output_image_count: self.tool_output_image_count, + } + } } #[derive(Debug, Clone, PartialEq, Eq, Serialize)] @@ -21,117 +77,115 @@ struct ImageInputDetail { #[serde(skip_serializing_if = "Option::is_none")] mime_type: Option, #[serde(skip_serializing_if = "Option::is_none")] - width: Option, - #[serde(skip_serializing_if = "Option::is_none")] - height: Option, - #[serde(skip_serializing_if = "Option::is_none")] byte_length: Option, #[serde(skip_serializing_if = "Option::is_none")] extension: Option, } -pub(crate) fn image_input_telemetry(items: &[UserInput]) -> Option { - let details: Vec = items - .iter() - .filter_map(|item| match item { - UserInput::Image { image_url } => Some(remote_or_data_url_detail(image_url)), - UserInput::LocalImage { path } => Some(local_image_detail(path)), - UserInput::Text { .. } | UserInput::Skill { .. } | UserInput::Mention { .. } => None, - _ => None, - }) - .collect(); +fn model_input_image_telemetry(items: &[ResponseItem]) -> ImageInputTelemetryState { + let mut details: Vec = Vec::new(); + let mut message_image_count = 0; + let mut tool_output_image_count = 0; - if details.is_empty() { - return None; + for item in items { + match item { + ResponseItem::Message { content, .. } => { + for content_item in content { + if let ContentItem::InputImage { image_url, .. } = content_item { + message_image_count += 1; + details.push(image_url_detail("message", image_url)); + } + } + } + ResponseItem::FunctionCallOutput { output, .. } => { + if let Some(content_items) = output.content_items() { + tool_output_image_count += collect_tool_output_image_details( + "tool_output", + content_items, + &mut details, + ); + } + } + ResponseItem::CustomToolCallOutput { output, .. } => { + if let Some(content_items) = output.content_items() { + tool_output_image_count += collect_tool_output_image_details( + "custom_tool_output", + content_items, + &mut details, + ); + } + } + _ => {} + } } - let image_types = comma_join( + let image_types = collect_unique_strings( details .iter() .filter_map(|detail| detail.image_type.as_deref()), ); - let mime_types = comma_join( + let mime_types = collect_unique_strings( details .iter() .filter_map(|detail| detail.mime_type.as_deref()), ); - let details_json = serde_json::to_string(&details).ok()?; - Some(ImageInputTelemetry { - details_json, + ImageInputTelemetryState { + details, image_types, mime_types, - }) + message_image_count, + tool_output_image_count, + } } -fn remote_or_data_url_detail(image_url: &str) -> ImageInputDetail { - if let Some(detail) = data_url_detail(image_url) { +fn collect_tool_output_image_details( + source: &'static str, + content_items: &[FunctionCallOutputContentItem], + details: &mut Vec, +) -> usize { + let mut image_count = 0; + for content_item in content_items { + if let FunctionCallOutputContentItem::InputImage { image_url, .. } = content_item { + details.push(image_url_detail(source, image_url)); + image_count += 1; + } + } + image_count +} + +fn image_url_detail(source: &'static str, image_url: &str) -> ImageInputDetail { + if let Some(detail) = data_url_detail(source, image_url) { return detail; } let extension = safe_image_extension_from_url(image_url); let mime_type = extension.as_deref().and_then(mime_type_from_extension); ImageInputDetail { - source: "remote_url", + source, image_type: mime_type.as_deref().and_then(image_type_from_mime), mime_type, - width: None, - height: None, byte_length: None, extension, } } -fn data_url_detail(image_url: &str) -> Option { +fn data_url_detail(source: &'static str, image_url: &str) -> Option { let image_url = strip_data_scheme(image_url)?; let (metadata, payload) = image_url.split_once(',')?; let mut metadata_parts = metadata.split(';'); - let mime_type = metadata_parts.next()?.to_ascii_lowercase(); - if !mime_type.starts_with("image/") { - return None; - } + let mime_type = normalize_known_image_mime(metadata_parts.next()?)?; let is_base64 = metadata_parts.any(|part| part.eq_ignore_ascii_case("base64")); Some(ImageInputDetail { - source: "data_url", + source, image_type: image_type_from_mime(&mime_type), mime_type: Some(mime_type), - width: None, - height: None, byte_length: is_base64.then(|| base64_payload_byte_len(payload)), extension: None, }) } -fn local_image_detail(path: &Path) -> ImageInputDetail { - let extension = path - .extension() - .and_then(|extension| extension.to_str()) - .map(str::to_ascii_lowercase) - .filter(|extension| mime_type_from_extension(extension).is_some()); - - let byte_length = path.metadata().ok().map(|metadata| metadata.len()); - let header = read_image_header(path).unwrap_or_default(); - let header_info = image_info_from_header(&header); - let mime_type = header_info - .as_ref() - .map(|info| info.mime_type.to_string()) - .or_else(|| extension.as_deref().and_then(mime_type_from_extension)); - - ImageInputDetail { - source: "local_file", - image_type: mime_type - .as_deref() - .and_then(image_type_from_mime) - .or(extension.clone()), - mime_type, - width: header_info.as_ref().and_then(|info| info.width), - height: header_info.as_ref().and_then(|info| info.height), - byte_length, - extension, - } -} - fn strip_data_scheme(image_url: &str) -> Option<&str> { let scheme = image_url.get(..5)?; if !scheme.eq_ignore_ascii_case("data:") { @@ -140,104 +194,6 @@ fn strip_data_scheme(image_url: &str) -> Option<&str> { image_url.get(5..) } -fn read_image_header(path: &Path) -> std::io::Result> { - let file = std::fs::File::open(path)?; - let mut header = Vec::new(); - file.take(IMAGE_HEADER_READ_LIMIT) - .read_to_end(&mut header)?; - Ok(header) -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -struct HeaderImageInfo { - mime_type: &'static str, - width: Option, - height: Option, -} - -fn image_info_from_header(bytes: &[u8]) -> Option { - if bytes.starts_with(b"\x89PNG\r\n\x1a\n") && bytes.len() >= 24 { - return Some(HeaderImageInfo { - mime_type: "image/png", - width: Some(u32::from_be_bytes(bytes[16..20].try_into().ok()?)), - height: Some(u32::from_be_bytes(bytes[20..24].try_into().ok()?)), - }); - } - - if (bytes.starts_with(b"GIF87a") || bytes.starts_with(b"GIF89a")) && bytes.len() >= 10 { - return Some(HeaderImageInfo { - mime_type: "image/gif", - width: Some(u16::from_le_bytes(bytes[6..8].try_into().ok()?) as u32), - height: Some(u16::from_le_bytes(bytes[8..10].try_into().ok()?) as u32), - }); - } - - if bytes.starts_with(b"\xff\xd8") { - return jpeg_info_from_header(bytes); - } - - if bytes.len() >= 12 && bytes.starts_with(b"RIFF") && bytes[8..12] == *b"WEBP" { - return Some(HeaderImageInfo { - mime_type: "image/webp", - width: None, - height: None, - }); - } - - None -} - -fn jpeg_info_from_header(bytes: &[u8]) -> Option { - let mut i = 2; - while i + 4 <= bytes.len() { - if bytes[i] != 0xff { - i += 1; - continue; - } - while i < bytes.len() && bytes[i] == 0xff { - i += 1; - } - if i >= bytes.len() { - break; - } - - let marker = bytes[i]; - i += 1; - if marker == 0xd9 || marker == 0xda { - break; - } - if i + 2 > bytes.len() { - break; - } - - let segment_len = u16::from_be_bytes(bytes[i..i + 2].try_into().ok()?) as usize; - if segment_len < 2 || i + segment_len > bytes.len() { - break; - } - if is_jpeg_start_of_frame(marker) && segment_len >= 7 { - return Some(HeaderImageInfo { - mime_type: "image/jpeg", - width: Some(u16::from_be_bytes(bytes[i + 5..i + 7].try_into().ok()?) as u32), - height: Some(u16::from_be_bytes(bytes[i + 3..i + 5].try_into().ok()?) as u32), - }); - } - i += segment_len; - } - - Some(HeaderImageInfo { - mime_type: "image/jpeg", - width: None, - height: None, - }) -} - -fn is_jpeg_start_of_frame(marker: u8) -> bool { - matches!( - marker, - 0xc0 | 0xc1 | 0xc2 | 0xc3 | 0xc5 | 0xc6 | 0xc7 | 0xc9 | 0xca | 0xcb | 0xcd | 0xce | 0xcf - ) -} - fn safe_image_extension_from_url(url: &str) -> Option { let path = url.split(['?', '#']).next().unwrap_or(url); let extension = Path::new(path) @@ -248,6 +204,21 @@ fn safe_image_extension_from_url(url: &str) -> Option { Some(extension) } +fn normalize_known_image_mime(mime_type: &str) -> Option { + match mime_type.trim().to_ascii_lowercase().as_str() { + "image/png" => Some("image/png".to_string()), + "image/jpeg" | "image/jpg" => Some("image/jpeg".to_string()), + "image/gif" => Some("image/gif".to_string()), + "image/webp" => Some("image/webp".to_string()), + "image/bmp" => Some("image/bmp".to_string()), + "image/heic" => Some("image/heic".to_string()), + "image/heif" => Some("image/heif".to_string()), + "image/tiff" => Some("image/tiff".to_string()), + "image/svg+xml" => Some("image/svg+xml".to_string()), + _ => None, + } +} + fn mime_type_from_extension(extension: &str) -> Option { match extension { "png" => Some("image/png".to_string()), @@ -274,10 +245,6 @@ fn base64_payload_byte_len(payload: &str) -> u64 { ((trimmed.len() * 3) / 4) as u64 } -fn comma_join<'a>(values: impl Iterator) -> String { - values - .collect::>() - .into_iter() - .collect::>() - .join(",") +fn collect_unique_strings<'a>(values: impl Iterator) -> BTreeSet { + values.map(str::to_string).collect() } diff --git a/codex-rs/otel/src/events/session_telemetry.rs b/codex-rs/otel/src/events/session_telemetry.rs index 5e747b1380..53f47cbf55 100644 --- a/codex-rs/otel/src/events/session_telemetry.rs +++ b/codex-rs/otel/src/events/session_telemetry.rs @@ -1,6 +1,6 @@ use crate::TelemetryAuthMode; use crate::ToolDecisionSource; -use crate::events::image_input_telemetry::image_input_telemetry; +use crate::events::image_input_telemetry::ModelInputImageTelemetry; use crate::events::shared::log_and_trace_event; use crate::events::shared::log_event; use crate::events::shared::trace_event; @@ -856,7 +856,6 @@ impl SessionTelemetry { .iter() .filter(|item| matches!(item, UserInput::LocalImage { .. })) .count(); - let image_telemetry = image_input_telemetry(items); let prompt_to_log = if self.metadata.log_user_prompts { prompt.as_str() @@ -870,28 +869,49 @@ impl SessionTelemetry { prompt_length = %prompt.chars().count(), prompt = %prompt_to_log, ); - if let Some(image_telemetry) = image_telemetry { - trace_event!( - self, - event.name = "codex.user_prompt", - prompt_length = %prompt.chars().count(), - text_input_count = text_input_count as i64, - image_input_count = image_input_count as i64, - local_image_input_count = local_image_input_count as i64, - image_input_types = %image_telemetry.image_types, - image_input_mime_types = %image_telemetry.mime_types, - image_input_details = %image_telemetry.details_json, - ); - } else { - trace_event!( - self, - event.name = "codex.user_prompt", - prompt_length = %prompt.chars().count(), - text_input_count = text_input_count as i64, - image_input_count = image_input_count as i64, - local_image_input_count = local_image_input_count as i64, - ); - } + trace_event!( + self, + event.name = "codex.user_prompt", + prompt_length = %prompt.chars().count(), + text_input_count = text_input_count as i64, + image_input_count = image_input_count as i64, + local_image_input_count = local_image_input_count as i64, + ); + } + + pub fn record_model_input_images( + &self, + turn_span: &Span, + model_input_images: &ModelInputImageTelemetry, + items: &[ResponseItem], + ) { + let image_telemetry = model_input_images.record_items(items); + let total_image_count = + image_telemetry.message_image_count + image_telemetry.tool_output_image_count; + let image_types = image_telemetry.image_types.join(","); + let mime_types = image_telemetry.mime_types.join(","); + + turn_span.record( + "codex.turn.model_input_image_count", + total_image_count as i64, + ); + turn_span.record( + "codex.turn.model_input_message_image_count", + image_telemetry.message_image_count as i64, + ); + turn_span.record( + "codex.turn.model_input_tool_image_count", + image_telemetry.tool_output_image_count as i64, + ); + turn_span.record("codex.turn.model_input_image_types", image_types.as_str()); + turn_span.record( + "codex.turn.model_input_image_mime_types", + mime_types.as_str(), + ); + turn_span.record( + "codex.turn.model_input_image_details", + image_telemetry.details_json.as_str(), + ); } pub fn tool_decision( diff --git a/codex-rs/otel/src/lib.rs b/codex-rs/otel/src/lib.rs index 431ed331a0..e929478561 100644 --- a/codex-rs/otel/src/lib.rs +++ b/codex-rs/otel/src/lib.rs @@ -16,6 +16,7 @@ pub use crate::config::OtelHttpProtocol; pub use crate::config::OtelSettings; pub use crate::config::OtelTlsConfig; pub use crate::config::StatsigMetricsSettings; +pub use crate::events::image_input_telemetry::ModelInputImageTelemetry; pub use crate::events::session_telemetry::AuthEnvTelemetryMetadata; pub use crate::events::session_telemetry::SessionTelemetry; pub use crate::events::session_telemetry::SessionTelemetryMetadata; diff --git a/codex-rs/otel/tests/suite/otel_export_routing_policy.rs b/codex-rs/otel/tests/suite/otel_export_routing_policy.rs index 603a6ad2a7..c10e65d46a 100644 --- a/codex-rs/otel/tests/suite/otel_export_routing_policy.rs +++ b/codex-rs/otel/tests/suite/otel_export_routing_policy.rs @@ -1,4 +1,5 @@ use codex_otel::AuthEnvTelemetryMetadata; +use codex_otel::ModelInputImageTelemetry; use codex_otel::OtelProvider; use codex_otel::SessionTelemetry; use codex_otel::TelemetryAuthMode; @@ -22,6 +23,10 @@ use tracing_subscriber::layer::SubscriberExt; use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::models::ContentItem; +use codex_protocol::models::FunctionCallOutputContentItem; +use codex_protocol::models::FunctionCallOutputPayload; +use codex_protocol::models::ResponseItem; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; @@ -42,6 +47,13 @@ fn span_event_attributes(event: &opentelemetry::trace::Event) -> BTreeMap BTreeMap { + span.attributes + .iter() + .map(|KeyValue { key, value, .. }| (key.as_str().to_string(), value.to_string())) + .collect() +} + fn any_value_to_string(value: &AnyValue) -> String { match value { AnyValue::Int(value) => value.to_string(), @@ -146,7 +158,7 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() { ); let root_span = tracing::info_span!("root"); let _root_guard = root_span.enter(); - manager.user_prompt(&[ + let items = [ UserInput::Text { text: "super secret prompt".to_string(), text_elements: Vec::new(), @@ -160,7 +172,53 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() { UserInput::LocalImage { path: local_png.clone(), }, - ]); + ]; + let turn_span = tracing::info_span!( + "turn", + otel.name = "session_task.turn", + codex.turn.model_input_image_count = tracing::field::Empty, + codex.turn.model_input_message_image_count = tracing::field::Empty, + codex.turn.model_input_tool_image_count = tracing::field::Empty, + codex.turn.model_input_image_types = tracing::field::Empty, + codex.turn.model_input_image_mime_types = tracing::field::Empty, + codex.turn.model_input_image_details = tracing::field::Empty, + ); + let model_input_images = ModelInputImageTelemetry::default(); + let model_input = vec![ + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ + ContentItem::InputText { + text: "super secret prompt".to_string(), + }, + ContentItem::InputImage { + image_url: "DATA:image/jpeg;BASE64,AAAA".to_string(), + detail: None, + }, + ContentItem::InputImage { + image_url: "https://example.com/image.customer-secret".to_string(), + detail: None, + }, + ], + phase: None, + }, + ResponseItem::FunctionCallOutput { + call_id: "call_1".to_string(), + output: FunctionCallOutputPayload::from_content_items(vec![ + FunctionCallOutputContentItem::InputText { + text: "page screenshot".to_string(), + }, + FunctionCallOutputContentItem::InputImage { + image_url: "data:image/png;base64,AAAA".to_string(), + detail: None, + }, + ]), + }, + ]; + manager.record_model_input_images(&turn_span, &model_input_images, &model_input); + manager.user_prompt(&items); + let _turn_guard = turn_span.enter(); }); logger_provider.force_flush().expect("flush logs"); @@ -184,8 +242,12 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() { ); let spans = span_exporter.get_finished_spans().expect("span export"); - assert_eq!(spans.len(), 1); - let span_events = &spans[0].events.events; + assert_eq!(spans.len(), 2); + let root_span = spans + .iter() + .find(|span| span.name == "root") + .expect("missing root span"); + let span_events = &root_span.events.events; assert_eq!(span_events.len(), 1); let prompt_trace_event = find_span_event_by_name_attr(span_events, "codex.user_prompt"); @@ -212,21 +274,48 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() { .map(String::as_str), Some("1") ); + assert!(!prompt_trace_attrs.contains_key("image_input_types")); + assert!(!prompt_trace_attrs.contains_key("image_input_mime_types")); + assert!(!prompt_trace_attrs.contains_key("image_input_details")); + + let turn_span = spans + .iter() + .find(|span| span.name == "session_task.turn") + .expect("missing turn span"); + let turn_attrs = span_attributes(turn_span); assert_eq!( - prompt_trace_attrs - .get("image_input_types") + turn_attrs + .get("codex.turn.model_input_image_count") + .map(String::as_str), + Some("3") + ); + assert_eq!( + turn_attrs + .get("codex.turn.model_input_message_image_count") + .map(String::as_str), + Some("2") + ); + assert_eq!( + turn_attrs + .get("codex.turn.model_input_tool_image_count") + .map(String::as_str), + Some("1") + ); + assert_eq!( + turn_attrs + .get("codex.turn.model_input_image_types") .map(String::as_str), Some("jpeg,png") ); assert_eq!( - prompt_trace_attrs - .get("image_input_mime_types") + turn_attrs + .get("codex.turn.model_input_image_mime_types") .map(String::as_str), Some("image/jpeg,image/png") ); let image_input_details: Value = serde_json::from_str( - prompt_trace_attrs - .get("image_input_details") + turn_attrs + .get("codex.turn.model_input_image_details") .expect("image input details"), ) .expect("image input details should be json"); @@ -234,27 +323,24 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() { image_input_details, json!([ { - "source": "data_url", + "source": "message", "image_type": "jpeg", "mime_type": "image/jpeg", "byte_length": 3 }, { - "source": "remote_url" + "source": "message" }, { - "source": "local_file", + "source": "tool_output", "image_type": "png", "mime_type": "image/png", - "width": 640, - "height": 480, - "byte_length": 24, - "extension": "png" + "byte_length": 3 } ]) ); assert!( - !prompt_trace_attrs["image_input_details"].contains("customer-secret"), + !turn_attrs["codex.turn.model_input_image_details"].contains("customer-secret"), "unknown remote URL suffixes should not be traced" ); assert!(!prompt_trace_attrs.contains_key("prompt")); @@ -522,6 +608,194 @@ fn otel_export_routing_policy_routes_auth_recovery_log_and_trace_events() { ); } +#[test] +fn record_model_input_images_accumulates_when_retry_has_no_images() { + let span_exporter = InMemorySpanExporter::default(); + let tracer_provider = SdkTracerProvider::builder() + .with_simple_exporter(span_exporter.clone()) + .build(); + let tracer = tracer_provider.tracer("model-input-image-retry-test"); + + let subscriber = + tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer)); + + tracing::subscriber::with_default(subscriber, || { + tracing::callsite::rebuild_interest_cache(); + let manager = SessionTelemetry::new( + ThreadId::new(), + "gpt-5.1", + "gpt-5.1", + Some("account-id".to_string()), + Some("engineer@example.com".to_string()), + Some(TelemetryAuthMode::Chatgpt), + "codex_exec".to_string(), + /*log_user_prompts*/ false, + "tty".to_string(), + SessionSource::Cli, + ); + let turn_span = tracing::info_span!( + "turn", + otel.name = "session_task.turn", + codex.turn.model_input_image_count = tracing::field::Empty, + codex.turn.model_input_message_image_count = tracing::field::Empty, + codex.turn.model_input_tool_image_count = tracing::field::Empty, + codex.turn.model_input_image_types = tracing::field::Empty, + codex.turn.model_input_image_mime_types = tracing::field::Empty, + codex.turn.model_input_image_details = tracing::field::Empty, + ); + let model_input_images = ModelInputImageTelemetry::default(); + manager.record_model_input_images( + &turn_span, + &model_input_images, + &[ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputImage { + image_url: "data:image/png;base64,AAAA".to_string(), + detail: None, + }], + phase: None, + }], + ); + manager.record_model_input_images( + &turn_span, + &model_input_images, + &[ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "retry after image sanitization".to_string(), + }], + phase: None, + }], + ); + let _turn_guard = turn_span.enter(); + }); + + tracer_provider.force_flush().expect("flush traces"); + + let spans = span_exporter.get_finished_spans().expect("span export"); + let turn_span = spans + .iter() + .find(|span| span.name == "session_task.turn") + .expect("missing turn span"); + let turn_attrs = span_attributes(turn_span); + assert_eq!( + turn_attrs + .get("codex.turn.model_input_image_count") + .map(String::as_str), + Some("1") + ); + assert_eq!( + turn_attrs + .get("codex.turn.model_input_message_image_count") + .map(String::as_str), + Some("1") + ); + assert_eq!( + turn_attrs + .get("codex.turn.model_input_tool_image_count") + .map(String::as_str), + Some("0") + ); + assert_eq!( + turn_attrs + .get("codex.turn.model_input_image_types") + .map(String::as_str), + Some("png") + ); + assert_eq!( + turn_attrs + .get("codex.turn.model_input_image_mime_types") + .map(String::as_str), + Some("image/png") + ); + let image_input_details: Value = serde_json::from_str( + turn_attrs + .get("codex.turn.model_input_image_details") + .expect("image input details"), + ) + .expect("image input details should be json"); + assert_eq!( + image_input_details, + json!([ + { + "source": "message", + "image_type": "png", + "mime_type": "image/png", + "byte_length": 3 + } + ]) + ); +} + +#[test] +fn record_model_input_images_records_empty_json_details_without_images() { + let span_exporter = InMemorySpanExporter::default(); + let tracer_provider = SdkTracerProvider::builder() + .with_simple_exporter(span_exporter.clone()) + .build(); + let tracer = tracer_provider.tracer("model-input-image-empty-test"); + + let subscriber = + tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer)); + + tracing::subscriber::with_default(subscriber, || { + tracing::callsite::rebuild_interest_cache(); + let manager = SessionTelemetry::new( + ThreadId::new(), + "gpt-5.1", + "gpt-5.1", + Some("account-id".to_string()), + Some("engineer@example.com".to_string()), + Some(TelemetryAuthMode::Chatgpt), + "codex_exec".to_string(), + /*log_user_prompts*/ false, + "tty".to_string(), + SessionSource::Cli, + ); + let turn_span = tracing::info_span!( + "turn", + otel.name = "session_task.turn", + codex.turn.model_input_image_count = tracing::field::Empty, + codex.turn.model_input_message_image_count = tracing::field::Empty, + codex.turn.model_input_tool_image_count = tracing::field::Empty, + codex.turn.model_input_image_types = tracing::field::Empty, + codex.turn.model_input_image_mime_types = tracing::field::Empty, + codex.turn.model_input_image_details = tracing::field::Empty, + ); + let model_input_images = ModelInputImageTelemetry::default(); + manager.record_model_input_images( + &turn_span, + &model_input_images, + &[ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "no images".to_string(), + }], + phase: None, + }], + ); + let _turn_guard = turn_span.enter(); + }); + + tracer_provider.force_flush().expect("flush traces"); + + let spans = span_exporter.get_finished_spans().expect("span export"); + let turn_span = spans + .iter() + .find(|span| span.name == "session_task.turn") + .expect("missing turn span"); + let turn_attrs = span_attributes(turn_span); + assert_eq!( + turn_attrs + .get("codex.turn.model_input_image_details") + .map(String::as_str), + Some("[]") + ); +} + #[test] fn otel_export_routing_policy_routes_api_request_auth_observability() { let log_exporter = InMemoryLogExporter::default();