Compare commits

...

15 Commits

Author SHA1 Message Date
Cooper Gamble
9f86a0cfa7 [codex-core] guard revoked auth retry after output [ci changed_files] 2026-05-21 23:26:51 +00:00
Cooper Gamble
6112b49ab6 [codex-core] reset websocket after auth recovery [ci changed_files] 2026-05-21 23:26:51 +00:00
Cooper Gamble
2beaf6ca44 [codex-core] retry remote v2 compact auth recovery [ci changed_files] 2026-05-21 23:26:51 +00:00
Cooper Gamble
4eedd05bba [codex-core] close revoked auth review gaps [ci changed_files] 2026-05-21 23:26:51 +00:00
Cooper Gamble
053f6633b1 [codex-core] add revoked auth integration coverage [ci changed_files] 2026-05-21 23:26:51 +00:00
Cooper Gamble
96e9af9096 [codex-login] clear invalidated auth without account id [ci changed_files] 2026-05-21 23:26:50 +00:00
Cooper Gamble
27acff776c [codex-core] guard invalidated token logout [ci changed_files] 2026-05-21 23:26:50 +00:00
Cooper Gamble
8f13e709e6 [codex-core] clear invalidated ChatGPT auth [ci changed_files] 2026-05-21 23:26:50 +00:00
Cooper Gamble
41b65786aa [codex-core] skip refresh for revoked access tokens [ci changed_files] 2026-05-21 23:26:50 +00:00
Cooper Gamble
d6c7bfd44f [codex-cli] gate startup token refresh [ci changed_files] 2026-05-21 23:00:30 +00:00
Cooper Gamble
445c687cdf [codex-cli] harden startup ChatGPT refresh [ci changed_files] 2026-05-21 23:00:30 +00:00
Cooper Gamble
4e0397eea7 [codex-cli] serialize startup ChatGPT token refresh [ci changed_files] 2026-05-21 23:00:30 +00:00
Cooper Gamble
d5eb810671 [codex-cli] bound startup ChatGPT token refresh [ci changed_files] 2026-05-21 23:00:30 +00:00
Cooper Gamble
b901ed1d55 [codex-cli] match web access token refresh window [ci changed_files] 2026-05-21 23:00:30 +00:00
Cooper Gamble
d17c381d3f [codex-cli] eagerly refresh ChatGPT access token on startup [ci changed_files] 2026-05-21 23:00:30 +00:00
18 changed files with 2492 additions and 82 deletions

View File

