Move image input metadata to turn spans

This commit is contained in:
charley-openai
2026-05-04 11:17:04 -07:00
parent dc6da4bca1
commit e1cf5abb38
9 changed files with 683 additions and 227 deletions

View File

@@ -1,5 +1,6 @@
use super::turn_context::image_generation_tool_auth_allowed;
use super::*;
use std::sync::OnceLock;
use std::sync::atomic::AtomicBool;
/// Spawn a review thread using the given prompt.
@@ -150,6 +151,8 @@ pub(super) async fn spawn_review_thread(
turn_metadata_state,
turn_skills: TurnSkillsContext::new(parent_turn_context.turn_skills.outcome.clone()),
turn_timing_state: Arc::new(TurnTimingState::default()),
turn_span: OnceLock::new(),
model_input_images: codex_otel::ModelInputImageTelemetry::default(),
server_model_warning_emitted: AtomicBool::new(false),
model_verification_emitted: AtomicBool::new(false),
};

View File

@@ -433,6 +433,11 @@ pub(crate) async fn run_turn(
.await
.for_prompt(&turn_context.model_info.input_modalities)
};
turn_context.session_telemetry.record_model_input_images(
&turn_context.turn_span(),
&turn_context.model_input_images,
&sampling_request_input,
);
let sampling_request_input_messages = sampling_request_input
.iter()

View File

