mirror of
https://github.com/openai/codex.git
synced 2026-03-13 02:03:59 +00:00
Compare commits
2 Commits
dev/cc/mul
...
ccy/codex-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79df29b534 | ||
|
|
212d93765c |
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
@@ -1439,6 +1439,7 @@ dependencies = [
|
||||
"codex-utils-cargo-bin",
|
||||
"codex-utils-cli",
|
||||
"codex-utils-json-to-toml",
|
||||
"codex-utils-pty",
|
||||
"core_test_support",
|
||||
"futures",
|
||||
"opentelemetry",
|
||||
@@ -2438,7 +2439,6 @@ dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"clap",
|
||||
"codex-otel",
|
||||
"codex-protocol",
|
||||
"dirs",
|
||||
"log",
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use base64::Engine;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_api::AuthProvider as ApiAuthProvider;
|
||||
@@ -7,6 +8,7 @@ use codex_api::rate_limits::parse_promo_message;
|
||||
use codex_api::rate_limits::parse_rate_limit_for_limit;
|
||||
use http::HeaderMap;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::auth::CodexAuth;
|
||||
use crate::error::CodexErr;
|
||||
@@ -30,6 +32,8 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
|
||||
url: None,
|
||||
cf_ray: None,
|
||||
request_id: None,
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
}),
|
||||
ApiError::InvalidRequest { message } => CodexErr::InvalidRequest(message),
|
||||
ApiError::Transport(transport) => match transport {
|
||||
@@ -98,6 +102,11 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
|
||||
url,
|
||||
cf_ray: extract_header(headers.as_ref(), CF_RAY_HEADER),
|
||||
request_id: extract_request_id(headers.as_ref()),
|
||||
identity_authorization_error: extract_header(
|
||||
headers.as_ref(),
|
||||
X_OPENAI_AUTHORIZATION_ERROR_HEADER,
|
||||
),
|
||||
identity_error_code: extract_x_error_json_code(headers.as_ref()),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -118,6 +127,8 @@ const ACTIVE_LIMIT_HEADER: &str = "x-codex-active-limit";
|
||||
const REQUEST_ID_HEADER: &str = "x-request-id";
|
||||
const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id";
|
||||
const CF_RAY_HEADER: &str = "cf-ray";
|
||||
const X_OPENAI_AUTHORIZATION_ERROR_HEADER: &str = "x-openai-authorization-error";
|
||||
const X_ERROR_JSON_HEADER: &str = "x-error-json";
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "api_bridge_tests.rs"]
|
||||
@@ -140,6 +151,19 @@ fn extract_header(headers: Option<&HeaderMap>, name: &str) -> Option<String> {
|
||||
})
|
||||
}
|
||||
|
||||
fn extract_x_error_json_code(headers: Option<&HeaderMap>) -> Option<String> {
|
||||
let encoded = extract_header(headers, X_ERROR_JSON_HEADER)?;
|
||||
let decoded = base64::engine::general_purpose::STANDARD
|
||||
.decode(encoded)
|
||||
.ok()?;
|
||||
let parsed = serde_json::from_slice::<Value>(&decoded).ok()?;
|
||||
parsed
|
||||
.get("error")
|
||||
.and_then(|error| error.get("code"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
}
|
||||
|
||||
pub(crate) fn auth_provider_from_auth(
|
||||
auth: Option<CodexAuth>,
|
||||
provider: &ModelProviderInfo,
|
||||
@@ -191,6 +215,22 @@ pub(crate) struct CoreAuthProvider {
|
||||
account_id: Option<String>,
|
||||
}
|
||||
|
||||
impl CoreAuthProvider {
|
||||
pub(crate) fn auth_header_attached(&self) -> bool {
|
||||
self.token
|
||||
.as_ref()
|
||||
.is_some_and(|token| http::HeaderValue::from_str(&format!("Bearer {token}")).is_ok())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn for_test(token: Option<&str>, account_id: Option<&str>) -> Self {
|
||||
Self {
|
||||
token: token.map(str::to_string),
|
||||
account_id: account_id.map(str::to_string),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ApiAuthProvider for CoreAuthProvider {
|
||||
fn bearer_token(&self) -> Option<String> {
|
||||
self.token.clone()
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use base64::Engine;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
@@ -94,3 +95,48 @@ fn map_api_error_does_not_fallback_limit_name_to_limit_id() {
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_api_error_extracts_identity_auth_details_from_headers() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(REQUEST_ID_HEADER, http::HeaderValue::from_static("req-401"));
|
||||
headers.insert(CF_RAY_HEADER, http::HeaderValue::from_static("ray-401"));
|
||||
headers.insert(
|
||||
X_OPENAI_AUTHORIZATION_ERROR_HEADER,
|
||||
http::HeaderValue::from_static("missing_authorization_header"),
|
||||
);
|
||||
let x_error_json =
|
||||
base64::engine::general_purpose::STANDARD.encode(r#"{"error":{"code":"token_expired"}}"#);
|
||||
headers.insert(
|
||||
X_ERROR_JSON_HEADER,
|
||||
http::HeaderValue::from_str(&x_error_json).expect("valid x-error-json header"),
|
||||
);
|
||||
|
||||
let err = map_api_error(ApiError::Transport(TransportError::Http {
|
||||
status: http::StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
|
||||
headers: Some(headers),
|
||||
body: Some(r#"{"detail":"Unauthorized"}"#.to_string()),
|
||||
}));
|
||||
|
||||
let CodexErr::UnexpectedStatus(err) = err else {
|
||||
panic!("expected CodexErr::UnexpectedStatus, got {err:?}");
|
||||
};
|
||||
assert_eq!(err.request_id.as_deref(), Some("req-401"));
|
||||
assert_eq!(err.cf_ray.as_deref(), Some("ray-401"));
|
||||
assert_eq!(
|
||||
err.identity_authorization_error.as_deref(),
|
||||
Some("missing_authorization_header")
|
||||
);
|
||||
assert_eq!(err.identity_error_code.as_deref(), Some("token_expired"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn core_auth_provider_reports_when_auth_header_will_attach() {
|
||||
let auth = CoreAuthProvider {
|
||||
token: Some("access-token".to_string()),
|
||||
account_id: None,
|
||||
};
|
||||
|
||||
assert!(auth.auth_header_attached());
|
||||
}
|
||||
|
||||
@@ -874,6 +874,17 @@ pub struct UnauthorizedRecovery {
|
||||
mode: UnauthorizedRecoveryMode,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub struct UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Option<bool>,
|
||||
}
|
||||
|
||||
impl UnauthorizedRecoveryStepResult {
|
||||
pub fn auth_state_changed(&self) -> Option<bool> {
|
||||
self.auth_state_changed
|
||||
}
|
||||
}
|
||||
|
||||
impl UnauthorizedRecovery {
|
||||
fn new(manager: Arc<AuthManager>) -> Self {
|
||||
let cached_auth = manager.auth_cached();
|
||||
@@ -917,7 +928,46 @@ impl UnauthorizedRecovery {
|
||||
!matches!(self.step, UnauthorizedRecoveryStep::Done)
|
||||
}
|
||||
|
||||
pub async fn next(&mut self) -> Result<(), RefreshTokenError> {
|
||||
pub fn unavailable_reason(&self) -> &'static str {
|
||||
if !self
|
||||
.manager
|
||||
.auth_cached()
|
||||
.as_ref()
|
||||
.is_some_and(CodexAuth::is_chatgpt_auth)
|
||||
{
|
||||
return "not_chatgpt_auth";
|
||||
}
|
||||
|
||||
if self.mode == UnauthorizedRecoveryMode::External
|
||||
&& !self.manager.has_external_auth_refresher()
|
||||
{
|
||||
return "no_external_refresher";
|
||||
}
|
||||
|
||||
if matches!(self.step, UnauthorizedRecoveryStep::Done) {
|
||||
return "recovery_exhausted";
|
||||
}
|
||||
|
||||
"ready"
|
||||
}
|
||||
|
||||
pub fn mode_name(&self) -> &'static str {
|
||||
match self.mode {
|
||||
UnauthorizedRecoveryMode::Managed => "managed",
|
||||
UnauthorizedRecoveryMode::External => "external",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn step_name(&self) -> &'static str {
|
||||
match self.step {
|
||||
UnauthorizedRecoveryStep::Reload => "reload",
|
||||
UnauthorizedRecoveryStep::RefreshToken => "refresh_token",
|
||||
UnauthorizedRecoveryStep::ExternalRefresh => "external_refresh",
|
||||
UnauthorizedRecoveryStep::Done => "done",
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn next(&mut self) -> Result<UnauthorizedRecoveryStepResult, RefreshTokenError> {
|
||||
if !self.has_next() {
|
||||
return Err(RefreshTokenError::Permanent(RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Other,
|
||||
@@ -931,8 +981,17 @@ impl UnauthorizedRecovery {
|
||||
.manager
|
||||
.reload_if_account_id_matches(self.expected_account_id.as_deref())
|
||||
{
|
||||
ReloadOutcome::ReloadedChanged | ReloadOutcome::ReloadedNoChange => {
|
||||
ReloadOutcome::ReloadedChanged => {
|
||||
self.step = UnauthorizedRecoveryStep::RefreshToken;
|
||||
return Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(true),
|
||||
});
|
||||
}
|
||||
ReloadOutcome::ReloadedNoChange => {
|
||||
self.step = UnauthorizedRecoveryStep::RefreshToken;
|
||||
return Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(false),
|
||||
});
|
||||
}
|
||||
ReloadOutcome::Skipped => {
|
||||
self.step = UnauthorizedRecoveryStep::Done;
|
||||
@@ -946,16 +1005,24 @@ impl UnauthorizedRecovery {
|
||||
UnauthorizedRecoveryStep::RefreshToken => {
|
||||
self.manager.refresh_token_from_authority().await?;
|
||||
self.step = UnauthorizedRecoveryStep::Done;
|
||||
return Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(true),
|
||||
});
|
||||
}
|
||||
UnauthorizedRecoveryStep::ExternalRefresh => {
|
||||
self.manager
|
||||
.refresh_external_auth(ExternalAuthRefreshReason::Unauthorized)
|
||||
.await?;
|
||||
self.step = UnauthorizedRecoveryStep::Done;
|
||||
return Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(true),
|
||||
});
|
||||
}
|
||||
UnauthorizedRecoveryStep::Done => {}
|
||||
}
|
||||
Ok(())
|
||||
Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ use codex_protocol::config_types::ForcedLoginMethod;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -171,6 +172,33 @@ fn logout_removes_auth_file() -> Result<(), std::io::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unauthorized_recovery_reports_mode_and_step_names() {
|
||||
let dir = tempdir().unwrap();
|
||||
let manager = AuthManager::shared(
|
||||
dir.path().to_path_buf(),
|
||||
false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
);
|
||||
let managed = UnauthorizedRecovery {
|
||||
manager: Arc::clone(&manager),
|
||||
step: UnauthorizedRecoveryStep::Reload,
|
||||
expected_account_id: None,
|
||||
mode: UnauthorizedRecoveryMode::Managed,
|
||||
};
|
||||
assert_eq!(managed.mode_name(), "managed");
|
||||
assert_eq!(managed.step_name(), "reload");
|
||||
|
||||
let external = UnauthorizedRecovery {
|
||||
manager,
|
||||
step: UnauthorizedRecoveryStep::ExternalRefresh,
|
||||
expected_account_id: None,
|
||||
mode: UnauthorizedRecoveryMode::External,
|
||||
};
|
||||
assert_eq!(external.mode_name(), "external");
|
||||
assert_eq!(external.step_name(), "external_refresh");
|
||||
}
|
||||
|
||||
struct AuthFileParams {
|
||||
openai_api_key: Option<String>,
|
||||
chatgpt_plan_type: Option<String>,
|
||||
|
||||
@@ -75,6 +75,7 @@ use http::HeaderValue;
|
||||
use http::StatusCode as HttpStatusCode;
|
||||
use reqwest::StatusCode;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
@@ -91,11 +92,18 @@ use crate::client_common::ResponseEvent;
|
||||
use crate::client_common::ResponseStream;
|
||||
use crate::config::Config;
|
||||
use crate::default_client::build_reqwest_client;
|
||||
use crate::default_client::current_residency_header_telemetry;
|
||||
use crate::endpoint_config_telemetry::EndpointConfigTelemetry;
|
||||
use crate::endpoint_config_telemetry::EndpointConfigTelemetrySource;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result;
|
||||
use crate::flags::CODEX_RS_SSE_FIXTURE;
|
||||
use crate::model_provider_info::ModelProviderInfo;
|
||||
use crate::model_provider_info::WireApi;
|
||||
use crate::response_debug_context::extract_response_debug_context;
|
||||
use crate::response_debug_context::extract_response_debug_context_from_api_error;
|
||||
use crate::response_debug_context::telemetry_api_error_message;
|
||||
use crate::response_debug_context::telemetry_transport_error_message;
|
||||
use crate::tools::spec::create_tools_json_for_responses_api;
|
||||
|
||||
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
|
||||
@@ -104,6 +112,9 @@ pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
|
||||
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
|
||||
"x-responsesapi-include-timing-metrics";
|
||||
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
|
||||
const RESPONSES_ENDPOINT: &str = "/responses";
|
||||
const RESPONSES_COMPACT_ENDPOINT: &str = "/responses/compact";
|
||||
const MEMORIES_SUMMARIZE_ENDPOINT: &str = "/memories/trace_summarize";
|
||||
|
||||
pub fn ws_version_from_features(config: &Config) -> bool {
|
||||
config
|
||||
@@ -123,6 +134,7 @@ struct ModelClientState {
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
conversation_id: ThreadId,
|
||||
provider: ModelProviderInfo,
|
||||
endpoint_telemetry_source: EndpointConfigTelemetrySource,
|
||||
session_source: SessionSource,
|
||||
model_verbosity: Option<VerbosityConfig>,
|
||||
responses_websockets_enabled_by_feature: bool,
|
||||
@@ -141,6 +153,25 @@ struct CurrentClientSetup {
|
||||
auth: Option<CodexAuth>,
|
||||
api_provider: codex_api::Provider,
|
||||
api_auth: CoreAuthProvider,
|
||||
endpoint_telemetry: EndpointConfigTelemetry,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct RequestRouteTelemetry {
|
||||
endpoint: &'static str,
|
||||
residency_header_attached: bool,
|
||||
residency_header_value: Option<&'static str>,
|
||||
}
|
||||
|
||||
impl RequestRouteTelemetry {
|
||||
fn for_endpoint(endpoint: &'static str) -> Self {
|
||||
let residency = current_residency_header_telemetry();
|
||||
Self {
|
||||
endpoint,
|
||||
residency_header_attached: residency.attached,
|
||||
residency_header_value: residency.value,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A session-scoped client for model-provider API calls.
|
||||
@@ -223,12 +254,71 @@ impl ModelClient {
|
||||
enable_request_compression: bool,
|
||||
include_timing_metrics: bool,
|
||||
beta_features_header: Option<String>,
|
||||
) -> Self {
|
||||
let endpoint_telemetry_source =
|
||||
EndpointConfigTelemetrySource::for_provider_without_id(&provider);
|
||||
Self::new_with_endpoint_telemetry_source(
|
||||
auth_manager,
|
||||
conversation_id,
|
||||
provider,
|
||||
endpoint_telemetry_source,
|
||||
session_source,
|
||||
model_verbosity,
|
||||
responses_websockets_enabled_by_feature,
|
||||
enable_request_compression,
|
||||
include_timing_metrics,
|
||||
beta_features_header,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_with_provider_id(
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
conversation_id: ThreadId,
|
||||
provider_id: &str,
|
||||
provider: ModelProviderInfo,
|
||||
session_source: SessionSource,
|
||||
model_verbosity: Option<VerbosityConfig>,
|
||||
responses_websockets_enabled_by_feature: bool,
|
||||
enable_request_compression: bool,
|
||||
include_timing_metrics: bool,
|
||||
beta_features_header: Option<String>,
|
||||
) -> Self {
|
||||
let endpoint_telemetry_source =
|
||||
EndpointConfigTelemetrySource::for_provider(provider_id, &provider);
|
||||
Self::new_with_endpoint_telemetry_source(
|
||||
auth_manager,
|
||||
conversation_id,
|
||||
provider,
|
||||
endpoint_telemetry_source,
|
||||
session_source,
|
||||
model_verbosity,
|
||||
responses_websockets_enabled_by_feature,
|
||||
enable_request_compression,
|
||||
include_timing_metrics,
|
||||
beta_features_header,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new_with_endpoint_telemetry_source(
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
conversation_id: ThreadId,
|
||||
provider: ModelProviderInfo,
|
||||
endpoint_telemetry_source: EndpointConfigTelemetrySource,
|
||||
session_source: SessionSource,
|
||||
model_verbosity: Option<VerbosityConfig>,
|
||||
responses_websockets_enabled_by_feature: bool,
|
||||
enable_request_compression: bool,
|
||||
include_timing_metrics: bool,
|
||||
beta_features_header: Option<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
state: Arc::new(ModelClientState {
|
||||
auth_manager,
|
||||
conversation_id,
|
||||
provider,
|
||||
endpoint_telemetry_source,
|
||||
session_source,
|
||||
model_verbosity,
|
||||
responses_websockets_enabled_by_feature,
|
||||
@@ -288,7 +378,15 @@ impl ModelClient {
|
||||
}
|
||||
let client_setup = self.current_client_setup().await?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let request_telemetry = Self::build_request_telemetry(session_telemetry);
|
||||
let request_telemetry = Self::build_request_telemetry(
|
||||
session_telemetry,
|
||||
AuthRequestTelemetryContext::new(
|
||||
&client_setup.api_auth,
|
||||
PendingUnauthorizedRetry::default(),
|
||||
),
|
||||
client_setup.endpoint_telemetry,
|
||||
RequestRouteTelemetry::for_endpoint(RESPONSES_COMPACT_ENDPOINT),
|
||||
);
|
||||
let client =
|
||||
ApiCompactClient::new(transport, client_setup.api_provider, client_setup.api_auth)
|
||||
.with_telemetry(Some(request_telemetry));
|
||||
@@ -329,7 +427,15 @@ impl ModelClient {
|
||||
|
||||
let client_setup = self.current_client_setup().await?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let request_telemetry = Self::build_request_telemetry(session_telemetry);
|
||||
let request_telemetry = Self::build_request_telemetry(
|
||||
session_telemetry,
|
||||
AuthRequestTelemetryContext::new(
|
||||
&client_setup.api_auth,
|
||||
PendingUnauthorizedRetry::default(),
|
||||
),
|
||||
client_setup.endpoint_telemetry,
|
||||
RequestRouteTelemetry::for_endpoint(MEMORIES_SUMMARIZE_ENDPOINT),
|
||||
);
|
||||
let client =
|
||||
ApiMemoriesClient::new(transport, client_setup.api_provider, client_setup.api_auth)
|
||||
.with_telemetry(Some(request_telemetry));
|
||||
@@ -369,8 +475,18 @@ impl ModelClient {
|
||||
}
|
||||
|
||||
/// Builds request telemetry for unary API calls (e.g., Compact endpoint).
|
||||
fn build_request_telemetry(session_telemetry: &SessionTelemetry) -> Arc<dyn RequestTelemetry> {
|
||||
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
|
||||
fn build_request_telemetry(
|
||||
session_telemetry: &SessionTelemetry,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
endpoint_telemetry: EndpointConfigTelemetry,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
) -> Arc<dyn RequestTelemetry> {
|
||||
let telemetry = Arc::new(ApiTelemetry::new(
|
||||
session_telemetry.clone(),
|
||||
auth_context,
|
||||
endpoint_telemetry,
|
||||
request_route_telemetry,
|
||||
));
|
||||
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry;
|
||||
request_telemetry
|
||||
}
|
||||
@@ -406,10 +522,15 @@ impl ModelClient {
|
||||
.provider
|
||||
.to_api_provider(auth.as_ref().map(CodexAuth::auth_mode))?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?;
|
||||
let endpoint_telemetry = self
|
||||
.state
|
||||
.endpoint_telemetry_source
|
||||
.classify(api_provider.base_url.as_str());
|
||||
Ok(CurrentClientSetup {
|
||||
auth,
|
||||
api_provider,
|
||||
api_auth,
|
||||
endpoint_telemetry,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -417,6 +538,7 @@ impl ModelClient {
|
||||
///
|
||||
/// Both startup prewarm and in-turn `needs_new` reconnects call this path so handshake
|
||||
/// behavior remains consistent across both flows.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn connect_websocket(
|
||||
&self,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
@@ -424,17 +546,63 @@ impl ModelClient {
|
||||
api_auth: CoreAuthProvider,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
endpoint_telemetry: EndpointConfigTelemetry,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
) -> std::result::Result<ApiWebSocketConnection, ApiError> {
|
||||
let headers = self.build_websocket_headers(turn_state.as_ref(), turn_metadata_header);
|
||||
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(session_telemetry);
|
||||
ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
||||
let start = Instant::now();
|
||||
let result = ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
||||
.connect(
|
||||
headers,
|
||||
crate::default_client::default_headers(),
|
||||
turn_state,
|
||||
Some(websocket_telemetry),
|
||||
)
|
||||
.await
|
||||
.await;
|
||||
let error_message = result.as_ref().err().map(telemetry_api_error_message);
|
||||
let response_debug = result
|
||||
.as_ref()
|
||||
.err()
|
||||
.map(extract_response_debug_context_from_api_error)
|
||||
.unwrap_or_default();
|
||||
let status = result.as_ref().err().and_then(api_error_http_status);
|
||||
session_telemetry.record_websocket_connect(
|
||||
start.elapsed(),
|
||||
status,
|
||||
error_message.as_deref(),
|
||||
auth_context.auth_header_attached,
|
||||
auth_context.retry_after_unauthorized,
|
||||
auth_context.recovery_mode,
|
||||
auth_context.recovery_phase,
|
||||
request_route_telemetry.endpoint,
|
||||
request_route_telemetry.residency_header_attached,
|
||||
request_route_telemetry.residency_header_value,
|
||||
endpoint_telemetry.base_url_origin,
|
||||
endpoint_telemetry.host_class,
|
||||
endpoint_telemetry.base_url_source,
|
||||
endpoint_telemetry.base_url_is_default,
|
||||
response_debug.request_id.as_deref(),
|
||||
response_debug.cf_ray.as_deref(),
|
||||
response_debug.auth_error.as_deref(),
|
||||
response_debug.auth_error_code.as_deref(),
|
||||
response_debug.error_body_class,
|
||||
response_debug.safe_error_message,
|
||||
);
|
||||
if status == Some(StatusCode::UNAUTHORIZED.as_u16()) && response_debug.geo_denial_detected {
|
||||
session_telemetry.record_geo_denial(
|
||||
request_route_telemetry.endpoint,
|
||||
request_route_telemetry.residency_header_attached,
|
||||
request_route_telemetry.residency_header_value,
|
||||
status,
|
||||
response_debug.request_id.as_deref(),
|
||||
response_debug.cf_ray.as_deref(),
|
||||
response_debug.error_body_class.unwrap_or_default(),
|
||||
response_debug.safe_error_message,
|
||||
);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Builds websocket handshake headers for both prewarm and turn-time reconnect.
|
||||
@@ -677,6 +845,11 @@ impl ModelClientSession {
|
||||
"failed to build websocket prewarm client setup: {err}"
|
||||
))
|
||||
})?;
|
||||
let auth_context = AuthRequestTelemetryContext::new(
|
||||
&client_setup.api_auth,
|
||||
PendingUnauthorizedRetry::default(),
|
||||
);
|
||||
let endpoint_telemetry = client_setup.endpoint_telemetry;
|
||||
|
||||
let connection = self
|
||||
.client
|
||||
@@ -686,12 +859,16 @@ impl ModelClientSession {
|
||||
client_setup.api_auth,
|
||||
Some(Arc::clone(&self.turn_state)),
|
||||
None,
|
||||
auth_context,
|
||||
endpoint_telemetry,
|
||||
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
|
||||
)
|
||||
.await?;
|
||||
self.websocket_session.connection = Some(connection);
|
||||
Ok(())
|
||||
}
|
||||
/// Returns a websocket connection for this turn.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn websocket_connection(
|
||||
&mut self,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
@@ -699,6 +876,9 @@ impl ModelClientSession {
|
||||
api_auth: CoreAuthProvider,
|
||||
turn_metadata_header: Option<&str>,
|
||||
options: &ApiResponsesOptions,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
endpoint_telemetry: EndpointConfigTelemetry,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
|
||||
let needs_new = match self.websocket_session.connection.as_ref() {
|
||||
Some(conn) => conn.is_closed().await,
|
||||
@@ -720,6 +900,9 @@ impl ModelClientSession {
|
||||
api_auth,
|
||||
Some(turn_state),
|
||||
turn_metadata_header,
|
||||
auth_context,
|
||||
endpoint_telemetry,
|
||||
request_route_telemetry,
|
||||
)
|
||||
.await?;
|
||||
self.websocket_session.connection = Some(new_conn);
|
||||
@@ -774,11 +957,18 @@ impl ModelClientSession {
|
||||
let mut auth_recovery = auth_manager
|
||||
.as_ref()
|
||||
.map(super::auth::AuthManager::unauthorized_recovery);
|
||||
let mut pending_retry = PendingUnauthorizedRetry::default();
|
||||
loop {
|
||||
let client_setup = self.client.current_client_setup().await?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let (request_telemetry, sse_telemetry) =
|
||||
Self::build_streaming_telemetry(session_telemetry);
|
||||
let request_auth_context =
|
||||
AuthRequestTelemetryContext::new(&client_setup.api_auth, pending_retry);
|
||||
let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(
|
||||
session_telemetry,
|
||||
request_auth_context,
|
||||
client_setup.endpoint_telemetry,
|
||||
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
|
||||
);
|
||||
let compression = self.responses_request_compression(client_setup.auth.as_ref());
|
||||
let options = self.build_responses_options(turn_metadata_header, compression);
|
||||
|
||||
@@ -806,7 +996,14 @@ impl ModelClientSession {
|
||||
Err(ApiError::Transport(
|
||||
unauthorized_transport @ TransportError::Http { status, .. },
|
||||
)) if status == StatusCode::UNAUTHORIZED => {
|
||||
handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?;
|
||||
pending_retry = PendingUnauthorizedRetry::from_recovery(
|
||||
handle_unauthorized(
|
||||
unauthorized_transport,
|
||||
&mut auth_recovery,
|
||||
session_telemetry,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Err(err) => return Err(map_api_error(err)),
|
||||
@@ -832,8 +1029,11 @@ impl ModelClientSession {
|
||||
let mut auth_recovery = auth_manager
|
||||
.as_ref()
|
||||
.map(super::auth::AuthManager::unauthorized_recovery);
|
||||
let mut pending_retry = PendingUnauthorizedRetry::default();
|
||||
loop {
|
||||
let client_setup = self.client.current_client_setup().await?;
|
||||
let request_auth_context =
|
||||
AuthRequestTelemetryContext::new(&client_setup.api_auth, pending_retry);
|
||||
let compression = self.responses_request_compression(client_setup.auth.as_ref());
|
||||
|
||||
let options = self.build_responses_options(turn_metadata_header, compression);
|
||||
@@ -860,6 +1060,9 @@ impl ModelClientSession {
|
||||
client_setup.api_auth,
|
||||
turn_metadata_header,
|
||||
&options,
|
||||
request_auth_context,
|
||||
client_setup.endpoint_telemetry,
|
||||
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -872,7 +1075,14 @@ impl ModelClientSession {
|
||||
Err(ApiError::Transport(
|
||||
unauthorized_transport @ TransportError::Http { status, .. },
|
||||
)) if status == StatusCode::UNAUTHORIZED => {
|
||||
handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?;
|
||||
pending_retry = PendingUnauthorizedRetry::from_recovery(
|
||||
handle_unauthorized(
|
||||
unauthorized_transport,
|
||||
&mut auth_recovery,
|
||||
session_telemetry,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Err(err) => return Err(map_api_error(err)),
|
||||
@@ -902,8 +1112,16 @@ impl ModelClientSession {
|
||||
/// Builds request and SSE telemetry for streaming API calls.
|
||||
fn build_streaming_telemetry(
|
||||
session_telemetry: &SessionTelemetry,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
endpoint_telemetry: EndpointConfigTelemetry,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
) -> (Arc<dyn RequestTelemetry>, Arc<dyn SseTelemetry>) {
|
||||
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
|
||||
let telemetry = Arc::new(ApiTelemetry::new(
|
||||
session_telemetry.clone(),
|
||||
auth_context,
|
||||
endpoint_telemetry,
|
||||
request_route_telemetry,
|
||||
));
|
||||
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry.clone();
|
||||
let sse_telemetry: Arc<dyn SseTelemetry> = telemetry;
|
||||
(request_telemetry, sse_telemetry)
|
||||
@@ -913,7 +1131,12 @@ impl ModelClientSession {
|
||||
fn build_websocket_telemetry(
|
||||
session_telemetry: &SessionTelemetry,
|
||||
) -> Arc<dyn WebsocketTelemetry> {
|
||||
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
|
||||
let telemetry = Arc::new(ApiTelemetry::new(
|
||||
session_telemetry.clone(),
|
||||
AuthRequestTelemetryContext::default(),
|
||||
EndpointConfigTelemetry::default(),
|
||||
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
|
||||
));
|
||||
let websocket_telemetry: Arc<dyn WebsocketTelemetry> = telemetry;
|
||||
websocket_telemetry
|
||||
}
|
||||
@@ -1185,30 +1408,155 @@ where
|
||||
///
|
||||
/// When refresh succeeds, the caller should retry the API call; otherwise
|
||||
/// the mapped `CodexErr` is returned to the caller.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
struct UnauthorizedRecoveryExecution {
|
||||
mode: &'static str,
|
||||
phase: &'static str,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default)]
|
||||
struct PendingUnauthorizedRetry {
|
||||
retry_after_unauthorized: bool,
|
||||
recovery_mode: Option<&'static str>,
|
||||
recovery_phase: Option<&'static str>,
|
||||
}
|
||||
|
||||
impl PendingUnauthorizedRetry {
|
||||
fn from_recovery(recovery: UnauthorizedRecoveryExecution) -> Self {
|
||||
Self {
|
||||
retry_after_unauthorized: true,
|
||||
recovery_mode: Some(recovery.mode),
|
||||
recovery_phase: Some(recovery.phase),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default)]
|
||||
struct AuthRequestTelemetryContext {
|
||||
auth_header_attached: bool,
|
||||
retry_after_unauthorized: bool,
|
||||
recovery_mode: Option<&'static str>,
|
||||
recovery_phase: Option<&'static str>,
|
||||
}
|
||||
|
||||
impl AuthRequestTelemetryContext {
|
||||
fn new(api_auth: &CoreAuthProvider, retry: PendingUnauthorizedRetry) -> Self {
|
||||
Self {
|
||||
auth_header_attached: api_auth.auth_header_attached(),
|
||||
retry_after_unauthorized: retry.retry_after_unauthorized,
|
||||
recovery_mode: retry.recovery_mode,
|
||||
recovery_phase: retry.recovery_phase,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_unauthorized(
|
||||
transport: TransportError,
|
||||
auth_recovery: &mut Option<UnauthorizedRecovery>,
|
||||
) -> Result<()> {
|
||||
session_telemetry: &SessionTelemetry,
|
||||
) -> Result<UnauthorizedRecoveryExecution> {
|
||||
let debug = extract_response_debug_context(&transport);
|
||||
if let Some(recovery) = auth_recovery
|
||||
&& recovery.has_next()
|
||||
{
|
||||
let mode = recovery.mode_name();
|
||||
let phase = recovery.step_name();
|
||||
return match recovery.next().await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(RefreshTokenError::Permanent(failed)) => Err(CodexErr::RefreshTokenFailed(failed)),
|
||||
Err(RefreshTokenError::Transient(other)) => Err(CodexErr::Io(other)),
|
||||
Ok(step_result) => {
|
||||
session_telemetry.record_auth_recovery(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_succeeded",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
None,
|
||||
step_result.auth_state_changed(),
|
||||
);
|
||||
Ok(UnauthorizedRecoveryExecution { mode, phase })
|
||||
}
|
||||
Err(RefreshTokenError::Permanent(failed)) => {
|
||||
session_telemetry.record_auth_recovery(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_failed_permanent",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
Err(CodexErr::RefreshTokenFailed(failed))
|
||||
}
|
||||
Err(RefreshTokenError::Transient(other)) => {
|
||||
session_telemetry.record_auth_recovery(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_failed_transient",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
Err(CodexErr::Io(other))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let (mode, phase, recovery_reason) = match auth_recovery.as_ref() {
|
||||
Some(recovery) => (
|
||||
recovery.mode_name(),
|
||||
recovery.step_name(),
|
||||
Some(recovery.unavailable_reason()),
|
||||
),
|
||||
None => ("none", "none", Some("auth_manager_missing")),
|
||||
};
|
||||
session_telemetry.record_auth_recovery(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_not_run",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
recovery_reason,
|
||||
None,
|
||||
);
|
||||
|
||||
Err(map_api_error(ApiError::Transport(transport)))
|
||||
}
|
||||
|
||||
fn api_error_http_status(error: &ApiError) -> Option<u16> {
|
||||
match error {
|
||||
ApiError::Transport(TransportError::Http { status, .. }) => Some(status.as_u16()),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
struct ApiTelemetry {
|
||||
session_telemetry: SessionTelemetry,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
endpoint_telemetry: EndpointConfigTelemetry,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
}
|
||||
|
||||
impl ApiTelemetry {
|
||||
fn new(session_telemetry: SessionTelemetry) -> Self {
|
||||
Self { session_telemetry }
|
||||
fn new(
|
||||
session_telemetry: SessionTelemetry,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
endpoint_telemetry: EndpointConfigTelemetry,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
) -> Self {
|
||||
Self {
|
||||
session_telemetry,
|
||||
auth_context,
|
||||
endpoint_telemetry,
|
||||
request_route_telemetry,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1220,13 +1568,46 @@ impl RequestTelemetry for ApiTelemetry {
|
||||
error: Option<&TransportError>,
|
||||
duration: Duration,
|
||||
) {
|
||||
let error_message = error.map(std::string::ToString::to_string);
|
||||
let error_message = error.map(telemetry_transport_error_message);
|
||||
let status = status.map(|s| s.as_u16());
|
||||
let debug = error
|
||||
.map(extract_response_debug_context)
|
||||
.unwrap_or_default();
|
||||
self.session_telemetry.record_api_request(
|
||||
attempt,
|
||||
status.map(|s| s.as_u16()),
|
||||
status,
|
||||
error_message.as_deref(),
|
||||
duration,
|
||||
self.auth_context.auth_header_attached,
|
||||
self.auth_context.retry_after_unauthorized,
|
||||
self.auth_context.recovery_mode,
|
||||
self.auth_context.recovery_phase,
|
||||
self.request_route_telemetry.endpoint,
|
||||
self.request_route_telemetry.residency_header_attached,
|
||||
self.request_route_telemetry.residency_header_value,
|
||||
self.endpoint_telemetry.base_url_origin,
|
||||
self.endpoint_telemetry.host_class,
|
||||
self.endpoint_telemetry.base_url_source,
|
||||
self.endpoint_telemetry.base_url_is_default,
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
debug.error_body_class,
|
||||
debug.safe_error_message,
|
||||
);
|
||||
if status == Some(StatusCode::UNAUTHORIZED.as_u16()) && debug.geo_denial_detected {
|
||||
self.session_telemetry.record_geo_denial(
|
||||
self.request_route_telemetry.endpoint,
|
||||
self.request_route_telemetry.residency_header_attached,
|
||||
self.request_route_telemetry.residency_header_value,
|
||||
status,
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.error_body_class.unwrap_or_default(),
|
||||
debug.safe_error_message,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1245,7 +1626,7 @@ impl SseTelemetry for ApiTelemetry {
|
||||
|
||||
impl WebsocketTelemetry for ApiTelemetry {
|
||||
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>) {
|
||||
let error_message = error.map(std::string::ToString::to_string);
|
||||
let error_message = error.map(telemetry_api_error_message);
|
||||
self.session_telemetry
|
||||
.record_websocket_request(duration, error_message.as_deref());
|
||||
}
|
||||
|
||||
@@ -1,10 +1,18 @@
|
||||
use super::AuthRequestTelemetryContext;
|
||||
use super::ModelClient;
|
||||
use super::PendingUnauthorizedRetry;
|
||||
use super::UnauthorizedRecoveryExecution;
|
||||
use crate::endpoint_config_telemetry::EndpointConfigTelemetrySource;
|
||||
use crate::model_provider_info::LMSTUDIO_OSS_PROVIDER_ID;
|
||||
use crate::response_debug_context::extract_response_debug_context;
|
||||
use codex_api::TransportError;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use pretty_assertions::assert_eq;
|
||||
use reqwest::StatusCode;
|
||||
use serde_json::json;
|
||||
|
||||
fn test_model_client(session_source: SessionSource) -> ModelClient {
|
||||
@@ -25,6 +33,47 @@ fn test_model_client(session_source: SessionSource) -> ModelClient {
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn model_client_new_requires_explicit_provider_id_for_builtin_endpoint_defaults() {
|
||||
let provider = crate::model_provider_info::create_oss_provider_with_base_url(
|
||||
"http://localhost:1234/v1",
|
||||
crate::model_provider_info::WireApi::Responses,
|
||||
);
|
||||
|
||||
let client = ModelClient::new(
|
||||
None,
|
||||
ThreadId::new(),
|
||||
provider.clone(),
|
||||
SessionSource::Cli,
|
||||
None,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
);
|
||||
let client_with_provider_id = ModelClient::new_with_provider_id(
|
||||
None,
|
||||
ThreadId::new(),
|
||||
LMSTUDIO_OSS_PROVIDER_ID,
|
||||
provider,
|
||||
SessionSource::Cli,
|
||||
None,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
client.state.endpoint_telemetry_source,
|
||||
EndpointConfigTelemetrySource::new("config_toml", false)
|
||||
);
|
||||
assert_eq!(
|
||||
client_with_provider_id.state.endpoint_telemetry_source,
|
||||
EndpointConfigTelemetrySource::new("default", true)
|
||||
);
|
||||
}
|
||||
|
||||
fn test_model_info() -> ModelInfo {
|
||||
serde_json::from_value(json!({
|
||||
"slug": "gpt-test",
|
||||
@@ -94,3 +143,52 @@ async fn summarize_memories_returns_empty_for_empty_input() {
|
||||
.expect("empty summarize request should succeed");
|
||||
assert_eq!(output.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extract_response_debug_context_decodes_identity_headers() {
|
||||
let mut headers = http::HeaderMap::new();
|
||||
headers.insert(
|
||||
"x-oai-request-id",
|
||||
http::HeaderValue::from_static("req-401"),
|
||||
);
|
||||
headers.insert("cf-ray", http::HeaderValue::from_static("ray-401"));
|
||||
headers.insert(
|
||||
"x-openai-authorization-error",
|
||||
http::HeaderValue::from_static("missing_authorization_header"),
|
||||
);
|
||||
headers.insert(
|
||||
"x-error-json",
|
||||
http::HeaderValue::from_static("eyJlcnJvciI6eyJjb2RlIjoidG9rZW5fZXhwaXJlZCJ9fQ=="),
|
||||
);
|
||||
|
||||
let context = extract_response_debug_context(&TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
|
||||
headers: Some(headers),
|
||||
body: Some(r#"{"detail":"Unauthorized"}"#.to_string()),
|
||||
});
|
||||
|
||||
assert_eq!(context.request_id.as_deref(), Some("req-401"));
|
||||
assert_eq!(context.cf_ray.as_deref(), Some("ray-401"));
|
||||
assert_eq!(
|
||||
context.auth_error.as_deref(),
|
||||
Some("missing_authorization_header")
|
||||
);
|
||||
assert_eq!(context.auth_error_code.as_deref(), Some("token_expired"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn auth_request_telemetry_context_tracks_attached_auth_and_retry_phase() {
|
||||
let auth_context = AuthRequestTelemetryContext::new(
|
||||
&crate::api_bridge::CoreAuthProvider::for_test(Some("access-token"), Some("workspace-123")),
|
||||
PendingUnauthorizedRetry::from_recovery(UnauthorizedRecoveryExecution {
|
||||
mode: "managed",
|
||||
phase: "refresh_token",
|
||||
}),
|
||||
);
|
||||
|
||||
assert!(auth_context.auth_header_attached);
|
||||
assert!(auth_context.retry_after_unauthorized);
|
||||
assert_eq!(auth_context.recovery_mode, Some("managed"));
|
||||
assert_eq!(auth_context.recovery_phase, Some("refresh_token"));
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use crate::compact::should_use_remote_compact_task;
|
||||
use crate::compact_remote::run_inline_remote_auto_compact_task;
|
||||
use crate::config::ManagedFeatures;
|
||||
use crate::connectors;
|
||||
use crate::endpoint_config_telemetry::resolve_endpoint_config_telemetry_source;
|
||||
use crate::exec_policy::ExecPolicyManager;
|
||||
use crate::features::FEATURES;
|
||||
use crate::features::Feature;
|
||||
@@ -1510,7 +1511,8 @@ impl Session {
|
||||
}
|
||||
|
||||
let auth = auth.as_ref();
|
||||
let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from);
|
||||
let provider_auth_mode = auth.map(CodexAuth::auth_mode);
|
||||
let auth_mode = provider_auth_mode.map(TelemetryAuthMode::from);
|
||||
let account_id = auth.and_then(CodexAuth::get_account_id);
|
||||
let account_email = auth.and_then(CodexAuth::get_account_email);
|
||||
let originator = crate::default_client::originator().value;
|
||||
@@ -1555,9 +1557,22 @@ impl Session {
|
||||
},
|
||||
)],
|
||||
);
|
||||
let endpoint_telemetry_source = resolve_endpoint_config_telemetry_source(
|
||||
config.as_ref(),
|
||||
session_configuration.session_source.clone(),
|
||||
);
|
||||
let conversation_start_endpoint_telemetry = config
|
||||
.model_provider
|
||||
.to_api_provider(provider_auth_mode)
|
||||
.map(|provider| endpoint_telemetry_source.classify(provider.base_url.as_str()))
|
||||
.unwrap_or_else(|_| endpoint_telemetry_source.redacted_unknown());
|
||||
|
||||
session_telemetry.conversation_starts(
|
||||
config.model_provider.name.as_str(),
|
||||
conversation_start_endpoint_telemetry.base_url_origin,
|
||||
conversation_start_endpoint_telemetry.host_class,
|
||||
conversation_start_endpoint_telemetry.base_url_source,
|
||||
conversation_start_endpoint_telemetry.base_url_is_default,
|
||||
session_configuration.collaboration_mode.reasoning_effort(),
|
||||
config
|
||||
.model_reasoning_summary
|
||||
@@ -1723,10 +1738,11 @@ impl Session {
|
||||
network_proxy,
|
||||
network_approval: Arc::clone(&network_approval),
|
||||
state_db: state_db_ctx.clone(),
|
||||
model_client: ModelClient::new(
|
||||
model_client: ModelClient::new_with_endpoint_telemetry_source(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
conversation_id,
|
||||
session_configuration.provider.clone(),
|
||||
endpoint_telemetry_source,
|
||||
session_configuration.session_source.clone(),
|
||||
config.model_verbosity,
|
||||
ws_version_from_features(config.as_ref()),
|
||||
|
||||
@@ -28,6 +28,12 @@ pub const DEFAULT_ORIGINATOR: &str = "codex_cli_rs";
|
||||
pub const CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR: &str = "CODEX_INTERNAL_ORIGINATOR_OVERRIDE";
|
||||
pub const RESIDENCY_HEADER_NAME: &str = "x-openai-internal-codex-residency";
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
pub struct ResidencyHeaderTelemetry {
|
||||
pub attached: bool,
|
||||
pub value: Option<&'static str>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Originator {
|
||||
pub value: String,
|
||||
@@ -87,6 +93,20 @@ pub fn set_default_client_residency_requirement(enforce_residency: Option<Reside
|
||||
*guard = enforce_residency;
|
||||
}
|
||||
|
||||
pub fn current_residency_header_telemetry() -> ResidencyHeaderTelemetry {
|
||||
let Ok(guard) = REQUIREMENTS_RESIDENCY.read() else {
|
||||
tracing::warn!("Failed to acquire requirements residency lock");
|
||||
return ResidencyHeaderTelemetry::default();
|
||||
};
|
||||
let Some(requirement) = guard.as_ref() else {
|
||||
return ResidencyHeaderTelemetry::default();
|
||||
};
|
||||
ResidencyHeaderTelemetry {
|
||||
attached: true,
|
||||
value: Some(residency_header_value(*requirement)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn originator() -> Originator {
|
||||
if let Ok(guard) = ORIGINATOR.read()
|
||||
&& let Some(originator) = guard.as_ref()
|
||||
@@ -203,14 +223,20 @@ pub fn default_headers() -> HeaderMap {
|
||||
&& let Some(requirement) = guard.as_ref()
|
||||
&& !headers.contains_key(RESIDENCY_HEADER_NAME)
|
||||
{
|
||||
let value = match requirement {
|
||||
ResidencyRequirement::Us => HeaderValue::from_static("us"),
|
||||
};
|
||||
headers.insert(RESIDENCY_HEADER_NAME, value);
|
||||
headers.insert(
|
||||
RESIDENCY_HEADER_NAME,
|
||||
HeaderValue::from_static(residency_header_value(*requirement)),
|
||||
);
|
||||
}
|
||||
headers
|
||||
}
|
||||
|
||||
fn residency_header_value(requirement: ResidencyRequirement) -> &'static str {
|
||||
match requirement {
|
||||
ResidencyRequirement::Us => "us",
|
||||
}
|
||||
}
|
||||
|
||||
fn is_sandboxed() -> bool {
|
||||
std::env::var(CODEX_SANDBOX_ENV_VAR).as_deref() == Ok("seatbelt")
|
||||
}
|
||||
|
||||
@@ -87,6 +87,23 @@ async fn test_create_client_sets_default_headers() {
|
||||
set_default_client_residency_requirement(None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn current_residency_header_telemetry_reports_attached_value() {
|
||||
set_default_client_residency_requirement(Some(ResidencyRequirement::Us));
|
||||
assert_eq!(
|
||||
current_residency_header_telemetry(),
|
||||
ResidencyHeaderTelemetry {
|
||||
attached: true,
|
||||
value: Some("us"),
|
||||
}
|
||||
);
|
||||
set_default_client_residency_requirement(None);
|
||||
assert_eq!(
|
||||
current_residency_header_telemetry(),
|
||||
ResidencyHeaderTelemetry::default()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invalid_suffix_is_sanitized() {
|
||||
let prefix = "codex_cli_rs/0.0.0";
|
||||
|
||||
327
codex-rs/core/src/endpoint_config_telemetry.rs
Normal file
327
codex-rs/core/src/endpoint_config_telemetry.rs
Normal file
@@ -0,0 +1,327 @@
|
||||
use crate::config::Config;
|
||||
use crate::model_provider_info::LMSTUDIO_OSS_PROVIDER_ID;
|
||||
use crate::model_provider_info::ModelProviderInfo;
|
||||
use crate::model_provider_info::OLLAMA_OSS_PROVIDER_ID;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use reqwest::Url;
|
||||
|
||||
const BASE_URL_ORIGIN_CHATGPT: &str = "chatgpt.com";
|
||||
const BASE_URL_ORIGIN_OPENAI_API: &str = "api.openai.com";
|
||||
const BASE_URL_ORIGIN_OPENROUTER: &str = "openrouter.ai";
|
||||
const BASE_URL_ORIGIN_CUSTOM: &str = "custom";
|
||||
|
||||
const HOST_CLASS_OPENAI_CHATGPT: &str = "openai_chatgpt";
|
||||
const HOST_CLASS_OPENAI_API: &str = "openai_api";
|
||||
const HOST_CLASS_KNOWN_THIRD_PARTY: &str = "known_third_party";
|
||||
const HOST_CLASS_CUSTOM_UNKNOWN: &str = "custom_unknown";
|
||||
|
||||
const BASE_URL_SOURCE_DEFAULT: &str = "default";
|
||||
const BASE_URL_SOURCE_ENV: &str = "env";
|
||||
const BASE_URL_SOURCE_CONFIG_TOML: &str = "config_toml";
|
||||
const BASE_URL_SOURCE_IDE_SETTINGS: &str = "ide_settings";
|
||||
const BASE_URL_SOURCE_MANAGED_CONFIG: &str = "managed_config";
|
||||
const BASE_URL_SOURCE_SESSION_FLAGS: &str = "session_flags";
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub(crate) struct EndpointConfigTelemetrySource {
|
||||
pub(crate) base_url_source: &'static str,
|
||||
pub(crate) base_url_is_default: bool,
|
||||
}
|
||||
|
||||
impl EndpointConfigTelemetrySource {
|
||||
pub(crate) const fn new(base_url_source: &'static str, base_url_is_default: bool) -> Self {
|
||||
Self {
|
||||
base_url_source,
|
||||
base_url_is_default,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn classify(self, base_url: &str) -> EndpointConfigTelemetry {
|
||||
let (base_url_origin, host_class) = classify_base_url(base_url);
|
||||
EndpointConfigTelemetry {
|
||||
base_url_origin,
|
||||
host_class,
|
||||
base_url_source: self.base_url_source,
|
||||
base_url_is_default: self.base_url_is_default,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) const fn redacted_unknown(self) -> EndpointConfigTelemetry {
|
||||
EndpointConfigTelemetry {
|
||||
base_url_origin: BASE_URL_ORIGIN_CUSTOM,
|
||||
host_class: HOST_CLASS_CUSTOM_UNKNOWN,
|
||||
base_url_source: self.base_url_source,
|
||||
base_url_is_default: self.base_url_is_default,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn for_provider(
|
||||
provider_id: &str,
|
||||
provider: &ModelProviderInfo,
|
||||
) -> EndpointConfigTelemetrySource {
|
||||
endpoint_source_from_provider_defaults(provider_id, provider)
|
||||
}
|
||||
|
||||
pub(crate) fn for_provider_without_id(provider: &ModelProviderInfo) -> Self {
|
||||
let base_url_is_default = provider.base_url.is_none();
|
||||
let base_url_source = if base_url_is_default {
|
||||
BASE_URL_SOURCE_DEFAULT
|
||||
} else {
|
||||
BASE_URL_SOURCE_CONFIG_TOML
|
||||
};
|
||||
EndpointConfigTelemetrySource::new(base_url_source, base_url_is_default)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub(crate) struct EndpointConfigTelemetry {
|
||||
pub(crate) base_url_origin: &'static str,
|
||||
pub(crate) host_class: &'static str,
|
||||
pub(crate) base_url_source: &'static str,
|
||||
pub(crate) base_url_is_default: bool,
|
||||
}
|
||||
|
||||
impl Default for EndpointConfigTelemetry {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
base_url_origin: BASE_URL_ORIGIN_CUSTOM,
|
||||
host_class: HOST_CLASS_CUSTOM_UNKNOWN,
|
||||
base_url_source: BASE_URL_SOURCE_DEFAULT,
|
||||
base_url_is_default: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_endpoint_config_telemetry_source(
|
||||
config: &Config,
|
||||
session_source: SessionSource,
|
||||
) -> EndpointConfigTelemetrySource {
|
||||
let origins = config.config_layer_stack.origins();
|
||||
let key = format!("model_providers.{}.base_url", config.model_provider_id);
|
||||
if let Some(origin) = origins.get(&key) {
|
||||
return endpoint_source_from_layer(&origin.name, session_source);
|
||||
}
|
||||
|
||||
endpoint_source_from_provider_defaults(
|
||||
config.model_provider_id.as_str(),
|
||||
&config.model_provider,
|
||||
)
|
||||
}
|
||||
|
||||
fn endpoint_source_from_layer(
|
||||
layer: &ConfigLayerSource,
|
||||
session_source: SessionSource,
|
||||
) -> EndpointConfigTelemetrySource {
|
||||
let base_url_source = match layer {
|
||||
ConfigLayerSource::SessionFlags => match session_source {
|
||||
SessionSource::VSCode | SessionSource::Mcp => BASE_URL_SOURCE_IDE_SETTINGS,
|
||||
SessionSource::Cli
|
||||
| SessionSource::Exec
|
||||
| SessionSource::SubAgent(_)
|
||||
| SessionSource::Unknown => BASE_URL_SOURCE_SESSION_FLAGS,
|
||||
},
|
||||
ConfigLayerSource::User { .. } | ConfigLayerSource::Project { .. } => {
|
||||
BASE_URL_SOURCE_CONFIG_TOML
|
||||
}
|
||||
ConfigLayerSource::System { .. }
|
||||
| ConfigLayerSource::Mdm { .. }
|
||||
| ConfigLayerSource::LegacyManagedConfigTomlFromFile { .. }
|
||||
| ConfigLayerSource::LegacyManagedConfigTomlFromMdm => BASE_URL_SOURCE_MANAGED_CONFIG,
|
||||
};
|
||||
|
||||
EndpointConfigTelemetrySource::new(base_url_source, false)
|
||||
}
|
||||
|
||||
fn endpoint_source_from_provider_defaults(
|
||||
provider_id: &str,
|
||||
provider: &ModelProviderInfo,
|
||||
) -> EndpointConfigTelemetrySource {
|
||||
let env_source = match provider_id {
|
||||
"openai" => env_var_present("OPENAI_BASE_URL"),
|
||||
OLLAMA_OSS_PROVIDER_ID | LMSTUDIO_OSS_PROVIDER_ID => {
|
||||
env_var_present("CODEX_OSS_BASE_URL") || env_var_present("CODEX_OSS_PORT")
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
if env_source {
|
||||
return EndpointConfigTelemetrySource::new(BASE_URL_SOURCE_ENV, false);
|
||||
}
|
||||
|
||||
let base_url_is_default = match provider_id {
|
||||
"openai" => provider.base_url.is_none(),
|
||||
OLLAMA_OSS_PROVIDER_ID | LMSTUDIO_OSS_PROVIDER_ID => true,
|
||||
_ => provider.base_url.is_none(),
|
||||
};
|
||||
if base_url_is_default {
|
||||
return EndpointConfigTelemetrySource::new(BASE_URL_SOURCE_DEFAULT, true);
|
||||
}
|
||||
|
||||
EndpointConfigTelemetrySource::new(BASE_URL_SOURCE_CONFIG_TOML, false)
|
||||
}
|
||||
|
||||
fn env_var_present(name: &str) -> bool {
|
||||
std::env::var(name)
|
||||
.ok()
|
||||
.is_some_and(|value| !value.trim().is_empty())
|
||||
}
|
||||
|
||||
fn classify_base_url(base_url: &str) -> (&'static str, &'static str) {
|
||||
let Ok(url) = Url::parse(base_url) else {
|
||||
return (BASE_URL_ORIGIN_CUSTOM, HOST_CLASS_CUSTOM_UNKNOWN);
|
||||
};
|
||||
let Some(host) = url.host_str().map(str::to_ascii_lowercase) else {
|
||||
return (BASE_URL_ORIGIN_CUSTOM, HOST_CLASS_CUSTOM_UNKNOWN);
|
||||
};
|
||||
|
||||
if matches!(host.as_str(), "chatgpt.com" | "chat.openai.com") {
|
||||
if is_chatgpt_codex_path(url.path()) {
|
||||
return (BASE_URL_ORIGIN_CHATGPT, HOST_CLASS_OPENAI_CHATGPT);
|
||||
}
|
||||
return (BASE_URL_ORIGIN_CHATGPT, HOST_CLASS_CUSTOM_UNKNOWN);
|
||||
}
|
||||
|
||||
if host == BASE_URL_ORIGIN_OPENAI_API {
|
||||
return (BASE_URL_ORIGIN_OPENAI_API, HOST_CLASS_OPENAI_API);
|
||||
}
|
||||
|
||||
if host == BASE_URL_ORIGIN_OPENROUTER || host.ends_with(".openrouter.ai") {
|
||||
return (BASE_URL_ORIGIN_OPENROUTER, HOST_CLASS_KNOWN_THIRD_PARTY);
|
||||
}
|
||||
|
||||
(BASE_URL_ORIGIN_CUSTOM, HOST_CLASS_CUSTOM_UNKNOWN)
|
||||
}
|
||||
|
||||
fn is_chatgpt_codex_path(path: &str) -> bool {
|
||||
path == "/backend-api/codex" || path.starts_with("/backend-api/codex/")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::EndpointConfigTelemetry;
|
||||
use super::EndpointConfigTelemetrySource;
|
||||
use super::endpoint_source_from_layer;
|
||||
use super::endpoint_source_from_provider_defaults;
|
||||
use crate::model_provider_info::WireApi;
|
||||
use crate::model_provider_info::create_oss_provider_with_base_url;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
fn provider(base_url: Option<&str>) -> crate::ModelProviderInfo {
|
||||
crate::ModelProviderInfo {
|
||||
name: "test-provider".to_string(),
|
||||
base_url: base_url.map(str::to_string),
|
||||
env_key: None,
|
||||
env_key_instructions: None,
|
||||
experimental_bearer_token: None,
|
||||
wire_api: crate::WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: true,
|
||||
supports_websockets: true,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn endpoint_config_telemetry_classifies_known_hosts_without_logging_custom_values() {
|
||||
let source = EndpointConfigTelemetrySource::new("config_toml", false);
|
||||
|
||||
assert_eq!(
|
||||
source.classify("https://chatgpt.com/backend-api/codex"),
|
||||
EndpointConfigTelemetry {
|
||||
base_url_origin: "chatgpt.com",
|
||||
host_class: "openai_chatgpt",
|
||||
base_url_source: "config_toml",
|
||||
base_url_is_default: false,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
source.classify("https://api.openai.com/v1"),
|
||||
EndpointConfigTelemetry {
|
||||
base_url_origin: "api.openai.com",
|
||||
host_class: "openai_api",
|
||||
base_url_source: "config_toml",
|
||||
base_url_is_default: false,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
source.classify("https://openrouter.ai/api/v1"),
|
||||
EndpointConfigTelemetry {
|
||||
base_url_origin: "openrouter.ai",
|
||||
host_class: "known_third_party",
|
||||
base_url_source: "config_toml",
|
||||
base_url_is_default: false,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
source.classify("https://private.example.internal/v1?token=secret"),
|
||||
EndpointConfigTelemetry {
|
||||
base_url_origin: "custom",
|
||||
host_class: "custom_unknown",
|
||||
base_url_source: "config_toml",
|
||||
base_url_is_default: false,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
source.classify("https://chatgpt.com/api/codex"),
|
||||
EndpointConfigTelemetry {
|
||||
base_url_origin: "chatgpt.com",
|
||||
host_class: "custom_unknown",
|
||||
base_url_source: "config_toml",
|
||||
base_url_is_default: false,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn endpoint_config_telemetry_source_maps_layers_and_defaults() {
|
||||
assert_eq!(
|
||||
endpoint_source_from_layer(&ConfigLayerSource::SessionFlags, SessionSource::VSCode),
|
||||
EndpointConfigTelemetrySource::new("ide_settings", false)
|
||||
);
|
||||
assert_eq!(
|
||||
endpoint_source_from_layer(
|
||||
&ConfigLayerSource::Project {
|
||||
dot_codex_folder: AbsolutePathBuf::try_from(std::path::PathBuf::from(
|
||||
"/tmp/project/.codex",
|
||||
))
|
||||
.expect("absolute path"),
|
||||
},
|
||||
SessionSource::Cli,
|
||||
),
|
||||
EndpointConfigTelemetrySource::new("config_toml", false)
|
||||
);
|
||||
assert_eq!(
|
||||
endpoint_source_from_provider_defaults("openai", &provider(None)),
|
||||
EndpointConfigTelemetrySource::new("default", true)
|
||||
);
|
||||
assert_eq!(
|
||||
endpoint_source_from_provider_defaults(
|
||||
"custom",
|
||||
&provider(Some("https://example.com/v1"))
|
||||
),
|
||||
EndpointConfigTelemetrySource::new("config_toml", false)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn endpoint_config_telemetry_source_requires_explicit_provider_id_for_builtin_oss_defaults() {
|
||||
let provider =
|
||||
create_oss_provider_with_base_url("http://localhost:1234/v1", WireApi::Responses);
|
||||
|
||||
assert_eq!(
|
||||
EndpointConfigTelemetrySource::for_provider("lmstudio", &provider),
|
||||
EndpointConfigTelemetrySource::new("default", true)
|
||||
);
|
||||
assert_eq!(
|
||||
EndpointConfigTelemetrySource::for_provider_without_id(&provider),
|
||||
EndpointConfigTelemetrySource::new("config_toml", false)
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -292,6 +292,8 @@ pub struct UnexpectedResponseError {
|
||||
pub url: Option<String>,
|
||||
pub cf_ray: Option<String>,
|
||||
pub request_id: Option<String>,
|
||||
pub identity_authorization_error: Option<String>,
|
||||
pub identity_error_code: Option<String>,
|
||||
}
|
||||
|
||||
const CLOUDFLARE_BLOCKED_MESSAGE: &str =
|
||||
@@ -346,6 +348,12 @@ impl UnexpectedResponseError {
|
||||
if let Some(id) = &self.request_id {
|
||||
message.push_str(&format!(", request id: {id}"));
|
||||
}
|
||||
if let Some(auth_error) = &self.identity_authorization_error {
|
||||
message.push_str(&format!(", auth error: {auth_error}"));
|
||||
}
|
||||
if let Some(error_code) = &self.identity_error_code {
|
||||
message.push_str(&format!(", auth error code: {error_code}"));
|
||||
}
|
||||
|
||||
Some(message)
|
||||
}
|
||||
@@ -368,6 +376,12 @@ impl std::fmt::Display for UnexpectedResponseError {
|
||||
if let Some(id) = &self.request_id {
|
||||
message.push_str(&format!(", request id: {id}"));
|
||||
}
|
||||
if let Some(auth_error) = &self.identity_authorization_error {
|
||||
message.push_str(&format!(", auth error: {auth_error}"));
|
||||
}
|
||||
if let Some(error_code) = &self.identity_error_code {
|
||||
message.push_str(&format!(", auth error code: {error_code}"));
|
||||
}
|
||||
write!(f, "{message}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -328,6 +328,8 @@ fn unexpected_status_cloudflare_html_is_simplified() {
|
||||
url: Some("http://example.com/blocked".to_string()),
|
||||
cf_ray: Some("ray-id".to_string()),
|
||||
request_id: None,
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
};
|
||||
let status = StatusCode::FORBIDDEN.to_string();
|
||||
let url = "http://example.com/blocked";
|
||||
@@ -345,6 +347,8 @@ fn unexpected_status_non_html_is_unchanged() {
|
||||
url: Some("http://example.com/plain".to_string()),
|
||||
cf_ray: None,
|
||||
request_id: None,
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
};
|
||||
let status = StatusCode::FORBIDDEN.to_string();
|
||||
let url = "http://example.com/plain";
|
||||
@@ -363,6 +367,8 @@ fn unexpected_status_prefers_error_message_when_present() {
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
cf_ray: None,
|
||||
request_id: Some("req-123".to_string()),
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
};
|
||||
let status = StatusCode::UNAUTHORIZED.to_string();
|
||||
assert_eq!(
|
||||
@@ -382,6 +388,8 @@ fn unexpected_status_truncates_long_body_with_ellipsis() {
|
||||
url: Some("http://example.com/long".to_string()),
|
||||
cf_ray: None,
|
||||
request_id: Some("req-long".to_string()),
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
};
|
||||
let status = StatusCode::BAD_GATEWAY.to_string();
|
||||
let expected_body = format!("{}...", "x".repeat(UNEXPECTED_RESPONSE_BODY_MAX_BYTES));
|
||||
@@ -401,6 +409,8 @@ fn unexpected_status_includes_cf_ray_and_request_id() {
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
cf_ray: Some("9c81f9f18f2fa49d-LHR".to_string()),
|
||||
request_id: Some("req-xyz".to_string()),
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
};
|
||||
let status = StatusCode::UNAUTHORIZED.to_string();
|
||||
assert_eq!(
|
||||
@@ -411,6 +421,26 @@ fn unexpected_status_includes_cf_ray_and_request_id() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unexpected_status_includes_identity_auth_details() {
|
||||
let err = UnexpectedResponseError {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
body: "plain text error".to_string(),
|
||||
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
|
||||
cf_ray: Some("9daa94119a96d1e1-ICN".to_string()),
|
||||
request_id: Some("req-auth".to_string()),
|
||||
identity_authorization_error: Some("missing_authorization_header".to_string()),
|
||||
identity_error_code: Some("token_expired".to_string()),
|
||||
};
|
||||
let status = StatusCode::UNAUTHORIZED.to_string();
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
format!(
|
||||
"unexpected status {status}: plain text error, url: https://chatgpt.com/backend-api/codex/models, cf-ray: 9daa94119a96d1e1-ICN, request id: req-auth, auth error: missing_authorization_header, auth error code: token_expired"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn usage_limit_reached_includes_hours_and_minutes() {
|
||||
let base = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
|
||||
|
||||
@@ -31,6 +31,7 @@ pub mod connectors;
|
||||
mod context_manager;
|
||||
mod contextual_user_message;
|
||||
pub mod custom_prompts;
|
||||
mod endpoint_config_telemetry;
|
||||
pub mod env;
|
||||
mod environment_context;
|
||||
pub mod error;
|
||||
@@ -86,6 +87,7 @@ pub use model_provider_info::WireApi;
|
||||
pub use model_provider_info::built_in_model_providers;
|
||||
pub use model_provider_info::create_oss_provider_with_base_url;
|
||||
mod event_mapping;
|
||||
mod response_debug_context;
|
||||
pub mod review_format;
|
||||
pub mod review_prompts;
|
||||
mod seatbelt_permissions;
|
||||
|
||||
@@ -5,14 +5,21 @@ use crate::auth::AuthManager;
|
||||
use crate::auth::AuthMode;
|
||||
use crate::config::Config;
|
||||
use crate::default_client::build_reqwest_client;
|
||||
use crate::default_client::current_residency_header_telemetry;
|
||||
use crate::endpoint_config_telemetry::EndpointConfigTelemetrySource;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CoreResult;
|
||||
use crate::model_provider_info::ModelProviderInfo;
|
||||
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use crate::models_manager::collaboration_mode_presets::builtin_collaboration_mode_presets;
|
||||
use crate::models_manager::model_info;
|
||||
use crate::response_debug_context::extract_response_debug_context;
|
||||
use crate::response_debug_context::telemetry_transport_error_message;
|
||||
use codex_api::ModelsClient;
|
||||
use codex_api::RequestTelemetry;
|
||||
use codex_api::ReqwestTransport;
|
||||
use codex_api::TransportError;
|
||||
use codex_otel::TelemetryAuthMode;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::openai_models::ModelPreset;
|
||||
@@ -30,6 +37,116 @@ use tracing::info;
|
||||
const MODEL_CACHE_FILE: &str = "models_cache.json";
|
||||
const DEFAULT_MODEL_CACHE_TTL: Duration = Duration::from_secs(300);
|
||||
const MODELS_REFRESH_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const MODELS_ENDPOINT: &str = "/models";
|
||||
const OPENAI_PROVIDER_ID: &str = "openai";
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ModelsRequestTelemetry {
|
||||
auth_mode: Option<String>,
|
||||
residency_header_attached: bool,
|
||||
residency_header_value: Option<&'static str>,
|
||||
base_url_origin: &'static str,
|
||||
host_class: &'static str,
|
||||
base_url_source: &'static str,
|
||||
base_url_is_default: bool,
|
||||
}
|
||||
|
||||
impl RequestTelemetry for ModelsRequestTelemetry {
|
||||
fn on_request(
|
||||
&self,
|
||||
attempt: u64,
|
||||
status: Option<http::StatusCode>,
|
||||
error: Option<&TransportError>,
|
||||
duration: Duration,
|
||||
) {
|
||||
let error_message = error.map(telemetry_transport_error_message);
|
||||
let response_debug = error
|
||||
.map(extract_response_debug_context)
|
||||
.unwrap_or_default();
|
||||
let status = status.map(|status| status.as_u16());
|
||||
tracing::event!(
|
||||
target: "codex_otel.log_only",
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.api_request",
|
||||
duration_ms = %duration.as_millis(),
|
||||
http.response.status_code = status,
|
||||
error.message = error_message.as_deref(),
|
||||
attempt = attempt,
|
||||
endpoint = MODELS_ENDPOINT,
|
||||
residency_header_attached = self.residency_header_attached,
|
||||
residency_header_value = self.residency_header_value,
|
||||
base_url_origin = self.base_url_origin,
|
||||
host_class = self.host_class,
|
||||
base_url_source = self.base_url_source,
|
||||
base_url_is_default = self.base_url_is_default,
|
||||
auth.request_id = response_debug.request_id.as_deref(),
|
||||
auth.cf_ray = response_debug.cf_ray.as_deref(),
|
||||
auth.error = response_debug.auth_error.as_deref(),
|
||||
auth.error_code = response_debug.auth_error_code.as_deref(),
|
||||
error_body_class = response_debug.error_body_class,
|
||||
safe_error_message = response_debug.safe_error_message,
|
||||
auth_mode = self.auth_mode.as_deref(),
|
||||
);
|
||||
tracing::event!(
|
||||
target: "codex_otel.trace_safe",
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.api_request",
|
||||
duration_ms = %duration.as_millis(),
|
||||
http.response.status_code = status,
|
||||
error.message = error_message.as_deref(),
|
||||
attempt = attempt,
|
||||
endpoint = MODELS_ENDPOINT,
|
||||
residency_header_attached = self.residency_header_attached,
|
||||
residency_header_value = self.residency_header_value,
|
||||
base_url_origin = self.base_url_origin,
|
||||
host_class = self.host_class,
|
||||
base_url_source = self.base_url_source,
|
||||
base_url_is_default = self.base_url_is_default,
|
||||
auth.request_id = response_debug.request_id.as_deref(),
|
||||
auth.cf_ray = response_debug.cf_ray.as_deref(),
|
||||
auth.error = response_debug.auth_error.as_deref(),
|
||||
auth.error_code = response_debug.auth_error_code.as_deref(),
|
||||
error_body_class = response_debug.error_body_class,
|
||||
safe_error_message = response_debug.safe_error_message,
|
||||
auth_mode = self.auth_mode.as_deref(),
|
||||
);
|
||||
|
||||
if status == Some(http::StatusCode::UNAUTHORIZED.as_u16())
|
||||
&& response_debug.geo_denial_detected
|
||||
{
|
||||
tracing::event!(
|
||||
target: "codex_otel.log_only",
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.geo_denial",
|
||||
geo_denial_detected = true,
|
||||
request_id = response_debug.request_id.as_deref(),
|
||||
cf_ray = response_debug.cf_ray.as_deref(),
|
||||
endpoint = MODELS_ENDPOINT,
|
||||
auth_mode = self.auth_mode.as_deref(),
|
||||
residency_header_attached = self.residency_header_attached,
|
||||
residency_header_value = self.residency_header_value,
|
||||
http_status = status,
|
||||
error_body_class = response_debug.error_body_class.unwrap_or_default(),
|
||||
safe_error_message = response_debug.safe_error_message,
|
||||
);
|
||||
tracing::event!(
|
||||
target: "codex_otel.trace_safe",
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.geo_denial",
|
||||
geo_denial_detected = true,
|
||||
request_id = response_debug.request_id.as_deref(),
|
||||
cf_ray = response_debug.cf_ray.as_deref(),
|
||||
endpoint = MODELS_ENDPOINT,
|
||||
auth_mode = self.auth_mode.as_deref(),
|
||||
residency_header_attached = self.residency_header_attached,
|
||||
residency_header_value = self.residency_header_value,
|
||||
http_status = status,
|
||||
error_body_class = response_debug.error_body_class.unwrap_or_default(),
|
||||
safe_error_message = response_debug.safe_error_message,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Strategy for refreshing available models.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -285,7 +402,21 @@ impl ModelsManager {
|
||||
let api_provider = self.provider.to_api_provider(auth_mode)?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider)?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let client = ModelsClient::new(transport, api_provider, api_auth);
|
||||
let endpoint_telemetry =
|
||||
EndpointConfigTelemetrySource::for_provider(OPENAI_PROVIDER_ID, &self.provider)
|
||||
.classify(api_provider.base_url.as_str());
|
||||
let residency = current_residency_header_telemetry();
|
||||
let request_telemetry: Arc<dyn RequestTelemetry> = Arc::new(ModelsRequestTelemetry {
|
||||
auth_mode: auth_mode.map(|mode| TelemetryAuthMode::from(mode).to_string()),
|
||||
residency_header_attached: residency.attached,
|
||||
residency_header_value: residency.value,
|
||||
base_url_origin: endpoint_telemetry.base_url_origin,
|
||||
host_class: endpoint_telemetry.host_class,
|
||||
base_url_source: endpoint_telemetry.base_url_source,
|
||||
base_url_is_default: endpoint_telemetry.base_url_is_default,
|
||||
});
|
||||
let client = ModelsClient::new(transport, api_provider, api_auth)
|
||||
.with_telemetry(Some(request_telemetry));
|
||||
|
||||
let client_version = crate::models_manager::client_version_to_whole();
|
||||
let (models, etag) = timeout(
|
||||
|
||||
232
codex-rs/core/src/response_debug_context.rs
Normal file
232
codex-rs/core/src/response_debug_context.rs
Normal file
@@ -0,0 +1,232 @@
|
||||
use base64::Engine;
|
||||
use codex_api::TransportError;
|
||||
use codex_api::error::ApiError;
|
||||
|
||||
const REQUEST_ID_HEADER: &str = "x-request-id";
|
||||
const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id";
|
||||
const CF_RAY_HEADER: &str = "cf-ray";
|
||||
const AUTH_ERROR_HEADER: &str = "x-openai-authorization-error";
|
||||
const X_ERROR_JSON_HEADER: &str = "x-error-json";
|
||||
const WORKSPACE_NOT_AUTHORIZED_IN_REGION_MESSAGE: &str =
|
||||
"Workspace is not authorized in this region.";
|
||||
pub(crate) const WORKSPACE_NOT_AUTHORIZED_IN_REGION_CLASS: &str =
|
||||
"workspace_not_authorized_in_region";
|
||||
const MAX_ERROR_BODY_BYTES: usize = 1000;
|
||||
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct ResponseDebugContext {
|
||||
pub(crate) request_id: Option<String>,
|
||||
pub(crate) cf_ray: Option<String>,
|
||||
pub(crate) auth_error: Option<String>,
|
||||
pub(crate) auth_error_code: Option<String>,
|
||||
pub(crate) safe_error_message: Option<&'static str>,
|
||||
pub(crate) error_body_class: Option<&'static str>,
|
||||
pub(crate) geo_denial_detected: bool,
|
||||
}
|
||||
|
||||
pub(crate) fn extract_response_debug_context(transport: &TransportError) -> ResponseDebugContext {
|
||||
let mut context = ResponseDebugContext::default();
|
||||
|
||||
let TransportError::Http { headers, body, .. } = transport else {
|
||||
return context;
|
||||
};
|
||||
|
||||
let extract_header = |name: &str| {
|
||||
headers
|
||||
.as_ref()
|
||||
.and_then(|headers| headers.get(name))
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(str::to_string)
|
||||
};
|
||||
|
||||
context.request_id =
|
||||
extract_header(REQUEST_ID_HEADER).or_else(|| extract_header(OAI_REQUEST_ID_HEADER));
|
||||
context.cf_ray = extract_header(CF_RAY_HEADER);
|
||||
context.auth_error = extract_header(AUTH_ERROR_HEADER);
|
||||
context.auth_error_code = extract_header(X_ERROR_JSON_HEADER).and_then(|encoded| {
|
||||
let decoded = base64::engine::general_purpose::STANDARD
|
||||
.decode(encoded)
|
||||
.ok()?;
|
||||
let parsed = serde_json::from_slice::<serde_json::Value>(&decoded).ok()?;
|
||||
parsed
|
||||
.get("error")
|
||||
.and_then(|error| error.get("code"))
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(str::to_string)
|
||||
});
|
||||
let error_body = extract_error_body(body.as_deref());
|
||||
context.safe_error_message = error_body
|
||||
.as_deref()
|
||||
.and_then(allowlisted_error_body_message);
|
||||
context.error_body_class = error_body.as_deref().and_then(classify_error_body_message);
|
||||
context.geo_denial_detected =
|
||||
context.error_body_class == Some(WORKSPACE_NOT_AUTHORIZED_IN_REGION_CLASS);
|
||||
|
||||
context
|
||||
}
|
||||
|
||||
pub(crate) fn extract_response_debug_context_from_api_error(
|
||||
error: &ApiError,
|
||||
) -> ResponseDebugContext {
|
||||
match error {
|
||||
ApiError::Transport(transport) => extract_response_debug_context(transport),
|
||||
_ => ResponseDebugContext::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn telemetry_transport_error_message(error: &TransportError) -> String {
|
||||
match error {
|
||||
TransportError::Http { status, .. } => format!("http {}", status.as_u16()),
|
||||
TransportError::RetryLimit => "retry limit reached".to_string(),
|
||||
TransportError::Timeout => "timeout".to_string(),
|
||||
TransportError::Network(_) => "network error".to_string(),
|
||||
TransportError::Build(_) => "request build error".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn telemetry_api_error_message(error: &ApiError) -> String {
|
||||
match error {
|
||||
ApiError::Transport(transport) => telemetry_transport_error_message(transport),
|
||||
ApiError::Api { status, .. } => format!("api error {}", status.as_u16()),
|
||||
ApiError::Stream(_) => "stream error".to_string(),
|
||||
ApiError::ContextWindowExceeded => "context window exceeded".to_string(),
|
||||
ApiError::QuotaExceeded => "quota exceeded".to_string(),
|
||||
ApiError::UsageNotIncluded => "usage not included".to_string(),
|
||||
ApiError::Retryable { .. } => "retryable error".to_string(),
|
||||
ApiError::RateLimit(_) => "rate limit".to_string(),
|
||||
ApiError::InvalidRequest { .. } => "invalid request".to_string(),
|
||||
ApiError::ServerOverloaded => "server overloaded".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_error_body(body: Option<&str>) -> Option<String> {
|
||||
let body = body?;
|
||||
if let Some(message) = extract_error_message(body) {
|
||||
return Some(message);
|
||||
}
|
||||
|
||||
let trimmed = body.trim();
|
||||
if trimmed.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(truncate_with_ellipsis(trimmed, MAX_ERROR_BODY_BYTES))
|
||||
}
|
||||
|
||||
fn extract_error_message(body: &str) -> Option<String> {
|
||||
let json = serde_json::from_str::<serde_json::Value>(body).ok()?;
|
||||
let message = json
|
||||
.get("error")
|
||||
.and_then(|error| error.get("message"))
|
||||
.and_then(serde_json::Value::as_str)?;
|
||||
let message = message.trim();
|
||||
if message.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(message.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
fn classify_error_body_message(message: &str) -> Option<&'static str> {
|
||||
if message == WORKSPACE_NOT_AUTHORIZED_IN_REGION_MESSAGE {
|
||||
Some(WORKSPACE_NOT_AUTHORIZED_IN_REGION_CLASS)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn allowlisted_error_body_message(message: &str) -> Option<&'static str> {
|
||||
if message == WORKSPACE_NOT_AUTHORIZED_IN_REGION_MESSAGE {
|
||||
Some(WORKSPACE_NOT_AUTHORIZED_IN_REGION_MESSAGE)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate_with_ellipsis(input: &str, max_bytes: usize) -> String {
|
||||
if input.len() <= max_bytes {
|
||||
return input.to_string();
|
||||
}
|
||||
|
||||
let ellipsis = "...";
|
||||
let keep = max_bytes.saturating_sub(ellipsis.len());
|
||||
let mut truncated = String::new();
|
||||
let mut used = 0usize;
|
||||
for ch in input.chars() {
|
||||
let len = ch.len_utf8();
|
||||
if used + len > keep {
|
||||
break;
|
||||
}
|
||||
truncated.push(ch);
|
||||
used += len;
|
||||
}
|
||||
truncated.push_str(ellipsis);
|
||||
truncated
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::ResponseDebugContext;
|
||||
use super::WORKSPACE_NOT_AUTHORIZED_IN_REGION_CLASS;
|
||||
use super::extract_response_debug_context;
|
||||
use super::telemetry_api_error_message;
|
||||
use super::telemetry_transport_error_message;
|
||||
use codex_api::TransportError;
|
||||
use codex_api::error::ApiError;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use http::StatusCode;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn extract_response_debug_context_decodes_geo_denial_details() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-oai-request-id", HeaderValue::from_static("req-geo"));
|
||||
headers.insert("cf-ray", HeaderValue::from_static("ray-geo"));
|
||||
headers.insert(
|
||||
"x-error-json",
|
||||
HeaderValue::from_static(
|
||||
"eyJlcnJvciI6eyJjb2RlIjoid29ya3NwYWNlX25vdF9hdXRob3JpemVkX2luX3JlZ2lvbiJ9fQ==",
|
||||
),
|
||||
);
|
||||
|
||||
let context = extract_response_debug_context(&TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
headers: Some(headers),
|
||||
body: Some(
|
||||
r#"{"error":{"message":"Workspace is not authorized in this region."},"status":401}"#
|
||||
.to_string(),
|
||||
),
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
context,
|
||||
ResponseDebugContext {
|
||||
request_id: Some("req-geo".to_string()),
|
||||
cf_ray: Some("ray-geo".to_string()),
|
||||
auth_error: None,
|
||||
auth_error_code: Some("workspace_not_authorized_in_region".to_string()),
|
||||
safe_error_message: Some("Workspace is not authorized in this region."),
|
||||
error_body_class: Some(WORKSPACE_NOT_AUTHORIZED_IN_REGION_CLASS),
|
||||
geo_denial_detected: true,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telemetry_error_messages_omit_http_bodies() {
|
||||
let transport = TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
headers: None,
|
||||
body: Some(r#"{"error":{"message":"secret token leaked"}}"#.to_string()),
|
||||
};
|
||||
|
||||
assert_eq!(telemetry_transport_error_message(&transport), "http 401");
|
||||
assert_eq!(
|
||||
telemetry_api_error_message(&ApiError::Transport(transport)),
|
||||
"http 401"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -295,6 +295,10 @@ impl SessionTelemetry {
|
||||
pub fn conversation_starts(
|
||||
&self,
|
||||
provider_name: &str,
|
||||
base_url_origin: &str,
|
||||
host_class: &str,
|
||||
base_url_source: &str,
|
||||
base_url_is_default: bool,
|
||||
reasoning_effort: Option<ReasoningEffort>,
|
||||
reasoning_summary: ReasoningSummary,
|
||||
context_window: Option<i64>,
|
||||
@@ -309,6 +313,10 @@ impl SessionTelemetry {
|
||||
common: {
|
||||
event.name = "codex.conversation_starts",
|
||||
provider_name = %provider_name,
|
||||
base_url_origin = base_url_origin,
|
||||
host_class = host_class,
|
||||
base_url_source = base_url_source,
|
||||
base_url_is_default = base_url_is_default,
|
||||
reasoning_effort = reasoning_effort.map(|e| e.to_string()),
|
||||
reasoning_summary = %reasoning_summary,
|
||||
context_window = context_window,
|
||||
@@ -340,17 +348,57 @@ impl SessionTelemetry {
|
||||
Ok(response) => (Some(response.status().as_u16()), None),
|
||||
Err(error) => (error.status().map(|s| s.as_u16()), Some(error.to_string())),
|
||||
};
|
||||
self.record_api_request(attempt, status, error.as_deref(), duration);
|
||||
self.record_api_request(
|
||||
attempt,
|
||||
status,
|
||||
error.as_deref(),
|
||||
duration,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
None,
|
||||
"unknown",
|
||||
false,
|
||||
None,
|
||||
"custom",
|
||||
"custom_unknown",
|
||||
"default",
|
||||
false,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
response
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn record_api_request(
|
||||
&self,
|
||||
attempt: u64,
|
||||
status: Option<u16>,
|
||||
error: Option<&str>,
|
||||
duration: Duration,
|
||||
auth_header_attached: bool,
|
||||
retry_after_unauthorized: bool,
|
||||
recovery_mode: Option<&str>,
|
||||
recovery_phase: Option<&str>,
|
||||
endpoint: &str,
|
||||
residency_header_attached: bool,
|
||||
residency_header_value: Option<&str>,
|
||||
base_url_origin: &str,
|
||||
host_class: &str,
|
||||
base_url_source: &str,
|
||||
base_url_is_default: bool,
|
||||
request_id: Option<&str>,
|
||||
cf_ray: Option<&str>,
|
||||
auth_error: Option<&str>,
|
||||
auth_error_code: Option<&str>,
|
||||
error_body_class: Option<&str>,
|
||||
safe_error_message: Option<&str>,
|
||||
) {
|
||||
let success = status.is_some_and(|code| (200..=299).contains(&code)) && error.is_none();
|
||||
let success_str = if success { "true" } else { "false" };
|
||||
@@ -375,6 +423,114 @@ impl SessionTelemetry {
|
||||
http.response.status_code = status,
|
||||
error.message = error,
|
||||
attempt = attempt,
|
||||
auth.header_attached = auth_header_attached,
|
||||
auth.retry_after_unauthorized = retry_after_unauthorized,
|
||||
auth.recovery_mode = recovery_mode,
|
||||
auth.recovery_phase = recovery_phase,
|
||||
endpoint = endpoint,
|
||||
residency_header_attached = residency_header_attached,
|
||||
residency_header_value = residency_header_value,
|
||||
base_url_origin = base_url_origin,
|
||||
host_class = host_class,
|
||||
base_url_source = base_url_source,
|
||||
base_url_is_default = base_url_is_default,
|
||||
auth.request_id = request_id,
|
||||
auth.cf_ray = cf_ray,
|
||||
auth.error = auth_error,
|
||||
auth.error_code = auth_error_code,
|
||||
error_body_class = error_body_class,
|
||||
safe_error_message = safe_error_message,
|
||||
},
|
||||
log: {},
|
||||
trace: {},
|
||||
);
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn record_websocket_connect(
|
||||
&self,
|
||||
duration: Duration,
|
||||
status: Option<u16>,
|
||||
error: Option<&str>,
|
||||
auth_header_attached: bool,
|
||||
retry_after_unauthorized: bool,
|
||||
recovery_mode: Option<&str>,
|
||||
recovery_phase: Option<&str>,
|
||||
endpoint: &str,
|
||||
residency_header_attached: bool,
|
||||
residency_header_value: Option<&str>,
|
||||
base_url_origin: &str,
|
||||
host_class: &str,
|
||||
base_url_source: &str,
|
||||
base_url_is_default: bool,
|
||||
request_id: Option<&str>,
|
||||
cf_ray: Option<&str>,
|
||||
auth_error: Option<&str>,
|
||||
auth_error_code: Option<&str>,
|
||||
error_body_class: Option<&str>,
|
||||
safe_error_message: Option<&str>,
|
||||
) {
|
||||
let success = error.is_none()
|
||||
&& status
|
||||
.map(|code| (200..=299).contains(&code))
|
||||
.unwrap_or(true);
|
||||
let success_str = if success { "true" } else { "false" };
|
||||
log_and_trace_event!(
|
||||
self,
|
||||
common: {
|
||||
event.name = "codex.websocket_connect",
|
||||
duration_ms = %duration.as_millis(),
|
||||
http.response.status_code = status,
|
||||
success = success_str,
|
||||
error.message = error,
|
||||
auth.header_attached = auth_header_attached,
|
||||
auth.retry_after_unauthorized = retry_after_unauthorized,
|
||||
auth.recovery_mode = recovery_mode,
|
||||
auth.recovery_phase = recovery_phase,
|
||||
endpoint = endpoint,
|
||||
residency_header_attached = residency_header_attached,
|
||||
residency_header_value = residency_header_value,
|
||||
base_url_origin = base_url_origin,
|
||||
host_class = host_class,
|
||||
base_url_source = base_url_source,
|
||||
base_url_is_default = base_url_is_default,
|
||||
auth.request_id = request_id,
|
||||
auth.cf_ray = cf_ray,
|
||||
auth.error = auth_error,
|
||||
auth.error_code = auth_error_code,
|
||||
error_body_class = error_body_class,
|
||||
safe_error_message = safe_error_message,
|
||||
},
|
||||
log: {},
|
||||
trace: {},
|
||||
);
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn record_geo_denial(
|
||||
&self,
|
||||
endpoint: &str,
|
||||
residency_header_attached: bool,
|
||||
residency_header_value: Option<&str>,
|
||||
http_status: Option<u16>,
|
||||
request_id: Option<&str>,
|
||||
cf_ray: Option<&str>,
|
||||
error_body_class: &str,
|
||||
safe_error_message: Option<&str>,
|
||||
) {
|
||||
log_and_trace_event!(
|
||||
self,
|
||||
common: {
|
||||
event.name = "codex.geo_denial",
|
||||
geo_denial_detected = true,
|
||||
request_id = request_id,
|
||||
cf_ray = cf_ray,
|
||||
endpoint = endpoint,
|
||||
residency_header_attached = residency_header_attached,
|
||||
residency_header_value = residency_header_value,
|
||||
http_status = http_status,
|
||||
error_body_class = error_body_class,
|
||||
safe_error_message = safe_error_message,
|
||||
},
|
||||
log: {},
|
||||
trace: {},
|
||||
@@ -406,6 +562,38 @@ impl SessionTelemetry {
|
||||
);
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn record_auth_recovery(
|
||||
&self,
|
||||
mode: &str,
|
||||
step: &str,
|
||||
outcome: &str,
|
||||
request_id: Option<&str>,
|
||||
cf_ray: Option<&str>,
|
||||
auth_error: Option<&str>,
|
||||
auth_error_code: Option<&str>,
|
||||
recovery_reason: Option<&str>,
|
||||
auth_state_changed: Option<bool>,
|
||||
) {
|
||||
log_and_trace_event!(
|
||||
self,
|
||||
common: {
|
||||
event.name = "codex.auth_recovery",
|
||||
auth.mode = mode,
|
||||
auth.step = step,
|
||||
auth.outcome = outcome,
|
||||
auth.request_id = request_id,
|
||||
auth.cf_ray = cf_ray,
|
||||
auth.error = auth_error,
|
||||
auth.error_code = auth_error_code,
|
||||
auth.recovery_reason = recovery_reason,
|
||||
auth.state_changed = auth_state_changed,
|
||||
},
|
||||
log: {},
|
||||
trace: {},
|
||||
);
|
||||
}
|
||||
|
||||
pub fn record_websocket_event(
|
||||
&self,
|
||||
result: &Result<
|
||||
|
||||
@@ -297,3 +297,662 @@ fn otel_export_routing_policy_routes_tool_result_log_and_trace_events() {
|
||||
assert!(!tool_trace_attrs.contains_key("mcp_server"));
|
||||
assert!(!tool_trace_attrs.contains_key("mcp_server_origin"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otel_export_routing_policy_routes_auth_recovery_log_and_trace_events() {
|
||||
let log_exporter = InMemoryLogExporter::default();
|
||||
let logger_provider = SdkLoggerProvider::builder()
|
||||
.with_simple_exporter(log_exporter.clone())
|
||||
.build();
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("sink-split-test");
|
||||
|
||||
let subscriber = tracing_subscriber::registry()
|
||||
.with(
|
||||
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
|
||||
&logger_provider,
|
||||
)
|
||||
.with_filter(filter_fn(OtelProvider::log_export_filter)),
|
||||
)
|
||||
.with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
let manager = SessionTelemetry::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
Some("engineer@example.com".to_string()),
|
||||
Some(TelemetryAuthMode::Chatgpt),
|
||||
"codex_exec".to_string(),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let root_span = tracing::info_span!("root");
|
||||
let _root_guard = root_span.enter();
|
||||
manager.record_auth_recovery(
|
||||
"managed",
|
||||
"reload",
|
||||
"recovery_succeeded",
|
||||
Some("req-401"),
|
||||
Some("ray-401"),
|
||||
Some("missing_authorization_header"),
|
||||
Some("token_expired"),
|
||||
None,
|
||||
Some(true),
|
||||
);
|
||||
});
|
||||
|
||||
logger_provider.force_flush().expect("flush logs");
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
|
||||
let logs = log_exporter.get_emitted_logs().expect("log export");
|
||||
let recovery_log = find_log_by_event_name(&logs, "codex.auth_recovery");
|
||||
let recovery_log_attrs = log_attributes(&recovery_log.record);
|
||||
assert_eq!(
|
||||
recovery_log_attrs.get("auth.mode").map(String::as_str),
|
||||
Some("managed")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs.get("auth.step").map(String::as_str),
|
||||
Some("reload")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs.get("auth.outcome").map(String::as_str),
|
||||
Some("recovery_succeeded")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs
|
||||
.get("auth.request_id")
|
||||
.map(String::as_str),
|
||||
Some("req-401")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs.get("auth.cf_ray").map(String::as_str),
|
||||
Some("ray-401")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs.get("auth.error").map(String::as_str),
|
||||
Some("missing_authorization_header")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs
|
||||
.get("auth.error_code")
|
||||
.map(String::as_str),
|
||||
Some("token_expired")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs
|
||||
.get("auth.state_changed")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
assert_eq!(spans.len(), 1);
|
||||
let span_events = &spans[0].events.events;
|
||||
assert_eq!(span_events.len(), 1);
|
||||
|
||||
let recovery_trace_event = find_span_event_by_name_attr(span_events, "codex.auth_recovery");
|
||||
let recovery_trace_attrs = span_event_attributes(recovery_trace_event);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs.get("auth.mode").map(String::as_str),
|
||||
Some("managed")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs.get("auth.step").map(String::as_str),
|
||||
Some("reload")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs.get("auth.outcome").map(String::as_str),
|
||||
Some("recovery_succeeded")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs
|
||||
.get("auth.request_id")
|
||||
.map(String::as_str),
|
||||
Some("req-401")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs.get("auth.cf_ray").map(String::as_str),
|
||||
Some("ray-401")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs.get("auth.error").map(String::as_str),
|
||||
Some("missing_authorization_header")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs
|
||||
.get("auth.error_code")
|
||||
.map(String::as_str),
|
||||
Some("token_expired")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs
|
||||
.get("auth.state_changed")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otel_export_routing_policy_routes_api_request_auth_observability() {
|
||||
let log_exporter = InMemoryLogExporter::default();
|
||||
let logger_provider = SdkLoggerProvider::builder()
|
||||
.with_simple_exporter(log_exporter.clone())
|
||||
.build();
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("sink-split-test");
|
||||
|
||||
let subscriber = tracing_subscriber::registry()
|
||||
.with(
|
||||
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
|
||||
&logger_provider,
|
||||
)
|
||||
.with_filter(filter_fn(OtelProvider::log_export_filter)),
|
||||
)
|
||||
.with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
let manager = SessionTelemetry::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
Some("engineer@example.com".to_string()),
|
||||
Some(TelemetryAuthMode::Chatgpt),
|
||||
"codex_exec".to_string(),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let root_span = tracing::info_span!("root");
|
||||
let _root_guard = root_span.enter();
|
||||
manager.record_api_request(
|
||||
1,
|
||||
Some(401),
|
||||
Some("http 401"),
|
||||
std::time::Duration::from_millis(42),
|
||||
true,
|
||||
true,
|
||||
Some("managed"),
|
||||
Some("refresh_token"),
|
||||
"/responses",
|
||||
true,
|
||||
Some("us"),
|
||||
"chatgpt.com",
|
||||
"openai_chatgpt",
|
||||
"ide_settings",
|
||||
false,
|
||||
Some("req-401"),
|
||||
Some("ray-401"),
|
||||
Some("missing_authorization_header"),
|
||||
Some("token_expired"),
|
||||
Some("workspace_not_authorized_in_region"),
|
||||
Some("Workspace is not authorized in this region."),
|
||||
);
|
||||
});
|
||||
|
||||
logger_provider.force_flush().expect("flush logs");
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
|
||||
let logs = log_exporter.get_emitted_logs().expect("log export");
|
||||
let request_log = find_log_by_event_name(&logs, "codex.api_request");
|
||||
let request_log_attrs = log_attributes(&request_log.record);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("auth.header_attached")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("auth.retry_after_unauthorized")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("auth.recovery_mode")
|
||||
.map(String::as_str),
|
||||
Some("managed")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("auth.recovery_phase")
|
||||
.map(String::as_str),
|
||||
Some("refresh_token")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs.get("endpoint").map(String::as_str),
|
||||
Some("/responses")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("residency_header_attached")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("residency_header_value")
|
||||
.map(String::as_str),
|
||||
Some("us")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs.get("base_url_origin").map(String::as_str),
|
||||
Some("chatgpt.com")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs.get("host_class").map(String::as_str),
|
||||
Some("openai_chatgpt")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs.get("base_url_source").map(String::as_str),
|
||||
Some("ide_settings")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("base_url_is_default")
|
||||
.map(String::as_str),
|
||||
Some("false")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs.get("auth.error").map(String::as_str),
|
||||
Some("missing_authorization_header")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("error_body_class")
|
||||
.map(String::as_str),
|
||||
Some("workspace_not_authorized_in_region")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("safe_error_message")
|
||||
.map(String::as_str),
|
||||
Some("Workspace is not authorized in this region.")
|
||||
);
|
||||
assert!(!request_log_attrs.contains_key("error_body"));
|
||||
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
let request_trace_event =
|
||||
find_span_event_by_name_attr(&spans[0].events.events, "codex.api_request");
|
||||
let request_trace_attrs = span_event_attributes(request_trace_event);
|
||||
assert_eq!(
|
||||
request_trace_attrs
|
||||
.get("auth.header_attached")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
request_trace_attrs
|
||||
.get("auth.retry_after_unauthorized")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
request_trace_attrs
|
||||
.get("base_url_origin")
|
||||
.map(String::as_str),
|
||||
Some("chatgpt.com")
|
||||
);
|
||||
assert_eq!(
|
||||
request_trace_attrs.get("endpoint").map(String::as_str),
|
||||
Some("/responses")
|
||||
);
|
||||
assert_eq!(
|
||||
request_trace_attrs
|
||||
.get("safe_error_message")
|
||||
.map(String::as_str),
|
||||
Some("Workspace is not authorized in this region.")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otel_export_routing_policy_routes_websocket_connect_auth_observability() {
|
||||
let log_exporter = InMemoryLogExporter::default();
|
||||
let logger_provider = SdkLoggerProvider::builder()
|
||||
.with_simple_exporter(log_exporter.clone())
|
||||
.build();
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("sink-split-test");
|
||||
|
||||
let subscriber = tracing_subscriber::registry()
|
||||
.with(
|
||||
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
|
||||
&logger_provider,
|
||||
)
|
||||
.with_filter(filter_fn(OtelProvider::log_export_filter)),
|
||||
)
|
||||
.with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
let manager = SessionTelemetry::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
Some("engineer@example.com".to_string()),
|
||||
Some(TelemetryAuthMode::Chatgpt),
|
||||
"codex_exec".to_string(),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let root_span = tracing::info_span!("root");
|
||||
let _root_guard = root_span.enter();
|
||||
manager.record_websocket_connect(
|
||||
std::time::Duration::from_millis(17),
|
||||
Some(401),
|
||||
Some("http 401"),
|
||||
true,
|
||||
true,
|
||||
Some("managed"),
|
||||
Some("reload"),
|
||||
"/responses",
|
||||
true,
|
||||
Some("us"),
|
||||
"openrouter.ai",
|
||||
"known_third_party",
|
||||
"config_toml",
|
||||
false,
|
||||
Some("req-ws-401"),
|
||||
Some("ray-ws-401"),
|
||||
Some("missing_authorization_header"),
|
||||
Some("token_expired"),
|
||||
Some("workspace_not_authorized_in_region"),
|
||||
Some("Workspace is not authorized in this region."),
|
||||
);
|
||||
});
|
||||
|
||||
logger_provider.force_flush().expect("flush logs");
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
|
||||
let logs = log_exporter.get_emitted_logs().expect("log export");
|
||||
let connect_log = find_log_by_event_name(&logs, "codex.websocket_connect");
|
||||
let connect_log_attrs = log_attributes(&connect_log.record);
|
||||
assert_eq!(
|
||||
connect_log_attrs
|
||||
.get("auth.header_attached")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_log_attrs.get("auth.error").map(String::as_str),
|
||||
Some("missing_authorization_header")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_log_attrs.get("base_url_origin").map(String::as_str),
|
||||
Some("openrouter.ai")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_log_attrs.get("host_class").map(String::as_str),
|
||||
Some("known_third_party")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_log_attrs.get("endpoint").map(String::as_str),
|
||||
Some("/responses")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_log_attrs
|
||||
.get("residency_header_value")
|
||||
.map(String::as_str),
|
||||
Some("us")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_log_attrs
|
||||
.get("safe_error_message")
|
||||
.map(String::as_str),
|
||||
Some("Workspace is not authorized in this region.")
|
||||
);
|
||||
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
let connect_trace_event =
|
||||
find_span_event_by_name_attr(&spans[0].events.events, "codex.websocket_connect");
|
||||
let connect_trace_attrs = span_event_attributes(connect_trace_event);
|
||||
assert_eq!(
|
||||
connect_trace_attrs
|
||||
.get("auth.recovery_phase")
|
||||
.map(String::as_str),
|
||||
Some("reload")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_trace_attrs
|
||||
.get("base_url_source")
|
||||
.map(String::as_str),
|
||||
Some("config_toml")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_trace_attrs
|
||||
.get("error_body_class")
|
||||
.map(String::as_str),
|
||||
Some("workspace_not_authorized_in_region")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_trace_attrs
|
||||
.get("safe_error_message")
|
||||
.map(String::as_str),
|
||||
Some("Workspace is not authorized in this region.")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otel_export_routing_policy_routes_geo_denial_log_and_trace_events() {
|
||||
let log_exporter = InMemoryLogExporter::default();
|
||||
let logger_provider = SdkLoggerProvider::builder()
|
||||
.with_simple_exporter(log_exporter.clone())
|
||||
.build();
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("sink-split-test");
|
||||
|
||||
let subscriber = tracing_subscriber::registry()
|
||||
.with(
|
||||
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
|
||||
&logger_provider,
|
||||
)
|
||||
.with_filter(filter_fn(OtelProvider::log_export_filter)),
|
||||
)
|
||||
.with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
let manager = SessionTelemetry::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
Some("engineer@example.com".to_string()),
|
||||
Some(TelemetryAuthMode::Chatgpt),
|
||||
"codex_exec".to_string(),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let root_span = tracing::info_span!("root");
|
||||
let _root_guard = root_span.enter();
|
||||
manager.record_geo_denial(
|
||||
"/responses",
|
||||
true,
|
||||
Some("us"),
|
||||
Some(401),
|
||||
Some("req-geo"),
|
||||
Some("ray-geo"),
|
||||
"workspace_not_authorized_in_region",
|
||||
Some("Workspace is not authorized in this region."),
|
||||
);
|
||||
});
|
||||
|
||||
logger_provider.force_flush().expect("flush logs");
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
|
||||
let logs = log_exporter.get_emitted_logs().expect("log export");
|
||||
let geo_log = find_log_by_event_name(&logs, "codex.geo_denial");
|
||||
let geo_log_attrs = log_attributes(&geo_log.record);
|
||||
assert_eq!(
|
||||
geo_log_attrs.get("geo_denial_detected").map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
geo_log_attrs.get("request_id").map(String::as_str),
|
||||
Some("req-geo")
|
||||
);
|
||||
assert_eq!(
|
||||
geo_log_attrs.get("endpoint").map(String::as_str),
|
||||
Some("/responses")
|
||||
);
|
||||
assert_eq!(
|
||||
geo_log_attrs
|
||||
.get("residency_header_value")
|
||||
.map(String::as_str),
|
||||
Some("us")
|
||||
);
|
||||
assert_eq!(
|
||||
geo_log_attrs.get("error_body_class").map(String::as_str),
|
||||
Some("workspace_not_authorized_in_region")
|
||||
);
|
||||
assert_eq!(
|
||||
geo_log_attrs.get("safe_error_message").map(String::as_str),
|
||||
Some("Workspace is not authorized in this region.")
|
||||
);
|
||||
assert!(!geo_log_attrs.contains_key("error_body"));
|
||||
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
let geo_trace_event = find_span_event_by_name_attr(&spans[0].events.events, "codex.geo_denial");
|
||||
let geo_trace_attrs = span_event_attributes(geo_trace_event);
|
||||
assert_eq!(
|
||||
geo_trace_attrs.get("cf_ray").map(String::as_str),
|
||||
Some("ray-geo")
|
||||
);
|
||||
assert_eq!(
|
||||
geo_trace_attrs.get("http_status").map(String::as_str),
|
||||
Some("401")
|
||||
);
|
||||
assert_eq!(
|
||||
geo_trace_attrs
|
||||
.get("safe_error_message")
|
||||
.map(String::as_str),
|
||||
Some("Workspace is not authorized in this region.")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otel_export_routing_policy_routes_conversation_start_endpoint_config() {
|
||||
let log_exporter = InMemoryLogExporter::default();
|
||||
let logger_provider = SdkLoggerProvider::builder()
|
||||
.with_simple_exporter(log_exporter.clone())
|
||||
.build();
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("sink-split-test");
|
||||
|
||||
let subscriber = tracing_subscriber::registry()
|
||||
.with(
|
||||
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
|
||||
&logger_provider,
|
||||
)
|
||||
.with_filter(filter_fn(OtelProvider::log_export_filter)),
|
||||
)
|
||||
.with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
let manager = SessionTelemetry::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
Some("engineer@example.com".to_string()),
|
||||
Some(TelemetryAuthMode::Chatgpt),
|
||||
"codex_exec".to_string(),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let root_span = tracing::info_span!("root");
|
||||
let _root_guard = root_span.enter();
|
||||
manager.conversation_starts(
|
||||
"OpenAI",
|
||||
"custom",
|
||||
"custom_unknown",
|
||||
"env",
|
||||
false,
|
||||
None,
|
||||
codex_protocol::config_types::ReasoningSummary::Auto,
|
||||
None,
|
||||
None,
|
||||
codex_protocol::protocol::AskForApproval::OnRequest,
|
||||
codex_protocol::protocol::SandboxPolicy::new_read_only_policy(),
|
||||
Vec::new(),
|
||||
None,
|
||||
);
|
||||
});
|
||||
|
||||
logger_provider.force_flush().expect("flush logs");
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
|
||||
let logs = log_exporter.get_emitted_logs().expect("log export");
|
||||
let start_log = find_log_by_event_name(&logs, "codex.conversation_starts");
|
||||
let start_log_attrs = log_attributes(&start_log.record);
|
||||
assert_eq!(
|
||||
start_log_attrs.get("base_url_origin").map(String::as_str),
|
||||
Some("custom")
|
||||
);
|
||||
assert_eq!(
|
||||
start_log_attrs.get("host_class").map(String::as_str),
|
||||
Some("custom_unknown")
|
||||
);
|
||||
assert_eq!(
|
||||
start_log_attrs.get("base_url_source").map(String::as_str),
|
||||
Some("env")
|
||||
);
|
||||
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
let start_trace_event =
|
||||
find_span_event_by_name_attr(&spans[0].events.events, "codex.conversation_starts");
|
||||
let start_trace_attrs = span_event_attributes(start_trace_event);
|
||||
assert_eq!(
|
||||
start_trace_attrs
|
||||
.get("base_url_is_default")
|
||||
.map(String::as_str),
|
||||
Some("false")
|
||||
);
|
||||
}
|
||||
|
||||
@@ -47,7 +47,29 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
|
||||
None,
|
||||
None,
|
||||
);
|
||||
manager.record_api_request(1, Some(200), None, Duration::from_millis(300));
|
||||
manager.record_api_request(
|
||||
1,
|
||||
Some(200),
|
||||
None,
|
||||
Duration::from_millis(300),
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
None,
|
||||
"/responses",
|
||||
false,
|
||||
None,
|
||||
"api.openai.com",
|
||||
"openai_api",
|
||||
"default",
|
||||
true,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
manager.record_websocket_request(Duration::from_millis(400), None);
|
||||
let sse_response: std::result::Result<
|
||||
Option<std::result::Result<StreamEvent, eventsource_stream::EventStreamError<&str>>>,
|
||||
|
||||
Reference in New Issue
Block a user