@@ -446,7 +446,7 @@ async fn external_auth_refreshes_on_unauthorized() -> Result<()> {
responses::ev_completed("resp-turn"),
]);
let unauthorized = ResponseTemplate::new(401).set_body_json(json!({
"error": { "message": "unauthorized" }
"error": { "message": "unauthorized", "type": "token_invalidated" }
}));
let responses_mock = responses::mount_response_sequence(
&mock_server,

View File

@@ -45,6 +45,7 @@ use tokio::time::sleep;
use tokio::time::timeout;
const CLOUD_REQUIREMENTS_TIMEOUT: Duration = Duration::from_secs(15);
const CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_TIMEOUT: Duration = Duration::from_secs(15);
const CLOUD_REQUIREMENTS_MAX_ATTEMPTS: usize = 5;
const CLOUD_REQUIREMENTS_CACHE_FILENAME: &str = "cloud-requirements-cache.json";
const CLOUD_REQUIREMENTS_CACHE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
@@ -728,22 +729,75 @@ pub fn cloud_requirements_loader(
})
}
async fn cloud_requirements_auth_manager_for_storage(
codex_home: &Path,
enable_codex_api_key_env: bool,
credentials_store_mode: AuthCredentialsStoreMode,
chatgpt_base_url: &str,
) -> Arc<AuthManager> {
AuthManager::shared(
codex_home.to_path_buf(),
enable_codex_api_key_env,
credentials_store_mode,
Some(chatgpt_base_url.to_string()),
)
.await
}
pub async fn cloud_requirements_loader_for_storage(
codex_home: PathBuf,
enable_codex_api_key_env: bool,
credentials_store_mode: AuthCredentialsStoreMode,
chatgpt_base_url: String,
) -> CloudRequirementsLoader {
let auth_manager = AuthManager::shared(
codex_home.clone(),
let auth_manager = cloud_requirements_auth_manager_for_storage(
&codex_home,
enable_codex_api_key_env,
credentials_store_mode,
Some(chatgpt_base_url.clone()),
&chatgpt_base_url,
)
.await;
cloud_requirements_loader(auth_manager, chatgpt_base_url, codex_home)
}
pub async fn refresh_managed_chatgpt_token_for_storage_if_near_expiry(
codex_home: PathBuf,
enable_codex_api_key_env: bool,
credentials_store_mode: AuthCredentialsStoreMode,
chatgpt_base_url: String,
) {
let auth_manager = cloud_requirements_auth_manager_for_storage(
&codex_home,
enable_codex_api_key_env,
credentials_store_mode,
&chatgpt_base_url,
)
.await;
let refresh_task = tokio::spawn(async move {
auth_manager
.refresh_managed_chatgpt_token_if_near_expiry()
.await
});
match timeout(CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_TIMEOUT, refresh_task).await {
Ok(Ok(Ok(()))) => {}
Ok(Ok(Err(err))) => {
tracing::warn!(
"failed to proactively refresh ChatGPT access token during CLI startup: {err}"
);
}
Ok(Err(err)) => {
tracing::warn!(
"startup ChatGPT access token refresh task failed before completion: {err}"
);
}
Err(_) => {
tracing::warn!(
"timed out waiting for proactive ChatGPT access token refresh during CLI startup; refresh will continue in the background"
);
}
}
}
fn parse_cloud_requirements(
contents: &str,
requirements_base_dir: &Path,

View File

@@ -150,6 +150,8 @@ const RESPONSES_COMPACT_ENDPOINT: &str = "/responses/compact";
// period between stream events.
const COMPACT_REQUEST_TIMEOUT_IDLE_MULTIPLIER: u32 = 4;
const MEMORIES_SUMMARIZE_ENDPOINT: &str = "/memories/trace_summarize";
const TOKEN_INVALIDATED_ERROR_CODE: &str = "token_invalidated";
const TOKEN_REVOKED_ERROR_CODE: &str = "token_revoked";
#[cfg(test)]
pub(crate) const WEBSOCKET_CONNECT_TIMEOUT: Duration =
Duration::from_millis(DEFAULT_WEBSOCKET_CONNECT_TIMEOUT_MS);
@@ -1281,6 +1283,7 @@ impl ModelClientSession {
stream,
session_telemetry.clone(),
inference_trace_attempt,
/*revoked_auth_recovery*/ None,
);
return Ok(stream);
}
@@ -1464,6 +1467,7 @@ impl ModelClientSession {
stream_result,
session_telemetry.clone(),
inference_trace_attempt,
auth_recovery,
);
self.websocket_session.last_response_rx = Some(last_request_rx);
return Ok(WebsocketStreamOutcome::Stream(stream));
@@ -1524,37 +1528,53 @@ impl ModelClientSession {
}
let disabled_trace = InferenceTraceContext::disabled();
match self
.stream_responses_websocket(
prompt,
model_info,
session_telemetry,
effort,
summary,
service_tier,
turn_metadata_header,
/*warmup*/ true,
current_span_w3c_trace_context(),
&disabled_trace,
)
.await
{
Ok(WebsocketStreamOutcome::Stream(mut stream)) => {
// Wait for the v2 warmup request to complete before sending the first turn request.
while let Some(event) = stream.next().await {
match event {
Ok(ResponseEvent::Completed { .. }) => break,
Err(err) => return Err(err),
_ => {}
let mut retried_after_auth_recovery = false;
loop {
match self
.stream_responses_websocket(
prompt,
model_info,
session_telemetry,
effort,
summary,
service_tier.clone(),
turn_metadata_header,
/*warmup*/ true,
current_span_w3c_trace_context(),
&disabled_trace,
)
.await
{
Ok(WebsocketStreamOutcome::Stream(mut stream)) => {
// Wait for the v2 warmup request to complete before sending the first turn request.
let mut retry_after_auth_recovery = false;
while let Some(event) = stream.next().await {
match event {
Ok(ResponseEvent::Completed { .. }) => return Ok(()),
Err(err)
if !retried_after_auth_recovery
&& is_websocket_auth_recovery_retry(&err) =>
{
retried_after_auth_recovery = true;
self.reset_websocket_session();
retry_after_auth_recovery = true;
break;
}
Err(err) => return Err(err),
_ => {}
}
}
if retry_after_auth_recovery {
continue;
}
return Ok(());
}
Ok(())
Ok(WebsocketStreamOutcome::FallbackToHttp) => {
self.try_switch_fallback_transport(session_telemetry, model_info);
return Ok(());
}
Err(err) => return Err(err),
}
Ok(WebsocketStreamOutcome::FallbackToHttp) => {
self.try_switch_fallback_transport(session_telemetry, model_info);
Ok(())
}
Err(err) => Err(err),
}
}
@@ -1734,11 +1754,22 @@ fn parent_thread_id_header_value(session_source: &SessionSource) -> Option<Strin
const RESPONSE_STREAM_CHANNEL_CAPACITY: usize = 1600;
const STREAM_DROPPED_REASON: &str = "response stream dropped before provider terminal event";
const WEBSOCKET_AUTH_RECOVERY_RETRY_REASON: &str =
"responses websocket auth recovered after unauthorized response";
pub(crate) fn is_websocket_auth_recovery_retry(err: &CodexErr) -> bool {
matches!(
err,
CodexErr::Stream(message, _)
if message == WEBSOCKET_AUTH_RECOVERY_RETRY_REASON
)
}
fn map_response_stream(
api_stream: codex_api::ResponseStream,
session_telemetry: SessionTelemetry,
inference_trace_attempt: InferenceTraceAttempt,
revoked_auth_recovery: Option<UnauthorizedRecovery>,
) -> (ResponseStream, oneshot::Receiver<LastResponse>) {
let codex_api::ResponseStream {
rx_event,
@@ -1753,6 +1784,7 @@ fn map_response_stream(
api_stream,
session_telemetry,
inference_trace_attempt,
revoked_auth_recovery,
)
}
@@ -1761,6 +1793,7 @@ fn map_response_events<S>(
api_stream: S,
session_telemetry: SessionTelemetry,
inference_trace_attempt: InferenceTraceAttempt,
mut revoked_auth_recovery: Option<UnauthorizedRecovery>,
) -> (ResponseStream, oneshot::Receiver<LastResponse>)
where
S: futures::Stream<Item = std::result::Result<ResponseEvent, ApiError>>
@@ -1871,7 +1904,13 @@ where
if let Some(upstream_request_id) = upstream_request_id {
feedback_tags!(last_model_request_id = upstream_request_id);
}
let mapped = map_api_error(err);
let mapped = map_response_stream_error(
err,
&mut revoked_auth_recovery,
&session_telemetry,
items_added.is_empty(),
)
.await;
inference_trace_attempt.record_failed(
&mapped,
upstream_request_id,
@@ -1903,6 +1942,61 @@ where
)
}
async fn map_response_stream_error(
err: ApiError,
revoked_auth_recovery: &mut Option<UnauthorizedRecovery>,
session_telemetry: &SessionTelemetry,
retry_after_auth_recovery_allowed: bool,
) -> CodexErr {
match err {
ApiError::Transport(
transport @ TransportError::Http {
status: StatusCode::UNAUTHORIZED,
..
},
) if stream_error_has_revoked_access_token(&transport) => {
let recovery_transport = clone_http_transport_error(&transport);
match handle_unauthorized(recovery_transport, revoked_auth_recovery, session_telemetry)
.await
{
Ok(_) if retry_after_auth_recovery_allowed => CodexErr::Stream(
WEBSOCKET_AUTH_RECOVERY_RETRY_REASON.to_string(),
/*requested_delay*/ None,
),
Ok(_) => map_api_error(ApiError::Transport(transport)),
Err(err) => err,
}
}
err => map_api_error(err),
}
}
fn stream_error_has_revoked_access_token(transport: &TransportError) -> bool {
let debug = extract_response_debug_context(transport);
access_token_revocation_recovery_reason(
debug.auth_error_code.as_deref(),
debug.auth_error_type.as_deref(),
)
.is_some()
}
fn clone_http_transport_error(transport: &TransportError) -> TransportError {
match transport {
TransportError::Http {
status,
url,
headers,
body,
} => TransportError::Http {
status: *status,
url: url.clone(),
headers: headers.clone(),
body: body.clone(),
},
_ => unreachable!("stream auth recovery only clones HTTP transport errors"),
}
}
/// Handles a 401 response by optionally refreshing ChatGPT tokens once.
///
/// When refresh succeeds, the caller should retry the API call; otherwise
@@ -1979,6 +2073,66 @@ async fn handle_unauthorized(
session_telemetry: &SessionTelemetry,
) -> Result<UnauthorizedRecoveryExecution> {
let debug = extract_response_debug_context(&transport);
let revocation_recovery_reason = access_token_revocation_recovery_reason(
debug.auth_error_code.as_deref(),
debug.auth_error_type.as_deref(),
);
if let Some(recovery_reason) = revocation_recovery_reason
&& let Some(recovery) = auth_recovery.as_mut()
&& recovery.handles_invalidated_access_token_auth()
{
let mode = recovery.mode_name();
let phase = recovery.step_name();
return match recovery.handle_invalidated_access_token_auth().await {
Ok(step_result) => {
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_succeeded",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
Some(recovery_reason),
step_result.auth_state_changed(),
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_succeeded",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Ok(UnauthorizedRecoveryExecution { mode, phase })
}
Err(failed) => {
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_failed_permanent",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
Some(recovery_reason),
/*auth_state_changed*/ None,
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_failed_permanent",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Err(CodexErr::RefreshTokenFailed(failed))
}
};
}
if let Some(recovery) = auth_recovery
&& recovery.has_next()
{
@@ -2063,7 +2217,11 @@ async fn handle_unauthorized(
recovery.step_name(),
Some(recovery.unavailable_reason()),
),
None => ("none", "none", Some("auth_manager_missing")),
None => (
"none",
"none",
revocation_recovery_reason.or(Some("auth_manager_missing")),
),
};
session_telemetry.record_auth_recovery(
mode,
@@ -2089,6 +2247,21 @@ async fn handle_unauthorized(
Err(map_api_error(ApiError::Transport(transport)))
}
fn access_token_revocation_recovery_reason(
auth_error_code: Option<&str>,
auth_error_type: Option<&str>,
) -> Option<&'static str> {
match (auth_error_code, auth_error_type) {
(Some(TOKEN_INVALIDATED_ERROR_CODE), _) | (_, Some(TOKEN_INVALIDATED_ERROR_CODE)) => {
Some("access_token_invalidated")
}
(Some(TOKEN_REVOKED_ERROR_CODE), _) | (_, Some(TOKEN_REVOKED_ERROR_CODE)) => {
Some("access_token_revoked")
}
_ => None,
}
}
fn api_error_http_status(error: &ApiError) -> Option<u16> {
match error {
ApiError::Transport(TransportError::Http { status, .. }) => Some(status.as_u16()),

View File

@@ -2,19 +2,28 @@ use super::AuthRequestTelemetryContext;
use super::ModelClient;
use super::PendingUnauthorizedRetry;
use super::UnauthorizedRecoveryExecution;
use super::WEBSOCKET_AUTH_RECOVERY_RETRY_REASON;
use super::X_CODEX_INSTALLATION_ID_HEADER;
use super::X_CODEX_PARENT_THREAD_ID_HEADER;
use super::X_CODEX_TURN_METADATA_HEADER;
use super::X_CODEX_WINDOW_ID_HEADER;
use super::X_OPENAI_SUBAGENT_HEADER;
use super::handle_unauthorized;
use crate::AttestationContext;
use crate::AttestationProvider;
use crate::GenerateAttestationFuture;
use async_trait::async_trait;
use base64::Engine;
use codex_api::ApiError;
use codex_api::ResponseEvent;
use codex_api::TransportError;
use codex_app_server_protocol::AuthMode;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_login::ExternalAuth;
use codex_login::ExternalAuthRefreshContext;
use codex_login::ExternalAuthRefreshReason;
use codex_login::ExternalAuthTokens;
use codex_model_provider::BearerAuthProvider;
use codex_model_provider_info::CHATGPT_CODEX_BASE_URL;
use codex_model_provider_info::ModelProviderInfo;
@@ -23,6 +32,8 @@ use codex_model_provider_info::create_oss_provider_with_base_url;
use codex_otel::SessionTelemetry;
use codex_protocol::SessionId;
use codex_protocol::ThreadId;
use codex_protocol::error::CodexErr;
use codex_protocol::error::RefreshTokenFailedReason;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelInfo;
@@ -37,10 +48,14 @@ use codex_rollout_trace::RolloutTrace;
use codex_rollout_trace::TraceWriter;
use codex_rollout_trace::replay_bundle;
use futures::StreamExt;
use http::HeaderMap;
use http::HeaderValue;
use http::StatusCode;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
@@ -123,6 +138,86 @@ fn test_session_telemetry() -> SessionTelemetry {
)
}
fn fake_chatgpt_jwt(signature: &str) -> String {
let encode_json = |value: serde_json::Value| {
base64::engine::general_purpose::URL_SAFE_NO_PAD
.encode(serde_json::to_vec(&value).expect("managed auth JWT segment should serialize"))
};
format!(
"{}.{}.{}",
encode_json(json!({"alg": "none", "typ": "JWT"})),
encode_json(json!({
"email": "user@example.com",
"https://api.openai.com/auth": {
"chatgpt_account_id": "account_id",
"chatgpt_user_id": "user-12345",
"user_id": "user-12345"
}
})),
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(signature.as_bytes()),
)
}
fn write_managed_chatgpt_auth(codex_home: &Path, access_token: &str) {
let id_token = fake_chatgpt_jwt("managed-auth");
let auth_json = json!({
"tokens": {
"id_token": id_token,
"access_token": access_token,
"refresh_token": "test-refresh-token",
"account_id": "account_id"
},
"last_refresh": chrono::Utc::now(),
});
std::fs::write(
codex_home.join("auth.json"),
serde_json::to_vec_pretty(&auth_json).expect("managed auth should serialize"),
)
.expect("managed auth should persist");
}
async fn managed_chatgpt_auth_manager(access_token: &str) -> (TempDir, Arc<AuthManager>) {
let codex_home = TempDir::new().expect("managed auth tempdir");
write_managed_chatgpt_auth(codex_home.path(), access_token);
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
codex_login::AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
(codex_home, manager)
}
struct RefreshingExternalChatgptAuth {
refreshed_token: String,
refresh_count: AtomicUsize,
}
#[async_trait]
impl ExternalAuth for RefreshingExternalChatgptAuth {
fn auth_mode(&self) -> AuthMode {
AuthMode::Chatgpt
}
async fn refresh(
&self,
context: ExternalAuthRefreshContext,
) -> std::io::Result<ExternalAuthTokens> {
assert_eq!(
context.reason,
ExternalAuthRefreshReason::Unauthorized,
"websocket revoked-token recovery should use unauthorized external refresh"
);
self.refresh_count.fetch_add(1, Ordering::SeqCst);
Ok(ExternalAuthTokens::chatgpt(
self.refreshed_token.clone(),
"account_id",
Some("pro".to_string()),
))
}
}
#[derive(Default)]
struct TagCollectorVisitor {
tags: BTreeMap<String, String>,
@@ -346,6 +441,7 @@ async fn dropped_response_stream_traces_cancelled_partial_output() -> anyhow::Re
api_stream,
test_session_telemetry(),
attempt,
/*revoked_auth_recovery*/ None,
);
let observed = stream
@@ -395,6 +491,7 @@ async fn response_stream_records_last_model_feedback_ids() {
api_stream,
test_session_telemetry(),
InferenceTraceAttempt::disabled(),
/*revoked_auth_recovery*/ None,
);
while stream.next().await.is_some() {}
@@ -436,6 +533,7 @@ async fn dropped_backpressured_response_stream_traces_cancelled_partial_output()
api_stream,
test_session_telemetry(),
attempt,
/*revoked_auth_recovery*/ None,
);
// Fill the mapper channel with non-terminal events, then yield one output
@@ -581,3 +679,362 @@ async fn non_chatgpt_codex_endpoints_omit_attestation_generation() {
);
assert_eq!(attestation_calls.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn token_invalidated_401_clears_auth_and_requires_relogin() {
let (_codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
let mut recovery = Some(manager.unauthorized_recovery());
let x_error_json = base64::engine::general_purpose::STANDARD
.encode(r#"{"error":{"code":"token_invalidated"}}"#);
let mut headers = HeaderMap::new();
headers.insert(
"x-error-json",
HeaderValue::from_str(&x_error_json).expect("valid x-error-json header"),
);
let err = handle_unauthorized(
TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
headers: Some(headers),
body: Some(r#"{"error":{"message":"revoked"}}"#.to_string()),
},
&mut recovery,
&test_session_telemetry(),
)
.await
.expect_err("revoked access tokens should not enter refresh recovery");
let CodexErr::RefreshTokenFailed(failed) = err else {
panic!("expected invalidated access token to force relogin, got {err:?}");
};
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
assert_eq!(
failed.message,
"Your ChatGPT session is no longer valid. Please sign in again."
);
assert!(manager.auth_cached().is_none());
assert_eq!(
recovery
.as_ref()
.expect("recovery state should remain available")
.step_name(),
"reload"
);
}
#[tokio::test]
async fn token_invalidated_error_type_401_clears_auth_and_requires_relogin() {
let (_codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
let mut recovery = Some(manager.unauthorized_recovery());
let err = handle_unauthorized(
TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
headers: None,
body: Some(r#"{"error":{"type":"token_invalidated"}}"#.to_string()),
},
&mut recovery,
&test_session_telemetry(),
)
.await
.expect_err("invalidated access tokens should not enter refresh recovery");
let CodexErr::RefreshTokenFailed(failed) = err else {
panic!("expected invalidated access token to force relogin, got {err:?}");
};
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
assert_eq!(
failed.message,
"Your ChatGPT session is no longer valid. Please sign in again."
);
assert!(manager.auth_cached().is_none());
assert_eq!(
recovery
.as_ref()
.expect("recovery state should remain available")
.step_name(),
"reload"
);
}
#[tokio::test]
async fn token_revoked_error_code_401_clears_auth_and_requires_relogin() {
let (_codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
let mut recovery = Some(manager.unauthorized_recovery());
let err = handle_unauthorized(
TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
headers: None,
body: Some(r#"{"error":{"code":"token_revoked"}}"#.to_string()),
},
&mut recovery,
&test_session_telemetry(),
)
.await
.expect_err("revoked access tokens should force relogin");
let CodexErr::RefreshTokenFailed(failed) = err else {
panic!("expected revoked access token to force relogin, got {err:?}");
};
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
assert_eq!(
failed.message,
"Your ChatGPT session is no longer valid. Please sign in again."
);
assert!(manager.auth_cached().is_none());
}
#[tokio::test]
async fn token_revoked_error_type_401_clears_auth_and_requires_relogin() {
let (_codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
let mut recovery = Some(manager.unauthorized_recovery());
let err = handle_unauthorized(
TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
headers: None,
body: Some(r#"{"error":{"type":"token_revoked"}}"#.to_string()),
},
&mut recovery,
&test_session_telemetry(),
)
.await
.expect_err("revoked access tokens should force relogin");
let CodexErr::RefreshTokenFailed(failed) = err else {
panic!("expected revoked access token to force relogin, got {err:?}");
};
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
assert_eq!(
failed.message,
"Your ChatGPT session is no longer valid. Please sign in again."
);
assert!(manager.auth_cached().is_none());
}
#[tokio::test]
async fn websocket_stream_token_revoked_401_clears_auth_and_requires_relogin() {
let (_codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
let api_stream = futures::stream::iter([Err(ApiError::Transport(TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: None,
headers: None,
body: Some(r#"{"type":"error","status":401,"error":{"type":"token_revoked"}}"#.to_string()),
}))]);
let (mut stream, _) = super::map_response_events(
/*upstream_request_id*/ None,
api_stream,
test_session_telemetry(),
InferenceTraceAttempt::disabled(),
Some(manager.unauthorized_recovery()),
);
let err = stream
.next()
.await
.expect("revoked websocket stream should emit an error")
.expect_err("revoked websocket stream should require relogin");
let CodexErr::RefreshTokenFailed(failed) = err else {
panic!("expected revoked websocket stream to force relogin, got {err:?}");
};
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
assert_eq!(
failed.message,
"Your ChatGPT session is no longer valid. Please sign in again."
);
assert!(manager.auth_cached().is_none());
}
#[tokio::test]
async fn websocket_stream_token_revoked_401_retries_after_loading_replacement_auth() {
let (codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
write_managed_chatgpt_auth(codex_home.path(), "replacement-access-token");
let api_stream = futures::stream::iter([Err(ApiError::Transport(TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: None,
headers: None,
body: Some(r#"{"type":"error","status":401,"error":{"type":"token_revoked"}}"#.to_string()),
}))]);
let (mut stream, _) = super::map_response_events(
/*upstream_request_id*/ None,
api_stream,
test_session_telemetry(),
InferenceTraceAttempt::disabled(),
Some(manager.unauthorized_recovery()),
);
let err = stream
.next()
.await
.expect("recovered websocket stream should emit a retryable error")
.expect_err("recovered websocket stream should ask the turn loop to retry");
let CodexErr::Stream(message, None) = err else {
panic!(
"expected recovered websocket revocation to surface a retryable stream error, got {err:?}"
);
};
assert_eq!(message, WEBSOCKET_AUTH_RECOVERY_RETRY_REASON);
assert_eq!(
manager
.auth_cached()
.expect("replacement auth should remain cached")
.get_token()
.expect("replacement token should resolve"),
"replacement-access-token"
);
}
#[tokio::test]
async fn websocket_stream_token_revoked_401_after_output_does_not_retry_after_loading_replacement_auth()
{
let (codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
write_managed_chatgpt_auth(codex_home.path(), "replacement-access-token");
let api_stream = futures::stream::iter([
Ok(ResponseEvent::OutputItemDone(output_message(
"msg-1",
"partial answer",
))),
Err(ApiError::Transport(TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: None,
headers: None,
body: Some(
r#"{"type":"error","status":401,"error":{"type":"token_revoked"}}"#.to_string(),
),
})),
]);
let (mut stream, _) = super::map_response_events(
/*upstream_request_id*/ None,
api_stream,
test_session_telemetry(),
InferenceTraceAttempt::disabled(),
Some(manager.unauthorized_recovery()),
);
assert!(matches!(
stream
.next()
.await
.expect("partial output should be forwarded")
.expect("partial output should stay successful"),
ResponseEvent::OutputItemDone(_)
));
let err = stream
.next()
.await
.expect("revoked websocket stream should emit an error")
.expect_err("revoked websocket stream should stop after output escapes");
assert!(
!super::is_websocket_auth_recovery_retry(&err),
"streams with recorded output must not transparently retry after auth recovery"
);
assert_eq!(
manager
.auth_cached()
.expect("replacement auth should remain cached")
.get_token()
.expect("replacement token should resolve"),
"replacement-access-token"
);
}
#[tokio::test]
async fn websocket_stream_token_revoked_401_refreshes_external_auth_before_retry() {
let codex_home = TempDir::new().expect("external auth tempdir");
let initial_access_token = fake_chatgpt_jwt("external-initial");
let refreshed_access_token = fake_chatgpt_jwt("external-refreshed");
codex_login::auth::login_with_chatgpt_auth_tokens(
codex_home.path(),
&initial_access_token,
"account_id",
Some("pro"),
)
.expect("external ChatGPT auth should seed");
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
codex_login::AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
let external_auth = Arc::new(RefreshingExternalChatgptAuth {
refreshed_token: refreshed_access_token.clone(),
refresh_count: AtomicUsize::new(0),
});
manager.set_external_auth(external_auth.clone());
let api_stream = futures::stream::iter([Err(ApiError::Transport(TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: None,
headers: None,
body: Some(r#"{"type":"error","status":401,"error":{"type":"token_revoked"}}"#.to_string()),
}))]);
let (mut stream, _) = super::map_response_events(
/*upstream_request_id*/ None,
api_stream,
test_session_telemetry(),
InferenceTraceAttempt::disabled(),
Some(manager.unauthorized_recovery()),
);
let err = stream
.next()
.await
.expect("refreshed websocket stream should emit a retryable error")
.expect_err("refreshed websocket stream should ask the turn loop to retry");
let CodexErr::Stream(message, None) = err else {
panic!(
"expected external websocket revocation to surface a retryable stream error, got {err:?}"
);
};
assert_eq!(message, WEBSOCKET_AUTH_RECOVERY_RETRY_REASON);
assert_eq!(external_auth.refresh_count.load(Ordering::SeqCst), 1);
assert_eq!(
manager
.auth_cached()
.expect("refreshed external auth should remain cached")
.get_token()
.expect("refreshed token should resolve"),
refreshed_access_token
);
}
#[tokio::test]
async fn token_invalidated_401_retries_when_persisted_auth_changed() {
let (codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
write_managed_chatgpt_auth(codex_home.path(), "replacement-access-token");
let mut recovery = Some(manager.unauthorized_recovery());
let retry = handle_unauthorized(
TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
headers: None,
body: Some(r#"{"error":{"type":"token_invalidated"}}"#.to_string()),
},
&mut recovery,
&test_session_telemetry(),
)
.await
.expect("new persisted auth should retry instead of logging out");
assert_eq!(retry.mode, "managed");
assert_eq!(retry.phase, "reload");
assert_eq!(
manager
.auth_cached()
.expect("replacement auth should remain cached")
.get_token()
.expect("replacement token should resolve"),
"replacement-access-token"
);
}

View File

@@ -186,6 +186,7 @@ async fn run_compact_task_inner_impl(
let max_retries = turn_context.provider.info().stream_max_retries();
let mut retries = 0;
let mut websocket_auth_recovery_retries = 0;
let mut client_session = sess.services.model_client.new_session();
// Reuse one client session so turn-scoped state (sticky routing, websocket incremental
// request tracking)
@@ -235,6 +236,14 @@ async fn run_compact_task_inner_impl(
sess.send_event(&turn_context, event).await;
return Err(e);
}
Err(e)
if crate::client::is_websocket_auth_recovery_retry(&e)
&& websocket_auth_recovery_retries == 0 =>
{
websocket_auth_recovery_retries += 1;
client_session.reset_websocket_session();
continue;
}
Err(e) => {
if retries < max_retries {
retries += 1;

View File

@@ -277,31 +277,44 @@ async fn run_remote_compaction_request_v2(
prompt: &Prompt,
turn_metadata_header: Option<&str>,
) -> CodexResult<(ResponseItem, String)> {
let stream = client_session
.stream(
prompt,
&turn_context.model_info,
&turn_context.session_telemetry,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
turn_context.config.service_tier.clone(),
turn_metadata_header,
&InferenceTraceContext::disabled(),
)
.or_else(|err| async {
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
let compact_request_log_data =
build_compact_request_log_data(&prompt.input, &prompt.base_instructions.text);
log_remote_compact_failure(
turn_context,
&compact_request_log_data,
total_usage_breakdown,
&err,
);
let mut websocket_auth_recovery_retries = 0;
loop {
let stream = client_session
.stream(
prompt,
&turn_context.model_info,
&turn_context.session_telemetry,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
turn_context.config.service_tier.clone(),
turn_metadata_header,
&InferenceTraceContext::disabled(),
)
.or_else(|err| async {
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
let compact_request_log_data =
build_compact_request_log_data(&prompt.input, &prompt.base_instructions.text);
log_remote_compact_failure(
turn_context,
&compact_request_log_data,
total_usage_breakdown,
&err,
);
Err(err)
})
.await?;
match collect_compaction_output(stream).await {
Err(err)
})
.await?;
collect_compaction_output(stream).await
if crate::client::is_websocket_auth_recovery_retry(&err)
&& websocket_auth_recovery_retries == 0 =>
{
websocket_auth_recovery_retries += 1;
client_session.reset_websocket_session();
}
result => return result,
}
}
}
async fn collect_compaction_output(

View File

@@ -943,6 +943,7 @@ async fn run_sampling_request(
)
.await;
let mut retries = 0;
let mut websocket_auth_recovery_retries = 0;
let mut initial_input = Some(input);
loop {
let prompt_input = if let Some(input) = initial_input.take() {
@@ -992,6 +993,16 @@ async fn run_sampling_request(
return Err(err);
}
// Auth recovery already loaded replacement credentials. Give that one immediate retry
// without spending the transport reconnect budget.
if crate::client::is_websocket_auth_recovery_retry(&err)
&& websocket_auth_recovery_retries == 0
{
websocket_auth_recovery_retries += 1;
client_session.reset_websocket_session();
continue;
}
// Use the configured provider-specific stream retry budget.
let max_retries = turn_context.provider.info().stream_max_retries();
if retries >= max_retries

View File

@@ -339,7 +339,7 @@ impl ThreadManager {
state_db: Option<StateDbHandle>,
) -> Self {
set_thread_manager_test_mode_for_tests(/*enabled*/ true);
let auth_manager = AuthManager::from_auth_for_testing(auth);
let auth_manager = AuthManager::from_auth_for_testing_with_home(auth, codex_home.clone());
let installation_id = uuid::Uuid::new_v4().to_string();
let skills_codex_home = match AbsolutePathBuf::from_absolute_path_checked(&codex_home) {
Ok(codex_home) => codex_home,

View File

@@ -1362,6 +1362,11 @@ pub async fn start_websocket_server_with_headers(
if close_after_requests {
let _ = ws_stream.close(None).await;
} else if !connections.lock().unwrap().is_empty() {
tokio::select! {
_ = &mut shutdown_rx => return,
_ = ws_stream.next() => {}
}
} else {
let _ = shutdown_rx.await;
return;

View File

@@ -467,7 +467,10 @@ impl TestCodexBuilder {
let installation_id = resolve_installation_id(&config.codex_home).await?;
let thread_manager = ThreadManager::new(
&config,
codex_core::test_support::auth_manager_from_auth(auth.clone()),
codex_core::test_support::auth_manager_from_auth_with_home(
auth.clone(),
config.codex_home.to_path_buf(),
),
SessionSource::Exec,
Arc::clone(&environment_manager),
empty_extension_registry(),

View File

@@ -1066,6 +1066,184 @@ async fn chatgpt_auth_sends_correct_request() {
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn revoked_chatgpt_auth_user_turn_clears_auth_and_requests_relogin() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/codex/responses"))
.respond_with(ResponseTemplate::new(401).set_body_json(json!({
"error": {
"code": "token_revoked",
"message": "revoked",
}
})))
.expect(1)
.mount(&server)
.await;
let codex_home = Arc::new(TempDir::new()?);
let _jwt = write_auth_json(
codex_home.as_ref(),
/*openai_api_key*/ None,
"pro",
"revoked-access-token",
Some("account_id"),
);
let auth = CodexAuth::from_auth_storage(
codex_home.path(),
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await?
.expect("managed ChatGPT auth should load");
let mut model_provider = built_in_model_providers(/*openai_base_url*/ None)["openai"].clone();
model_provider.base_url = Some(format!("{}/api/codex", server.uri()));
model_provider.supports_websockets = false;
let mut builder = test_codex()
.with_home(codex_home.clone())
.with_auth(auth)
.with_config(move |config| {
config.model_provider = model_provider;
});
let test = builder.build(&server).await?;
let codex = test.codex.clone();
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: Default::default(),
})
.await?;
let error_event = wait_for_event(&codex, |ev| matches!(ev, EventMsg::Error(_))).await;
assert!(
matches!(
error_event,
EventMsg::Error(ref err)
if err.message.contains(
"Your ChatGPT session is no longer valid. Please sign in again."
)
),
"expected invalidated-session relogin error; got {error_event:?}"
);
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert!(
!test.codex_home_path().join("auth.json").exists(),
"revoked managed ChatGPT auth should be removed"
);
let response_attempts = server
.received_requests()
.await
.expect("mock server should capture requests")
.into_iter()
.filter(|request| request.url.path() == "/api/codex/responses")
.count();
assert_eq!(
response_attempts, 1,
"revoked managed auth should fail without retrying /responses"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn revoked_chatgpt_auth_user_turn_retries_reloaded_auth() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/codex/responses"))
.and(header("authorization", "Bearer revoked-access-token"))
.respond_with(ResponseTemplate::new(401).set_body_json(json!({
"error": {
"code": "token_revoked",
"message": "revoked",
}
})))
.expect(1)
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/api/codex/responses"))
.and(header("authorization", "Bearer replacement-access-token"))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(
sse(vec![ev_response_created("resp1"), ev_completed("resp1")]),
"text/event-stream",
),
)
.expect(1)
.mount(&server)
.await;
let codex_home = Arc::new(TempDir::new()?);
let _jwt = write_auth_json(
codex_home.as_ref(),
/*openai_api_key*/ None,
"pro",
"revoked-access-token",
Some("account_id"),
);
let auth = CodexAuth::from_auth_storage(
codex_home.path(),
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await?
.expect("managed ChatGPT auth should load");
let mut model_provider = built_in_model_providers(/*openai_base_url*/ None)["openai"].clone();
model_provider.base_url = Some(format!("{}/api/codex", server.uri()));
model_provider.supports_websockets = false;
let mut builder = test_codex()
.with_home(codex_home.clone())
.with_auth(auth)
.with_config(move |config| {
config.model_provider = model_provider;
});
let test = builder.build(&server).await?;
write_auth_json(
codex_home.as_ref(),
/*openai_api_key*/ None,
"pro",
"replacement-access-token",
Some("account_id"),
);
test.submit_turn("hello")
.await
.expect("reloaded managed auth should retry the HTTP turn");
let persisted_auth = CodexAuth::from_auth_storage(
codex_home.path(),
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await?
.expect("replacement managed auth should persist");
assert_eq!(
persisted_auth
.get_token()
.expect("replacement token should resolve"),
"replacement-access-token"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
skip_if_no_network!();

View File

@@ -1,6 +1,7 @@
#![allow(clippy::expect_used, clippy::unwrap_used)]
use codex_api::WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY;
use codex_api::WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY;
use codex_config::types::AuthCredentialsStoreMode;
use codex_core::ModelClient;
use codex_core::ModelClientSession;
use codex_core::Prompt;
@@ -66,6 +67,42 @@ const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
const X_CODEX_WS_STREAM_REQUEST_START_MS_CLIENT_METADATA_KEY: &str =
"x-codex-ws-stream-request-start-ms";
fn write_managed_chatgpt_auth(codex_home: &TempDir, access_token: &str) {
use base64::Engine as _;
let encode_json = |value: serde_json::Value| {
base64::engine::general_purpose::URL_SAFE_NO_PAD
.encode(serde_json::to_vec(&value).expect("managed auth JWT segment should serialize"))
};
let id_token = format!(
"{}.{}.{}",
encode_json(json!({"alg": "none", "typ": "JWT"})),
encode_json(json!({
"email": "user@example.com",
"https://api.openai.com/auth": {
"chatgpt_account_id": "account_id",
"chatgpt_user_id": "user-12345",
"user_id": "user-12345"
}
})),
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"sig"),
);
let auth_json = json!({
"tokens": {
"id_token": id_token,
"access_token": access_token,
"refresh_token": "test-refresh-token",
"account_id": "account_id"
},
"last_refresh": chrono::Utc::now(),
});
std::fs::write(
codex_home.path().join("auth.json"),
serde_json::to_vec_pretty(&auth_json).expect("managed auth should serialize"),
)
.expect("managed auth should persist");
}
fn assert_request_trace_matches(body: &serde_json::Value, expected_trace: &W3cTraceContext) {
let client_metadata = body["client_metadata"]
.as_object()
@@ -1429,6 +1466,384 @@ async fn responses_websocket_invalid_request_error_with_status_is_forwarded() {
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_revoked_managed_auth_clears_auth_and_requests_relogin() {
skip_if_no_network!();
let revoked_token_error = json!({
"type": "error",
"status": 401,
"error": {
"type": "token_revoked",
"message": "revoked"
}
});
let server = start_websocket_server(vec![vec![
vec![
ev_response_created("resp-prewarm"),
ev_completed("resp-prewarm"),
],
vec![revoked_token_error],
]])
.await;
let codex_home = Arc::new(TempDir::new().expect("managed auth tempdir"));
write_managed_chatgpt_auth(codex_home.as_ref(), "revoked-access-token");
let auth = CodexAuth::from_auth_storage(
codex_home.path(),
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await
.expect("managed ChatGPT auth should load")
.expect("managed ChatGPT auth should exist");
let mut builder = test_codex()
.with_home(codex_home.clone())
.with_auth(auth)
.with_config(|config| {
config.model_provider.request_max_retries = Some(0);
config.model_provider.stream_max_retries = Some(0);
});
let test = builder
.build_with_websocket_server(&server)
.await
.expect("build websocket codex");
let submission_id = test
.codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
thread_settings: Default::default(),
})
.await
.expect("submission should emit relogin events");
let error_event = wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::Error(_))).await;
let EventMsg::Error(error_event) = error_event else {
unreachable!();
};
assert!(
error_event
.message
.contains("Your ChatGPT session is no longer valid. Please sign in again."),
"unexpected error message for submission {submission_id}: {}",
error_event.message
);
wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;
assert!(
!test.codex_home_path().join("auth.json").exists(),
"revoked managed ChatGPT auth should be removed"
);
assert_eq!(server.single_connection().len(), 2);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_revoked_managed_auth_retries_reloaded_auth() {
skip_if_no_network!();
let revoked_token_error = json!({
"type": "error",
"status": 401,
"error": {
"type": "token_revoked",
"message": "revoked"
}
});
let server = start_websocket_server_with_headers(vec![
WebSocketConnectionConfig {
requests: vec![
vec![
ev_response_created("resp-prewarm"),
ev_completed("resp-prewarm"),
],
vec![revoked_token_error],
vec![
ev_response_created("resp-stale-reuse"),
ev_completed("resp-stale-reuse"),
],
],
response_headers: Vec::new(),
accept_delay: None,
close_after_requests: false,
},
WebSocketConnectionConfig {
requests: vec![vec![
ev_response_created("resp-retry"),
ev_completed("resp-retry"),
]],
response_headers: Vec::new(),
accept_delay: None,
close_after_requests: true,
},
])
.await;
let codex_home = Arc::new(TempDir::new().expect("managed auth tempdir"));
write_managed_chatgpt_auth(codex_home.as_ref(), "revoked-access-token");
let auth = CodexAuth::from_auth_storage(
codex_home.path(),
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await
.expect("managed ChatGPT auth should load")
.expect("managed ChatGPT auth should exist");
let mut builder = test_codex()
.with_home(codex_home.clone())
.with_auth(auth)
.with_config(|config| {
config.model_provider.request_max_retries = Some(0);
config.model_provider.stream_max_retries = Some(0);
});
let test = builder
.build_with_websocket_server(&server)
.await
.expect("build websocket codex");
write_managed_chatgpt_auth(codex_home.as_ref(), "replacement-access-token");
test.submit_turn("hello")
.await
.expect("reloaded managed auth should retry the websocket turn");
let persisted_auth = CodexAuth::from_auth_storage(
codex_home.path(),
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await
.expect("replacement managed auth should load")
.expect("replacement managed auth should persist");
assert_eq!(
persisted_auth
.get_token()
.expect("replacement token should resolve"),
"replacement-access-token"
);
let connections = server.connections();
assert_eq!(connections.len(), 2);
assert_eq!(connections[0].len(), 2);
assert_eq!(connections[1].len(), 1);
let handshakes = server.handshakes();
assert_eq!(handshakes.len(), 2);
assert_eq!(
handshakes[0].header("authorization").as_deref(),
Some("Bearer revoked-access-token")
);
assert_eq!(
handshakes[1].header("authorization").as_deref(),
Some("Bearer replacement-access-token")
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_revoked_managed_auth_prewarm_retries_reloaded_auth() {
skip_if_no_network!();
let revoked_token_error = json!({
"type": "error",
"status": 401,
"error": {
"type": "token_revoked",
"message": "revoked"
}
});
let server = start_websocket_server(vec![
vec![vec![revoked_token_error]],
vec![
vec![
ev_response_created("warm-retry"),
ev_completed("warm-retry"),
],
vec![ev_response_created("resp-1"), ev_completed("resp-1")],
],
])
.await;
let codex_home = Arc::new(TempDir::new().expect("managed auth tempdir"));
write_managed_chatgpt_auth(codex_home.as_ref(), "revoked-access-token");
let auth = CodexAuth::from_auth_storage(
codex_home.path(),
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await
.expect("managed ChatGPT auth should load")
.expect("managed ChatGPT auth should exist");
write_managed_chatgpt_auth(codex_home.as_ref(), "replacement-access-token");
let mut builder = test_codex()
.with_home(codex_home.clone())
.with_auth(auth)
.with_config(|config| {
config.model_provider.request_max_retries = Some(0);
config.model_provider.stream_max_retries = Some(0);
});
let test = builder
.build_with_websocket_server(&server)
.await
.expect("build websocket codex");
test.submit_turn("hello")
.await
.expect("reloaded managed auth should retry startup websocket prewarm");
let persisted_auth = CodexAuth::from_auth_storage(
codex_home.path(),
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await
.expect("replacement managed auth should load")
.expect("replacement managed auth should persist");
assert_eq!(
persisted_auth
.get_token()
.expect("replacement token should resolve"),
"replacement-access-token"
);
let connections = server.connections();
assert_eq!(connections.len(), 2);
assert_eq!(connections[0].len(), 1);
assert_eq!(connections[1].len(), 2);
let handshakes = server.handshakes();
assert_eq!(handshakes.len(), 2);
assert_eq!(
handshakes[0].header("authorization").as_deref(),
Some("Bearer revoked-access-token")
);
assert_eq!(
handshakes[1].header("authorization").as_deref(),
Some("Bearer replacement-access-token")
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_revoked_managed_auth_compact_retries_reloaded_auth() {
skip_if_no_network!();
let revoked_token_error = json!({
"type": "error",
"status": 401,
"error": {
"type": "token_revoked",
"message": "revoked"
}
});
let server = start_websocket_server(vec![
vec![
vec![
ev_response_created("resp-prewarm"),
ev_completed("resp-prewarm"),
],
vec![ev_response_created("resp-1"), ev_completed("resp-1")],
vec![revoked_token_error],
],
vec![vec![
ev_response_created("compact-retry"),
ev_assistant_message("msg-compact", "COMPACT_SUMMARY"),
ev_completed("compact-retry"),
]],
])
.await;
let codex_home = Arc::new(TempDir::new().expect("managed auth tempdir"));
write_managed_chatgpt_auth(codex_home.as_ref(), "revoked-access-token");
let auth = CodexAuth::from_auth_storage(
codex_home.path(),
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await
.expect("managed ChatGPT auth should load")
.expect("managed ChatGPT auth should exist");
let mut builder = test_codex()
.with_home(codex_home.clone())
.with_auth(auth)
.with_config(|config| {
config
.features
.enable(Feature::RemoteCompactionV2)
.expect("test config should allow feature update");
config.model_provider.request_max_retries = Some(0);
config.model_provider.stream_max_retries = Some(0);
});
let test = builder
.build_with_websocket_server(&server)
.await
.expect("build websocket codex");
test.submit_turn("hello")
.await
.expect("initial websocket turn should complete");
write_managed_chatgpt_auth(codex_home.as_ref(), "replacement-access-token");
test.codex
.submit(Op::Compact)
.await
.expect("compact submission should succeed");
wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;
assert!(
server
.wait_for_handshakes(/*expected*/ 2, Duration::from_secs(10))
.await,
"compact auth recovery should reconnect with replacement auth"
);
let persisted_auth = CodexAuth::from_auth_storage(
codex_home.path(),
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await
.expect("replacement managed auth should load")
.expect("replacement managed auth should persist");
assert_eq!(
persisted_auth
.get_token()
.expect("replacement token should resolve"),
"replacement-access-token"
);
let connections = server.connections();
assert_eq!(connections.len(), 2);
assert_eq!(connections[0].len(), 3);
assert_eq!(connections[1].len(), 1);
let handshakes = server.handshakes();
assert_eq!(handshakes.len(), 2);
assert_eq!(
handshakes[0].header("authorization").as_deref(),
Some("Bearer revoked-access-token")
);
assert_eq!(
handshakes[1].header("authorization").as_deref(),
Some("Bearer replacement-access-token")
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_connection_limit_error_reconnects_and_completes() {
skip_if_no_network!();

View File

@@ -52,6 +52,7 @@ use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_arg0::Arg0DispatchPaths;
use codex_cloud_requirements::cloud_requirements_loader_for_storage;
use codex_cloud_requirements::refresh_managed_chatgpt_token_for_storage_if_near_expiry;
use codex_config::ConfigLoadError;
use codex_config::ConfigLoadOptions;
use codex_config::LoaderOverrides;
@@ -156,7 +157,6 @@ use crate::event_processor::EventProcessor;
const DEFAULT_ANALYTICS_ENABLED: bool = true;
const EXEC_DEFAULT_LOG_FILTER: &str = "error,opentelemetry_sdk=off,opentelemetry_otlp=off";
enum InitialOperation {
UserTurn {
items: Vec<UserInput>,
@@ -470,6 +470,14 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
std::process::exit(1);
}
refresh_managed_chatgpt_token_for_storage_if_near_expiry(
config.codex_home.to_path_buf(),
/*enable_codex_api_key_env*/ true,
config.cli_auth_credentials_store_mode,
config.chatgpt_base_url.clone(),
)
.await;
let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
codex_core::otel_init::build_provider(
&config,

View File

@@ -308,6 +308,7 @@ async fn unauthorized_recovery_reports_mode_and_step_names() {
let managed = UnauthorizedRecovery {
manager: Arc::clone(&manager),
step: UnauthorizedRecoveryStep::Reload,
expected_auth: None,
expected_account_id: None,
mode: UnauthorizedRecoveryMode::Managed,
};
@@ -317,6 +318,7 @@ async fn unauthorized_recovery_reports_mode_and_step_names() {
let external = UnauthorizedRecovery {
manager,
step: UnauthorizedRecoveryStep::ExternalRefresh,
expected_auth: None,
expected_account_id: None,
mode: UnauthorizedRecoveryMode::External,
};
@@ -324,6 +326,502 @@ async fn unauthorized_recovery_reports_mode_and_step_names() {
assert_eq!(external.step_name(), "external_refresh");
}
#[tokio::test]
#[serial(codex_auth_env)]
async fn invalidated_access_token_logout_clears_cached_auth() {
let codex_home = tempdir().unwrap();
let _access_token_guard = remove_access_token_env_var();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: Some("pro".to_string()),
chatgpt_account_id: Some("org_mine".to_string()),
},
codex_home.path(),
)
.expect("failed to write auth file");
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
let mut recovery = manager.unauthorized_recovery();
assert!(recovery.handles_invalidated_access_token_auth());
let failed = recovery
.handle_invalidated_access_token_auth()
.await
.expect_err("unchanged revoked auth should force login");
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
assert_eq!(
failed.message,
"Your ChatGPT session is no longer valid. Please sign in again."
);
assert!(manager.auth_cached().is_none());
assert!(!codex_home.path().join("auth.json").exists());
}
#[tokio::test]
#[serial(codex_auth_env)]
async fn invalidated_access_token_logout_clears_cached_auth_without_account_id() {
let codex_home = tempdir().unwrap();
let _access_token_guard = remove_access_token_env_var();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: Some("pro".to_string()),
chatgpt_account_id: None,
},
codex_home.path(),
)
.expect("failed to write auth file");
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
let mut recovery = manager.unauthorized_recovery();
let failed = recovery
.handle_invalidated_access_token_auth()
.await
.expect_err("unchanged revoked auth without an account id should force login");
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
assert_eq!(
failed.message,
"Your ChatGPT session is no longer valid. Please sign in again."
);
assert!(manager.auth_cached().is_none());
assert!(!codex_home.path().join("auth.json").exists());
}
#[tokio::test]
#[serial(codex_auth_env)]
async fn invalidated_access_token_clears_cached_auth_when_persisted_auth_was_removed() {
let codex_home = tempdir().unwrap();
let _access_token_guard = remove_access_token_env_var();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: Some("pro".to_string()),
chatgpt_account_id: Some("org_mine".to_string()),
},
codex_home.path(),
)
.expect("failed to write auth file");
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
let mut recovery = manager.unauthorized_recovery();
std::fs::remove_file(codex_home.path().join("auth.json"))
.expect("auth file should be removable");
let failed = recovery
.handle_invalidated_access_token_auth()
.await
.expect_err("removed persisted auth should force login");
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
assert_eq!(
failed.message,
"Your ChatGPT session is no longer valid. Please sign in again."
);
assert!(manager.auth_cached().is_none());
}
#[tokio::test]
#[serial(codex_auth_env)]
async fn invalidated_access_token_preserves_cached_auth_when_storage_inspection_fails() {
let codex_home = tempdir().unwrap();
let _access_token_guard = remove_access_token_env_var();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: Some("pro".to_string()),
chatgpt_account_id: Some("org_mine".to_string()),
},
codex_home.path(),
)
.expect("failed to write auth file");
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
let mut recovery = manager.unauthorized_recovery();
std::fs::write(codex_home.path().join("auth.json"), "{not-json")
.expect("auth file should be corruptible");
let failed = recovery
.handle_invalidated_access_token_auth()
.await
.expect_err("storage inspection failures should stop invalidation cleanup");
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
assert!(failed.message.starts_with(
"Your ChatGPT session is no longer valid. Please sign in again. Codex could not inspect saved auth:"
));
assert!(manager.auth_cached().is_some());
assert!(codex_home.path().join("auth.json").exists());
}
#[tokio::test]
#[serial(codex_auth_env)]
async fn invalidated_access_token_preserves_reloaded_auth() {
let codex_home = tempdir().unwrap();
let _access_token_guard = remove_access_token_env_var();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: Some("pro".to_string()),
chatgpt_account_id: Some("org_mine".to_string()),
},
codex_home.path(),
)
.expect("failed to write auth file");
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
let mut recovery = manager.unauthorized_recovery();
let mut reauthenticated = load_auth_dot_json(codex_home.path(), AuthCredentialsStoreMode::File)
.expect("auth should load")
.expect("auth should exist");
reauthenticated
.tokens
.as_mut()
.expect("tokens should exist")
.access_token = "replacement-access-token".to_string();
save_auth(
codex_home.path(),
&reauthenticated,
AuthCredentialsStoreMode::File,
)
.expect("replacement auth should persist");
let step_result = recovery
.handle_invalidated_access_token_auth()
.await
.expect("new persisted auth should be retried");
assert_eq!(step_result.auth_state_changed(), Some(true));
assert_eq!(
manager
.auth_cached()
.expect("replacement auth should remain cached")
.get_token()
.expect("replacement token should resolve"),
"replacement-access-token"
);
assert!(codex_home.path().join("auth.json").exists());
}
#[tokio::test]
#[serial(codex_auth_env)]
async fn invalidated_access_token_logs_out_reloaded_auth_if_retry_is_also_revoked() {
let codex_home = tempdir().unwrap();
let _access_token_guard = remove_access_token_env_var();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: Some("pro".to_string()),
chatgpt_account_id: Some("org_mine".to_string()),
},
codex_home.path(),
)
.expect("failed to write auth file");
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
let mut recovery = manager.unauthorized_recovery();
let mut reauthenticated = load_auth_dot_json(codex_home.path(), AuthCredentialsStoreMode::File)
.expect("auth should load")
.expect("auth should exist");
reauthenticated
.tokens
.as_mut()
.expect("tokens should exist")
.access_token = "replacement-access-token".to_string();
save_auth(
codex_home.path(),
&reauthenticated,
AuthCredentialsStoreMode::File,
)
.expect("replacement auth should persist");
recovery
.handle_invalidated_access_token_auth()
.await
.expect("first revoked token should retry the replacement auth");
let failed = recovery
.handle_invalidated_access_token_auth()
.await
.expect_err("repeated revocation of the replacement auth should force login");
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
assert_eq!(
failed.message,
"Your ChatGPT session is no longer valid. Please sign in again."
);
assert!(manager.auth_cached().is_none());
assert!(!codex_home.path().join("auth.json").exists());
}
#[tokio::test]
#[serial(codex_auth_env)]
async fn invalidated_access_token_logs_out_auth_reloaded_by_normal_recovery() {
let codex_home = tempdir().unwrap();
let _access_token_guard = remove_access_token_env_var();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: Some("pro".to_string()),
chatgpt_account_id: Some("org_mine".to_string()),
},
codex_home.path(),
)
.expect("failed to write auth file");
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
let mut recovery = manager.unauthorized_recovery();
let mut reauthenticated = load_auth_dot_json(codex_home.path(), AuthCredentialsStoreMode::File)
.expect("auth should load")
.expect("auth should exist");
reauthenticated
.tokens
.as_mut()
.expect("tokens should exist")
.access_token = "replacement-access-token".to_string();
save_auth(
codex_home.path(),
&reauthenticated,
AuthCredentialsStoreMode::File,
)
.expect("replacement auth should persist");
let reloaded = recovery
.next()
.await
.expect("normal unauthorized recovery should reload replacement auth");
assert_eq!(reloaded.auth_state_changed(), Some(true));
let failed = recovery
.handle_invalidated_access_token_auth()
.await
.expect_err("revoking the just-reloaded auth should force login");
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
assert_eq!(
failed.message,
"Your ChatGPT session is no longer valid. Please sign in again."
);
assert!(manager.auth_cached().is_none());
assert!(!codex_home.path().join("auth.json").exists());
}
#[tokio::test]
#[serial(codex_auth_env)]
async fn invalidated_access_token_preserves_auth_refreshed_after_request_started() {
let codex_home = tempdir().unwrap();
let _access_token_guard = remove_access_token_env_var();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: Some("pro".to_string()),
chatgpt_account_id: Some("org_mine".to_string()),
},
codex_home.path(),
)
.expect("failed to write auth file");
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
let mut recovery = manager.unauthorized_recovery();
let mut refreshed_auth = load_auth_dot_json(codex_home.path(), AuthCredentialsStoreMode::File)
.expect("auth should load")
.expect("auth should exist");
refreshed_auth
.tokens
.as_mut()
.expect("tokens should exist")
.access_token = "replacement-access-token".to_string();
save_auth(
codex_home.path(),
&refreshed_auth,
AuthCredentialsStoreMode::File,
)
.expect("replacement auth should persist");
manager.reload().await;
assert_eq!(
manager
.auth_cached()
.expect("replacement auth should load into cache")
.get_token()
.expect("replacement token should resolve"),
"replacement-access-token"
);
let step_result = recovery
.handle_invalidated_access_token_auth()
.await
.expect("auth refreshed after request start should be retried");
assert_eq!(step_result.auth_state_changed(), Some(true));
assert_eq!(
manager
.auth_cached()
.expect("replacement auth should remain cached")
.get_token()
.expect("replacement token should resolve"),
"replacement-access-token"
);
assert!(codex_home.path().join("auth.json").exists());
}
#[tokio::test]
#[serial(codex_auth_env)]
async fn invalidated_access_token_preserves_reloaded_auth_from_a_different_account() {
let codex_home = tempdir().unwrap();
let _access_token_guard = remove_access_token_env_var();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: Some("pro".to_string()),
chatgpt_account_id: Some("org_mine".to_string()),
},
codex_home.path(),
)
.expect("failed to write auth file");
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
let mut recovery = manager.unauthorized_recovery();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: Some("pro".to_string()),
chatgpt_account_id: Some("org_elsewhere".to_string()),
},
codex_home.path(),
)
.expect("replacement auth should persist");
let step_result = recovery
.handle_invalidated_access_token_auth()
.await
.expect("new persisted auth should be retried across account changes");
assert_eq!(step_result.auth_state_changed(), Some(true));
assert_eq!(
manager
.auth_cached()
.expect("replacement auth should remain cached")
.get_account_id()
.as_deref(),
Some("org_elsewhere")
);
let follow_up_step = recovery
.next()
.await
.expect("follow-up refreshable 401 should use the replacement account");
assert_eq!(follow_up_step.auth_state_changed(), Some(false));
assert_eq!(recovery.step_name(), "refresh_token");
assert!(codex_home.path().join("auth.json").exists());
}
#[tokio::test]
#[serial(codex_auth_env)]
async fn invalidated_access_token_logout_preserves_auth_changed_before_delete() {
let codex_home = tempdir().unwrap();
let _access_token_guard = remove_access_token_env_var();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: Some("pro".to_string()),
chatgpt_account_id: Some("org_mine".to_string()),
},
codex_home.path(),
)
.expect("failed to write auth file");
let manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
/*chatgpt_base_url*/ None,
)
.await;
let mut reauthenticated = load_auth_dot_json(codex_home.path(), AuthCredentialsStoreMode::File)
.expect("auth should load")
.expect("auth should exist");
reauthenticated
.tokens
.as_mut()
.expect("tokens should exist")
.access_token = "replacement-access-token".to_string();
save_auth(
codex_home.path(),
&reauthenticated,
AuthCredentialsStoreMode::File,
)
.expect("replacement auth should persist");
let outcome = manager
.logout_if_auth_snapshot_unchanged()
.await
.expect("replaced persisted auth should avoid logout");
assert!(matches!(outcome, InvalidatedAuthLogoutOutcome::AuthChanged));
assert_eq!(
manager
.auth_cached()
.expect("replacement auth should remain cached")
.get_token()
.expect("replacement token should resolve"),
"replacement-access-token"
);
assert!(codex_home.path().join("auth.json").exists());
}
#[tokio::test]
#[serial(codex_auth_env)]
async fn refresh_failure_is_scoped_to_the_matching_auth_snapshot() {
@@ -453,6 +951,7 @@ async fn unauthorized_recovery_uses_external_refresh_for_bearer_manager() {
assert!(recovery.has_next());
assert_eq!(recovery.mode_name(), "external");
assert_eq!(recovery.step_name(), "external_refresh");
assert!(!recovery.handles_invalidated_access_token_auth());
let result = recovery
.next()
@@ -610,7 +1109,8 @@ fn write_auth_file(params: AuthFileParams, codex_home: &Path) -> std::io::Result
"tokens": {
"id_token": fake_jwt,
"access_token": "test-access-token",
"refresh_token": "test-refresh-token"
"refresh_token": "test-refresh-token",
"account_id": params.chatgpt_account_id,
},
"last_refresh": Utc::now(),
});

View File

@@ -7,6 +7,10 @@ use serde::Serialize;
use serial_test::serial;
use std::env;
use std::fmt::Debug;
use std::fs::File;
use std::fs::OpenOptions;
#[cfg(unix)]
use std::os::unix::fs::OpenOptionsExt;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
@@ -84,6 +88,10 @@ struct ChatgptAuthState {
}
const TOKEN_REFRESH_INTERVAL: i64 = 8;
const CHATGPT_ACCESS_TOKEN_REFRESH_WINDOW_MINUTES: i64 = 5;
const CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_FILENAME: &str =
"chatgpt-access-token-startup-refresh.lock";
const CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_POLL_INTERVAL_MS: u64 = 50;
const REFRESH_TOKEN_EXPIRED_MESSAGE: &str = "Your access token could not be refreshed because your refresh token has expired. Please log out and sign in again.";
const REFRESH_TOKEN_REUSED_MESSAGE: &str = "Your access token could not be refreshed because your refresh token was already used. Please log out and sign in again.";
@@ -91,6 +99,8 @@ const REFRESH_TOKEN_INVALIDATED_MESSAGE: &str = "Your access token could not be
const REFRESH_TOKEN_UNKNOWN_MESSAGE: &str =
"Your access token could not be refreshed. Please log out and sign in again.";
const REFRESH_TOKEN_ACCOUNT_MISMATCH_MESSAGE: &str = "Your access token could not be refreshed because you have since logged out or signed in to another account. Please sign in again.";
const ACCESS_TOKEN_INVALIDATED_MESSAGE: &str =
"Your ChatGPT session is no longer valid. Please sign in again.";
const DEFAULT_CHATGPT_BACKEND_BASE_URL: &str = "https://chatgpt.com/backend-api";
const REFRESH_TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
pub(super) const REVOKE_TOKEN_URL: &str = "https://auth.openai.com/oauth/revoke";
@@ -1049,6 +1059,11 @@ enum ReloadOutcome {
Skipped,
}
enum InvalidatedAuthLogoutOutcome {
LoggedOut,
AuthChanged,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum UnauthorizedRecoveryMode {
Managed,
@@ -1056,7 +1071,9 @@ enum UnauthorizedRecoveryMode {
}
// UnauthorizedRecovery is a state machine that handles an attempt to refresh the authentication when requests
// to API fail with 401 status code.
// to API fail with a refreshable 401 status code. Managed ChatGPT access-token revocation is handled beside
// this flow so persisted credentials can be guarded before they are cleared; externally owned auth stays here
// so its owner can supply replacement credentials.
// The client calls next() every time it encounters a 401 error, one time per retry.
// For API key based authentication, we don't do anything and let the error bubble to the user.
//
@@ -1076,6 +1093,7 @@ enum UnauthorizedRecoveryMode {
pub struct UnauthorizedRecovery {
manager: Arc<AuthManager>,
step: UnauthorizedRecoveryStep,
expected_auth: Option<CodexAuth>,
expected_account_id: Option<String>,
mode: UnauthorizedRecoveryMode,
}
@@ -1111,6 +1129,7 @@ impl UnauthorizedRecovery {
Self {
manager,
step,
expected_auth: cached_auth,
expected_account_id,
mode,
}
@@ -1182,6 +1201,15 @@ impl UnauthorizedRecovery {
}
}
pub fn handles_invalidated_access_token_auth(&self) -> bool {
self.mode == UnauthorizedRecoveryMode::Managed
&& self
.manager
.auth_cached()
.as_ref()
.is_some_and(|auth| matches!(auth, CodexAuth::Chatgpt(_)))
}
pub async fn next(&mut self) -> Result<UnauthorizedRecoveryStepResult, RefreshTokenError> {
if !self.has_next() {
return Err(RefreshTokenError::Permanent(RefreshTokenFailedError::new(
@@ -1198,6 +1226,7 @@ impl UnauthorizedRecovery {
.await
{
ReloadOutcome::ReloadedChanged => {
self.update_expected_auth_from_cache();
self.step = UnauthorizedRecoveryStep::RefreshToken;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
@@ -1220,6 +1249,7 @@ impl UnauthorizedRecovery {
}
UnauthorizedRecoveryStep::RefreshToken => {
self.manager.refresh_token_from_authority().await?;
self.update_expected_auth_from_cache();
self.step = UnauthorizedRecoveryStep::Done;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
@@ -1229,6 +1259,7 @@ impl UnauthorizedRecovery {
self.manager
.refresh_external_auth(ExternalAuthRefreshReason::Unauthorized)
.await?;
self.update_expected_auth_from_cache();
self.step = UnauthorizedRecoveryStep::Done;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
@@ -1240,6 +1271,127 @@ impl UnauthorizedRecovery {
auth_state_changed: None,
})
}
fn update_expected_auth_from_cache(&mut self) {
let expected_auth = self.manager.auth_cached();
self.expected_account_id = expected_auth.as_ref().and_then(CodexAuth::get_account_id);
self.expected_auth = expected_auth;
}
pub async fn handle_invalidated_access_token_auth(
&mut self,
) -> Result<UnauthorizedRecoveryStepResult, RefreshTokenFailedError> {
let (result, next_expected_auth) = {
let _refresh_guard = self.manager.refresh_lock.acquire().await.map_err(|_| {
RefreshTokenFailedError::new(
RefreshTokenFailedReason::Other,
REFRESH_TOKEN_UNKNOWN_MESSAGE.to_string(),
)
})?;
let cached_auth = self.manager.auth_cached();
if !AuthManager::auths_equal_for_refresh(
cached_auth.as_ref(),
self.expected_auth.as_ref(),
) {
self.invalidated_auth_state_changed_result()
} else {
let reload_outcome = self
.manager
.reload_if_auth_snapshot_changed()
.await
.map_err(Self::invalidated_auth_storage_error)?;
match reload_outcome {
ReloadOutcome::ReloadedChanged => self.invalidated_auth_state_changed_result(),
ReloadOutcome::ReloadedNoChange => {
match self.manager.logout_if_auth_snapshot_unchanged().await {
Ok(InvalidatedAuthLogoutOutcome::AuthChanged) => {
self.invalidated_auth_state_changed_result()
}
Ok(InvalidatedAuthLogoutOutcome::LoggedOut) => (
Err(RefreshTokenFailedError::new(
RefreshTokenFailedReason::Revoked,
ACCESS_TOKEN_INVALIDATED_MESSAGE.to_string(),
)),
None,
),
Err(err) => (
Err(RefreshTokenFailedError::new(
RefreshTokenFailedReason::Revoked,
format!(
"{ACCESS_TOKEN_INVALIDATED_MESSAGE} Codex could not clear saved auth: {err}"
),
)),
None,
),
}
}
ReloadOutcome::Skipped => {
if self
.manager
.clear_cached_auth_if_storage_missing()
.await
.map_err(Self::invalidated_auth_storage_error)?
{
(
Err(RefreshTokenFailedError::new(
RefreshTokenFailedReason::Revoked,
ACCESS_TOKEN_INVALIDATED_MESSAGE.to_string(),
)),
None,
)
} else {
(
Err(RefreshTokenFailedError::new(
RefreshTokenFailedReason::Other,
REFRESH_TOKEN_ACCOUNT_MISMATCH_MESSAGE.to_string(),
)),
None,
)
}
}
}
}
};
if result.is_ok() {
self.expected_account_id = next_expected_auth
.as_ref()
.and_then(CodexAuth::get_account_id);
self.expected_auth = next_expected_auth;
}
result
}
fn invalidated_auth_state_changed_result(
&self,
) -> (
Result<UnauthorizedRecoveryStepResult, RefreshTokenFailedError>,
Option<CodexAuth>,
) {
let cached_auth = self.manager.auth_cached();
if cached_auth.is_none() {
return (
Err(RefreshTokenFailedError::new(
RefreshTokenFailedReason::Revoked,
ACCESS_TOKEN_INVALIDATED_MESSAGE.to_string(),
)),
None,
);
}
(
Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
}),
cached_auth,
)
}
fn invalidated_auth_storage_error(err: std::io::Error) -> RefreshTokenFailedError {
RefreshTokenFailedError::new(
RefreshTokenFailedReason::Revoked,
format!("{ACCESS_TOKEN_INVALIDATED_MESSAGE} Codex could not inspect saved auth: {err}"),
)
}
}
/// Central manager providing a single source of truth for auth.json derived
@@ -1481,6 +1633,48 @@ impl AuthManager {
}
}
async fn reload_if_auth_snapshot_changed(&self) -> std::io::Result<ReloadOutcome> {
let new_auth = self.try_load_auth_from_storage().await?;
let cached_before_reload = self.auth_cached();
let auth_changed =
!Self::auths_equal_for_refresh(cached_before_reload.as_ref(), new_auth.as_ref());
if !auth_changed {
return Ok(ReloadOutcome::ReloadedNoChange);
}
tracing::info!("Reloading auth because the persisted auth snapshot changed.");
self.set_cached_auth(new_auth);
Ok(ReloadOutcome::ReloadedChanged)
}
async fn clear_cached_auth_if_storage_missing(&self) -> std::io::Result<bool> {
if self.try_load_auth_from_storage().await?.is_some() {
return Ok(false);
}
tracing::info!("Clearing cached auth because persisted auth is no longer available.");
self.set_cached_auth(/*new_auth*/ None);
Ok(true)
}
async fn logout_if_auth_snapshot_unchanged(
&self,
) -> std::io::Result<InvalidatedAuthLogoutOutcome> {
let persisted_auth = self.try_load_auth_from_storage().await?;
let cached_auth = self.auth_cached();
if !Self::auths_equal_for_refresh(cached_auth.as_ref(), persisted_auth.as_ref()) {
tracing::info!(
"Skipping auth logout because the persisted auth snapshot changed before deletion."
);
self.set_cached_auth(persisted_auth);
return Ok(InvalidatedAuthLogoutOutcome::AuthChanged);
}
logout_all_stores(&self.codex_home, self.auth_credentials_store_mode)?;
self.reload().await;
Ok(InvalidatedAuthLogoutOutcome::LoggedOut)
}
fn auths_equal_for_refresh(a: Option<&CodexAuth>, b: Option<&CodexAuth>) -> bool {
match (a, b) {
(None, None) => true,
@@ -1529,7 +1723,7 @@ impl AuthManager {
}
}
async fn load_auth_from_storage(&self) -> Option<CodexAuth> {
async fn try_load_auth_from_storage(&self) -> std::io::Result<Option<CodexAuth>> {
load_auth(
&self.codex_home,
self.enable_codex_api_key_env,
@@ -1537,8 +1731,10 @@ impl AuthManager {
self.chatgpt_base_url.as_deref(),
)
.await
.ok()
.flatten()
}
async fn load_auth_from_storage(&self) -> Option<CodexAuth> {
self.try_load_auth_from_storage().await.ok().flatten()
}
fn set_cached_auth(&self, new_auth: Option<CodexAuth>) -> bool {
@@ -1686,6 +1882,10 @@ impl AuthManager {
REFRESH_TOKEN_UNKNOWN_MESSAGE.to_string(),
))
})?;
self.refresh_token_with_refresh_lock_held().await
}
async fn refresh_token_with_refresh_lock_held(&self) -> Result<(), RefreshTokenError> {
let auth_before_reload = self.auth_cached();
if auth_before_reload
.as_ref()
@@ -1715,6 +1915,85 @@ impl AuthManager {
}
}
/// Refresh managed ChatGPT auth when its access token is nearly expired.
///
/// CLI startup uses the same five-minute refresh window as ChatGPT web so a
/// new session does not begin with a token that is about to expire. Other auth
/// modes either cannot be refreshed locally or have separate refresh ownership,
/// so they intentionally no-op here.
pub async fn refresh_managed_chatgpt_token_if_near_expiry(
&self,
) -> Result<(), RefreshTokenError> {
let Some(CodexAuth::Chatgpt(chatgpt_auth)) = self.auth_cached() else {
return Ok(());
};
let should_refresh = chatgpt_auth
.current_token_data()
.and_then(|tokens| parse_jwt_expiration(&tokens.access_token).ok().flatten())
.is_some_and(|expires_at| {
expires_at
<= Utc::now()
+ chrono::Duration::minutes(CHATGPT_ACCESS_TOKEN_REFRESH_WINDOW_MINUTES)
});
if !should_refresh {
return Ok(());
}
let _refresh_lock = self.acquire_chatgpt_startup_refresh_lock().await?;
let _refresh_guard = self.refresh_lock.acquire().await.map_err(|_| {
RefreshTokenError::Permanent(RefreshTokenFailedError::new(
RefreshTokenFailedReason::Other,
REFRESH_TOKEN_UNKNOWN_MESSAGE.to_string(),
))
})?;
self.refresh_token_with_refresh_lock_held().await
}
async fn acquire_chatgpt_startup_refresh_lock(&self) -> Result<File, RefreshTokenError> {
let mut logged_wait = false;
loop {
if let Some(lock_file) = self.try_acquire_chatgpt_startup_refresh_lock()? {
return Ok(lock_file);
}
if !logged_wait {
tracing::info!(
"Waiting to proactively refresh ChatGPT access token because another process is already refreshing it."
);
logged_wait = true;
}
tokio::time::sleep(std::time::Duration::from_millis(
CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_POLL_INTERVAL_MS,
))
.await;
}
}
fn try_acquire_chatgpt_startup_refresh_lock(&self) -> Result<Option<File>, RefreshTokenError> {
let lock_path = self
.codex_home
.join(CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_FILENAME);
if let Some(parent) = lock_path.parent() {
std::fs::create_dir_all(parent).map_err(RefreshTokenError::Transient)?;
}
let mut options = OpenOptions::new();
options.read(true).write(true).create(true).truncate(false);
#[cfg(unix)]
{
options.mode(0o600);
}
let lock_file = options
.open(lock_path)
.map_err(RefreshTokenError::Transient)?;
match lock_file.try_lock() {
Ok(()) => Ok(Some(lock_file)),
Err(std::fs::TryLockError::WouldBlock) => Ok(None),
Err(err) => Err(RefreshTokenError::Transient(err.into())),
}
}
/// Attempt to refresh the current auth token from the authority that issued
/// the token. On success, reloads the auth state from disk so other components
/// observe refreshed token. If the token refresh fails, returns the error to

View File

@@ -19,6 +19,7 @@ use pretty_assertions::assert_eq;
use serde::Serialize;
use serde_json::json;
use std::ffi::OsString;
use std::fs::File;
use std::sync::Arc;
use tempfile::TempDir;
use wiremock::Mock;
@@ -158,6 +159,242 @@ async fn refresh_token_refreshes_when_auth_is_unchanged() -> Result<()> {
Ok(())
}
#[serial_test::serial(auth_refresh)]
#[tokio::test]
async fn refresh_managed_chatgpt_token_refreshes_when_auth_is_near_expiry() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/oauth/token"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"access_token": "new-access-token",
"refresh_token": "new-refresh-token"
})))
.expect(1)
.mount(&server)
.await;
let ctx = RefreshTokenTestContext::new(&server).await?;
let initial_last_refresh = Utc::now();
let near_expiry_access_token = access_token_with_expiration(Utc::now() + Duration::minutes(4));
let initial_tokens = build_tokens(&near_expiry_access_token, INITIAL_REFRESH_TOKEN);
let initial_auth = AuthDotJson {
auth_mode: Some(AuthMode::Chatgpt),
openai_api_key: None,
tokens: Some(initial_tokens.clone()),
last_refresh: Some(initial_last_refresh),
agent_identity: None,
};
ctx.write_auth(&initial_auth).await?;
ctx.auth_manager
.refresh_managed_chatgpt_token_if_near_expiry()
.await
.context("managed ChatGPT refresh should succeed")?;
let refreshed_tokens = TokenData {
access_token: "new-access-token".to_string(),
refresh_token: "new-refresh-token".to_string(),
..initial_tokens.clone()
};
let stored = ctx.load_auth()?;
let tokens = stored.tokens.as_ref().context("tokens should exist")?;
assert_eq!(tokens, &refreshed_tokens);
let refreshed_at = stored
.last_refresh
.as_ref()
.context("last_refresh should be recorded")?;
assert!(
*refreshed_at >= initial_last_refresh,
"last_refresh should advance"
);
server.verify().await;
Ok(())
}
#[serial_test::serial(auth_refresh)]
#[tokio::test]
async fn refresh_managed_chatgpt_token_skips_auth_outside_refresh_window() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = MockServer::start().await;
let ctx = RefreshTokenTestContext::new(&server).await?;
let initial_last_refresh = Utc::now();
let fresh_access_token = access_token_with_expiration(Utc::now() + Duration::minutes(6));
let initial_tokens = build_tokens(&fresh_access_token, INITIAL_REFRESH_TOKEN);
let initial_auth = AuthDotJson {
auth_mode: Some(AuthMode::Chatgpt),
openai_api_key: None,
tokens: Some(initial_tokens.clone()),
last_refresh: Some(initial_last_refresh),
agent_identity: None,
};
ctx.write_auth(&initial_auth).await?;
ctx.auth_manager
.refresh_managed_chatgpt_token_if_near_expiry()
.await
.context("managed ChatGPT refresh should no-op")?;
assert_eq!(ctx.load_auth()?, initial_auth);
let requests = server.received_requests().await.unwrap_or_default();
assert!(requests.is_empty(), "expected no refresh token requests");
Ok(())
}
#[serial_test::serial(auth_refresh)]
#[tokio::test]
async fn refresh_managed_chatgpt_token_waits_while_startup_refresh_lock_is_held() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/oauth/token"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"access_token": "new-access-token",
"refresh_token": "new-refresh-token"
})))
.expect(1)
.mount(&server)
.await;
let ctx = RefreshTokenTestContext::new(&server).await?;
let initial_last_refresh = Utc::now();
let expired_access_token = access_token_with_expiration(Utc::now() - Duration::minutes(1));
let initial_tokens = build_tokens(&expired_access_token, INITIAL_REFRESH_TOKEN);
let initial_auth = AuthDotJson {
auth_mode: Some(AuthMode::Chatgpt),
openai_api_key: None,
tokens: Some(initial_tokens.clone()),
last_refresh: Some(initial_last_refresh),
agent_identity: None,
};
ctx.write_auth(&initial_auth).await?;
let lock_path = ctx
.codex_home
.path()
.join("chatgpt-access-token-startup-refresh.lock");
let lock_file = File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(lock_path)?;
lock_file.try_lock()?;
let auth_manager = Arc::clone(&ctx.auth_manager);
let refresh_task = tokio::spawn(async move {
auth_manager
.refresh_managed_chatgpt_token_if_near_expiry()
.await
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(
!refresh_task.is_finished(),
"managed ChatGPT refresh should wait while another startup holds the lock"
);
assert_eq!(ctx.load_auth()?, initial_auth);
let requests = server.received_requests().await.unwrap_or_default();
assert!(
requests.is_empty(),
"expected no refresh token requests before the startup lock is released"
);
drop(lock_file);
refresh_task
.await
.context("startup refresh task should join")?
.context("managed ChatGPT refresh should resume after the lock is released")?;
let stored = ctx.load_auth()?;
let tokens = stored.tokens.as_ref().context("tokens should exist")?;
assert_eq!(tokens.access_token, "new-access-token");
assert_eq!(tokens.refresh_token, "new-refresh-token");
server.verify().await;
Ok(())
}
#[serial_test::serial(auth_refresh)]
#[tokio::test]
async fn refresh_token_does_not_wait_while_startup_refresh_lock_is_held() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/oauth/token"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"access_token": "new-access-token",
"refresh_token": "new-refresh-token"
})))
.expect(1)
.mount(&server)
.await;
let ctx = RefreshTokenTestContext::new(&server).await?;
let initial_last_refresh = Utc::now();
let expired_access_token = access_token_with_expiration(Utc::now() - Duration::minutes(1));
let initial_tokens = build_tokens(&expired_access_token, INITIAL_REFRESH_TOKEN);
ctx.write_auth(&AuthDotJson {
auth_mode: Some(AuthMode::Chatgpt),
openai_api_key: None,
tokens: Some(initial_tokens),
last_refresh: Some(initial_last_refresh),
agent_identity: None,
})
.await?;
let lock_path = ctx
.codex_home
.path()
.join("chatgpt-access-token-startup-refresh.lock");
let lock_file = File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(lock_path)?;
lock_file.try_lock()?;
let auth_manager = Arc::clone(&ctx.auth_manager);
let startup_refresh_task = tokio::spawn(async move {
auth_manager
.refresh_managed_chatgpt_token_if_near_expiry()
.await
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(
!startup_refresh_task.is_finished(),
"startup refresh should wait while another process holds the file lock"
);
tokio::time::timeout(
std::time::Duration::from_secs(1),
ctx.auth_manager.refresh_token_from_authority(),
)
.await
.context("normal refresh should not wait for the startup file lock")?
.context("normal refresh should succeed")?;
startup_refresh_task.abort();
let _ = startup_refresh_task.await;
drop(lock_file);
let stored = ctx.load_auth()?;
let tokens = stored.tokens.as_ref().context("tokens should exist")?;
assert_eq!(tokens.access_token, "new-access-token");
assert_eq!(tokens.refresh_token, "new-refresh-token");
server.verify().await;
Ok(())
}
#[serial_test::serial(auth_refresh)]
#[tokio::test]
async fn refresh_token_skips_refresh_when_auth_changed() -> Result<()> {

View File

@@ -14,15 +14,13 @@ pub struct ResponseDebugContext {
pub cf_ray: Option<String>,
pub auth_error: Option<String>,
pub auth_error_code: Option<String>,
pub auth_error_type: Option<String>,
}
pub fn extract_response_debug_context(transport: &TransportError) -> ResponseDebugContext {
let mut context = ResponseDebugContext::default();
let TransportError::Http {
headers, body: _, ..
} = transport
else {
let TransportError::Http { headers, body, .. } = transport else {
return context;
};
@@ -38,21 +36,46 @@ pub fn extract_response_debug_context(transport: &TransportError) -> ResponseDeb
extract_header(REQUEST_ID_HEADER).or_else(|| extract_header(OAI_REQUEST_ID_HEADER));
context.cf_ray = extract_header(CF_RAY_HEADER);
context.auth_error = extract_header(AUTH_ERROR_HEADER);
context.auth_error_code = extract_header(X_ERROR_JSON_HEADER).and_then(|encoded| {
let decoded = base64::engine::general_purpose::STANDARD
.decode(encoded)
.ok()?;
let parsed = serde_json::from_slice::<serde_json::Value>(&decoded).ok()?;
parsed
.get("error")
.and_then(|error| error.get("code"))
.and_then(serde_json::Value::as_str)
.map(str::to_string)
});
let header_error = extract_header(X_ERROR_JSON_HEADER)
.and_then(|encoded| parse_x_error_json(encoded.as_str()));
let body_error = body
.as_deref()
.and_then(|body| serde_json::from_str::<serde_json::Value>(body).ok());
context.auth_error_code = header_error
.as_ref()
.and_then(|error| extract_error_field(error, "code"))
.or_else(|| {
body_error
.as_ref()
.and_then(|error| extract_error_field(error, "code"))
});
context.auth_error_type = header_error
.as_ref()
.and_then(|error| extract_error_field(error, "type"))
.or_else(|| {
body_error
.as_ref()
.and_then(|error| extract_error_field(error, "type"))
});
context
}
fn parse_x_error_json(encoded: &str) -> Option<serde_json::Value> {
let decoded = base64::engine::general_purpose::STANDARD
.decode(encoded)
.ok()?;
serde_json::from_slice::<serde_json::Value>(&decoded).ok()
}
fn extract_error_field(error: &serde_json::Value, field: &str) -> Option<String> {
error
.get("error")
.and_then(|error| error.get(field))
.and_then(serde_json::Value::as_str)
.map(str::to_string)
}
pub fn extract_response_debug_context_from_api_error(error: &ApiError) -> ResponseDebugContext {
match error {
ApiError::Transport(transport) => extract_response_debug_context(transport),
@@ -127,6 +150,43 @@ mod tests {
cf_ray: Some("ray-auth".to_string()),
auth_error: Some("missing_authorization_header".to_string()),
auth_error_code: Some("token_expired".to_string()),
auth_error_type: None,
}
);
}
#[test]
fn extract_response_debug_context_reads_identity_error_code_from_body() {
let context = extract_response_debug_context(&TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
headers: None,
body: Some(r#"{"error":{"code":"token_revoked"}}"#.to_string()),
});
assert_eq!(
context,
ResponseDebugContext {
auth_error_code: Some("token_revoked".to_string()),
..ResponseDebugContext::default()
}
);
}
#[test]
fn extract_response_debug_context_reads_identity_error_type_from_body() {
let context = extract_response_debug_context(&TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
headers: None,
body: Some(r#"{"error":{"type":"token_invalidated"}}"#.to_string()),
});
assert_eq!(
context,
ResponseDebugContext {
auth_error_type: Some("token_invalidated".to_string()),
..ResponseDebugContext::default()
}
);
}

View File

@@ -38,6 +38,7 @@ use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadSortKey as AppServerThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
use codex_cloud_requirements::cloud_requirements_loader_for_storage;
use codex_cloud_requirements::refresh_managed_chatgpt_token_for_storage_if_near_expiry;
use codex_config::CloudRequirementsLoader;
use codex_config::ConfigLoadError;
use codex_config::LoaderOverrides;
@@ -281,7 +282,6 @@ pub use public_widgets::composer_input::ComposerInput;
#[cfg(unix)]
const AUTO_CONNECT_DAEMON_CONNECT_TIMEOUT: std::time::Duration =
std::time::Duration::from_millis(50);
#[allow(clippy::too_many_arguments)]
async fn start_embedded_app_server(
arg0_paths: Arg0DispatchPaths,
@@ -1168,6 +1168,14 @@ pub async fn run_main(
eprintln!("{err}");
std::process::exit(1);
}
refresh_managed_chatgpt_token_for_storage_if_near_expiry(
config.codex_home.to_path_buf(),
/*enable_codex_api_key_env*/ false,
config.cli_auth_credentials_store_mode,
config.chatgpt_base_url.clone(),
)
.await;
}
let log_dir = config.log_dir.clone();