Compare commits

...

2 Commits

Author SHA1 Message Date
Colin Young
6b6253e48a [Codex][Codex CLI] Harden auth-failure telemetry delivery
Co-authored-by: Codex <noreply@openai.com>
2026-04-09 10:56:10 -07:00
Colin Young
2b01b05bb9 [Codex][Codex CLI] Emit auth-failure Sentry telemetry without feedback
Add automatic auth-failure Sentry events for request-level and refresh-token failures, including the early /oauth/token path hit by short-lived CLI commands. Also allow a local DSN override so the non-/feedback path can be validated end to end with a real CLI 401 repro.

Co-authored-by: Codex <noreply@openai.com>
2026-03-31 12:19:47 -07:00
21 changed files with 1922 additions and 126 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -2259,6 +2259,7 @@ dependencies = [
"codex-core",
"codex-exec-server",
"codex-features",
"codex-feedback",
"codex-protocol",
"codex-shell-command",
"codex-utils-cli",

View File

@@ -434,7 +434,6 @@ pub async fn run_main_with_transport(
})?
}
};
if let Ok(Some(err)) = check_execpolicy_for_warnings(&config.config_layer_stack).await {
let (path, range) = exec_policy_warning_location(&err);
let message = ConfigWarningNotification {

View File

@@ -60,7 +60,6 @@ pub fn extract_chatgpt_account_id(token: &str) -> Option<String> {
}
pub async fn load_auth_manager() -> Option<AuthManager> {
// TODO: pass in cli overrides once cloud tasks properly support them.
let config = Config::load_with_cli_overrides(Vec::new()).await.ok()?;
Some(AuthManager::new(
config.codex_home,

View File

@@ -105,6 +105,7 @@ use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::model_provider_info::ModelProviderInfo;
use crate::model_provider_info::WireApi;
use crate::provider_auth::auth_manager_for_provider;
use crate::response_debug_context::ResponseDebugContext;
use crate::response_debug_context::extract_response_debug_context;
use crate::response_debug_context::extract_response_debug_context_from_api_error;
use crate::response_debug_context::telemetry_api_error_message;
@@ -112,6 +113,7 @@ use crate::response_debug_context::telemetry_transport_error_message;
use crate::util::FeedbackRequestTags;
use crate::util::emit_feedback_auth_recovery_tags;
use crate::util::emit_feedback_request_tags_with_auth_env;
use crate::util::emit_sentry_auth_failure_event_with_auth_env;
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
@@ -363,10 +365,12 @@ impl ModelClient {
AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
/*has_followup_unauthorized_retry*/ false,
PendingUnauthorizedRetry::default(),
),
RequestRouteTelemetry::for_endpoint(RESPONSES_COMPACT_ENDPOINT),
self.state.auth_env_telemetry.clone(),
self.state.provider.should_emit_sentry_auth_failures(),
);
let client =
ApiCompactClient::new(transport, client_setup.api_provider, client_setup.api_auth)
@@ -432,10 +436,12 @@ impl ModelClient {
AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
/*has_followup_unauthorized_retry*/ false,
PendingUnauthorizedRetry::default(),
),
RequestRouteTelemetry::for_endpoint(MEMORIES_SUMMARIZE_ENDPOINT),
self.state.auth_env_telemetry.clone(),
self.state.provider.should_emit_sentry_auth_failures(),
);
let client =
ApiMemoriesClient::new(transport, client_setup.api_provider, client_setup.api_auth)
@@ -481,12 +487,14 @@ impl ModelClient {
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
auth_env_telemetry: AuthEnvTelemetry,
emit_sentry_auth_failures: bool,
) -> Arc<dyn RequestTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
auth_env_telemetry,
emit_sentry_auth_failures,
));
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry;
request_telemetry
@@ -567,6 +575,7 @@ impl ModelClient {
auth_context,
request_route_telemetry,
self.state.auth_env_telemetry.clone(),
self.state.provider.should_emit_sentry_auth_failures(),
);
let websocket_connect_timeout = self.state.provider.websocket_connect_timeout();
let start = Instant::now();
@@ -631,6 +640,33 @@ impl ModelClient {
},
&self.state.auth_env_telemetry,
);
emit_sentry_auth_failure_if_unauthorized(
&FeedbackRequestTags {
endpoint: request_route_telemetry.endpoint,
auth_header_attached: auth_context.auth_header_attached,
auth_header_name: auth_context.auth_header_name,
auth_mode: auth_context.auth_mode,
auth_retry_after_unauthorized: Some(auth_context.retry_after_unauthorized),
auth_recovery_mode: auth_context.recovery_mode,
auth_recovery_phase: auth_context.recovery_phase,
auth_connection_reused: Some(false),
auth_request_id: response_debug.request_id.as_deref(),
auth_cf_ray: response_debug.cf_ray.as_deref(),
auth_error: response_debug.auth_error.as_deref(),
auth_error_code: response_debug.auth_error_code.as_deref(),
auth_recovery_followup_success: auth_context
.retry_after_unauthorized
.then_some(result.is_ok()),
auth_recovery_followup_status: auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
},
&self.state.auth_env_telemetry,
self.state.provider.should_emit_sentry_auth_failures(),
status,
auth_context.has_followup_unauthorized_retry,
);
result
}
@@ -878,6 +914,7 @@ impl ModelClientSession {
let auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
/*has_followup_unauthorized_retry*/ false,
PendingUnauthorizedRetry::default(),
);
let connection = self
@@ -1028,11 +1065,15 @@ impl ModelClientSession {
.map(super::auth::AuthManager::unauthorized_recovery);
let mut pending_retry = PendingUnauthorizedRetry::default();
loop {
let has_followup_unauthorized_retry = auth_recovery
.as_ref()
.is_some_and(UnauthorizedRecovery::has_next);
let client_setup = self.client.current_client_setup().await?;
let transport = ReqwestTransport::new(build_reqwest_client());
let request_auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
has_followup_unauthorized_retry,
pending_retry,
);
let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(
@@ -1040,6 +1081,10 @@ impl ModelClientSession {
request_auth_context,
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
self.client.state.auth_env_telemetry.clone(),
self.client
.state
.provider
.should_emit_sentry_auth_failures(),
);
let compression = self.responses_request_compression(client_setup.auth.as_ref());
let options = self.build_responses_options(turn_metadata_header, compression);
@@ -1068,14 +1113,42 @@ impl ModelClientSession {
Err(ApiError::Transport(
unauthorized_transport @ TransportError::Http { status, .. },
)) if status == StatusCode::UNAUTHORIZED => {
pending_retry = PendingUnauthorizedRetry::from_recovery(
handle_unauthorized(
unauthorized_transport,
&mut auth_recovery,
session_telemetry,
)
.await?,
);
let attempted_recovery = auth_recovery.as_ref().and_then(|recovery| {
recovery
.has_next()
.then(|| (recovery.mode_name(), recovery.step_name()))
});
let debug = extract_response_debug_context(&unauthorized_transport);
match handle_unauthorized(
unauthorized_transport,
&mut auth_recovery,
session_telemetry,
)
.await
{
Ok(recovery) => {
pending_retry = PendingUnauthorizedRetry::from_recovery(recovery);
}
Err(err) => {
if request_auth_context.has_followup_unauthorized_retry {
emit_terminal_auth_failure_after_failed_recovery(
AuthRequestTelemetryContext {
recovery_mode: attempted_recovery.map(|(mode, _)| mode),
recovery_phase: attempted_recovery.map(|(_, phase)| phase),
..request_auth_context
},
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
&self.client.state.auth_env_telemetry,
&debug,
self.client
.state
.provider
.should_emit_sentry_auth_failures(),
);
}
return Err(err);
}
}
continue;
}
Err(err) => return Err(map_api_error(err)),
@@ -1117,10 +1190,14 @@ impl ModelClientSession {
.map(super::auth::AuthManager::unauthorized_recovery);
let mut pending_retry = PendingUnauthorizedRetry::default();
loop {
let has_followup_unauthorized_retry = auth_recovery
.as_ref()
.is_some_and(UnauthorizedRecovery::has_next);
let client_setup = self.client.current_client_setup().await?;
let request_auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
has_followup_unauthorized_retry,
pending_retry,
);
let compression = self.responses_request_compression(client_setup.auth.as_ref());
@@ -1168,14 +1245,42 @@ impl ModelClientSession {
Err(ApiError::Transport(
unauthorized_transport @ TransportError::Http { status, .. },
)) if status == StatusCode::UNAUTHORIZED => {
pending_retry = PendingUnauthorizedRetry::from_recovery(
handle_unauthorized(
unauthorized_transport,
&mut auth_recovery,
session_telemetry,
)
.await?,
);
let attempted_recovery = auth_recovery.as_ref().and_then(|recovery| {
recovery
.has_next()
.then(|| (recovery.mode_name(), recovery.step_name()))
});
let debug = extract_response_debug_context(&unauthorized_transport);
match handle_unauthorized(
unauthorized_transport,
&mut auth_recovery,
session_telemetry,
)
.await
{
Ok(recovery) => {
pending_retry = PendingUnauthorizedRetry::from_recovery(recovery);
}
Err(err) => {
if request_auth_context.has_followup_unauthorized_retry {
emit_terminal_auth_failure_after_failed_recovery(
AuthRequestTelemetryContext {
recovery_mode: attempted_recovery.map(|(mode, _)| mode),
recovery_phase: attempted_recovery.map(|(_, phase)| phase),
..request_auth_context
},
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
&self.client.state.auth_env_telemetry,
&debug,
self.client
.state
.provider
.should_emit_sentry_auth_failures(),
);
}
return Err(err);
}
}
continue;
}
Err(err) => return Err(map_api_error(err)),
@@ -1205,12 +1310,14 @@ impl ModelClientSession {
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
auth_env_telemetry: AuthEnvTelemetry,
emit_sentry_auth_failures: bool,
) -> (Arc<dyn RequestTelemetry>, Arc<dyn SseTelemetry>) {
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
auth_env_telemetry,
emit_sentry_auth_failures,
));
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry.clone();
let sse_telemetry: Arc<dyn SseTelemetry> = telemetry;
@@ -1223,12 +1330,14 @@ impl ModelClientSession {
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
auth_env_telemetry: AuthEnvTelemetry,
emit_sentry_auth_failures: bool,
) -> Arc<dyn WebsocketTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
auth_env_telemetry,
emit_sentry_auth_failures,
));
let websocket_telemetry: Arc<dyn WebsocketTelemetry> = telemetry;
websocket_telemetry
@@ -1522,6 +1631,7 @@ struct AuthRequestTelemetryContext {
auth_mode: Option<&'static str>,
auth_header_attached: bool,
auth_header_name: Option<&'static str>,
has_followup_unauthorized_retry: bool,
retry_after_unauthorized: bool,
recovery_mode: Option<&'static str>,
recovery_phase: Option<&'static str>,
@@ -1531,6 +1641,7 @@ impl AuthRequestTelemetryContext {
fn new(
auth_mode: Option<AuthMode>,
api_auth: &CoreAuthProvider,
has_followup_unauthorized_retry: bool,
retry: PendingUnauthorizedRetry,
) -> Self {
Self {
@@ -1540,6 +1651,7 @@ impl AuthRequestTelemetryContext {
}),
auth_header_attached: api_auth.auth_header_attached(),
auth_header_name: api_auth.auth_header_name(),
has_followup_unauthorized_retry,
retry_after_unauthorized: retry.retry_after_unauthorized,
recovery_mode: retry.recovery_mode,
recovery_phase: retry.recovery_phase,
@@ -1685,6 +1797,7 @@ struct ApiTelemetry {
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
auth_env_telemetry: AuthEnvTelemetry,
emit_sentry_auth_failures: bool,
}
impl ApiTelemetry {
@@ -1693,16 +1806,68 @@ impl ApiTelemetry {
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
auth_env_telemetry: AuthEnvTelemetry,
emit_sentry_auth_failures: bool,
) -> Self {
Self {
session_telemetry,
auth_context,
request_route_telemetry,
auth_env_telemetry,
emit_sentry_auth_failures,
}
}
}
fn emit_sentry_auth_failure_if_unauthorized(
feedback_tags: &FeedbackRequestTags<'_>,
auth_env_telemetry: &AuthEnvTelemetry,
emit_sentry_auth_failures: bool,
status: Option<u16>,
has_followup_unauthorized_retry: bool,
) {
if status == Some(http::StatusCode::UNAUTHORIZED.as_u16())
&& !has_followup_unauthorized_retry
&& emit_sentry_auth_failures
{
emit_sentry_auth_failure_event_with_auth_env(feedback_tags, auth_env_telemetry);
}
}
fn emit_terminal_auth_failure_after_failed_recovery(
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
auth_env_telemetry: &AuthEnvTelemetry,
debug: &ResponseDebugContext,
emit_sentry_auth_failures: bool,
) {
if auth_context.recovery_phase == Some("refresh_token") {
return;
}
let feedback_tags = FeedbackRequestTags {
endpoint: request_route_telemetry.endpoint,
auth_header_attached: auth_context.auth_header_attached,
auth_header_name: auth_context.auth_header_name,
auth_mode: auth_context.auth_mode,
auth_retry_after_unauthorized: Some(false),
auth_recovery_mode: auth_context.recovery_mode,
auth_recovery_phase: auth_context.recovery_phase,
auth_connection_reused: None,
auth_request_id: debug.request_id.as_deref(),
auth_cf_ray: debug.cf_ray.as_deref(),
auth_error: debug.auth_error.as_deref(),
auth_error_code: debug.auth_error_code.as_deref(),
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
};
emit_sentry_auth_failure_if_unauthorized(
&feedback_tags,
auth_env_telemetry,
emit_sentry_auth_failures,
Some(StatusCode::UNAUTHORIZED.as_u16()),
/*has_followup_unauthorized_retry*/ false,
);
}
impl RequestTelemetry for ApiTelemetry {
fn on_request(
&self,
@@ -1732,31 +1897,36 @@ impl RequestTelemetry for ApiTelemetry {
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
emit_feedback_request_tags_with_auth_env(
&FeedbackRequestTags {
endpoint: self.request_route_telemetry.endpoint,
auth_header_attached: self.auth_context.auth_header_attached,
auth_header_name: self.auth_context.auth_header_name,
auth_mode: self.auth_context.auth_mode,
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
auth_recovery_mode: self.auth_context.recovery_mode,
auth_recovery_phase: self.auth_context.recovery_phase,
auth_connection_reused: None,
auth_request_id: debug.request_id.as_deref(),
auth_cf_ray: debug.cf_ray.as_deref(),
auth_error: debug.auth_error.as_deref(),
auth_error_code: debug.auth_error_code.as_deref(),
auth_recovery_followup_success: self
.auth_context
.retry_after_unauthorized
.then_some(error.is_none()),
auth_recovery_followup_status: self
.auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
},
let feedback_tags = FeedbackRequestTags {
endpoint: self.request_route_telemetry.endpoint,
auth_header_attached: self.auth_context.auth_header_attached,
auth_header_name: self.auth_context.auth_header_name,
auth_mode: self.auth_context.auth_mode,
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
auth_recovery_mode: self.auth_context.recovery_mode,
auth_recovery_phase: self.auth_context.recovery_phase,
auth_connection_reused: None,
auth_request_id: debug.request_id.as_deref(),
auth_cf_ray: debug.cf_ray.as_deref(),
auth_error: debug.auth_error.as_deref(),
auth_error_code: debug.auth_error_code.as_deref(),
auth_recovery_followup_success: self
.auth_context
.retry_after_unauthorized
.then_some(error.is_none()),
auth_recovery_followup_status: self
.auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
};
emit_feedback_request_tags_with_auth_env(&feedback_tags, &self.auth_env_telemetry);
emit_sentry_auth_failure_if_unauthorized(
&feedback_tags,
&self.auth_env_telemetry,
self.emit_sentry_auth_failures,
status,
self.auth_context.has_followup_unauthorized_retry,
);
}
}
@@ -1786,31 +1956,36 @@ impl WebsocketTelemetry for ApiTelemetry {
error_message.as_deref(),
connection_reused,
);
emit_feedback_request_tags_with_auth_env(
&FeedbackRequestTags {
endpoint: self.request_route_telemetry.endpoint,
auth_header_attached: self.auth_context.auth_header_attached,
auth_header_name: self.auth_context.auth_header_name,
auth_mode: self.auth_context.auth_mode,
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
auth_recovery_mode: self.auth_context.recovery_mode,
auth_recovery_phase: self.auth_context.recovery_phase,
auth_connection_reused: Some(connection_reused),
auth_request_id: debug.request_id.as_deref(),
auth_cf_ray: debug.cf_ray.as_deref(),
auth_error: debug.auth_error.as_deref(),
auth_error_code: debug.auth_error_code.as_deref(),
auth_recovery_followup_success: self
.auth_context
.retry_after_unauthorized
.then_some(error.is_none()),
auth_recovery_followup_status: self
.auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
},
let feedback_tags = FeedbackRequestTags {
endpoint: self.request_route_telemetry.endpoint,
auth_header_attached: self.auth_context.auth_header_attached,
auth_header_name: self.auth_context.auth_header_name,
auth_mode: self.auth_context.auth_mode,
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
auth_recovery_mode: self.auth_context.recovery_mode,
auth_recovery_phase: self.auth_context.recovery_phase,
auth_connection_reused: Some(connection_reused),
auth_request_id: debug.request_id.as_deref(),
auth_cf_ray: debug.cf_ray.as_deref(),
auth_error: debug.auth_error.as_deref(),
auth_error_code: debug.auth_error_code.as_deref(),
auth_recovery_followup_success: self
.auth_context
.retry_after_unauthorized
.then_some(error.is_none()),
auth_recovery_followup_status: self
.auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
};
emit_feedback_request_tags_with_auth_env(&feedback_tags, &self.auth_env_telemetry);
emit_sentry_auth_failure_if_unauthorized(
&feedback_tags,
&self.auth_env_telemetry,
self.emit_sentry_auth_failures,
status,
self.auth_context.has_followup_unauthorized_retry && !connection_reused,
);
}

View File

@@ -1,14 +1,27 @@
use super::ApiTelemetry;
use super::AuthRequestTelemetryContext;
use super::ModelClient;
use super::PendingUnauthorizedRetry;
use super::RequestRouteTelemetry;
use super::UnauthorizedRecoveryExecution;
use super::emit_terminal_auth_failure_after_failed_recovery;
use crate::auth_env_telemetry::AuthEnvTelemetry;
use crate::response_debug_context::ResponseDebugContext;
use codex_api::ApiError;
use codex_api::RequestTelemetry;
use codex_api::WebsocketTelemetry;
use codex_otel::SessionTelemetry;
use codex_protocol::ThreadId;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use http::StatusCode;
use pretty_assertions::assert_eq;
use serde_json::json;
use serial_test::serial;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::Mutex;
fn test_model_client(session_source: SessionSource) -> ModelClient {
let provider = crate::model_provider_info::create_oss_provider_with_base_url(
@@ -72,6 +85,37 @@ fn test_session_telemetry() -> SessionTelemetry {
)
}
fn empty_auth_env_telemetry() -> AuthEnvTelemetry {
AuthEnvTelemetry {
openai_api_key_env_present: false,
codex_api_key_env_present: false,
codex_api_key_env_enabled: false,
provider_env_key_name: None,
provider_env_key_present: None,
refresh_token_url_override_present: false,
}
}
type AuthFailureReportCollector = Arc<Mutex<Vec<BTreeMap<String, String>>>>;
fn install_auth_failure_report_collector() -> (
AuthFailureReportCollector,
crate::auth::AuthFailureReporterGuard,
) {
let reported = Arc::new(Mutex::new(Vec::new()));
let guard = crate::auth::set_auth_failure_reporter({
let reported = Arc::clone(&reported);
Arc::new(move |fields| {
reported
.lock()
.expect("report collector poisoned")
.push(fields);
true
})
});
(reported, guard)
}
#[test]
fn build_subagent_headers_sets_other_subagent_label() {
let client = test_model_client(SessionSource::SubAgent(SubAgentSource::Other(
@@ -107,6 +151,7 @@ fn auth_request_telemetry_context_tracks_attached_auth_and_retry_phase() {
let auth_context = AuthRequestTelemetryContext::new(
Some(crate::auth::AuthMode::Chatgpt),
&crate::api_bridge::CoreAuthProvider::for_test(Some("access-token"), Some("workspace-123")),
/*has_followup_unauthorized_retry*/ true,
PendingUnauthorizedRetry::from_recovery(UnauthorizedRecoveryExecution {
mode: "managed",
phase: "refresh_token",
@@ -120,3 +165,291 @@ fn auth_request_telemetry_context_tracks_attached_auth_and_retry_phase() {
assert_eq!(auth_context.recovery_mode, Some("managed"));
assert_eq!(auth_context.recovery_phase, Some("refresh_token"));
}
#[test]
#[serial(auth_failure_reporter)]
fn api_telemetry_skips_auth_failure_while_followup_retry_remains() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
let telemetry = ApiTelemetry::new(
test_session_telemetry(),
AuthRequestTelemetryContext {
auth_mode: Some("Chatgpt"),
auth_header_attached: true,
auth_header_name: Some("authorization"),
has_followup_unauthorized_retry: true,
retry_after_unauthorized: false,
recovery_mode: Some("managed"),
recovery_phase: Some("refresh_token"),
},
RequestRouteTelemetry::for_endpoint("/responses"),
empty_auth_env_telemetry(),
/*emit_sentry_auth_failures*/ true,
);
telemetry.on_request(
/*attempt*/ 1,
Some(StatusCode::UNAUTHORIZED),
/*error*/ None,
Default::default(),
);
assert!(reported.lock().unwrap().is_empty());
}
#[test]
#[serial(auth_failure_reporter)]
fn api_telemetry_emits_auth_failure_once_no_followup_retry_remains() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
let telemetry = ApiTelemetry::new(
test_session_telemetry(),
AuthRequestTelemetryContext {
auth_mode: Some("Chatgpt"),
auth_header_attached: true,
auth_header_name: Some("authorization"),
has_followup_unauthorized_retry: false,
retry_after_unauthorized: true,
recovery_mode: Some("managed"),
recovery_phase: Some("refresh_token"),
},
RequestRouteTelemetry::for_endpoint("/responses"),
empty_auth_env_telemetry(),
/*emit_sentry_auth_failures*/ true,
);
telemetry.on_request(
/*attempt*/ 2,
Some(StatusCode::UNAUTHORIZED),
/*error*/ None,
Default::default(),
);
let reported = reported.lock().expect("report collector poisoned");
assert_eq!(reported.len(), 1);
assert_eq!(
reported[0].get("endpoint").map(String::as_str),
Some("/responses")
);
assert_eq!(
reported[0]
.get("auth_retry_after_unauthorized")
.map(String::as_str),
Some("true")
);
}
#[test]
#[serial(auth_failure_reporter)]
fn api_telemetry_skips_sentry_auth_failure_for_non_openai_provider() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
let telemetry = ApiTelemetry::new(
test_session_telemetry(),
AuthRequestTelemetryContext {
auth_mode: Some("Chatgpt"),
auth_header_attached: true,
auth_header_name: Some("authorization"),
has_followup_unauthorized_retry: true,
retry_after_unauthorized: true,
recovery_mode: Some("managed"),
recovery_phase: Some("refresh_token"),
},
RequestRouteTelemetry::for_endpoint("/responses"),
empty_auth_env_telemetry(),
/*emit_sentry_auth_failures*/ false,
);
telemetry.on_request(
/*attempt*/ 1,
Some(StatusCode::UNAUTHORIZED),
/*error*/ None,
Default::default(),
);
assert!(reported.lock().unwrap().is_empty());
}
#[test]
#[serial(auth_failure_reporter)]
fn websocket_handshake_failure_emits_auth_failure_when_no_followup_retry_remains() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
let telemetry = ApiTelemetry::new(
test_session_telemetry(),
AuthRequestTelemetryContext {
auth_mode: Some("Chatgpt"),
auth_header_attached: true,
auth_header_name: Some("authorization"),
has_followup_unauthorized_retry: false,
retry_after_unauthorized: true,
recovery_mode: Some("managed"),
recovery_phase: Some("refresh_token"),
},
RequestRouteTelemetry::for_endpoint("/responses"),
empty_auth_env_telemetry(),
/*emit_sentry_auth_failures*/ true,
);
telemetry.on_ws_request(
Default::default(),
Some(&ApiError::Transport(codex_api::TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: None,
headers: None,
body: None,
})),
/*connection_reused*/ false,
);
let reported = reported.lock().expect("report collector poisoned");
assert_eq!(reported.len(), 1);
assert_eq!(
reported[0].get("endpoint").map(String::as_str),
Some("/responses")
);
assert_eq!(
reported[0]
.get("auth_retry_after_unauthorized")
.map(String::as_str),
Some("true")
);
}
#[test]
#[serial(auth_failure_reporter)]
fn websocket_reused_connection_failure_emits_auth_failure_even_with_followup_retry_available() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
let telemetry = ApiTelemetry::new(
test_session_telemetry(),
AuthRequestTelemetryContext {
auth_mode: Some("Chatgpt"),
auth_header_attached: true,
auth_header_name: Some("authorization"),
has_followup_unauthorized_retry: true,
retry_after_unauthorized: false,
recovery_mode: Some("managed"),
recovery_phase: Some("refresh_token"),
},
RequestRouteTelemetry::for_endpoint("/responses"),
empty_auth_env_telemetry(),
/*emit_sentry_auth_failures*/ true,
);
telemetry.on_ws_request(
Default::default(),
Some(&ApiError::Transport(codex_api::TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: None,
headers: None,
body: None,
})),
/*connection_reused*/ true,
);
let reported = reported.lock().expect("report collector poisoned");
assert_eq!(reported.len(), 1);
assert_eq!(
reported[0].get("endpoint").map(String::as_str),
Some("/responses")
);
assert_eq!(
reported[0]
.get("auth_connection_reused")
.map(String::as_str),
Some("true")
);
}
#[test]
#[serial(auth_failure_reporter)]
fn failed_recovery_before_retry_emits_terminal_auth_failure() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
let auth_context = AuthRequestTelemetryContext {
auth_mode: Some("Chatgpt"),
auth_header_attached: true,
auth_header_name: Some("authorization"),
has_followup_unauthorized_retry: true,
retry_after_unauthorized: false,
recovery_mode: Some("managed"),
recovery_phase: Some("reload"),
};
let debug = ResponseDebugContext {
request_id: Some("req_failed_recovery".to_string()),
cf_ray: Some("ray_failed_recovery".to_string()),
auth_error: None,
auth_error_code: Some("token_expired".to_string()),
};
emit_terminal_auth_failure_after_failed_recovery(
auth_context,
RequestRouteTelemetry::for_endpoint("/responses"),
&empty_auth_env_telemetry(),
&debug,
/*emit_sentry_auth_failures*/ true,
);
let reported = reported.lock().expect("report collector poisoned");
assert_eq!(reported.len(), 1);
assert_eq!(
reported[0].get("endpoint").map(String::as_str),
Some("/responses")
);
assert_eq!(
reported[0].get("auth_recovery_mode").map(String::as_str),
Some("managed")
);
assert_eq!(
reported[0].get("auth_recovery_phase").map(String::as_str),
Some("reload")
);
assert_eq!(
reported[0]
.get("auth_retry_after_unauthorized")
.map(String::as_str),
Some("false")
);
assert_eq!(
reported[0].get("auth_request_id").map(String::as_str),
Some("req_failed_recovery")
);
}
#[test]
#[serial(auth_failure_reporter)]
fn refresh_token_failed_recovery_before_retry_does_not_emit_terminal_auth_failure() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
let auth_context = AuthRequestTelemetryContext {
auth_mode: Some("Chatgpt"),
auth_header_attached: true,
auth_header_name: Some("authorization"),
has_followup_unauthorized_retry: true,
retry_after_unauthorized: false,
recovery_mode: Some("managed"),
recovery_phase: Some("refresh_token"),
};
let debug = ResponseDebugContext {
request_id: Some("req_refresh_failed".to_string()),
cf_ray: Some("ray_refresh_failed".to_string()),
auth_error: None,
auth_error_code: Some("token_expired".to_string()),
};
emit_terminal_auth_failure_after_failed_recovery(
auth_context,
RequestRouteTelemetry::for_endpoint("/responses"),
&empty_auth_env_telemetry(),
&debug,
/*emit_sentry_auth_failures*/ true,
);
assert!(
reported
.lock()
.expect("report collector poisoned")
.is_empty()
);
}

View File

@@ -796,6 +796,13 @@ fn load_catalog_json(path: &AbsolutePathBuf) -> std::io::Result<ModelsResponse>
Ok(catalog)
}
pub fn feedback_enabled_from_config_toml(cfg: &ConfigToml) -> bool {
cfg.feedback
.as_ref()
.and_then(|feedback| feedback.enabled)
.unwrap_or(true)
}
fn load_model_catalog(
model_catalog_json: Option<AbsolutePathBuf>,
) -> std::io::Result<Option<ModelsResponse>> {
@@ -1983,6 +1990,7 @@ impl Config {
) -> std::io::Result<Self> {
validate_model_providers(&cfg.model_providers)
.map_err(|message| std::io::Error::new(std::io::ErrorKind::InvalidInput, message))?;
let feedback_enabled = feedback_enabled_from_config_toml(&cfg);
// Ensure that every field of ConfigRequirements is applied to the final
// Config.
let ConfigRequirements {
@@ -2685,11 +2693,7 @@ impl Config {
.as_ref()
.and_then(|a| a.enabled)
.or(cfg.analytics.as_ref().and_then(|a| a.enabled)),
feedback_enabled: cfg
.feedback
.as_ref()
.and_then(|feedback| feedback.enabled)
.unwrap_or(true),
feedback_enabled,
tool_suggest,
tui_notifications: cfg
.tui

View File

@@ -19,6 +19,7 @@ use serde::Serialize;
use std::collections::HashMap;
use std::fmt;
use std::time::Duration;
use url::Url;
const DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 300_000;
const DEFAULT_STREAM_MAX_RETRIES: u64 = 5;
@@ -316,6 +317,42 @@ impl ModelProviderInfo {
pub(crate) fn has_command_auth(&self) -> bool {
self.auth.is_some()
}
pub(crate) fn should_emit_sentry_auth_failures(&self) -> bool {
(self.requires_openai_auth
|| self.env_key.is_some()
|| self.auth.is_some()
|| self.experimental_bearer_token.is_some()
|| self.http_headers.as_ref().is_some_and(|headers| {
headers
.keys()
.any(|key| key.eq_ignore_ascii_case("authorization"))
})
|| self.env_http_headers.as_ref().is_some_and(|headers| {
headers
.keys()
.any(|key| key.eq_ignore_ascii_case("authorization"))
}))
&& self
.base_url
.as_deref()
.map(Self::is_openai_owned_base_url)
.unwrap_or(true)
}
fn is_openai_owned_base_url(base_url: &str) -> bool {
let Ok(parsed) = Url::parse(base_url) else {
return false;
};
let Some(host) = parsed.host_str() else {
return false;
};
host == "openai.com"
|| host.ends_with(".openai.com")
|| host == "chatgpt.com"
|| host.ends_with(".chatgpt.com")
}
}
pub const DEFAULT_LMSTUDIO_PORT: u16 = 1234;

View File

@@ -156,3 +156,132 @@ args = ["--format=text"]
})
);
}
#[test]
fn openai_provider_without_base_url_emits_sentry_auth_failures() {
let provider = ModelProviderInfo::create_openai_provider(/*base_url*/ None);
assert!(provider.should_emit_sentry_auth_failures());
}
#[test]
fn openai_provider_with_non_openai_base_url_skips_sentry_auth_failures() {
let provider =
ModelProviderInfo::create_openai_provider(Some("https://example.com/v1".to_string()));
assert!(!provider.should_emit_sentry_auth_failures());
}
#[test]
fn openai_provider_with_openai_subdomain_base_url_emits_sentry_auth_failures() {
let provider = ModelProviderInfo::create_openai_provider(Some(
"https://api-staging.openai.com/v1".to_string(),
));
assert!(provider.should_emit_sentry_auth_failures());
}
#[test]
fn openai_provider_with_chatgpt_subdomain_base_url_emits_sentry_auth_failures() {
let provider =
ModelProviderInfo::create_openai_provider(Some("https://ab.chatgpt.com/v1".to_string()));
assert!(provider.should_emit_sentry_auth_failures());
}
#[test]
fn custom_openai_provider_with_env_key_emits_sentry_auth_failures() {
let provider: ModelProviderInfo = toml::from_str(
r#"
name = "OpenAI Staging"
base_url = "https://api.openai.com/v1"
env_key = "OPENAI_API_KEY"
wire_api = "responses"
"#,
)
.expect("custom openai provider should deserialize");
assert!(provider.should_emit_sentry_auth_failures());
}
#[test]
fn custom_openai_provider_with_command_auth_emits_sentry_auth_failures() {
let provider: ModelProviderInfo = toml::from_str(
r#"
name = "OpenAI Staging"
base_url = "https://api.openai.com/v1"
wire_api = "responses"
[auth]
command = "print-token"
"#,
)
.expect("custom openai provider should deserialize");
assert!(provider.should_emit_sentry_auth_failures());
}
#[test]
fn custom_openai_provider_with_static_bearer_token_emits_sentry_auth_failures() {
let provider: ModelProviderInfo = toml::from_str(
r#"
name = "OpenAI Staging"
base_url = "https://api.openai.com/v1"
wire_api = "responses"
experimental_bearer_token = "token"
"#,
)
.expect("custom openai provider should deserialize");
assert!(provider.should_emit_sentry_auth_failures());
}
#[test]
fn custom_openai_provider_with_http_authorization_header_emits_sentry_auth_failures() {
let provider: ModelProviderInfo = toml::from_str(
r#"
name = "OpenAI Staging"
base_url = "https://api.openai.com/v1"
wire_api = "responses"
[http_headers]
Authorization = "Bearer token"
"#,
)
.expect("custom openai provider should deserialize");
assert!(provider.should_emit_sentry_auth_failures());
}
#[test]
fn custom_openai_provider_with_env_authorization_header_emits_sentry_auth_failures() {
let provider: ModelProviderInfo = toml::from_str(
r#"
name = "OpenAI Staging"
base_url = "https://api.openai.com/v1"
wire_api = "responses"
[env_http_headers]
authorization = "OPENAI_AUTH_HEADER"
"#,
)
.expect("custom openai provider should deserialize");
assert!(provider.should_emit_sentry_auth_failures());
}
#[test]
fn renamed_openai_provider_still_emits_sentry_auth_failures() {
let mut provider = ModelProviderInfo::create_openai_provider(/*base_url*/ None);
provider.name = "Staging".to_string();
assert!(provider.should_emit_sentry_auth_failures());
}
#[test]
fn oss_provider_skips_sentry_auth_failures() {
let provider =
create_oss_provider_with_base_url("https://api.openai.com/v1", WireApi::Responses);
assert!(!provider.should_emit_sentry_auth_failures());
}

View File

@@ -114,25 +114,23 @@ impl RequestTelemetry for ModelsRequestTelemetry {
auth.error_code = response_debug.auth_error_code.as_deref(),
auth.mode = self.auth_mode.as_deref(),
);
emit_feedback_request_tags_with_auth_env(
&FeedbackRequestTags {
endpoint: MODELS_ENDPOINT,
auth_header_attached: self.auth_header_attached,
auth_header_name: self.auth_header_name,
auth_mode: self.auth_mode.as_deref(),
auth_retry_after_unauthorized: None,
auth_recovery_mode: None,
auth_recovery_phase: None,
auth_connection_reused: None,
auth_request_id: response_debug.request_id.as_deref(),
auth_cf_ray: response_debug.cf_ray.as_deref(),
auth_error: response_debug.auth_error.as_deref(),
auth_error_code: response_debug.auth_error_code.as_deref(),
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
},
&self.auth_env,
);
let feedback_tags = FeedbackRequestTags {
endpoint: MODELS_ENDPOINT,
auth_header_attached: self.auth_header_attached,
auth_header_name: self.auth_header_name,
auth_mode: self.auth_mode.as_deref(),
auth_retry_after_unauthorized: None,
auth_recovery_mode: None,
auth_recovery_phase: None,
auth_connection_reused: None,
auth_request_id: response_debug.request_id.as_deref(),
auth_cf_ray: response_debug.cf_ray.as_deref(),
auth_error: response_debug.auth_error.as_deref(),
auth_error_code: response_debug.auth_error_code.as_deref(),
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
};
emit_feedback_request_tags_with_auth_env(&feedback_tags, &self.auth_env);
}
}

View File

@@ -14,6 +14,7 @@ use http::HeaderMap;
use http::StatusCode;
use pretty_assertions::assert_eq;
use serde_json::json;
use serial_test::serial;
use std::collections::BTreeMap;
use std::num::NonZeroU64;
use std::sync::Arc;
@@ -215,6 +216,7 @@ impl Visit for TagCollectorVisitor {
#[derive(Clone)]
struct TagCollectorLayer {
target: &'static str,
tags: Arc<Mutex<BTreeMap<String, String>>>,
}
@@ -223,7 +225,7 @@ where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
if event.metadata().target() != "feedback_tags" {
if event.metadata().target() != self.target {
return;
}
let mut visitor = TagCollectorVisitor::default();
@@ -232,6 +234,26 @@ where
}
}
type AuthFailureReportCollector = Arc<Mutex<Vec<BTreeMap<String, String>>>>;
fn install_auth_failure_report_collector() -> (
AuthFailureReportCollector,
crate::auth::AuthFailureReporterGuard,
) {
let reported = Arc::new(Mutex::new(Vec::new()));
let guard = crate::auth::set_auth_failure_reporter({
let reported = Arc::clone(&reported);
Arc::new(move |fields| {
reported
.lock()
.expect("report collector poisoned")
.push(fields);
true
})
});
(reported, guard)
}
#[tokio::test]
async fn get_model_info_tracks_fallback_usage() {
let codex_home = tempdir().expect("temp dir");
@@ -741,7 +763,10 @@ async fn refresh_available_models_skips_network_without_chatgpt_auth() {
fn models_request_telemetry_emits_auth_env_feedback_tags_on_failure() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.with(TagCollectorLayer {
target: "feedback_tags",
tags: tags.clone(),
})
.set_default();
let telemetry = ModelsRequestTelemetry {
@@ -835,6 +860,53 @@ fn models_request_telemetry_emits_auth_env_feedback_tags_on_failure() {
);
}
#[test]
#[serial(auth_failure_reporter)]
fn models_request_telemetry_does_not_emit_sentry_auth_failure_event_on_401() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
let telemetry = ModelsRequestTelemetry {
auth_mode: Some(TelemetryAuthMode::Chatgpt.to_string()),
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_env: crate::auth_env_telemetry::AuthEnvTelemetry {
openai_api_key_env_present: false,
codex_api_key_env_present: false,
codex_api_key_env_enabled: false,
provider_env_key_name: Some("configured".to_string()),
provider_env_key_present: Some(false),
refresh_token_url_override_present: false,
},
};
let mut headers = HeaderMap::new();
headers.insert("x-request-id", "req-models-401".parse().unwrap());
headers.insert("cf-ray", "ray-models-401".parse().unwrap());
headers.insert(
"x-openai-authorization-error",
"missing_authorization_header".parse().unwrap(),
);
headers.insert(
"x-error-json",
base64::engine::general_purpose::STANDARD
.encode(r#"{"error":{"code":"token_expired"}}"#)
.parse()
.unwrap(),
);
telemetry.on_request(
/*attempt*/ 1,
Some(StatusCode::UNAUTHORIZED),
Some(&TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://example.test/models".to_string()),
headers: Some(headers),
body: Some("plain text error".to_string()),
}),
Duration::from_millis(17),
);
assert!(reported.lock().unwrap().is_empty());
}
#[test]
fn build_available_models_picks_default_after_hiding_hidden_models() {
let codex_home = tempdir().expect("temp dir");

View File

@@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
@@ -11,7 +12,6 @@ use crate::parse_command::shlex_join;
const INITIAL_DELAY_MS: u64 = 200;
const BACKOFF_FACTOR: f64 = 2.0;
/// Emit structured feedback metadata as key/value pairs.
///
/// This logs a tracing event with `target: "feedback_tags"`. If
@@ -201,6 +201,105 @@ pub(crate) fn emit_feedback_auth_recovery_tags(
);
}
pub(crate) fn emit_sentry_auth_failure_event_with_auth_env(
tags: &FeedbackRequestTags<'_>,
auth_env: &AuthEnvTelemetry,
) {
let mut fields = BTreeMap::from([
("report_kind".to_string(), "auth_failure_auto".to_string()),
("endpoint".to_string(), tags.endpoint.to_string()),
(
"auth_header_attached".to_string(),
tags.auth_header_attached.to_string(),
),
(
"auth_retry_after_unauthorized".to_string(),
tags.auth_retry_after_unauthorized
.unwrap_or(false)
.to_string(),
),
(
"auth_env_openai_api_key_present".to_string(),
auth_env.openai_api_key_env_present.to_string(),
),
(
"auth_env_codex_api_key_present".to_string(),
auth_env.codex_api_key_env_present.to_string(),
),
(
"auth_env_codex_api_key_enabled".to_string(),
auth_env.codex_api_key_env_enabled.to_string(),
),
(
"auth_env_refresh_token_url_override_present".to_string(),
auth_env.refresh_token_url_override_present.to_string(),
),
(
"cli_version".to_string(),
env!("CARGO_PKG_VERSION").to_string(),
),
]);
if let Some(auth_header_name) = tags.auth_header_name {
fields.insert("auth_header_name".to_string(), auth_header_name.to_string());
}
if let Some(auth_mode) = tags.auth_mode {
fields.insert("auth_mode".to_string(), auth_mode.to_string());
}
if let Some(auth_recovery_mode) = tags.auth_recovery_mode {
fields.insert(
"auth_recovery_mode".to_string(),
auth_recovery_mode.to_string(),
);
}
if let Some(auth_recovery_phase) = tags.auth_recovery_phase {
fields.insert(
"auth_recovery_phase".to_string(),
auth_recovery_phase.to_string(),
);
}
if let Some(auth_connection_reused) = tags.auth_connection_reused {
fields.insert(
"auth_connection_reused".to_string(),
auth_connection_reused.to_string(),
);
}
if let Some(auth_request_id) = tags.auth_request_id {
fields.insert("auth_request_id".to_string(), auth_request_id.to_string());
}
if let Some(auth_cf_ray) = tags.auth_cf_ray {
fields.insert("auth_cf_ray".to_string(), auth_cf_ray.to_string());
}
if let Some(auth_error_code) = tags.auth_error_code {
fields.insert("auth_error_code".to_string(), auth_error_code.to_string());
}
if let Some(auth_recovery_followup_success) = tags.auth_recovery_followup_success {
fields.insert(
"auth_recovery_followup_success".to_string(),
auth_recovery_followup_success.to_string(),
);
}
if let Some(auth_recovery_followup_status) = tags.auth_recovery_followup_status {
fields.insert(
"auth_recovery_followup_status".to_string(),
auth_recovery_followup_status.to_string(),
);
}
if let Some(auth_env_provider_key_name) = auth_env.provider_env_key_name.as_deref() {
fields.insert(
"auth_env_provider_key_name".to_string(),
auth_env_provider_key_name.to_string(),
);
}
if let Some(auth_env_provider_key_present) = auth_env.provider_env_key_present {
fields.insert(
"auth_env_provider_key_present".to_string(),
auth_env_provider_key_present.to_string(),
);
}
let _ = crate::auth::report_auth_failure(fields);
}
pub fn backoff(attempt: u64) -> Duration {
let exp = BACKOFF_FACTOR.powi(attempt.saturating_sub(1) as i32);
let base = (INITIAL_DELAY_MS as f64 * exp) as u64;

View File

@@ -1,5 +1,6 @@
use super::*;
use crate::auth_env_telemetry::AuthEnvTelemetry;
use serial_test::serial;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::Mutex;
@@ -44,6 +45,7 @@ impl Visit for TagCollectorVisitor {
#[derive(Clone)]
struct TagCollectorLayer {
target: &'static str,
tags: Arc<Mutex<BTreeMap<String, String>>>,
event_count: Arc<Mutex<usize>>,
}
@@ -53,7 +55,7 @@ where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
if event.metadata().target() != "feedback_tags" {
if event.metadata().target() != self.target {
return;
}
let mut visitor = TagCollectorVisitor::default();
@@ -63,12 +65,33 @@ where
}
}
type AuthFailureReportCollector = Arc<Mutex<Vec<BTreeMap<String, String>>>>;
fn install_auth_failure_report_collector() -> (
AuthFailureReportCollector,
crate::auth::AuthFailureReporterGuard,
) {
let reported = Arc::new(Mutex::new(Vec::new()));
let guard = crate::auth::set_auth_failure_reporter({
let reported = Arc::clone(&reported);
Arc::new(move |fields| {
reported
.lock()
.expect("report collector poisoned")
.push(fields);
true
})
});
(reported, guard)
}
#[test]
fn emit_feedback_request_tags_records_sentry_feedback_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let event_count = Arc::new(Mutex::new(0));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer {
target: "feedback_tags",
tags: tags.clone(),
event_count: event_count.clone(),
})
@@ -172,6 +195,7 @@ fn emit_feedback_auth_recovery_tags_preserves_401_specific_fields() {
let event_count = Arc::new(Mutex::new(0));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer {
target: "feedback_tags",
tags: tags.clone(),
event_count: event_count.clone(),
})
@@ -213,6 +237,7 @@ fn emit_feedback_auth_recovery_tags_clears_stale_401_fields() {
let event_count = Arc::new(Mutex::new(0));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer {
target: "feedback_tags",
tags: tags.clone(),
event_count: event_count.clone(),
})
@@ -260,6 +285,7 @@ fn emit_feedback_request_tags_preserves_latest_auth_fields_after_unauthorized()
let event_count = Arc::new(Mutex::new(0));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer {
target: "feedback_tags",
tags: tags.clone(),
event_count: event_count.clone(),
})
@@ -307,12 +333,188 @@ fn emit_feedback_request_tags_preserves_latest_auth_fields_after_unauthorized()
assert_eq!(*event_count.lock().unwrap(), 1);
}
#[test]
#[serial(auth_failure_reporter)]
fn emit_sentry_auth_failure_event_keeps_post_refresh_retry_reports_when_override_present() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
let auth_env = AuthEnvTelemetry {
openai_api_key_env_present: true,
codex_api_key_env_present: false,
codex_api_key_env_enabled: true,
provider_env_key_name: Some("configured".to_string()),
provider_env_key_present: Some(true),
refresh_token_url_override_present: true,
};
emit_sentry_auth_failure_event_with_auth_env(
&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(false),
auth_recovery_mode: Some("managed"),
auth_recovery_phase: Some("refresh_token"),
auth_connection_reused: Some(false),
auth_request_id: Some("req-123"),
auth_cf_ray: Some("ray-123"),
auth_error: Some("missing_authorization_header"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
},
&auth_env,
);
let reported = reported.lock().expect("report collector poisoned");
assert_eq!(reported.len(), 1);
assert_eq!(
reported[0].get("endpoint").map(String::as_str),
Some("/responses")
);
assert_eq!(
reported[0].get("auth_recovery_phase").map(String::as_str),
Some("refresh_token")
);
assert_eq!(
reported[0].get("auth_request_id").map(String::as_str),
Some("req-123")
);
}
#[test]
#[serial(auth_failure_reporter)]
fn emit_sentry_auth_failure_event_keeps_non_refresh_reports_when_override_present() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
let auth_env = AuthEnvTelemetry {
openai_api_key_env_present: true,
codex_api_key_env_present: false,
codex_api_key_env_enabled: true,
provider_env_key_name: Some("configured".to_string()),
provider_env_key_present: Some(true),
refresh_token_url_override_present: true,
};
emit_sentry_auth_failure_event_with_auth_env(
&FeedbackRequestTags {
endpoint: "/responses/compact",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("openai_api_key"),
auth_retry_after_unauthorized: Some(false),
auth_recovery_mode: None,
auth_recovery_phase: None,
auth_connection_reused: None,
auth_request_id: Some("req-non-refresh"),
auth_cf_ray: Some("ray-non-refresh"),
auth_error: Some("missing_authorization_header"),
auth_error_code: Some("invalid_api_key"),
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
},
&auth_env,
);
let reported = reported.lock().expect("report collector poisoned");
assert_eq!(reported.len(), 1);
assert_eq!(
reported[0].get("endpoint").map(String::as_str),
Some("/responses/compact")
);
assert_eq!(
reported[0].get("auth_request_id").map(String::as_str),
Some("req-non-refresh")
);
}
#[test]
#[serial(auth_failure_reporter)]
fn emit_sentry_auth_failure_event_omits_missing_followup_metadata() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
emit_sentry_auth_failure_event_with_auth_env(
&FeedbackRequestTags {
endpoint: "/responses/compact",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(false),
auth_recovery_mode: None,
auth_recovery_phase: None,
auth_connection_reused: None,
auth_request_id: Some("req-no-followup"),
auth_cf_ray: Some("ray-no-followup"),
auth_error: Some("plain body"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
},
&AuthEnvTelemetry {
openai_api_key_env_present: false,
codex_api_key_env_present: false,
codex_api_key_env_enabled: true,
provider_env_key_name: None,
provider_env_key_present: None,
refresh_token_url_override_present: false,
},
);
let reported = reported.lock().expect("report collector poisoned");
assert_eq!(reported.len(), 1);
assert_eq!(reported[0].get("auth_recovery_followup_success"), None);
assert_eq!(reported[0].get("auth_recovery_followup_status"), None);
}
#[test]
#[serial(auth_failure_reporter)]
fn emit_sentry_auth_failure_event_uses_direct_reporter_when_available() {
let (reported, _reporter_guard) = install_auth_failure_report_collector();
emit_sentry_auth_failure_event_with_auth_env(
&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(true),
auth_recovery_mode: Some("managed"),
auth_recovery_phase: Some("refresh_token"),
auth_connection_reused: Some(false),
auth_request_id: Some("req-direct"),
auth_cf_ray: Some("ray-direct"),
auth_error: Some("plain body"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: Some(false),
auth_recovery_followup_status: Some(401),
},
&AuthEnvTelemetry {
openai_api_key_env_present: false,
codex_api_key_env_present: false,
codex_api_key_env_enabled: true,
provider_env_key_name: Some("OPENAI_API_KEY".to_string()),
provider_env_key_present: Some(false),
refresh_token_url_override_present: false,
},
);
let reported = reported.lock().expect("report collector poisoned");
assert_eq!(reported.len(), 1);
assert_eq!(
reported[0].get("auth_request_id").map(String::as_str),
Some("req-direct")
);
assert_eq!(reported[0].get("auth_error").map(String::as_str), None);
}
#[test]
fn emit_feedback_request_tags_preserves_auth_env_fields_for_legacy_emitters() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let event_count = Arc::new(Mutex::new(0));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer {
target: "feedback_tags",
tags: tags.clone(),
event_count: event_count.clone(),
})

