Compare commits

...

4 Commits

Author SHA1 Message Date
celia-oai
41696c897b changes 2026-03-23 15:46:09 -07:00
Eric Traut
91850c2c04 Document permanent refresh failure 2026-03-23 13:03:04 -07:00
Eric Traut
1306c64d98 Document poisoned_managed_auth field 2026-03-23 13:03:03 -07:00
Eric Traut
2468bee659 Mitigate token refresh storms 2026-03-23 13:02:38 -07:00
4 changed files with 382 additions and 18 deletions

View File

@@ -409,6 +409,13 @@ enum EnsureConversationListenerResult {
ConnectionClosed,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum RefreshTokenRequestOutcome {
NotAttemptedOrSucceeded,
FailedTransiently,
FailedPermanently,
}
pub(crate) struct CodexMessageProcessorArgs {
pub(crate) auth_manager: Arc<AuthManager>,
pub(crate) thread_manager: Arc<ThreadManager>,
@@ -1338,20 +1345,26 @@ impl CodexMessageProcessor {
}
}
async fn refresh_token_if_requested(&self, do_refresh: bool) {
async fn refresh_token_if_requested(&self, do_refresh: bool) -> RefreshTokenRequestOutcome {
if self.auth_manager.is_external_auth_active() {
return;
return RefreshTokenRequestOutcome::NotAttemptedOrSucceeded;
}
if do_refresh && let Err(err) = self.auth_manager.refresh_token().await {
tracing::warn!("failed to refresh token while getting account: {err}");
let failed_reason = err.failed_reason();
if failed_reason.is_none() {
tracing::warn!("failed to refresh token while getting account: {err}");
return RefreshTokenRequestOutcome::FailedTransiently;
}
return RefreshTokenRequestOutcome::FailedPermanently;
}
RefreshTokenRequestOutcome::NotAttemptedOrSucceeded
}
async fn get_auth_status(&self, request_id: ConnectionRequestId, params: GetAuthStatusParams) {
let include_token = params.include_token.unwrap_or(false);
let do_refresh = params.refresh_token.unwrap_or(false);
self.refresh_token_if_requested(do_refresh).await;
let refresh_outcome = self.refresh_token_if_requested(do_refresh).await;
// Determine whether auth is required based on the active model provider.
// If a custom provider is configured with `requires_openai_auth == false`,
@@ -1365,20 +1378,31 @@ impl CodexMessageProcessor {
requires_openai_auth: Some(false),
}
} else {
let refresh_failure = self.auth_manager.refresh_failure();
let permanent_refresh_failure = refresh_failure.is_some()
|| matches!(
refresh_outcome,
RefreshTokenRequestOutcome::FailedPermanently
);
match self.auth_manager.auth().await {
Some(auth) => {
let auth_mode = auth.api_auth_mode();
let (reported_auth_method, token_opt) = match auth.get_token() {
Ok(token) if !token.is_empty() => {
let tok = if include_token { Some(token) } else { None };
(Some(auth_mode), tok)
}
Ok(_) => (None, None),
Err(err) => {
tracing::warn!("failed to get token for auth status: {err}");
(None, None)
}
};
let (reported_auth_method, token_opt) =
if include_token && permanent_refresh_failure {
(Some(auth_mode), None)
} else {
match auth.get_token() {
Ok(token) if !token.is_empty() => {
let tok = if include_token { Some(token) } else { None };
(Some(auth_mode), tok)
}
Ok(_) => (None, None),
Err(err) => {
tracing::warn!("failed to get token for auth status: {err}");
(None, None)
}
}
};
GetAuthStatusResponse {
auth_method: reported_auth_method,
auth_token: token_opt,

View File

@@ -1,6 +1,8 @@
use anyhow::Result;
use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::GetAuthStatusParams;
use codex_app_server_protocol::GetAuthStatusResponse;
@@ -8,10 +10,17 @@ use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::LoginAccountResponse;
use codex_app_server_protocol::RequestId;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::auth::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
@@ -207,6 +216,121 @@ async fn get_auth_status_with_api_key_no_include_token() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_auth_status_with_api_key_refresh_requested() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
login_with_api_key_via_request(&mut mcp, "sk-test-key").await?;
let request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true),
refresh_token: Some(true),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let status: GetAuthStatusResponse = to_response(resp)?;
assert_eq!(
status,
GetAuthStatusResponse {
auth_method: Some(AuthMode::ApiKey),
auth_token: Some("sk-test-key".to_string()),
requires_openai_auth: Some(true),
}
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_auth_status_omits_token_after_permanent_refresh_failure() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("stale-access-token")
.refresh_token("stale-refresh-token")
.account_id("acct_123")
.email("user@example.com")
.plan_type("pro"),
AuthCredentialsStoreMode::File,
)?;
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/oauth/token"))
.respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({
"error": {
"code": "refresh_token_reused"
}
})))
.expect(1)
.mount(&server)
.await;
let refresh_url = format!("{}/oauth/token", server.uri());
let mut mcp = McpProcess::new_with_env(
codex_home.path(),
&[
("OPENAI_API_KEY", None),
(
REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR,
Some(refresh_url.as_str()),
),
],
)
.await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true),
refresh_token: Some(true),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let status: GetAuthStatusResponse = to_response(resp)?;
assert_eq!(
status,
GetAuthStatusResponse {
auth_method: Some(AuthMode::Chatgpt),
auth_token: None,
requires_openai_auth: Some(true),
}
);
let second_request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true),
refresh_token: Some(true),
})
.await?;
let second_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(second_request_id)),
)
.await??;
let second_status: GetAuthStatusResponse = to_response(second_resp)?;
assert_eq!(second_status, status);
server.verify().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn login_api_key_rejected_when_forced_chatgpt() -> Result<()> {
let codex_home = TempDir::new()?;

View File

@@ -543,6 +543,153 @@ async fn refresh_token_returns_permanent_error_for_expired_refresh_token() -> Re
Ok(())
}
#[serial_test::serial(auth_refresh)]
#[tokio::test]
async fn refresh_token_does_not_retry_after_permanent_failure() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/oauth/token"))
.respond_with(ResponseTemplate::new(401).set_body_json(json!({
"error": {
"code": "refresh_token_reused"
}
})))
.expect(1)
.mount(&server)
.await;
let ctx = RefreshTokenTestContext::new(&server)?;
let initial_last_refresh = Utc::now() - Duration::days(1);
let initial_tokens = build_tokens(INITIAL_ACCESS_TOKEN, INITIAL_REFRESH_TOKEN);
let initial_auth = AuthDotJson {
auth_mode: Some(AuthMode::Chatgpt),
openai_api_key: None,
tokens: Some(initial_tokens.clone()),
last_refresh: Some(initial_last_refresh),
};
ctx.write_auth(&initial_auth)?;
let first_err = ctx
.auth_manager
.refresh_token()
.await
.err()
.context("first refresh should fail")?;
assert_eq!(
first_err.failed_reason(),
Some(RefreshTokenFailedReason::Exhausted)
);
let second_err = ctx
.auth_manager
.refresh_token()
.await
.err()
.context("second refresh should fail without retrying")?;
assert_eq!(
second_err.failed_reason(),
Some(RefreshTokenFailedReason::Exhausted)
);
let stored = ctx.load_auth()?;
assert_eq!(stored, initial_auth);
let cached_auth = ctx
.auth_manager
.auth()
.await
.context("auth should remain cached")?;
let cached = cached_auth
.get_token_data()
.context("token data should remain cached")?;
assert_eq!(cached, initial_tokens);
server.verify().await;
Ok(())
}
#[serial_test::serial(auth_refresh)]
#[tokio::test]
async fn refresh_token_reloads_changed_auth_after_permanent_failure() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/oauth/token"))
.respond_with(ResponseTemplate::new(401).set_body_json(json!({
"error": {
"code": "refresh_token_reused"
}
})))
.expect(1)
.mount(&server)
.await;
let ctx = RefreshTokenTestContext::new(&server)?;
let initial_last_refresh = Utc::now() - Duration::days(1);
let initial_tokens = build_tokens(INITIAL_ACCESS_TOKEN, INITIAL_REFRESH_TOKEN);
let initial_auth = AuthDotJson {
auth_mode: Some(AuthMode::Chatgpt),
openai_api_key: None,
tokens: Some(initial_tokens.clone()),
last_refresh: Some(initial_last_refresh),
};
ctx.write_auth(&initial_auth)?;
let first_err = ctx
.auth_manager
.refresh_token()
.await
.err()
.context("first refresh should fail")?;
assert_eq!(
first_err.failed_reason(),
Some(RefreshTokenFailedReason::Exhausted)
);
let fresh_refresh = Utc::now() - Duration::hours(1);
let disk_tokens = build_tokens("disk-access-token", "disk-refresh-token");
let disk_auth = AuthDotJson {
auth_mode: Some(AuthMode::Chatgpt),
openai_api_key: None,
tokens: Some(disk_tokens.clone()),
last_refresh: Some(fresh_refresh),
};
save_auth(
ctx.codex_home.path(),
&disk_auth,
AuthCredentialsStoreMode::File,
)?;
ctx.auth_manager
.refresh_token()
.await
.context("refresh should reload changed auth without retrying")?;
let stored = ctx.load_auth()?;
assert_eq!(stored, disk_auth);
let cached_auth = ctx
.auth_manager
.auth_cached()
.context("auth should be cached")?;
let cached = cached_auth
.get_token_data()
.context("token data should reload from disk")?;
assert_eq!(cached, disk_tokens);
let requests = server.received_requests().await.unwrap_or_default();
assert_eq!(
requests.len(),
1,
"expected only the initial refresh request"
);
server.verify().await;
Ok(())
}
#[serial_test::serial(auth_refresh)]
#[tokio::test]
async fn refresh_token_returns_transient_error_on_server_failure() -> Result<()> {

View File

@@ -796,6 +796,9 @@ struct CachedAuth {
auth: Option<CodexAuth>,
/// Callback used to refresh external auth by asking the parent app for new tokens.
external_refresher: Option<Arc<dyn ExternalAuthRefresher>>,
/// Permanent refresh failure cached for the current auth snapshot so
/// later refresh attempts for the same credentials fail fast without network.
permanent_refresh_failure: Option<RefreshTokenFailedError>,
}
impl Debug for CachedAuth {
@@ -809,6 +812,13 @@ impl Debug for CachedAuth {
"external_refresher",
&self.external_refresher.as_ref().map(|_| "present"),
)
.field(
"permanent_refresh_failure",
&self
.permanent_refresh_failure
.as_ref()
.map(|failure| failure.reason),
)
.finish()
}
}
@@ -1046,6 +1056,7 @@ impl AuthManager {
inner: RwLock::new(CachedAuth {
auth: managed_auth,
external_refresher: None,
permanent_refresh_failure: None,
}),
enable_codex_api_key_env,
auth_credentials_store_mode,
@@ -1058,6 +1069,7 @@ impl AuthManager {
let cached = CachedAuth {
auth: Some(auth),
external_refresher: None,
permanent_refresh_failure: None,
};
Arc::new(Self {
@@ -1074,6 +1086,7 @@ impl AuthManager {
let cached = CachedAuth {
auth: Some(auth),
external_refresher: None,
permanent_refresh_failure: None,
};
Arc::new(Self {
codex_home,
@@ -1089,6 +1102,11 @@ impl AuthManager {
self.inner.read().ok().and_then(|c| c.auth.clone())
}
pub fn refresh_failure(&self) -> Option<RefreshTokenFailedError> {
let auth = self.auth_cached()?;
self.permanent_refresh_failure_for_auth(&auth)
}
/// Current cached auth (clone). May be `None` if not logged in or load failed.
/// For stale managed ChatGPT auth, first performs a guarded reload and then
/// refreshes only if the on-disk auth is unchanged.
@@ -1166,6 +1184,38 @@ impl AuthManager {
}
}
/// Returns the cached permanent refresh failure only when `auth` still
/// matches the current cached auth snapshot.
fn permanent_refresh_failure_for_auth(
&self,
auth: &CodexAuth,
) -> Option<RefreshTokenFailedError> {
self.inner
.read()
.ok()
.and_then(|cached| cached.permanent_refresh_failure.clone())
.filter(|_| {
let current_auth = self.auth_cached();
Self::auths_equal_for_refresh(Some(auth), current_auth.as_ref())
})
}
/// Records a permanent refresh failure only if the failed refresh was
/// attempted against the auth snapshot that is still cached.
fn record_permanent_refresh_failure_if_unchanged(
&self,
attempted_auth: &CodexAuth,
error: &RefreshTokenFailedError,
) {
if let Ok(mut guard) = self.inner.write() {
let current_auth_matches =
Self::auths_equal_for_refresh(Some(attempted_auth), guard.auth.as_ref());
if current_auth_matches {
guard.permanent_refresh_failure = Some(error.clone());
}
}
}
fn load_auth_from_storage(&self) -> Option<CodexAuth> {
load_auth(
&self.codex_home,
@@ -1180,6 +1230,11 @@ impl AuthManager {
if let Ok(mut guard) = self.inner.write() {
let previous = guard.auth.as_ref();
let changed = !AuthManager::auths_equal(previous, new_auth.as_ref());
let auth_changed_for_refresh =
!Self::auths_equal_for_refresh(previous, new_auth.as_ref());
if auth_changed_for_refresh {
guard.permanent_refresh_failure = None;
}
tracing::info!("Reloaded auth, changed: {changed}");
guard.auth = new_auth;
changed
@@ -1255,6 +1310,12 @@ impl AuthManager {
/// token is the same as the cached, then ask the token authority to refresh.
pub async fn refresh_token(&self) -> Result<(), RefreshTokenError> {
let auth_before_reload = self.auth_cached();
if auth_before_reload
.as_ref()
.is_some_and(CodexAuth::is_api_key_auth)
{
return Ok(());
}
let expected_account_id = auth_before_reload
.as_ref()
.and_then(CodexAuth::get_account_id);
@@ -1285,7 +1346,12 @@ impl AuthManager {
Some(auth) => auth,
None => return Ok(()),
};
match auth {
if let Some(error) = self.permanent_refresh_failure_for_auth(&auth) {
return Err(RefreshTokenError::Permanent(error));
}
let attempted_auth = auth.clone();
let result = match auth {
CodexAuth::ChatgptAuthTokens(_) => {
self.refresh_external_auth(ExternalAuthRefreshReason::Unauthorized)
.await
@@ -1297,11 +1363,14 @@ impl AuthManager {
))
})?;
self.refresh_and_persist_chatgpt_token(&chatgpt_auth, token_data.refresh_token)
.await?;
Ok(())
.await
}
CodexAuth::ApiKey(_) => Ok(()),
};
if let Err(RefreshTokenError::Permanent(error)) = &result {
self.record_permanent_refresh_failure_if_unchanged(&attempted_auth, error);
}
result
}
/// Log out by deleting the ondisk auth.json (if present). Returns Ok(true)