@@ -9,6 +9,7 @@ use codex_sandboxing::compatibility_sandbox_policy_for_permission_profile;
use codex_sandboxing::policy_transforms::effective_file_system_sandbox_policy;
use codex_sandboxing::policy_transforms::effective_network_sandbox_policy;
use codex_tools::ToolEnvironmentMode;
use std::sync::OnceLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@@ -92,6 +93,8 @@ pub(crate) struct TurnContext {
pub(crate) turn_metadata_state: Arc<TurnMetadataState>,
pub(crate) turn_skills: TurnSkillsContext,
pub(crate) turn_timing_state: Arc<TurnTimingState>,
pub(crate) turn_span: OnceLock<tracing::Span>,
pub(crate) model_input_images: codex_otel::ModelInputImageTelemetry,
pub(crate) server_model_warning_emitted: AtomicBool,
pub(crate) model_verification_emitted: AtomicBool,
}
@@ -108,6 +111,17 @@ impl TurnContext {
self.permission_profile.network_sandbox_policy()
}
pub(crate) fn set_turn_span(&self, span: tracing::Span) {
let _ = self.turn_span.set(span);
}
pub(crate) fn turn_span(&self) -> tracing::Span {
self.turn_span
.get()
.cloned()
.unwrap_or_else(tracing::Span::none)
}
pub(crate) fn sandbox_policy(&self) -> SandboxPolicy {
let file_system_sandbox_policy = self.file_system_sandbox_policy();
let network_sandbox_policy = self.network_sandbox_policy();
@@ -232,6 +246,11 @@ impl TurnContext {
&config.agent_roles,
));
let turn_span = OnceLock::new();
if let Some(span) = self.turn_span.get() {
let _ = turn_span.set(span.clone());
}
Self {
sub_id: self.sub_id.clone(),
trace_id: self.trace_id.clone(),
@@ -274,6 +293,8 @@ impl TurnContext {
turn_metadata_state: self.turn_metadata_state.clone(),
turn_skills: self.turn_skills.clone(),
turn_timing_state: Arc::clone(&self.turn_timing_state),
turn_span,
model_input_images: self.model_input_images.clone(),
server_model_warning_emitted: AtomicBool::new(
self.server_model_warning_emitted.load(Ordering::Relaxed),
),
@@ -566,6 +587,8 @@ impl Session {
turn_metadata_state,
turn_skills: TurnSkillsContext::new(skills_outcome),
turn_timing_state: Arc::new(TurnTimingState::default()),
turn_span: OnceLock::new(),
model_input_images: codex_otel::ModelInputImageTelemetry::default(),
server_model_warning_emitted: AtomicBool::new(false),
model_verification_emitted: AtomicBool::new(false),
}

View File

@@ -360,10 +360,6 @@ impl Session {
let turn = active.get_or_insert_with(ActiveTurn::default);
debug_assert!(turn.tasks.is_empty());
let done_clone = Arc::clone(&done);
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
let ctx = Arc::clone(&turn_context);
let task_for_run = Arc::clone(&task);
let task_cancellation_token = cancellation_token.child_token();
// Task-owned turn spans keep a core-owned span open for the
// full task lifecycle after the submission dispatch span ends.
let reasoning_effort = turn_context.effective_reasoning_effort_for_tracing();
@@ -380,7 +376,18 @@ impl Session {
codex.turn.token_usage.output_tokens = field::Empty,
codex.turn.token_usage.reasoning_output_tokens = field::Empty,
codex.turn.token_usage.total_tokens = field::Empty,
codex.turn.model_input_image_count = field::Empty,
codex.turn.model_input_message_image_count = field::Empty,
codex.turn.model_input_tool_image_count = field::Empty,
codex.turn.model_input_image_types = field::Empty,
codex.turn.model_input_image_mime_types = field::Empty,
codex.turn.model_input_image_details = field::Empty,
);
turn_context.set_turn_span(task_span.clone());
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
let ctx = Arc::clone(&turn_context);
let task_for_run = Arc::clone(&task);
let task_cancellation_token = cancellation_token.child_token();
let handle = tokio::spawn(
async move {
let ctx_for_finish = Arc::clone(&ctx);

View File

@@ -1,3 +1,6 @@
use anyhow::Result;
use codex_config::types::McpServerConfig;
use codex_config::types::McpServerTransportConfig;
use codex_core::config::Constrained;
use codex_features::Feature;
use codex_protocol::models::PermissionProfile;
@@ -11,6 +14,7 @@ use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_custom_tool_call;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_function_call_with_namespace;
use core_test_support::responses::ev_local_shell_call;
use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
@@ -24,10 +28,14 @@ use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::sse_response;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::stdio_server_bin;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::Duration;
use tracing::Level;
use tracing_test::traced_test;
@@ -565,7 +573,7 @@ async fn process_sse_emits_completed_telemetry() {
}
#[tokio::test(flavor = "current_thread")]
async fn turn_and_completed_response_spans_record_token_usage() {
async fn turn_span_records_token_usage_and_image_metadata() {
let buffer: &'static Mutex<Vec<u8>> = Box::leak(Box::new(Mutex::new(Vec::new())));
let subscriber = tracing_subscriber::fmt()
.with_level(true)
@@ -603,6 +611,10 @@ async fn turn_and_completed_response_spans_record_token_usage() {
.features
.disable(Feature::GhostCommit)
.expect("test config should allow feature update");
config
.features
.disable(Feature::ShellSnapshot)
.expect("test config should allow feature update");
})
.build(&server)
.await
@@ -613,10 +625,15 @@ async fn turn_and_completed_response_spans_record_token_usage() {
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
items: vec![
UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
},
UserInput::Image {
image_url: "DATA:image/jpeg;BASE64,AAAA".to_string(),
},
],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
@@ -650,11 +667,150 @@ async fn turn_and_completed_response_spans_record_token_usage() {
&& line.contains("codex.turn.token_usage.output_tokens=5")
&& line.contains("codex.turn.token_usage.reasoning_output_tokens=2")
&& line.contains("codex.turn.token_usage.total_tokens=9")
&& line.contains("codex.turn.model_input_image_count=1")
&& line.contains("codex.turn.model_input_message_image_count=1")
&& line.contains("codex.turn.model_input_tool_image_count=0")
&& line.contains("codex.turn.model_input_image_types=\"jpeg\"")
&& line.contains("codex.turn.model_input_image_mime_types=\"image/jpeg\"")
&& line.contains("\\\"source\\\":\\\"message\\\"")
&& line.contains("\\\"byte_length\\\":3")
}),
"missing regular turn span token usage\nlogs:\n{logs}"
"missing regular turn span token usage and image metadata\nlogs:\n{logs}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn turn_span_records_tool_output_image_metadata() -> Result<()> {
skip_if_no_network!(Ok(()));
let buffer: &'static Mutex<Vec<u8>> = Box::leak(Box::new(Mutex::new(Vec::new())));
let subscriber = tracing_subscriber::fmt()
.with_level(true)
.with_ansi(false)
.with_max_level(Level::TRACE)
.with_span_events(FmtSpan::FULL)
.with_writer(MockWriter::new(buffer))
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let server = start_mock_server().await;
let call_id = "rmcp-image-otel";
let server_name = "rmcp";
let namespace = format!("mcp__{server_name}__");
let openai_png = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMB/ee9bQAAAABJRU5ErkJggg==";
mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_function_call_with_namespace(call_id, &namespace, "image", "{}"),
ev_completed("resp-1"),
]),
)
.await;
mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
)
.await;
let rmcp_test_server_bin = stdio_server_bin()?;
let fixture = test_codex()
.with_config(move |config| {
config
.features
.disable(Feature::GhostCommit)
.expect("test config should allow feature update");
config
.features
.disable(Feature::ShellSnapshot)
.expect("test config should allow feature update");
let mut servers = config.mcp_servers.get().clone();
servers.insert(
server_name.to_string(),
McpServerConfig {
transport: McpServerTransportConfig::Stdio {
command: rmcp_test_server_bin,
args: Vec::new(),
env: Some(HashMap::from([(
"MCP_TEST_IMAGE_DATA_URL".to_string(),
openai_png.to_string(),
)])),
env_vars: Vec::new(),
cwd: None,
},
experimental_environment: None,
enabled: true,
required: false,
supports_parallel_tool_calls: false,
disabled_reason: None,
startup_timeout_sec: Some(Duration::from_secs(10)),
tool_timeout_sec: None,
default_tools_approval_mode: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth_resource: None,
tools: HashMap::new(),
},
);
config
.mcp_servers
.set(servers)
.expect("test mcp servers should accept any configuration");
})
.build(&server)
.await?;
let session_model = fixture.session_configured.model.clone();
let permission_profile = PermissionProfile::read_only();
let sandbox_policy = permission_profile.to_legacy_sandbox_policy(fixture.cwd.path())?;
fixture
.codex
.submit(Op::UserTurn {
environments: None,
items: vec![UserInput::Text {
text: "call the rmcp image tool".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: fixture.cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy,
permission_profile: Some(permission_profile),
model: session_model,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let logs = String::from_utf8(buffer.lock().unwrap().clone()).unwrap();
assert!(
logs.lines().any(|line| {
line.contains("turn{otel.name=\"session_task.turn\"")
&& line.contains("codex.turn.model_input_image_count=1")
&& line.contains("codex.turn.model_input_message_image_count=0")
&& line.contains("codex.turn.model_input_tool_image_count=1")
&& line.contains("codex.turn.model_input_image_types=\"png\"")
&& line.contains("codex.turn.model_input_image_mime_types=\"image/png\"")
&& line.contains("\\\"source\\\":\\\"tool_output\\\"")
}),
"missing tool output image metadata on turn span\nlogs:\n{logs}"
);
Ok(())
}
#[tokio::test]
async fn handle_responses_span_records_response_kind_and_tool_name() {
let buffer: &'static Mutex<Vec<u8>> = Box::leak(Box::new(Mutex::new(Vec::new())));

View File

@@ -1,16 +1,72 @@
use codex_protocol::user_input::UserInput;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::ResponseItem;
use serde::Serialize;
use std::collections::BTreeSet;
use std::io::Read;
use std::path::Path;
const IMAGE_HEADER_READ_LIMIT: u64 = 64 * 1024;
use std::sync::Arc;
use std::sync::Mutex;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ImageInputTelemetry {
pub(crate) struct ImageInputTelemetrySnapshot {
pub(crate) details_json: String,
pub(crate) image_types: String,
pub(crate) mime_types: String,
pub(crate) image_types: Vec<String>,
pub(crate) mime_types: Vec<String>,
pub(crate) message_image_count: usize,
pub(crate) tool_output_image_count: usize,
}
#[derive(Debug, Clone, Default)]
pub struct ModelInputImageTelemetry {
state: Arc<Mutex<ImageInputTelemetryState>>,
}
impl ModelInputImageTelemetry {
pub(crate) fn record_items(&self, items: &[ResponseItem]) -> ImageInputTelemetrySnapshot {
let request = model_input_image_telemetry(items);
let mut state = match self.state.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
state.accumulate(request);
state.snapshot()
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
struct ImageInputTelemetryState {
details: Vec<ImageInputDetail>,
image_types: BTreeSet<String>,
mime_types: BTreeSet<String>,
message_image_count: usize,
tool_output_image_count: usize,
}
impl ImageInputTelemetryState {
fn accumulate(&mut self, request: Self) {
self.message_image_count = self.message_image_count.max(request.message_image_count);
self.tool_output_image_count = self
.tool_output_image_count
.max(request.tool_output_image_count);
self.image_types.extend(request.image_types);
self.mime_types.extend(request.mime_types);
if request.details.len() > self.details.len() {
self.details = request.details;
}
}
fn snapshot(&self) -> ImageInputTelemetrySnapshot {
let details_json =
serde_json::to_string(&self.details).unwrap_or_else(|_| "[]".to_string());
ImageInputTelemetrySnapshot {
details_json,
image_types: self.image_types.iter().cloned().collect(),
mime_types: self.mime_types.iter().cloned().collect(),
message_image_count: self.message_image_count,
tool_output_image_count: self.tool_output_image_count,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
@@ -21,117 +77,115 @@ struct ImageInputDetail {
#[serde(skip_serializing_if = "Option::is_none")]
mime_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
width: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
height: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
byte_length: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
extension: Option<String>,
}
pub(crate) fn image_input_telemetry(items: &[UserInput]) -> Option<ImageInputTelemetry> {
let details: Vec<ImageInputDetail> = items
.iter()
.filter_map(|item| match item {
UserInput::Image { image_url } => Some(remote_or_data_url_detail(image_url)),
UserInput::LocalImage { path } => Some(local_image_detail(path)),
UserInput::Text { .. } | UserInput::Skill { .. } | UserInput::Mention { .. } => None,
_ => None,
})
.collect();
fn model_input_image_telemetry(items: &[ResponseItem]) -> ImageInputTelemetryState {
let mut details: Vec<ImageInputDetail> = Vec::new();
let mut message_image_count = 0;
let mut tool_output_image_count = 0;
if details.is_empty() {
return None;
for item in items {
match item {
ResponseItem::Message { content, .. } => {
for content_item in content {
if let ContentItem::InputImage { image_url, .. } = content_item {
message_image_count += 1;
details.push(image_url_detail("message", image_url));
}
}
}
ResponseItem::FunctionCallOutput { output, .. } => {
if let Some(content_items) = output.content_items() {
tool_output_image_count += collect_tool_output_image_details(
"tool_output",
content_items,
&mut details,
);
}
}
ResponseItem::CustomToolCallOutput { output, .. } => {
if let Some(content_items) = output.content_items() {
tool_output_image_count += collect_tool_output_image_details(
"custom_tool_output",
content_items,
&mut details,
);
}
}
_ => {}
}
}
let image_types = comma_join(
let image_types = collect_unique_strings(
details
.iter()
.filter_map(|detail| detail.image_type.as_deref()),
);
let mime_types = comma_join(
let mime_types = collect_unique_strings(
details
.iter()
.filter_map(|detail| detail.mime_type.as_deref()),
);
let details_json = serde_json::to_string(&details).ok()?;
Some(ImageInputTelemetry {
details_json,
ImageInputTelemetryState {
details,
image_types,
mime_types,
})
message_image_count,
tool_output_image_count,
}
}
fn remote_or_data_url_detail(image_url: &str) -> ImageInputDetail {
if let Some(detail) = data_url_detail(image_url) {
fn collect_tool_output_image_details(
source: &'static str,
content_items: &[FunctionCallOutputContentItem],
details: &mut Vec<ImageInputDetail>,
) -> usize {
let mut image_count = 0;
for content_item in content_items {
if let FunctionCallOutputContentItem::InputImage { image_url, .. } = content_item {
details.push(image_url_detail(source, image_url));
image_count += 1;
}
}
image_count
}
fn image_url_detail(source: &'static str, image_url: &str) -> ImageInputDetail {
if let Some(detail) = data_url_detail(source, image_url) {
return detail;
}
let extension = safe_image_extension_from_url(image_url);
let mime_type = extension.as_deref().and_then(mime_type_from_extension);
ImageInputDetail {
source: "remote_url",
source,
image_type: mime_type.as_deref().and_then(image_type_from_mime),
mime_type,
width: None,
height: None,
byte_length: None,
extension,
}
}
fn data_url_detail(image_url: &str) -> Option<ImageInputDetail> {
fn data_url_detail(source: &'static str, image_url: &str) -> Option<ImageInputDetail> {
let image_url = strip_data_scheme(image_url)?;
let (metadata, payload) = image_url.split_once(',')?;
let mut metadata_parts = metadata.split(';');
let mime_type = metadata_parts.next()?.to_ascii_lowercase();
if !mime_type.starts_with("image/") {
return None;
}
let mime_type = normalize_known_image_mime(metadata_parts.next()?)?;
let is_base64 = metadata_parts.any(|part| part.eq_ignore_ascii_case("base64"));
Some(ImageInputDetail {
source: "data_url",
source,
image_type: image_type_from_mime(&mime_type),
mime_type: Some(mime_type),
width: None,
height: None,
byte_length: is_base64.then(|| base64_payload_byte_len(payload)),
extension: None,
})
}
fn local_image_detail(path: &Path) -> ImageInputDetail {
let extension = path
.extension()
.and_then(|extension| extension.to_str())
.map(str::to_ascii_lowercase)
.filter(|extension| mime_type_from_extension(extension).is_some());
let byte_length = path.metadata().ok().map(|metadata| metadata.len());
let header = read_image_header(path).unwrap_or_default();
let header_info = image_info_from_header(&header);
let mime_type = header_info
.as_ref()
.map(|info| info.mime_type.to_string())
.or_else(|| extension.as_deref().and_then(mime_type_from_extension));
ImageInputDetail {
source: "local_file",
image_type: mime_type
.as_deref()
.and_then(image_type_from_mime)
.or(extension.clone()),
mime_type,
width: header_info.as_ref().and_then(|info| info.width),
height: header_info.as_ref().and_then(|info| info.height),
byte_length,
extension,
}
}
fn strip_data_scheme(image_url: &str) -> Option<&str> {
let scheme = image_url.get(..5)?;
if !scheme.eq_ignore_ascii_case("data:") {
@@ -140,104 +194,6 @@ fn strip_data_scheme(image_url: &str) -> Option<&str> {
image_url.get(5..)
}
fn read_image_header(path: &Path) -> std::io::Result<Vec<u8>> {
let file = std::fs::File::open(path)?;
let mut header = Vec::new();
file.take(IMAGE_HEADER_READ_LIMIT)
.read_to_end(&mut header)?;
Ok(header)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct HeaderImageInfo {
mime_type: &'static str,
width: Option<u32>,
height: Option<u32>,
}
fn image_info_from_header(bytes: &[u8]) -> Option<HeaderImageInfo> {
if bytes.starts_with(b"\x89PNG\r\n\x1a\n") && bytes.len() >= 24 {
return Some(HeaderImageInfo {
mime_type: "image/png",
width: Some(u32::from_be_bytes(bytes[16..20].try_into().ok()?)),
height: Some(u32::from_be_bytes(bytes[20..24].try_into().ok()?)),
});
}
if (bytes.starts_with(b"GIF87a") || bytes.starts_with(b"GIF89a")) && bytes.len() >= 10 {
return Some(HeaderImageInfo {
mime_type: "image/gif",
width: Some(u16::from_le_bytes(bytes[6..8].try_into().ok()?) as u32),
height: Some(u16::from_le_bytes(bytes[8..10].try_into().ok()?) as u32),
});
}
if bytes.starts_with(b"\xff\xd8") {
return jpeg_info_from_header(bytes);
}
if bytes.len() >= 12 && bytes.starts_with(b"RIFF") && bytes[8..12] == *b"WEBP" {
return Some(HeaderImageInfo {
mime_type: "image/webp",
width: None,
height: None,
});
}
None
}
fn jpeg_info_from_header(bytes: &[u8]) -> Option<HeaderImageInfo> {
let mut i = 2;
while i + 4 <= bytes.len() {
if bytes[i] != 0xff {
i += 1;
continue;
}
while i < bytes.len() && bytes[i] == 0xff {
i += 1;
}
if i >= bytes.len() {
break;
}
let marker = bytes[i];
i += 1;
if marker == 0xd9 || marker == 0xda {
break;
}
if i + 2 > bytes.len() {
break;
}
let segment_len = u16::from_be_bytes(bytes[i..i + 2].try_into().ok()?) as usize;
if segment_len < 2 || i + segment_len > bytes.len() {
break;
}
if is_jpeg_start_of_frame(marker) && segment_len >= 7 {
return Some(HeaderImageInfo {
mime_type: "image/jpeg",
width: Some(u16::from_be_bytes(bytes[i + 5..i + 7].try_into().ok()?) as u32),
height: Some(u16::from_be_bytes(bytes[i + 3..i + 5].try_into().ok()?) as u32),
});
}
i += segment_len;
}
Some(HeaderImageInfo {
mime_type: "image/jpeg",
width: None,
height: None,
})
}
fn is_jpeg_start_of_frame(marker: u8) -> bool {
matches!(
marker,
0xc0 | 0xc1 | 0xc2 | 0xc3 | 0xc5 | 0xc6 | 0xc7 | 0xc9 | 0xca | 0xcb | 0xcd | 0xce | 0xcf
)
}
fn safe_image_extension_from_url(url: &str) -> Option<String> {
let path = url.split(['?', '#']).next().unwrap_or(url);
let extension = Path::new(path)
@@ -248,6 +204,21 @@ fn safe_image_extension_from_url(url: &str) -> Option<String> {
Some(extension)
}
fn normalize_known_image_mime(mime_type: &str) -> Option<String> {
match mime_type.trim().to_ascii_lowercase().as_str() {
"image/png" => Some("image/png".to_string()),
"image/jpeg" | "image/jpg" => Some("image/jpeg".to_string()),
"image/gif" => Some("image/gif".to_string()),
"image/webp" => Some("image/webp".to_string()),
"image/bmp" => Some("image/bmp".to_string()),
"image/heic" => Some("image/heic".to_string()),
"image/heif" => Some("image/heif".to_string()),
"image/tiff" => Some("image/tiff".to_string()),
"image/svg+xml" => Some("image/svg+xml".to_string()),
_ => None,
}
}
fn mime_type_from_extension(extension: &str) -> Option<String> {
match extension {
"png" => Some("image/png".to_string()),
@@ -274,10 +245,6 @@ fn base64_payload_byte_len(payload: &str) -> u64 {
((trimmed.len() * 3) / 4) as u64
}
fn comma_join<'a>(values: impl Iterator<Item = &'a str>) -> String {
values
.collect::<BTreeSet<_>>()
.into_iter()
.collect::<Vec<_>>()
.join(",")
fn collect_unique_strings<'a>(values: impl Iterator<Item = &'a str>) -> BTreeSet<String> {
values.map(str::to_string).collect()
}

View File

@@ -1,6 +1,6 @@
use crate::TelemetryAuthMode;
use crate::ToolDecisionSource;
use crate::events::image_input_telemetry::image_input_telemetry;
use crate::events::image_input_telemetry::ModelInputImageTelemetry;
use crate::events::shared::log_and_trace_event;
use crate::events::shared::log_event;
use crate::events::shared::trace_event;
@@ -856,7 +856,6 @@ impl SessionTelemetry {
.iter()
.filter(|item| matches!(item, UserInput::LocalImage { .. }))
.count();
let image_telemetry = image_input_telemetry(items);
let prompt_to_log = if self.metadata.log_user_prompts {
prompt.as_str()
@@ -870,28 +869,49 @@ impl SessionTelemetry {
prompt_length = %prompt.chars().count(),
prompt = %prompt_to_log,
);
if let Some(image_telemetry) = image_telemetry {
trace_event!(
self,
event.name = "codex.user_prompt",
prompt_length = %prompt.chars().count(),
text_input_count = text_input_count as i64,
image_input_count = image_input_count as i64,
local_image_input_count = local_image_input_count as i64,
image_input_types = %image_telemetry.image_types,
image_input_mime_types = %image_telemetry.mime_types,
image_input_details = %image_telemetry.details_json,
);
} else {
trace_event!(
self,
event.name = "codex.user_prompt",
prompt_length = %prompt.chars().count(),
text_input_count = text_input_count as i64,
image_input_count = image_input_count as i64,
local_image_input_count = local_image_input_count as i64,
);
}
trace_event!(
self,
event.name = "codex.user_prompt",
prompt_length = %prompt.chars().count(),
text_input_count = text_input_count as i64,
image_input_count = image_input_count as i64,
local_image_input_count = local_image_input_count as i64,
);
}
pub fn record_model_input_images(
&self,
turn_span: &Span,
model_input_images: &ModelInputImageTelemetry,
items: &[ResponseItem],
) {
let image_telemetry = model_input_images.record_items(items);
let total_image_count =
image_telemetry.message_image_count + image_telemetry.tool_output_image_count;
let image_types = image_telemetry.image_types.join(",");
let mime_types = image_telemetry.mime_types.join(",");
turn_span.record(
"codex.turn.model_input_image_count",
total_image_count as i64,
);
turn_span.record(
"codex.turn.model_input_message_image_count",
image_telemetry.message_image_count as i64,
);
turn_span.record(
"codex.turn.model_input_tool_image_count",
image_telemetry.tool_output_image_count as i64,
);
turn_span.record("codex.turn.model_input_image_types", image_types.as_str());
turn_span.record(
"codex.turn.model_input_image_mime_types",
mime_types.as_str(),
);
turn_span.record(
"codex.turn.model_input_image_details",
image_telemetry.details_json.as_str(),
);
}
pub fn tool_decision(

View File

@@ -16,6 +16,7 @@ pub use crate::config::OtelHttpProtocol;
pub use crate::config::OtelSettings;
pub use crate::config::OtelTlsConfig;
pub use crate::config::StatsigMetricsSettings;
pub use crate::events::image_input_telemetry::ModelInputImageTelemetry;
pub use crate::events::session_telemetry::AuthEnvTelemetryMetadata;
pub use crate::events::session_telemetry::SessionTelemetry;
pub use crate::events::session_telemetry::SessionTelemetryMetadata;

View File

@@ -1,4 +1,5 @@
use codex_otel::AuthEnvTelemetryMetadata;
use codex_otel::ModelInputImageTelemetry;
use codex_otel::OtelProvider;
use codex_otel::SessionTelemetry;
use codex_otel::TelemetryAuthMode;
@@ -22,6 +23,10 @@ use tracing_subscriber::layer::SubscriberExt;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
@@ -42,6 +47,13 @@ fn span_event_attributes(event: &opentelemetry::trace::Event) -> BTreeMap<String
.collect()
}
fn span_attributes(span: &opentelemetry_sdk::trace::SpanData) -> BTreeMap<String, String> {
span.attributes
.iter()
.map(|KeyValue { key, value, .. }| (key.as_str().to_string(), value.to_string()))
.collect()
}
fn any_value_to_string(value: &AnyValue) -> String {
match value {
AnyValue::Int(value) => value.to_string(),
@@ -146,7 +158,7 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() {
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.user_prompt(&[
let items = [
UserInput::Text {
text: "super secret prompt".to_string(),
text_elements: Vec::new(),
@@ -160,7 +172,53 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() {
UserInput::LocalImage {
path: local_png.clone(),
},
]);
];
let turn_span = tracing::info_span!(
"turn",
otel.name = "session_task.turn",
codex.turn.model_input_image_count = tracing::field::Empty,
codex.turn.model_input_message_image_count = tracing::field::Empty,
codex.turn.model_input_tool_image_count = tracing::field::Empty,
codex.turn.model_input_image_types = tracing::field::Empty,
codex.turn.model_input_image_mime_types = tracing::field::Empty,
codex.turn.model_input_image_details = tracing::field::Empty,
);
let model_input_images = ModelInputImageTelemetry::default();
let model_input = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![
ContentItem::InputText {
text: "super secret prompt".to_string(),
},
ContentItem::InputImage {
image_url: "DATA:image/jpeg;BASE64,AAAA".to_string(),
detail: None,
},
ContentItem::InputImage {
image_url: "https://example.com/image.customer-secret".to_string(),
detail: None,
},
],
phase: None,
},
ResponseItem::FunctionCallOutput {
call_id: "call_1".to_string(),
output: FunctionCallOutputPayload::from_content_items(vec![
FunctionCallOutputContentItem::InputText {
text: "page screenshot".to_string(),
},
FunctionCallOutputContentItem::InputImage {
image_url: "data:image/png;base64,AAAA".to_string(),
detail: None,
},
]),
},
];
manager.record_model_input_images(&turn_span, &model_input_images, &model_input);
manager.user_prompt(&items);
let _turn_guard = turn_span.enter();
});
logger_provider.force_flush().expect("flush logs");
@@ -184,8 +242,12 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() {
);
let spans = span_exporter.get_finished_spans().expect("span export");
assert_eq!(spans.len(), 1);
let span_events = &spans[0].events.events;
assert_eq!(spans.len(), 2);
let root_span = spans
.iter()
.find(|span| span.name == "root")
.expect("missing root span");
let span_events = &root_span.events.events;
assert_eq!(span_events.len(), 1);
let prompt_trace_event = find_span_event_by_name_attr(span_events, "codex.user_prompt");
@@ -212,21 +274,48 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() {
.map(String::as_str),
Some("1")
);
assert!(!prompt_trace_attrs.contains_key("image_input_types"));
assert!(!prompt_trace_attrs.contains_key("image_input_mime_types"));
assert!(!prompt_trace_attrs.contains_key("image_input_details"));
let turn_span = spans
.iter()
.find(|span| span.name == "session_task.turn")
.expect("missing turn span");
let turn_attrs = span_attributes(turn_span);
assert_eq!(
prompt_trace_attrs
.get("image_input_types")
turn_attrs
.get("codex.turn.model_input_image_count")
.map(String::as_str),
Some("3")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_message_image_count")
.map(String::as_str),
Some("2")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_tool_image_count")
.map(String::as_str),
Some("1")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_types")
.map(String::as_str),
Some("jpeg,png")
);
assert_eq!(
prompt_trace_attrs
.get("image_input_mime_types")
turn_attrs
.get("codex.turn.model_input_image_mime_types")
.map(String::as_str),
Some("image/jpeg,image/png")
);
let image_input_details: Value = serde_json::from_str(
prompt_trace_attrs
.get("image_input_details")
turn_attrs
.get("codex.turn.model_input_image_details")
.expect("image input details"),
)
.expect("image input details should be json");
@@ -234,27 +323,24 @@ fn otel_export_routing_policy_routes_user_prompt_log_and_trace_events() {
image_input_details,
json!([
{
"source": "data_url",
"source": "message",
"image_type": "jpeg",
"mime_type": "image/jpeg",
"byte_length": 3
},
{
"source": "remote_url"
"source": "message"
},
{
"source": "local_file",
"source": "tool_output",
"image_type": "png",
"mime_type": "image/png",
"width": 640,
"height": 480,
"byte_length": 24,
"extension": "png"
"byte_length": 3
}
])
);
assert!(
!prompt_trace_attrs["image_input_details"].contains("customer-secret"),
!turn_attrs["codex.turn.model_input_image_details"].contains("customer-secret"),
"unknown remote URL suffixes should not be traced"
);
assert!(!prompt_trace_attrs.contains_key("prompt"));
@@ -522,6 +608,194 @@ fn otel_export_routing_policy_routes_auth_recovery_log_and_trace_events() {
);
}
#[test]
fn record_model_input_images_accumulates_when_retry_has_no_images() {
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("model-input-image-retry-test");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
/*log_user_prompts*/ false,
"tty".to_string(),
SessionSource::Cli,
);
let turn_span = tracing::info_span!(
"turn",
otel.name = "session_task.turn",
codex.turn.model_input_image_count = tracing::field::Empty,
codex.turn.model_input_message_image_count = tracing::field::Empty,
codex.turn.model_input_tool_image_count = tracing::field::Empty,
codex.turn.model_input_image_types = tracing::field::Empty,
codex.turn.model_input_image_mime_types = tracing::field::Empty,
codex.turn.model_input_image_details = tracing::field::Empty,
);
let model_input_images = ModelInputImageTelemetry::default();
manager.record_model_input_images(
&turn_span,
&model_input_images,
&[ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputImage {
image_url: "data:image/png;base64,AAAA".to_string(),
detail: None,
}],
phase: None,
}],
);
manager.record_model_input_images(
&turn_span,
&model_input_images,
&[ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "retry after image sanitization".to_string(),
}],
phase: None,
}],
);
let _turn_guard = turn_span.enter();
});
tracer_provider.force_flush().expect("flush traces");
let spans = span_exporter.get_finished_spans().expect("span export");
let turn_span = spans
.iter()
.find(|span| span.name == "session_task.turn")
.expect("missing turn span");
let turn_attrs = span_attributes(turn_span);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_count")
.map(String::as_str),
Some("1")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_message_image_count")
.map(String::as_str),
Some("1")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_tool_image_count")
.map(String::as_str),
Some("0")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_types")
.map(String::as_str),
Some("png")
);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_mime_types")
.map(String::as_str),
Some("image/png")
);
let image_input_details: Value = serde_json::from_str(
turn_attrs
.get("codex.turn.model_input_image_details")
.expect("image input details"),
)
.expect("image input details should be json");
assert_eq!(
image_input_details,
json!([
{
"source": "message",
"image_type": "png",
"mime_type": "image/png",
"byte_length": 3
}
])
);
}
#[test]
fn record_model_input_images_records_empty_json_details_without_images() {
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("model-input-image-empty-test");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
/*log_user_prompts*/ false,
"tty".to_string(),
SessionSource::Cli,
);
let turn_span = tracing::info_span!(
"turn",
otel.name = "session_task.turn",
codex.turn.model_input_image_count = tracing::field::Empty,
codex.turn.model_input_message_image_count = tracing::field::Empty,
codex.turn.model_input_tool_image_count = tracing::field::Empty,
codex.turn.model_input_image_types = tracing::field::Empty,
codex.turn.model_input_image_mime_types = tracing::field::Empty,
codex.turn.model_input_image_details = tracing::field::Empty,
);
let model_input_images = ModelInputImageTelemetry::default();
manager.record_model_input_images(
&turn_span,
&model_input_images,
&[ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "no images".to_string(),
}],
phase: None,
}],
);
let _turn_guard = turn_span.enter();
});
tracer_provider.force_flush().expect("flush traces");
let spans = span_exporter.get_finished_spans().expect("span export");
let turn_span = spans
.iter()
.find(|span| span.name == "session_task.turn")
.expect("missing turn span");
let turn_attrs = span_attributes(turn_span);
assert_eq!(
turn_attrs
.get("codex.turn.model_input_image_details")
.map(String::as_str),
Some("[]")
);
}
#[test]
fn otel_export_routing_policy_routes_api_request_auth_observability() {
let log_exporter = InMemoryLogExporter::default();