View File

@@ -7,6 +7,7 @@ use std::io::{self};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::OnceLock;
use std::time::Duration;
use anyhow::Result;
@@ -28,9 +29,25 @@ pub mod feedback_diagnostics;
const DEFAULT_MAX_BYTES: usize = 4 * 1024 * 1024; // 4 MiB
const SENTRY_DSN: &str =
"https://ae32ed50620d7a7792c1ce5df38b3e3e@o33249.ingest.us.sentry.io/4510195390611458";
const SENTRY_DSN_OVERRIDE_ENV_VAR: &str = "CODEX_SENTRY_DSN_OVERRIDE";
const UPLOAD_TIMEOUT_SECS: u64 = 10;
const FEEDBACK_TAGS_TARGET: &str = "feedback_tags";
const AUTH_FAILURE_REPORT_KIND: &str = "auth_failure_auto";
const MAX_FEEDBACK_TAGS: usize = 64;
const AUTH_FAILURE_UPLOAD_QUEUE_CAPACITY: usize = 32;
const FALLBACK_AUTH_FAILURE_UPLOAD_QUEUE_CAPACITY: usize = 32;
#[derive(Clone)]
struct AuthFailureUploadTask {
tags: BTreeMap<String, String>,
dsn_override: Option<String>,
}
static AUTH_FAILURE_UPLOAD_QUEUE: OnceLock<std::sync::mpsc::SyncSender<AuthFailureUploadTask>> =
OnceLock::new();
static FALLBACK_AUTH_FAILURE_UPLOAD_QUEUE: OnceLock<
std::sync::mpsc::SyncSender<AuthFailureUploadTask>,
> = OnceLock::new();
#[derive(Clone)]
pub struct CodexFeedback {
@@ -212,6 +229,15 @@ pub struct FeedbackSnapshot {
pub thread_id: String,
}
struct FeedbackUploadParams<'a> {
classification: &'a str,
reason: Option<&'a str>,
include_logs: bool,
extra_attachment_paths: &'a [PathBuf],
session_source: Option<SessionSource>,
logs_override: Option<Vec<u8>>,
}
impl FeedbackSnapshot {
pub(crate) fn as_bytes(&self) -> &[u8] {
&self.bytes
@@ -252,36 +278,45 @@ impl FeedbackSnapshot {
session_source: Option<SessionSource>,
logs_override: Option<Vec<u8>>,
) -> Result<()> {
use std::collections::BTreeMap;
use std::str::FromStr;
use std::sync::Arc;
self.upload_feedback_with_dsn_override(
FeedbackUploadParams {
classification,
reason,
include_logs,
extra_attachment_paths,
session_source,
logs_override,
},
std::env::var(SENTRY_DSN_OVERRIDE_ENV_VAR).ok().as_deref(),
)
}
use sentry::Client;
use sentry::ClientOptions;
fn upload_feedback_with_dsn_override(
&self,
params: FeedbackUploadParams<'_>,
dsn_override: Option<&str>,
) -> Result<()> {
use sentry::protocol::Envelope;
use sentry::protocol::EnvelopeItem;
use sentry::protocol::Event;
use sentry::protocol::Level;
use sentry::transports::DefaultTransportFactory;
use sentry::types::Dsn;
use std::collections::BTreeMap;
// Build Sentry client
let client = Client::from_config(ClientOptions {
dsn: Some(Dsn::from_str(SENTRY_DSN).map_err(|e| anyhow!("invalid DSN: {e}"))?),
transport: Some(Arc::new(DefaultTransportFactory {})),
..Default::default()
});
let client = build_sentry_client_with_dsn_override(dsn_override)?;
let cli_version = env!("CARGO_PKG_VERSION");
let mut tags = BTreeMap::from([
(String::from("thread_id"), self.thread_id.to_string()),
(String::from("classification"), classification.to_string()),
(
String::from("classification"),
params.classification.to_string(),
),
(String::from("cli_version"), cli_version.to_string()),
]);
if let Some(source) = session_source.as_ref() {
if let Some(source) = params.session_source.as_ref() {
tags.insert(String::from("session_source"), source.to_string());
}
if let Some(r) = reason {
if let Some(r) = params.reason {
tags.insert(String::from("reason"), r.to_string());
}
@@ -301,7 +336,7 @@ impl FeedbackSnapshot {
}
}
let level = match classification {
let level = match params.classification {
"bug" | "bad_result" | "safety_check" => Level::Error,
_ => Level::Info,
};
@@ -309,7 +344,7 @@ impl FeedbackSnapshot {
let mut envelope = Envelope::new();
let title = format!(
"[{}]: Codex session {}",
display_classification(classification),
display_classification(params.classification),
self.thread_id
);
@@ -319,7 +354,7 @@ impl FeedbackSnapshot {
tags,
..Default::default()
};
if let Some(r) = reason {
if let Some(r) = params.reason {
use sentry::protocol::Exception;
use sentry::protocol::Values;
@@ -331,9 +366,11 @@ impl FeedbackSnapshot {
}
envelope.add_item(EnvelopeItem::Event(event));
for attachment in
self.feedback_attachments(include_logs, extra_attachment_paths, logs_override)
{
for attachment in self.feedback_attachments(
params.include_logs,
params.extra_attachment_paths,
params.logs_override,
) {
envelope.add_item(EnvelopeItem::Attachment(attachment));
}
@@ -398,6 +435,23 @@ impl FeedbackSnapshot {
}
}
fn build_sentry_client_with_dsn_override(dsn_override: Option<&str>) -> Result<sentry::Client> {
use std::str::FromStr;
use std::sync::Arc;
use sentry::Client;
use sentry::ClientOptions;
use sentry::transports::DefaultTransportFactory;
use sentry::types::Dsn;
let dsn = dsn_override.unwrap_or(SENTRY_DSN);
Ok(Client::from_config(ClientOptions {
dsn: Some(Dsn::from_str(dsn).map_err(|e| anyhow!("invalid DSN: {e}"))?),
transport: Some(Arc::new(DefaultTransportFactory {})),
..Default::default()
}))
}
fn display_classification(classification: &str) -> String {
match classification {
"bug" => "Bug".to_string(),
@@ -477,10 +531,184 @@ impl Visit for FeedbackTagsVisitor {
}
}
fn finalize_auth_failure_tags(mut tags: BTreeMap<String, String>) -> BTreeMap<String, String> {
tags.retain(|_, value| !value.is_empty());
tags.insert(
String::from("report_kind"),
AUTH_FAILURE_REPORT_KIND.to_string(),
);
tags
}
fn auth_failure_grouping_key(tags: &BTreeMap<String, String>) -> Vec<String> {
let endpoint = tags
.get("endpoint")
.cloned()
.unwrap_or_else(|| "unknown".to_string());
let error_code = tags
.get("auth_error_code")
.filter(|value| !value.is_empty())
.cloned()
.unwrap_or_else(|| "unknown".to_string());
let auth_header_attached = tags
.get("auth_header_attached")
.cloned()
.unwrap_or_else(|| "unknown".to_string());
vec![
"codex".to_string(),
AUTH_FAILURE_REPORT_KIND.to_string(),
endpoint,
error_code,
auth_header_attached,
]
}
fn build_auth_failure_event(tags: BTreeMap<String, String>) -> sentry::protocol::Event<'static> {
use std::borrow::Cow;
use sentry::protocol::Event;
use sentry::protocol::Level;
let endpoint = tags
.get("endpoint")
.cloned()
.unwrap_or_else(|| "unknown".to_string());
let fingerprint = auth_failure_grouping_key(&tags)
.into_iter()
.map(Cow::Owned)
.collect::<Vec<Cow<'static, str>>>();
Event {
level: Level::Error,
message: Some(format!("Codex client auth failure on {endpoint}")),
fingerprint: fingerprint.into(),
tags,
..Default::default()
}
}
pub fn upload_auth_failure_event_tags(tags: BTreeMap<String, String>) -> Result<()> {
upload_auth_failure_event_with_dsn_override(
finalize_auth_failure_tags(tags),
std::env::var(SENTRY_DSN_OVERRIDE_ENV_VAR).ok().as_deref(),
)
}
pub fn enqueue_auth_failure_event_tags(tags: BTreeMap<String, String>) -> bool {
let task = AuthFailureUploadTask {
tags: finalize_auth_failure_tags(tags),
dsn_override: std::env::var(SENTRY_DSN_OVERRIDE_ENV_VAR).ok(),
};
enqueue_auth_failure_upload(auth_failure_upload_queue_sender(), "primary", task.clone())
|| enqueue_auth_failure_upload(
fallback_auth_failure_upload_queue_sender(),
"fallback",
task,
)
}
#[cfg(test)]
fn enqueue_auth_failure_event_with_dsn_override(
tags: BTreeMap<String, String>,
dsn_override: Option<String>,
) -> bool {
enqueue_auth_failure_upload(
auth_failure_upload_queue_sender(),
"primary",
AuthFailureUploadTask {
tags: finalize_auth_failure_tags(tags),
dsn_override,
},
)
}
fn enqueue_auth_failure_upload(
sender: &std::sync::mpsc::SyncSender<AuthFailureUploadTask>,
lane: &'static str,
task: AuthFailureUploadTask,
) -> bool {
match sender.try_send(task) {
Ok(()) => true,
Err(err) => {
tracing::warn!(error = %err, lane, "failed to enqueue auth failure event");
false
}
}
}
fn auth_failure_upload_queue_sender() -> &'static std::sync::mpsc::SyncSender<AuthFailureUploadTask>
{
auth_failure_upload_queue_sender_for(
&AUTH_FAILURE_UPLOAD_QUEUE,
AUTH_FAILURE_UPLOAD_QUEUE_CAPACITY,
"primary",
)
}
fn fallback_auth_failure_upload_queue_sender()
-> &'static std::sync::mpsc::SyncSender<AuthFailureUploadTask> {
auth_failure_upload_queue_sender_for(
&FALLBACK_AUTH_FAILURE_UPLOAD_QUEUE,
FALLBACK_AUTH_FAILURE_UPLOAD_QUEUE_CAPACITY,
"fallback",
)
}
fn auth_failure_upload_queue_sender_for(
slot: &'static OnceLock<std::sync::mpsc::SyncSender<AuthFailureUploadTask>>,
capacity: usize,
lane: &'static str,
) -> &'static std::sync::mpsc::SyncSender<AuthFailureUploadTask> {
slot.get_or_init(|| {
let (tx, rx) = std::sync::mpsc::sync_channel::<AuthFailureUploadTask>(capacity);
std::thread::spawn(move || {
while let Ok(task) = rx.recv() {
if let Err(err) = upload_auth_failure_event_with_dsn_override(
task.tags,
task.dsn_override.as_deref(),
) {
tracing::warn!(error = %err, lane, "failed to upload auth failure event");
}
}
});
tx
})
}
fn upload_auth_failure_event_with_dsn_override(
tags: BTreeMap<String, String>,
dsn_override: Option<&str>,
) -> Result<()> {
use sentry::protocol::Envelope;
use sentry::protocol::EnvelopeItem;
let client = build_sentry_client_with_dsn_override(dsn_override)?;
let mut envelope = Envelope::new();
envelope.add_item(EnvelopeItem::Event(build_auth_failure_event(tags)));
client.send_envelope(envelope);
ensure_auth_failure_event_flushed(
client.flush(Some(Duration::from_secs(UPLOAD_TIMEOUT_SECS))),
)?;
Ok(())
}
fn ensure_auth_failure_event_flushed(flushed: bool) -> Result<()> {
if flushed {
Ok(())
} else {
Err(anyhow!("timed out flushing auth failure event"))
}
}
#[cfg(test)]
mod tests {
use std::ffi::OsStr;
use std::fs;
use std::io::Read;
use std::io::Write;
use std::net::TcpListener;
use std::sync::mpsc;
use std::thread;
use std::time::Duration as StdDuration;
use super::*;
use feedback_diagnostics::FeedbackDiagnostic;
@@ -515,6 +743,309 @@ mod tests {
pretty_assertions::assert_eq!(snap.tags.get("cached").map(String::as_str), Some("true"));
}
#[test]
fn finalize_auth_failure_tags_adds_report_kind_and_drops_empty_values() {
let tags = finalize_auth_failure_tags(BTreeMap::from([
(String::from("endpoint"), String::from("/responses")),
(String::from("auth_request_id"), String::new()),
]));
assert_eq!(
tags.get("report_kind").map(String::as_str),
Some(AUTH_FAILURE_REPORT_KIND)
);
assert_eq!(tags.get("endpoint").map(String::as_str), Some("/responses"));
assert!(!tags.contains_key("auth_request_id"));
}
#[test]
fn auth_failure_grouping_key_uses_endpoint_code_and_header_state() {
let grouping_key = auth_failure_grouping_key(&BTreeMap::from([
(String::from("endpoint"), String::from("/responses")),
(
String::from("auth_error_code"),
String::from("token_expired"),
),
(String::from("auth_header_attached"), String::from("true")),
]));
assert_eq!(
grouping_key,
vec![
"codex".to_string(),
AUTH_FAILURE_REPORT_KIND.to_string(),
"/responses".to_string(),
"token_expired".to_string(),
"true".to_string(),
]
);
}
#[test]
fn auth_failure_grouping_key_does_not_use_raw_error_text() {
let grouping_key = auth_failure_grouping_key(&BTreeMap::from([
(String::from("endpoint"), String::from("/responses")),
(
String::from("auth_error"),
String::from("request-specific plaintext body"),
),
(String::from("auth_header_attached"), String::from("true")),
]));
assert_eq!(
grouping_key,
vec![
"codex".to_string(),
AUTH_FAILURE_REPORT_KIND.to_string(),
"/responses".to_string(),
"unknown".to_string(),
"true".to_string(),
]
);
}
#[test]
fn build_auth_failure_event_sets_stable_message_and_tags() {
let event = build_auth_failure_event(BTreeMap::from([
(
String::from("report_kind"),
AUTH_FAILURE_REPORT_KIND.to_string(),
),
(String::from("endpoint"), String::from("/responses")),
(String::from("auth_header_attached"), String::from("true")),
]));
assert_eq!(
event.message.as_deref(),
Some("Codex client auth failure on /responses")
);
assert_eq!(
event.tags.get("report_kind").map(String::as_str),
Some(AUTH_FAILURE_REPORT_KIND)
);
assert_eq!(
event.tags.get("auth_header_attached").map(String::as_str),
Some("true")
);
}
#[test]
fn auth_failure_upload_posts_envelope_to_overridden_dsn() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind local listener");
listener
.set_nonblocking(false)
.expect("listener should stay blocking");
let addr = listener.local_addr().expect("local addr");
let (tx, rx) = mpsc::channel();
let server = thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("accept envelope request");
let mut buffer = Vec::new();
let mut headers_end = None;
while headers_end.is_none() {
let mut chunk = [0_u8; 4096];
let read = stream.read(&mut chunk).expect("read request headers");
buffer.extend_from_slice(&chunk[..read]);
headers_end = buffer.windows(4).position(|window| window == b"\r\n\r\n");
}
let headers_end = headers_end.expect("headers terminator should exist") + 4;
let headers = String::from_utf8_lossy(&buffer[..headers_end]);
let content_length = headers
.lines()
.find_map(|line| {
let (name, value) = line.split_once(':')?;
if name.eq_ignore_ascii_case("content-length") {
Some(value.trim().parse::<usize>().expect("content-length"))
} else {
None
}
})
.expect("content-length header");
while buffer.len() < headers_end + content_length {
let mut chunk = [0_u8; 4096];
let read = stream.read(&mut chunk).expect("read request body");
buffer.extend_from_slice(&chunk[..read]);
}
stream
.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
.expect("write response");
tx.send(buffer).expect("capture request");
});
let dsn = format!("http://public@127.0.0.1:{}/1", addr.port());
upload_auth_failure_event_with_dsn_override(
finalize_auth_failure_tags(BTreeMap::from([
(String::from("endpoint"), String::from("/oauth/token")),
(
String::from("auth_error_code"),
String::from("refresh_token_reused"),
),
(String::from("auth_header_attached"), String::from("true")),
])),
Some(&dsn),
)
.expect("upload auth failure event");
let request = rx
.recv_timeout(StdDuration::from_secs(5))
.expect("receive envelope request");
server.join().expect("server thread should exit");
let request_text = String::from_utf8_lossy(&request);
assert!(request_text.contains("POST /api/1/envelope/"));
assert!(request_text.contains("\"report_kind\":\"auth_failure_auto\""));
assert!(request_text.contains("\"endpoint\":\"/oauth/token\""));
assert!(request_text.contains("\"auth_error_code\":\"refresh_token_reused\""));
}
#[test]
fn enqueue_auth_failure_event_tags_posts_envelope_to_overridden_dsn() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind local listener");
listener
.set_nonblocking(false)
.expect("listener should stay blocking");
let addr = listener.local_addr().expect("local addr");
let (tx, rx) = mpsc::channel();
let server = thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("accept envelope request");
let mut buffer = Vec::new();
let mut headers_end = None;
while headers_end.is_none() {
let mut chunk = [0_u8; 4096];
let read = stream.read(&mut chunk).expect("read request headers");
buffer.extend_from_slice(&chunk[..read]);
headers_end = buffer.windows(4).position(|window| window == b"\r\n\r\n");
}
let headers_end = headers_end.expect("headers terminator should exist") + 4;
let headers = String::from_utf8_lossy(&buffer[..headers_end]);
let content_length = headers
.lines()
.find_map(|line| {
let (name, value) = line.split_once(':')?;
if name.eq_ignore_ascii_case("content-length") {
Some(value.trim().parse::<usize>().expect("content-length"))
} else {
None
}
})
.expect("content-length header");
while buffer.len() < headers_end + content_length {
let mut chunk = [0_u8; 4096];
let read = stream.read(&mut chunk).expect("read request body");
buffer.extend_from_slice(&chunk[..read]);
}
stream
.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
.expect("write response");
tx.send(buffer).expect("capture request");
});
enqueue_auth_failure_event_with_dsn_override(
BTreeMap::from([
(String::from("endpoint"), String::from("/oauth/token")),
(
String::from("auth_error_code"),
String::from("refresh_token_reused"),
),
(String::from("auth_header_attached"), String::from("false")),
]),
Some(format!("http://public@127.0.0.1:{}/1", addr.port())),
);
let request = rx
.recv_timeout(StdDuration::from_secs(5))
.expect("receive envelope request");
server.join().expect("server thread should exit");
let request_text = String::from_utf8_lossy(&request);
assert!(request_text.contains("POST /api/1/envelope/"));
assert!(request_text.contains("\"report_kind\":\"auth_failure_auto\""));
assert!(request_text.contains("\"endpoint\":\"/oauth/token\""));
assert!(request_text.contains("\"auth_header_attached\":\"false\""));
}
#[test]
fn upload_feedback_posts_envelope_to_overridden_dsn() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind local listener");
listener
.set_nonblocking(false)
.expect("listener should stay blocking");
let addr = listener.local_addr().expect("local addr");
let (tx, rx) = mpsc::channel();
let server = thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("accept envelope request");
let mut buffer = Vec::new();
let mut headers_end = None;
while headers_end.is_none() {
let mut chunk = [0_u8; 4096];
let read = stream.read(&mut chunk).expect("read request headers");
buffer.extend_from_slice(&chunk[..read]);
headers_end = buffer.windows(4).position(|window| window == b"\r\n\r\n");
}
let headers_end = headers_end.expect("headers terminator should exist") + 4;
let headers = String::from_utf8_lossy(&buffer[..headers_end]);
let content_length = headers
.lines()
.find_map(|line| {
let (name, value) = line.split_once(':')?;
if name.eq_ignore_ascii_case("content-length") {
Some(value.trim().parse::<usize>().expect("content-length"))
} else {
None
}
})
.expect("content-length header");
while buffer.len() < headers_end + content_length {
let mut chunk = [0_u8; 4096];
let read = stream.read(&mut chunk).expect("read request body");
buffer.extend_from_slice(&chunk[..read]);
}
stream
.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
.expect("write response");
tx.send(buffer).expect("capture request");
});
let snapshot = CodexFeedback::new().snapshot(/*session_id*/ None);
let dsn = format!("http://public@127.0.0.1:{}/1", addr.port());
snapshot
.upload_feedback_with_dsn_override(
FeedbackUploadParams {
classification: "idea",
reason: Some("manual"),
include_logs: false,
extra_attachment_paths: &[],
session_source: None,
logs_override: None,
},
Some(&dsn),
)
.expect("upload feedback");
let request = rx
.recv_timeout(StdDuration::from_secs(5))
.expect("receive envelope request");
server.join().expect("server thread should exit");
let request_text = String::from_utf8_lossy(&request);
assert!(request_text.contains("POST /api/1/envelope/"));
assert!(request_text.contains("\"classification\":\"idea\""));
assert!(request_text.contains("\"reason\":\"manual\""));
}
#[test]
fn ensure_auth_failure_event_flushed_rejects_timeouts() {
assert!(ensure_auth_failure_event_flushed(/*flushed*/ true).is_ok());
assert_eq!(
ensure_auth_failure_event_flushed(/*flushed*/ false)
.expect_err("flush timeout should fail")
.to_string(),
"timed out flushing auth failure event"
);
}
#[test]
fn feedback_attachments_gate_connectivity_diagnostics() {
let extra_filename = format!("codex-feedback-extra-{}.jsonl", ThreadId::new());

View File

@@ -12,7 +12,10 @@ use codex_protocol::config_types::ModelProviderAuthInfo;
use pretty_assertions::assert_eq;
use serde::Serialize;
use serde_json::json;
use serial_test::serial;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::Mutex;
use tempfile::TempDir;
use tempfile::tempdir;
@@ -789,3 +792,64 @@ fn missing_plan_type_maps_to_unknown() {
pretty_assertions::assert_eq!(auth.account_plan_type(), Some(AccountPlanType::Unknown));
}
#[test]
#[serial(auth_failure_reporter)]
fn nested_auth_failure_reporter_guard_restores_previous_reporter() {
let outer_reported = Arc::new(Mutex::new(Vec::new()));
let inner_reported = Arc::new(Mutex::new(Vec::new()));
let outer_guard = set_auth_failure_reporter({
let outer_reported = Arc::clone(&outer_reported);
Arc::new(move |fields| {
outer_reported
.lock()
.expect("outer collector poisoned")
.push(fields);
true
})
});
{
let _inner_guard = set_auth_failure_reporter({
let inner_reported = Arc::clone(&inner_reported);
Arc::new(move |fields| {
inner_reported
.lock()
.expect("inner collector poisoned")
.push(fields);
true
})
});
assert!(report_auth_failure(BTreeMap::from([(
"endpoint".to_string(),
"/responses".to_string(),
)])));
}
assert!(report_auth_failure(BTreeMap::from([(
"endpoint".to_string(),
"/models".to_string(),
)])));
drop(outer_guard);
assert_eq!(
inner_reported
.lock()
.expect("inner collector poisoned")
.clone(),
vec![BTreeMap::from([(
"endpoint".to_string(),
"/responses".to_string(),
)])]
);
assert_eq!(
outer_reported
.lock()
.expect("outer collector poisoned")
.clone(),
vec![BTreeMap::from([(
"endpoint".to_string(),
"/models".to_string(),
)])]
);
}

View File

@@ -1,10 +1,12 @@
use async_trait::async_trait;
use chrono::Utc;
use once_cell::sync::Lazy;
use reqwest::StatusCode;
use serde::Deserialize;
use serde::Serialize;
#[cfg(test)]
use serial_test::serial;
use std::cell::Cell;
use std::collections::BTreeMap;
use std::env;
use std::fmt::Debug;
use std::path::Path;
@@ -83,6 +85,15 @@ const REFRESH_TOKEN_UNKNOWN_MESSAGE: &str =
const REFRESH_TOKEN_ACCOUNT_MISMATCH_MESSAGE: &str = "Your access token could not be refreshed because you have since logged out or signed in to another account. Please sign in again.";
const REFRESH_TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
pub const REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR: &str = "CODEX_REFRESH_TOKEN_URL_OVERRIDE";
type AuthFailureReporter = Arc<dyn Fn(BTreeMap<String, String>) -> bool + Send + Sync + 'static>;
static AUTH_FAILURE_REPORTER: Lazy<RwLock<Option<AuthFailureReporter>>> =
Lazy::new(|| RwLock::new(None));
#[cfg(test)]
static AUTH_FAILURE_REPORTER_TEST_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
#[cfg(test)]
thread_local! {
static AUTH_FAILURE_REPORTER_TEST_DEPTH: Cell<u32> = const { Cell::new(0) };
}
#[derive(Debug, Error)]
pub enum RefreshTokenError {
@@ -92,6 +103,60 @@ pub enum RefreshTokenError {
Transient(#[from] std::io::Error),
}
pub struct AuthFailureReporterGuard {
previous: Option<AuthFailureReporter>,
#[cfg(test)]
_test_lock: Option<std::sync::MutexGuard<'static, ()>>,
}
impl Drop for AuthFailureReporterGuard {
fn drop(&mut self) {
if let Ok(mut reporter) = AUTH_FAILURE_REPORTER.write() {
*reporter = self.previous.take();
}
#[cfg(test)]
AUTH_FAILURE_REPORTER_TEST_DEPTH.with(|depth| {
depth.set(depth.get().saturating_sub(1));
});
}
}
pub fn set_auth_failure_reporter(reporter: AuthFailureReporter) -> AuthFailureReporterGuard {
#[cfg(test)]
let test_lock = AUTH_FAILURE_REPORTER_TEST_DEPTH.with(|depth| {
if depth.get() == 0 {
depth.set(1);
Some(
AUTH_FAILURE_REPORTER_TEST_LOCK
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner),
)
} else {
depth.set(depth.get() + 1);
None
}
});
let mut previous = None;
if let Ok(mut current) = AUTH_FAILURE_REPORTER.write() {
previous = current.replace(reporter);
}
AuthFailureReporterGuard {
previous,
#[cfg(test)]
_test_lock: test_lock,
}
}
pub fn report_auth_failure(fields: BTreeMap<String, String>) -> bool {
if let Ok(reporter) = AUTH_FAILURE_REPORTER.read()
&& let Some(reporter) = reporter.as_ref()
{
reporter(fields)
} else {
false
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExternalAuthTokens {
pub access_token: String,

View File

@@ -18,6 +18,7 @@ pub use server::run_login_server;
pub use auth::AuthConfig;
pub use auth::AuthCredentialsStoreMode;
pub use auth::AuthDotJson;
pub use auth::AuthFailureReporterGuard;
pub use auth::AuthManager;
pub use auth::CLIENT_ID;
pub use auth::CODEX_API_KEY_ENV_VAR;
@@ -37,6 +38,8 @@ pub use auth::load_auth_dot_json;
pub use auth::login_with_api_key;
pub use auth::logout;
pub use auth::read_openai_api_key_from_env;
pub use auth::report_auth_failure;
pub use auth::save_auth;
pub use auth::set_auth_failure_reporter;
pub use codex_app_server_protocol::AuthMode;
pub use token_data::TokenData;

View File

@@ -21,6 +21,7 @@ codex-arg0 = { workspace = true }
codex-core = { workspace = true }
codex-exec-server = { workspace = true }
codex-features = { workspace = true }
codex-feedback = { workspace = true }
codex-protocol = { workspace = true }
codex-shell-command = { workspace = true }
codex-utils-cli = { workspace = true }

View File

@@ -8,6 +8,7 @@ use std::sync::Arc;
use codex_arg0::Arg0DispatchPaths;
use codex_core::config::Config;
use codex_exec_server::EnvironmentManager;
use codex_feedback::enqueue_auth_failure_event_tags;
use codex_utils_cli::CliConfigOverrides;
use rmcp::model::ClientNotification;
@@ -71,6 +72,10 @@ pub async fn run_main(
.map_err(|e| {
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
})?;
let feedback_enabled = config.feedback_enabled;
let _auth_failure_reporter_guard = feedback_enabled.then(|| {
codex_core::auth::set_auth_failure_reporter(Arc::new(enqueue_auth_failure_event_tags))
});
let otel = codex_core::otel_init::build_provider(
&config,

View File

@@ -978,6 +978,7 @@ pub(crate) struct App {
/// transcript cells.
pub(crate) backtrack_render_pending: bool,
pub(crate) feedback: codex_feedback::CodexFeedback,
auth_failure_reporter_guard: Option<codex_core::auth::AuthFailureReporterGuard>,
feedback_audience: FeedbackAudience,
remote_app_server_url: Option<String>,
remote_app_server_auth_token: Option<String>,
@@ -1052,6 +1053,25 @@ fn active_turn_missing_steer_error(error: &TypedRequestError) -> bool {
}
impl App {
fn set_config(&mut self, config: Config) {
self.config = config;
self.chat_widget.sync_runtime_config(&self.config);
self.reconcile_auth_failure_reporting();
}
fn reconcile_auth_failure_reporting(&mut self) {
if self.config.feedback_enabled {
if self.auth_failure_reporter_guard.is_none() {
self.auth_failure_reporter_guard =
Some(codex_core::auth::set_auth_failure_reporter(Arc::new(
codex_feedback::enqueue_auth_failure_event_tags,
)));
}
} else {
self.auth_failure_reporter_guard = None;
}
}
pub fn chatwidget_init_for_forked_or_resumed_thread(
&self,
tui: &mut tui::Tui,
@@ -1096,8 +1116,7 @@ impl App {
.rebuild_config_for_cwd(self.chat_widget.config_ref().cwd.to_path_buf())
.await?;
self.apply_runtime_policy_overrides(&mut config);
self.config = config;
self.chat_widget.sync_plugin_mentions_config(&self.config);
self.set_config(config);
Ok(())
}
@@ -1349,7 +1368,7 @@ impl App {
return;
}
self.config = next_config;
self.set_config(next_config);
for (feature, effective_enabled) in feature_updates_to_apply {
self.chat_widget
.set_feature_enabled(feature, effective_enabled);
@@ -3222,7 +3241,7 @@ impl App {
tracing::warn!("failed to unsubscribe tracked thread {thread_id}: {err}");
}
}
self.config = config.clone();
self.set_config(config.clone());
match app_server.start_thread(&config).await {
Ok(started) => {
if let Err(err) = self
@@ -3710,6 +3729,7 @@ impl App {
backtrack: BacktrackState::default(),
backtrack_render_pending: false,
feedback: feedback.clone(),
auth_failure_reporter_guard: None,
feedback_audience,
remote_app_server_url,
remote_app_server_auth_token,
@@ -3727,6 +3747,7 @@ impl App {
pending_primary_events: VecDeque::new(),
pending_app_server_requests: PendingAppServerRequests::default(),
};
app.reconcile_auth_failure_reporting();
if let Some(started) = initial_started_thread {
app.enqueue_primary_thread_session(started.session, started.turns)
.await?;
@@ -4034,7 +4055,7 @@ impl App {
{
Ok(resumed) => {
self.shutdown_current_thread(app_server).await;
self.config = resume_config;
self.set_config(resume_config);
tui.set_notification_method(self.config.tui_notification_method);
self.file_search
.update_search_dir(self.config.cwd.to_path_buf());
@@ -4960,7 +4981,7 @@ impl App {
) {
return Ok(AppRunControl::Continue);
}
self.config = config;
self.set_config(config);
self.runtime_approval_policy_override =
Some(self.config.permissions.approval_policy.value());
self.chat_widget
@@ -4984,7 +5005,7 @@ impl App {
) {
return Ok(AppRunControl::Continue);
}
self.config = config;
self.set_config(config);
if let Err(err) = self.chat_widget.set_sandbox_policy(policy_for_chat) {
tracing::warn!(%err, "failed to set sandbox policy on chat config");
self.chat_widget
@@ -8926,6 +8947,7 @@ guardian_approval = true
backtrack: BacktrackState::default(),
backtrack_render_pending: false,
feedback: codex_feedback::CodexFeedback::new(),
auth_failure_reporter_guard: None,
feedback_audience: FeedbackAudience::External,
remote_app_server_url: None,
remote_app_server_auth_token: None,
@@ -8980,6 +9002,7 @@ guardian_approval = true
backtrack: BacktrackState::default(),
backtrack_render_pending: false,
feedback: codex_feedback::CodexFeedback::new(),
auth_failure_reporter_guard: None,
feedback_audience: FeedbackAudience::External,
remote_app_server_url: None,
remote_app_server_auth_token: None,
@@ -9919,6 +9942,62 @@ guardian_approval = true
Ok(())
}
#[tokio::test]
async fn refresh_in_memory_config_from_disk_reconciles_auth_failure_reporting() -> Result<()> {
let mut app = make_test_app().await;
let codex_home = tempdir()?;
app.config.codex_home = codex_home.path().to_path_buf();
app.config.feedback_enabled = false;
app.reconcile_auth_failure_reporting();
assert!(app.auth_failure_reporter_guard.is_none());
ConfigEditsBuilder::new(&app.config.codex_home)
.with_edits([ConfigEdit::SetPath {
segments: vec!["feedback".to_string(), "enabled".to_string()],
value: true.into(),
}])
.apply()
.await
.expect("persist feedback opt-in");
app.refresh_in_memory_config_from_disk().await?;
assert!(app.config.feedback_enabled);
assert!(app.auth_failure_reporter_guard.is_some());
assert!(app.chat_widget.config_ref().feedback_enabled);
ConfigEditsBuilder::new(&app.config.codex_home)
.with_edits([ConfigEdit::SetPath {
segments: vec!["feedback".to_string(), "enabled".to_string()],
value: false.into(),
}])
.apply()
.await
.expect("persist feedback opt-out");
app.refresh_in_memory_config_from_disk().await?;
assert!(!app.config.feedback_enabled);
assert!(app.auth_failure_reporter_guard.is_none());
assert!(!app.chat_widget.config_ref().feedback_enabled);
Ok(())
}
#[tokio::test]
async fn set_config_syncs_chat_widget_feedback_enabled() -> Result<()> {
let mut app = make_test_app().await;
let mut next_config = app.config.clone();
next_config.feedback_enabled = !app.chat_widget.config_ref().feedback_enabled;
app.set_config(next_config.clone());
assert_eq!(app.config.feedback_enabled, next_config.feedback_enabled);
assert_eq!(
app.chat_widget.config_ref().feedback_enabled,
next_config.feedback_enabled
);
Ok(())
}
#[tokio::test]
async fn rebuild_config_for_resume_or_fallback_uses_current_config_on_same_cwd_error()
-> Result<()> {

View File

@@ -10582,9 +10582,10 @@ impl ChatWidget {
self.bottom_pane.set_plugin_mentions(Some(plugins));
}
pub(crate) fn sync_plugin_mentions_config(&mut self, config: &Config) {
pub(crate) fn sync_runtime_config(&mut self, config: &Config) {
self.config.features = config.features.clone();
self.config.config_layer_stack = config.config_layer_stack.clone();
self.config.feedback_enabled = config.feedback_enabled;
}
pub(crate) fn open_review_popup(&mut self) {

View File

@@ -760,7 +760,6 @@ pub async fn run_main(
cloud_requirements.clone(),
)
.await;
#[allow(clippy::print_stderr)]
match check_execpolicy_for_warnings(&config.config_layer_stack).await {
Ok(None) => {}