mirror of
https://github.com/openai/codex.git
synced 2026-04-23 22:24:57 +00:00
Compare commits
1 Commits
starr/exec
...
fjord/orig
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0771a56078 |
@@ -10,6 +10,7 @@ use codex_login::token_data::PlanType;
|
||||
use http::HeaderMap;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::auth::CodexAuth;
|
||||
use crate::error::CodexErr;
|
||||
@@ -18,6 +19,12 @@ use crate::error::UnexpectedResponseError;
|
||||
use crate::error::UsageLimitReachedError;
|
||||
use crate::model_provider_info::ModelProviderInfo;
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub(crate) struct InlineImageRequestLimitBadRequestObservation {
|
||||
pub(crate) bytes_exceeded: bool,
|
||||
pub(crate) images_exceeded: bool,
|
||||
}
|
||||
|
||||
pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
|
||||
match err {
|
||||
ApiError::ContextWindowExceeded => CodexErr::ContextWindowExceeded,
|
||||
@@ -63,6 +70,17 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
|
||||
.contains("The image data you provided does not represent a valid image")
|
||||
{
|
||||
CodexErr::InvalidImageRequest()
|
||||
} else if let Some(observation) =
|
||||
inline_image_request_limit_bad_request_observation(&body_text)
|
||||
{
|
||||
warn!(
|
||||
response_status = %status,
|
||||
bytes_exceeded = observation.bytes_exceeded,
|
||||
images_exceeded = observation.images_exceeded,
|
||||
response_body = %body_text,
|
||||
"responses request rejected by upstream inline image limit"
|
||||
);
|
||||
CodexErr::InvalidRequest(body_text)
|
||||
} else {
|
||||
CodexErr::InvalidRequest(body_text)
|
||||
}
|
||||
@@ -138,6 +156,59 @@ fn extract_request_tracking_id(headers: Option<&HeaderMap>) -> Option<String> {
|
||||
extract_request_id(headers).or_else(|| extract_header(headers, CF_RAY_HEADER))
|
||||
}
|
||||
|
||||
pub(crate) fn inline_image_request_limit_bad_request_observation(
|
||||
body: &str,
|
||||
) -> Option<InlineImageRequestLimitBadRequestObservation> {
|
||||
if let Ok(error) = serde_json::from_str::<BadRequestErrorResponse>(body) {
|
||||
return inline_image_request_limit_observation(
|
||||
&error.error.message,
|
||||
error.error.code.as_deref(),
|
||||
error.error.error_type.as_deref(),
|
||||
);
|
||||
}
|
||||
|
||||
inline_image_request_limit_observation_from_message(body)
|
||||
}
|
||||
|
||||
pub(crate) fn inline_image_request_limit_observation(
|
||||
message: &str,
|
||||
code: Option<&str>,
|
||||
error_type: Option<&str>,
|
||||
) -> Option<InlineImageRequestLimitBadRequestObservation> {
|
||||
if matches!(
|
||||
(code, error_type),
|
||||
(Some("max_images_per_request"), _) | (_, Some("max_images_per_request"))
|
||||
) {
|
||||
return Some(InlineImageRequestLimitBadRequestObservation {
|
||||
bytes_exceeded: false,
|
||||
images_exceeded: true,
|
||||
});
|
||||
}
|
||||
|
||||
inline_image_request_limit_observation_from_message(message)
|
||||
}
|
||||
|
||||
fn inline_image_request_limit_observation_from_message(
|
||||
message: &str,
|
||||
) -> Option<InlineImageRequestLimitBadRequestObservation> {
|
||||
let bytes_exceeded = matches_inline_image_byte_limit_message(message);
|
||||
if !bytes_exceeded {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(InlineImageRequestLimitBadRequestObservation {
|
||||
bytes_exceeded,
|
||||
images_exceeded: false,
|
||||
})
|
||||
}
|
||||
|
||||
fn matches_inline_image_byte_limit_message(message: &str) -> bool {
|
||||
message
|
||||
.strip_prefix("Total image data in 'input' exceeds the ")
|
||||
.and_then(|rest| rest.split_once(" byte limit"))
|
||||
.is_some_and(|(limit, _)| !limit.is_empty() && limit.chars().all(|c| c.is_ascii_digit()))
|
||||
}
|
||||
|
||||
fn extract_request_id(headers: Option<&HeaderMap>) -> Option<String> {
|
||||
extract_header(headers, REQUEST_ID_HEADER)
|
||||
.or_else(|| extract_header(headers, OAI_REQUEST_ID_HEADER))
|
||||
@@ -201,6 +272,19 @@ struct UsageErrorResponse {
|
||||
error: UsageErrorBody,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct BadRequestErrorResponse {
|
||||
error: BadRequestErrorBody,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct BadRequestErrorBody {
|
||||
message: String,
|
||||
#[serde(rename = "type")]
|
||||
error_type: Option<String>,
|
||||
code: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct UsageErrorBody {
|
||||
#[serde(rename = "type")]
|
||||
|
||||
@@ -131,6 +131,65 @@ fn map_api_error_extracts_identity_auth_details_from_headers() {
|
||||
assert_eq!(err.identity_error_code.as_deref(), Some("token_expired"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inline_image_request_limit_bad_request_matches_byte_limit_copy() {
|
||||
assert_eq!(
|
||||
inline_image_request_limit_bad_request_observation(
|
||||
"Total image data in 'input' exceeds the 536870912 byte limit."
|
||||
),
|
||||
Some(InlineImageRequestLimitBadRequestObservation {
|
||||
bytes_exceeded: true,
|
||||
images_exceeded: false,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inline_image_request_limit_bad_request_matches_live_byte_limit_copy() {
|
||||
assert_eq!(
|
||||
inline_image_request_limit_bad_request_observation(
|
||||
"Total image data in 'input' exceeds the 536870912 byte limit for a single /v1/responses request."
|
||||
),
|
||||
Some(InlineImageRequestLimitBadRequestObservation {
|
||||
bytes_exceeded: true,
|
||||
images_exceeded: false,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inline_image_request_limit_bad_request_matches_structured_image_count_error() {
|
||||
assert_eq!(
|
||||
inline_image_request_limit_bad_request_observation(
|
||||
r#"{"error":{"message":"Invalid request.","type":"max_images_per_request","param":null,"code":"max_images_per_request"}}"#
|
||||
),
|
||||
Some(InlineImageRequestLimitBadRequestObservation {
|
||||
bytes_exceeded: false,
|
||||
images_exceeded: true,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inline_image_request_limit_bad_request_ignores_message_only_image_count_copy() {
|
||||
assert_eq!(
|
||||
inline_image_request_limit_bad_request_observation(
|
||||
"This request contains 1501 images, which exceeds the 1500 image limit for a single Responses API request."
|
||||
),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inline_image_request_limit_bad_request_ignores_other_bad_requests() {
|
||||
assert_eq!(
|
||||
inline_image_request_limit_bad_request_observation(
|
||||
"Request body is missing required field: input"
|
||||
),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn core_auth_provider_reports_when_auth_header_will_attach() {
|
||||
let auth = CoreAuthProvider {
|
||||
|
||||
@@ -32,6 +32,7 @@ use std::sync::atomic::Ordering;
|
||||
|
||||
use crate::api_bridge::CoreAuthProvider;
|
||||
use crate::api_bridge::auth_provider_from_auth;
|
||||
use crate::api_bridge::inline_image_request_limit_bad_request_observation;
|
||||
use crate::api_bridge::map_api_error;
|
||||
use crate::auth::UnauthorizedRecovery;
|
||||
use crate::auth_env_telemetry::AuthEnvTelemetry;
|
||||
@@ -61,6 +62,7 @@ use codex_api::error::ApiError;
|
||||
use codex_api::requests::responses::Compression;
|
||||
use codex_api::response_create_client_metadata;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_otel::WellKnownApiRequestError;
|
||||
use codex_otel::current_span_w3c_trace_context;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -1674,6 +1676,22 @@ fn api_error_http_status(error: &ApiError) -> Option<u16> {
|
||||
}
|
||||
}
|
||||
|
||||
fn upstream_inline_image_request_limit_observation_from_transport_error(
|
||||
error: &TransportError,
|
||||
) -> Option<crate::api_bridge::InlineImageRequestLimitBadRequestObservation> {
|
||||
let TransportError::Http {
|
||||
status,
|
||||
body: Some(body_text),
|
||||
..
|
||||
} = error
|
||||
else {
|
||||
return None;
|
||||
};
|
||||
if *status != StatusCode::BAD_REQUEST {
|
||||
return None;
|
||||
}
|
||||
inline_image_request_limit_bad_request_observation(body_text)
|
||||
}
|
||||
struct ApiTelemetry {
|
||||
session_telemetry: SessionTelemetry,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
@@ -1710,6 +1728,17 @@ impl RequestTelemetry for ApiTelemetry {
|
||||
let debug = error
|
||||
.map(extract_response_debug_context)
|
||||
.unwrap_or_default();
|
||||
let well_known_error = match error
|
||||
.and_then(upstream_inline_image_request_limit_observation_from_transport_error)
|
||||
{
|
||||
Some(observation) if observation.images_exceeded => {
|
||||
WellKnownApiRequestError::TooManyImages
|
||||
}
|
||||
Some(observation) if observation.bytes_exceeded => {
|
||||
WellKnownApiRequestError::RequestSizeExceeded
|
||||
}
|
||||
Some(_) | None => WellKnownApiRequestError::None,
|
||||
};
|
||||
self.session_telemetry.record_api_request(
|
||||
attempt,
|
||||
status,
|
||||
@@ -1725,6 +1754,7 @@ impl RequestTelemetry for ApiTelemetry {
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
well_known_error,
|
||||
);
|
||||
emit_feedback_request_tags_with_auth_env(
|
||||
&FeedbackRequestTags {
|
||||
|
||||
@@ -2,13 +2,34 @@ use super::AuthRequestTelemetryContext;
|
||||
use super::ModelClient;
|
||||
use super::PendingUnauthorizedRetry;
|
||||
use super::UnauthorizedRecoveryExecution;
|
||||
use crate::client_common::Prompt;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_otel::WellKnownApiRequestError;
|
||||
use codex_otel::metrics::MetricsClient;
|
||||
use codex_otel::metrics::MetricsConfig;
|
||||
use codex_otel::metrics::names::API_CALL_COUNT_METRIC;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::models::BaseInstructions;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
|
||||
use opentelemetry_sdk::metrics::data::Metric;
|
||||
use opentelemetry_sdk::metrics::data::MetricData;
|
||||
use opentelemetry_sdk::metrics::data::ResourceMetrics;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::collections::BTreeMap;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
fn test_model_client(session_source: SessionSource) -> ModelClient {
|
||||
let provider = crate::model_provider_info::create_oss_provider_with_base_url(
|
||||
@@ -72,6 +93,51 @@ fn test_session_telemetry() -> SessionTelemetry {
|
||||
)
|
||||
}
|
||||
|
||||
fn test_session_telemetry_with_metrics() -> SessionTelemetry {
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let metrics = MetricsClient::new(
|
||||
MetricsConfig::in_memory("test", "codex-core", env!("CARGO_PKG_VERSION"), exporter)
|
||||
.with_runtime_reader(),
|
||||
)
|
||||
.expect("in-memory metrics client");
|
||||
test_session_telemetry().with_metrics_without_metadata_tags(metrics)
|
||||
}
|
||||
|
||||
fn find_metric<'a>(resource_metrics: &'a ResourceMetrics, name: &str) -> &'a Metric {
|
||||
for scope_metrics in resource_metrics.scope_metrics() {
|
||||
for metric in scope_metrics.metrics() {
|
||||
if metric.name() == name {
|
||||
return metric;
|
||||
}
|
||||
}
|
||||
}
|
||||
panic!("metric {name} missing");
|
||||
}
|
||||
|
||||
fn attributes_to_map<'a>(
|
||||
attributes: impl Iterator<Item = &'a KeyValue>,
|
||||
) -> BTreeMap<String, String> {
|
||||
attributes
|
||||
.map(|kv| (kv.key.as_str().to_string(), kv.value.as_str().to_string()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn metric_point(resource_metrics: &ResourceMetrics, name: &str) -> (BTreeMap<String, String>, u64) {
|
||||
let metric = find_metric(resource_metrics, name);
|
||||
match metric.data() {
|
||||
AggregatedMetrics::U64(data) => match data {
|
||||
MetricData::Sum(sum) => {
|
||||
let points: Vec<_> = sum.data_points().collect();
|
||||
assert_eq!(points.len(), 1);
|
||||
let point = points[0];
|
||||
(attributes_to_map(point.attributes()), point.value())
|
||||
}
|
||||
_ => panic!("unexpected counter aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected counter data type"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_subagent_headers_sets_other_subagent_label() {
|
||||
let client = test_model_client(SessionSource::SubAgent(SubAgentSource::Other(
|
||||
@@ -120,3 +186,78 @@ fn auth_request_telemetry_context_tracks_attached_auth_and_retry_phase() {
|
||||
assert_eq!(auth_context.recovery_mode, Some("managed"));
|
||||
assert_eq!(auth_context.recovery_phase, Some("refresh_token"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compact_conversation_history_emits_metric_for_upstream_inline_image_limit_rejection() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/responses/compact"))
|
||||
.respond_with(ResponseTemplate::new(400).set_body_json(json!({
|
||||
"error": {
|
||||
"message": "Invalid request.",
|
||||
"type": "max_images_per_request",
|
||||
"param": null,
|
||||
"code": "max_images_per_request"
|
||||
}
|
||||
})))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let provider = crate::model_provider_info::create_oss_provider_with_base_url(
|
||||
&format!("{}/v1", server.uri()),
|
||||
crate::model_provider_info::WireApi::Responses,
|
||||
);
|
||||
let client = ModelClient::new(
|
||||
/*auth_manager*/ None,
|
||||
ThreadId::new(),
|
||||
provider,
|
||||
SessionSource::Cli,
|
||||
/*model_verbosity*/ None,
|
||||
/*enable_request_compression*/ false,
|
||||
/*include_timing_metrics*/ false,
|
||||
/*beta_features_header*/ None,
|
||||
);
|
||||
let session_telemetry = test_session_telemetry_with_metrics();
|
||||
let prompt = Prompt {
|
||||
input: vec![ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputImage {
|
||||
image_url: "https://example.com/one.png".to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}],
|
||||
base_instructions: BaseInstructions::default(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let err = client
|
||||
.compact_conversation_history(
|
||||
&prompt,
|
||||
&test_model_info(),
|
||||
/*effort*/ None,
|
||||
ReasoningSummaryConfig::Auto,
|
||||
&session_telemetry,
|
||||
)
|
||||
.await
|
||||
.expect_err("compact request should be rejected upstream");
|
||||
assert!(matches!(err, crate::error::CodexErr::InvalidRequest(_)));
|
||||
|
||||
let snapshot = session_telemetry
|
||||
.snapshot_metrics()
|
||||
.expect("runtime metrics snapshot");
|
||||
let (api_attrs, api_value) = metric_point(&snapshot, API_CALL_COUNT_METRIC);
|
||||
assert_eq!(api_value, 1);
|
||||
assert_eq!(
|
||||
api_attrs,
|
||||
BTreeMap::from([
|
||||
("status".to_string(), "400".to_string()),
|
||||
("success".to_string(), "false".to_string()),
|
||||
(
|
||||
"well_known_error".to_string(),
|
||||
WellKnownApiRequestError::TooManyImages.as_str().to_string(),
|
||||
),
|
||||
])
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::TelemetryAuthMode;
|
||||
use crate::ToolDecisionSource;
|
||||
use crate::WellKnownApiRequestError;
|
||||
use crate::events::shared::log_and_trace_event;
|
||||
use crate::events::shared::log_event;
|
||||
use crate::events::shared::trace_event;
|
||||
@@ -9,6 +10,7 @@ use crate::metrics::MetricsError;
|
||||
use crate::metrics::Result as MetricsResult;
|
||||
use crate::metrics::names::API_CALL_COUNT_METRIC;
|
||||
use crate::metrics::names::API_CALL_DURATION_METRIC;
|
||||
use crate::metrics::names::INLINE_IMAGE_REQUEST_LIMIT_METRIC;
|
||||
use crate::metrics::names::PROFILE_USAGE_METRIC;
|
||||
use crate::metrics::names::RESPONSES_API_ENGINE_IAPI_TBT_DURATION_METRIC;
|
||||
use crate::metrics::names::RESPONSES_API_ENGINE_IAPI_TTFT_DURATION_METRIC;
|
||||
@@ -62,6 +64,63 @@ const RESPONSES_API_ENGINE_IAPI_TTFT_FIELD: &str = "engine_iapi_ttft_total_ms";
|
||||
const RESPONSES_API_ENGINE_SERVICE_TTFT_FIELD: &str = "engine_service_ttft_total_ms";
|
||||
const RESPONSES_API_ENGINE_IAPI_TBT_FIELD: &str = "engine_iapi_tbt_across_engine_calls_ms";
|
||||
const RESPONSES_API_ENGINE_SERVICE_TBT_FIELD: &str = "engine_service_tbt_across_engine_calls_ms";
|
||||
const INLINE_IMAGE_REQUEST_LIMIT_OUTCOME_UPSTREAM_REJECTED: &str = "upstream_rejected";
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
struct InlineImageRequestLimitObservation {
|
||||
bytes_exceeded: bool,
|
||||
images_exceeded: bool,
|
||||
}
|
||||
|
||||
fn bool_metric_tag(value: bool) -> &'static str {
|
||||
if value { "true" } else { "false" }
|
||||
}
|
||||
|
||||
fn matches_inline_image_byte_limit_message(message: &str) -> bool {
|
||||
message
|
||||
.strip_prefix("Total image data in 'input' exceeds the ")
|
||||
.and_then(|rest| rest.split_once(" byte limit"))
|
||||
.is_some_and(|(limit, _)| !limit.is_empty() && limit.chars().all(|c| c.is_ascii_digit()))
|
||||
}
|
||||
|
||||
fn inline_image_request_limit_observation_from_error_fields(
|
||||
message: Option<&str>,
|
||||
code: Option<&str>,
|
||||
error_type: Option<&str>,
|
||||
) -> Option<InlineImageRequestLimitObservation> {
|
||||
if matches!(
|
||||
(code, error_type),
|
||||
(Some("max_images_per_request"), _) | (_, Some("max_images_per_request"))
|
||||
) {
|
||||
return Some(InlineImageRequestLimitObservation {
|
||||
bytes_exceeded: false,
|
||||
images_exceeded: true,
|
||||
});
|
||||
}
|
||||
|
||||
if message.is_some_and(matches_inline_image_byte_limit_message) {
|
||||
return Some(InlineImageRequestLimitObservation {
|
||||
bytes_exceeded: true,
|
||||
images_exceeded: false,
|
||||
});
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn inline_image_request_limit_observation_from_event_json(
|
||||
event_json: &serde_json::Value,
|
||||
) -> Option<InlineImageRequestLimitObservation> {
|
||||
let error = event_json
|
||||
.get("response")
|
||||
.and_then(|response| response.get("error"))
|
||||
.or_else(|| event_json.get("error"))?;
|
||||
inline_image_request_limit_observation_from_error_fields(
|
||||
error.get("message").and_then(serde_json::Value::as_str),
|
||||
error.get("code").and_then(serde_json::Value::as_str),
|
||||
error.get("type").and_then(serde_json::Value::as_str),
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
||||
pub struct AuthEnvTelemetryMetadata {
|
||||
@@ -98,6 +157,30 @@ pub struct SessionTelemetry {
|
||||
}
|
||||
|
||||
impl SessionTelemetry {
|
||||
fn record_upstream_inline_image_request_limit_observation(
|
||||
&self,
|
||||
observation: InlineImageRequestLimitObservation,
|
||||
) {
|
||||
self.counter(
|
||||
INLINE_IMAGE_REQUEST_LIMIT_METRIC,
|
||||
/*inc*/ 1,
|
||||
&[
|
||||
(
|
||||
"outcome",
|
||||
INLINE_IMAGE_REQUEST_LIMIT_OUTCOME_UPSTREAM_REJECTED,
|
||||
),
|
||||
(
|
||||
"bytes_exceeded",
|
||||
bool_metric_tag(observation.bytes_exceeded),
|
||||
),
|
||||
(
|
||||
"images_exceeded",
|
||||
bool_metric_tag(observation.images_exceeded),
|
||||
),
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
pub fn with_auth_env(mut self, auth_env: AuthEnvTelemetryMetadata) -> Self {
|
||||
self.metadata.auth_env = auth_env;
|
||||
self
|
||||
@@ -382,6 +465,7 @@ impl SessionTelemetry {
|
||||
/*cf_ray*/ None,
|
||||
/*auth_error*/ None,
|
||||
/*auth_error_code*/ None,
|
||||
WellKnownApiRequestError::None,
|
||||
);
|
||||
|
||||
response
|
||||
@@ -404,21 +488,31 @@ impl SessionTelemetry {
|
||||
cf_ray: Option<&str>,
|
||||
auth_error: Option<&str>,
|
||||
auth_error_code: Option<&str>,
|
||||
well_known_error: WellKnownApiRequestError,
|
||||
) {
|
||||
let success = status.is_some_and(|code| (200..=299).contains(&code)) && error.is_none();
|
||||
let success_str = if success { "true" } else { "false" };
|
||||
let status_str = status
|
||||
.map(|code| code.to_string())
|
||||
.unwrap_or_else(|| "none".to_string());
|
||||
let well_known_error_str = well_known_error.as_str();
|
||||
self.counter(
|
||||
API_CALL_COUNT_METRIC,
|
||||
/*inc*/ 1,
|
||||
&[("status", status_str.as_str()), ("success", success_str)],
|
||||
&[
|
||||
("status", status_str.as_str()),
|
||||
("success", success_str),
|
||||
("well_known_error", well_known_error_str),
|
||||
],
|
||||
);
|
||||
self.record_duration(
|
||||
API_CALL_DURATION_METRIC,
|
||||
duration,
|
||||
&[("status", status_str.as_str()), ("success", success_str)],
|
||||
&[
|
||||
("status", status_str.as_str()),
|
||||
("success", success_str),
|
||||
("well_known_error", well_known_error_str),
|
||||
],
|
||||
);
|
||||
log_and_trace_event!(
|
||||
self,
|
||||
@@ -444,6 +538,7 @@ impl SessionTelemetry {
|
||||
auth.cf_ray = cf_ray,
|
||||
auth.error = auth_error,
|
||||
auth.error_code = auth_error_code,
|
||||
well_known_error = well_known_error_str,
|
||||
},
|
||||
log: {},
|
||||
trace: {},
|
||||
@@ -603,6 +698,13 @@ impl SessionTelemetry {
|
||||
self.record_responses_websocket_timing_metrics(&value);
|
||||
}
|
||||
if kind.as_deref() == Some("response.failed") {
|
||||
if let Some(observation) =
|
||||
inline_image_request_limit_observation_from_event_json(&value)
|
||||
{
|
||||
self.record_upstream_inline_image_request_limit_observation(
|
||||
observation,
|
||||
);
|
||||
}
|
||||
success = false;
|
||||
error_message = value
|
||||
.get("response")
|
||||
@@ -683,6 +785,13 @@ impl SessionTelemetry {
|
||||
} else {
|
||||
match serde_json::from_str::<serde_json::Value>(&sse.data) {
|
||||
Ok(error) if sse.event == "response.failed" => {
|
||||
if let Some(observation) =
|
||||
inline_image_request_limit_observation_from_event_json(&error)
|
||||
{
|
||||
self.record_upstream_inline_image_request_limit_observation(
|
||||
observation,
|
||||
);
|
||||
}
|
||||
self.sse_event_failed(Some(&sse.event), duration, &error);
|
||||
}
|
||||
Ok(content) if sse.event == "response.output_item.done" => {
|
||||
@@ -1083,6 +1192,10 @@ impl SessionTelemetry {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "session_telemetry_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
fn duration_from_ms_value(value: Option<&serde_json::Value>) -> Option<Duration> {
|
||||
let value = value?;
|
||||
let ms = value
|
||||
|
||||
147
codex-rs/otel/src/events/session_telemetry_tests.rs
Normal file
147
codex-rs/otel/src/events/session_telemetry_tests.rs
Normal file
@@ -0,0 +1,147 @@
|
||||
use super::AuthEnvTelemetryMetadata;
|
||||
use super::SessionTelemetry;
|
||||
use crate::TelemetryAuthMode;
|
||||
use crate::metrics::MetricsClient;
|
||||
use crate::metrics::MetricsConfig;
|
||||
use crate::metrics::names::INLINE_IMAGE_REQUEST_LIMIT_METRIC;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use eventsource_stream::Event as StreamEvent;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
|
||||
use opentelemetry_sdk::metrics::data::Metric;
|
||||
use opentelemetry_sdk::metrics::data::MetricData;
|
||||
use opentelemetry_sdk::metrics::data::ResourceMetrics;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::Duration;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
fn auth_env_metadata() -> AuthEnvTelemetryMetadata {
|
||||
AuthEnvTelemetryMetadata {
|
||||
openai_api_key_env_present: true,
|
||||
codex_api_key_env_present: false,
|
||||
codex_api_key_env_enabled: true,
|
||||
provider_env_key_name: Some("configured".to_string()),
|
||||
provider_env_key_present: Some(true),
|
||||
refresh_token_url_override_present: true,
|
||||
}
|
||||
}
|
||||
|
||||
fn test_session_telemetry_with_metrics() -> SessionTelemetry {
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let metrics = MetricsClient::new(
|
||||
MetricsConfig::in_memory("test", "codex-otel", env!("CARGO_PKG_VERSION"), exporter)
|
||||
.with_runtime_reader(),
|
||||
)
|
||||
.expect("in-memory metrics client");
|
||||
SessionTelemetry::new(
|
||||
ThreadId::new(),
|
||||
"gpt-test",
|
||||
"gpt-test",
|
||||
Some("account-id".to_string()),
|
||||
/*account_email*/ None,
|
||||
Some(TelemetryAuthMode::ApiKey),
|
||||
"test-originator".to_string(),
|
||||
/*log_user_prompts*/ false,
|
||||
"test-terminal".to_string(),
|
||||
SessionSource::Cli,
|
||||
)
|
||||
.with_auth_env(auth_env_metadata())
|
||||
.with_metrics_without_metadata_tags(metrics)
|
||||
}
|
||||
|
||||
fn find_metric<'a>(resource_metrics: &'a ResourceMetrics, name: &str) -> &'a Metric {
|
||||
for scope_metrics in resource_metrics.scope_metrics() {
|
||||
for metric in scope_metrics.metrics() {
|
||||
if metric.name() == name {
|
||||
return metric;
|
||||
}
|
||||
}
|
||||
}
|
||||
panic!("metric {name} missing");
|
||||
}
|
||||
|
||||
fn attributes_to_map<'a>(
|
||||
attributes: impl Iterator<Item = &'a KeyValue>,
|
||||
) -> BTreeMap<String, String> {
|
||||
attributes
|
||||
.map(|kv| (kv.key.as_str().to_string(), kv.value.as_str().to_string()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn metric_point(resource_metrics: &ResourceMetrics, name: &str) -> (BTreeMap<String, String>, u64) {
|
||||
let metric = find_metric(resource_metrics, name);
|
||||
match metric.data() {
|
||||
AggregatedMetrics::U64(data) => match data {
|
||||
MetricData::Sum(sum) => {
|
||||
let points: Vec<_> = sum.data_points().collect();
|
||||
assert_eq!(points.len(), 1);
|
||||
let point = points[0];
|
||||
(attributes_to_map(point.attributes()), point.value())
|
||||
}
|
||||
_ => panic!("unexpected counter aggregation"),
|
||||
},
|
||||
_ => panic!("unexpected counter data type"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn log_sse_event_records_inline_image_limit_metric_for_response_failed() {
|
||||
let session_telemetry = test_session_telemetry_with_metrics();
|
||||
let sse_response: std::result::Result<
|
||||
Option<std::result::Result<StreamEvent, eventsource_stream::EventStreamError<&str>>>,
|
||||
tokio::time::error::Elapsed,
|
||||
> = Ok(Some(Ok(StreamEvent {
|
||||
event: "response.failed".to_string(),
|
||||
data: r#"{"type":"response.failed","response":{"error":{"code":"max_images_per_request","message":"Invalid request."}}}"#
|
||||
.to_string(),
|
||||
id: String::new(),
|
||||
retry: None,
|
||||
})));
|
||||
|
||||
session_telemetry.log_sse_event(&sse_response, Duration::from_millis(25));
|
||||
|
||||
let snapshot = session_telemetry
|
||||
.snapshot_metrics()
|
||||
.expect("runtime metrics snapshot");
|
||||
let (attrs, value) = metric_point(&snapshot, INLINE_IMAGE_REQUEST_LIMIT_METRIC);
|
||||
assert_eq!(value, 1);
|
||||
assert_eq!(
|
||||
attrs,
|
||||
BTreeMap::from([
|
||||
("bytes_exceeded".to_string(), "false".to_string()),
|
||||
("images_exceeded".to_string(), "true".to_string()),
|
||||
("outcome".to_string(), "upstream_rejected".to_string()),
|
||||
])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn record_websocket_event_records_inline_image_limit_metric_for_response_failed() {
|
||||
let session_telemetry = test_session_telemetry_with_metrics();
|
||||
let websocket_response: std::result::Result<
|
||||
Option<std::result::Result<Message, tokio_tungstenite::tungstenite::Error>>,
|
||||
codex_api::ApiError,
|
||||
> = Ok(Some(Ok(Message::Text(
|
||||
r#"{"type":"response.failed","response":{"error":{"code":"max_images_per_request","message":"Invalid request."}}}"#
|
||||
.into(),
|
||||
))));
|
||||
|
||||
session_telemetry.record_websocket_event(&websocket_response, Duration::from_millis(25));
|
||||
|
||||
let snapshot = session_telemetry
|
||||
.snapshot_metrics()
|
||||
.expect("runtime metrics snapshot");
|
||||
let (attrs, value) = metric_point(&snapshot, INLINE_IMAGE_REQUEST_LIMIT_METRIC);
|
||||
assert_eq!(value, 1);
|
||||
assert_eq!(
|
||||
attrs,
|
||||
BTreeMap::from([
|
||||
("bytes_exceeded".to_string(), "false".to_string()),
|
||||
("images_exceeded".to_string(), "true".to_string()),
|
||||
("outcome".to_string(), "upstream_rejected".to_string()),
|
||||
])
|
||||
);
|
||||
}
|
||||
@@ -43,6 +43,24 @@ pub enum TelemetryAuthMode {
|
||||
Chatgpt,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
pub enum WellKnownApiRequestError {
|
||||
#[default]
|
||||
None,
|
||||
TooManyImages,
|
||||
RequestSizeExceeded,
|
||||
}
|
||||
|
||||
impl WellKnownApiRequestError {
|
||||
pub fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::None => "None",
|
||||
Self::TooManyImages => "TooManyImages",
|
||||
Self::RequestSizeExceeded => "RequestSizeExceeded",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<codex_app_server_protocol::AuthMode> for TelemetryAuthMode {
|
||||
fn from(mode: codex_app_server_protocol::AuthMode) -> Self {
|
||||
match mode {
|
||||
|
||||
@@ -9,6 +9,7 @@ pub const WEBSOCKET_REQUEST_COUNT_METRIC: &str = "codex.websocket.request";
|
||||
pub const WEBSOCKET_REQUEST_DURATION_METRIC: &str = "codex.websocket.request.duration_ms";
|
||||
pub const WEBSOCKET_EVENT_COUNT_METRIC: &str = "codex.websocket.event";
|
||||
pub const WEBSOCKET_EVENT_DURATION_METRIC: &str = "codex.websocket.event.duration_ms";
|
||||
pub const INLINE_IMAGE_REQUEST_LIMIT_METRIC: &str = "codex.responses.inline_image_limit";
|
||||
pub const RESPONSES_API_OVERHEAD_DURATION_METRIC: &str = "codex.responses_api_overhead.duration_ms";
|
||||
pub const RESPONSES_API_INFERENCE_TIME_DURATION_METRIC: &str =
|
||||
"codex.responses_api_inference_time.duration_ms";
|
||||
|
||||
@@ -2,6 +2,7 @@ use codex_otel::AuthEnvTelemetryMetadata;
|
||||
use codex_otel::OtelProvider;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_otel::TelemetryAuthMode;
|
||||
use codex_otel::WellKnownApiRequestError;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::logs::AnyValue;
|
||||
use opentelemetry::trace::TracerProvider as _;
|
||||
@@ -527,6 +528,7 @@ fn otel_export_routing_policy_routes_api_request_auth_observability() {
|
||||
Some("ray-401"),
|
||||
Some("missing_authorization_header"),
|
||||
Some("token_expired"),
|
||||
WellKnownApiRequestError::None,
|
||||
);
|
||||
});
|
||||
|
||||
@@ -584,6 +586,12 @@ fn otel_export_routing_policy_routes_api_request_auth_observability() {
|
||||
request_log_attrs.get("endpoint").map(String::as_str),
|
||||
Some("/responses")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("well_known_error")
|
||||
.map(String::as_str),
|
||||
Some("None")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs.get("auth.error").map(String::as_str),
|
||||
Some("missing_authorization_header")
|
||||
@@ -636,6 +644,12 @@ fn otel_export_routing_policy_routes_api_request_auth_observability() {
|
||||
request_trace_attrs.get("endpoint").map(String::as_str),
|
||||
Some("/responses")
|
||||
);
|
||||
assert_eq!(
|
||||
request_trace_attrs
|
||||
.get("well_known_error")
|
||||
.map(String::as_str),
|
||||
Some("None")
|
||||
);
|
||||
assert_eq!(
|
||||
request_trace_attrs
|
||||
.get("auth.env_openai_api_key_present")
|
||||
|
||||
@@ -2,6 +2,7 @@ use codex_otel::RuntimeMetricTotals;
|
||||
use codex_otel::RuntimeMetricsSummary;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_otel::TelemetryAuthMode;
|
||||
use codex_otel::WellKnownApiRequestError;
|
||||
use codex_otel::metrics::MetricsClient;
|
||||
use codex_otel::metrics::MetricsConfig;
|
||||
use codex_otel::metrics::Result;
|
||||
@@ -62,6 +63,7 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
|
||||
/*cf_ray*/ None,
|
||||
/*auth_error*/ None,
|
||||
/*auth_error_code*/ None,
|
||||
WellKnownApiRequestError::None,
|
||||
);
|
||||
manager.record_websocket_request(
|
||||
Duration::from_millis(400),
|
||||
|
||||
Reference in New Issue
Block a user