Compare commits

...

6 Commits

Author SHA1 Message Date
Eric Traut
d681ed2f2b Fixed lint 2026-01-05 18:04:53 -07:00
Eric Traut
ddcb82c9b5 Code review feedback 2026-01-05 17:58:09 -07:00
Eric Traut
35567b3a0b Updated AGENTS.md to reflect code simplification rules 2026-01-05 16:02:30 -07:00
Eric Traut
73a1642fd4 Code review feedback 2026-01-05 15:57:16 -07:00
Eric Traut
ef60a312c5 Merge branch 'main' into etraut/concurrent_refresh 2026-01-05 15:56:54 -07:00
Eric Traut
896e9ed212 Fix "could not refresh token" error resulting from concurrent CLI instances
Idle Codex CLI instances can get stuck after another concurrently-running instance refreshes and rotates the shared ChatGPT refresh token: the idle process wakes up, gets a 401, and its in-memory refresh token is no longer valid, so refresh fails permanently.

This change makes 401 recovery resilient to concurrent token rotation by first syncing ChatGPT tokens from the configured credential store (file/keyring/auto) and retrying the request, then performing a network refresh only if needed (using the refresh token loaded from storage). It also prevents accidental cross-account/workspace switching by only adopting/refreshing when chatgpt_account_id matches the request’s auth snapshot, and adds bounded retries on transient auth.json parse errors to handle concurrent truncate+write. Added unit tests for the storage-sync outcomes.
2025-12-31 13:07:37 -07:00
4 changed files with 454 additions and 32 deletions

View File

@@ -52,6 +52,12 @@ See `codex-rs/tui/styles.md`.
- If you need to indent wrapped lines, use the initial_indent / subsequent_indent options from RtOptions if you can, rather than writing custom logic.
- If you have a list of lines and you need to prefix them all with some prefix (optionally different on the first vs subsequent lines), use the `prefix_lines` helper from line_utils.
## Code
### Coding conventions
Keep functions relatively simple. Do not produce functions with dozens of return statements. Create helper functions if needed to keep code flow easy to understand.
## Tests
### Snapshot tests

View File

@@ -100,6 +100,14 @@ impl From<RefreshTokenError> for std::io::Error {
}
impl CodexAuth {
/// Workspace/account identifier associated with this auth snapshot, derived from the cached
/// ID token claims. This is used to prevent accidentally adopting credentials for a different
/// identity when multiple Codex instances share a credential store.
pub fn chatgpt_account_id(&self) -> Option<String> {
self.get_current_token_data()
.and_then(|t| t.id_token.chatgpt_account_id)
}
pub async fn refresh_token(&self) -> Result<String, RefreshTokenError> {
tracing::info!("Refreshing token");
@@ -719,6 +727,165 @@ mod tests {
assert_eq!(auth, None);
}
#[tokio::test]
async fn sync_from_storage_applies_updated_tokens_for_matching_identity() {
let codex_home = tempdir().unwrap();
let jwt = write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: "pro".to_string(),
chatgpt_account_id: Some("acct_123".to_string()),
},
codex_home.path(),
)
.expect("seed auth");
let manager = AuthManager::new(
codex_home.path().to_path_buf(),
false,
AuthCredentialsStoreMode::File,
);
let expected = manager.auth().expect("auth should be loaded");
let updated = AuthDotJson {
openai_api_key: None,
tokens: Some(TokenData {
id_token: parse_id_token(&jwt).expect("jwt should parse"),
access_token: "new-access".to_string(),
refresh_token: "new-refresh".to_string(),
account_id: None,
}),
last_refresh: Some(Utc::now()),
};
save_auth(codex_home.path(), &updated, AuthCredentialsStoreMode::File)
.expect("write updated auth");
let sync = manager
.sync_from_storage_for_request(&expected)
.await
.expect("sync should succeed");
assert_eq!(sync, SyncFromStorageResult::Applied { changed: true });
let current = manager.auth().expect("auth should exist");
let guard = current.auth_dot_json.lock().unwrap();
let tokens = guard
.as_ref()
.and_then(|a| a.tokens.as_ref())
.expect("tokens should exist");
assert_eq!(tokens.access_token, "new-access");
assert_eq!(tokens.refresh_token, "new-refresh");
}
#[tokio::test]
async fn sync_from_storage_returns_mismatch_for_different_identity() {
let codex_home = tempdir().unwrap();
let _jwt_expected = write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: "pro".to_string(),
chatgpt_account_id: Some("acct_123".to_string()),
},
codex_home.path(),
)
.expect("seed auth");
let manager = AuthManager::new(
codex_home.path().to_path_buf(),
false,
AuthCredentialsStoreMode::File,
);
let expected = manager.auth().expect("auth should be loaded");
let jwt_other = write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: "pro".to_string(),
chatgpt_account_id: Some("acct_other".to_string()),
},
codex_home.path(),
)
.expect("seed other auth");
let mismatched = AuthDotJson {
openai_api_key: None,
tokens: Some(TokenData {
id_token: parse_id_token(&jwt_other).expect("jwt should parse"),
access_token: "other-access".to_string(),
refresh_token: "other-refresh".to_string(),
account_id: None,
}),
last_refresh: Some(Utc::now()),
};
save_auth(
codex_home.path(),
&mismatched,
AuthCredentialsStoreMode::File,
)
.expect("write mismatched auth");
let sync = manager
.sync_from_storage_for_request(&expected)
.await
.expect("sync should succeed");
assert_eq!(sync, SyncFromStorageResult::IdentityMismatch);
}
#[tokio::test]
async fn sync_from_storage_skips_when_identity_missing() {
let codex_home = tempdir().unwrap();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: "pro".to_string(),
chatgpt_account_id: None,
},
codex_home.path(),
)
.expect("seed auth");
let manager = AuthManager::new(
codex_home.path().to_path_buf(),
false,
AuthCredentialsStoreMode::File,
);
let expected = manager.auth().expect("auth should be loaded");
let sync = manager
.sync_from_storage_for_request(&expected)
.await
.expect("sync should succeed");
assert_eq!(sync, SyncFromStorageResult::SkippedMissingIdentity);
}
#[tokio::test]
async fn sync_from_storage_reports_logged_out_when_storage_empty() {
let codex_home = tempdir().unwrap();
write_auth_file(
AuthFileParams {
openai_api_key: None,
chatgpt_plan_type: "pro".to_string(),
chatgpt_account_id: Some("acct_123".to_string()),
},
codex_home.path(),
)
.expect("seed auth");
let manager = AuthManager::new(
codex_home.path().to_path_buf(),
false,
AuthCredentialsStoreMode::File,
);
let expected = manager.auth().expect("auth should be loaded");
std::fs::remove_file(codex_home.path().join("auth.json")).expect("remove auth");
let sync = manager
.sync_from_storage_for_request(&expected)
.await
.expect("sync should succeed");
assert_eq!(sync, SyncFromStorageResult::LoggedOut);
}
#[tokio::test]
#[serial(codex_api_key)]
async fn pro_account_with_no_api_key_uses_chatgpt_auth() {
@@ -1052,14 +1219,15 @@ mod tests {
}
}
/// Central manager providing a single source of truth for auth.json derived
/// Central manager providing a single source of truth for auth storage derived
/// authentication data. It loads once (or on preference change) and then
/// hands out cloned `CodexAuth` values so the rest of the program has a
/// consistent snapshot.
///
/// External modifications to `auth.json` will NOT be observed until
/// `reload()` is called explicitly. This matches the design goal of avoiding
/// different parts of the program seeing inconsistent auth data midrun.
/// External modifications to credentials in storage will generally NOT be
/// observed until `reload()` is called explicitly. One exception is the
/// token-refresh recovery path, which may consult storage in order to adopt
/// tokens rotated by another concurrently-running Codex instance.
#[derive(Debug)]
pub struct AuthManager {
codex_home: PathBuf,
@@ -1068,6 +1236,35 @@ pub struct AuthManager {
auth_credentials_store_mode: AuthCredentialsStoreMode,
}
/// Outcome of attempting to sync the currently cached ChatGPT credentials from storage.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SyncFromStorageResult {
/// The request auth did not include a workspace identifier, so we cannot safely compare.
SkippedMissingIdentity,
/// No credentials were found in storage (user logged out).
LoggedOut,
/// Storage contains credentials for a different identity (workspace/account).
IdentityMismatch,
/// Storage credentials match identity and were applied to the in-memory snapshot.
Applied { changed: bool },
}
/// Per-request state used to coordinate a limited 401 recovery flow.
///
/// This is intentionally managed by `AuthManager` so callers don't need to
/// understand the recovery stages.
#[derive(Default, Debug)]
pub(crate) struct UnauthorizedRecovery {
synced_from_storage: bool,
refreshed_token: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum UnauthorizedRecoveryDecision {
Retry,
GiveUp,
}
impl AuthManager {
/// Create a new manager loading the initial auth using the provided
/// preferred auth method. Errors loading auth are swallowed; `auth()` will
@@ -1174,7 +1371,7 @@ impl AuthManager {
}
/// Attempt to refresh the current auth token (if any). On success, reload
/// the auth state from disk so other components observe refreshed token.
/// the auth state from storage so other components observe refreshed token.
/// If the token refresh fails in a permanent (nontransient) way, logs out
/// to clear invalid auth state.
pub async fn refresh_token(&self) -> Result<Option<String>, RefreshTokenError> {
@@ -1182,17 +1379,8 @@ impl AuthManager {
Some(a) => a,
None => return Ok(None),
};
match auth.refresh_token().await {
Ok(token) => {
// Reload to pick up persisted changes.
self.reload();
Ok(Some(token))
}
Err(e) => {
tracing::error!("Failed to refresh token: {}", e);
Err(e)
}
}
self.refresh_token_for_request(&auth).await
}
/// Log out by deleting the ondisk auth.json (if present). Returns Ok(true)
@@ -1209,4 +1397,233 @@ impl AuthManager {
pub fn get_auth_mode(&self) -> Option<AuthMode> {
self.auth().map(|a| a.mode)
}
pub(crate) async fn sync_from_storage_for_request(
&self,
expected: &CodexAuth,
) -> std::io::Result<SyncFromStorageResult> {
let Some(expected_account_id) = expected.chatgpt_account_id() else {
return Ok(SyncFromStorageResult::SkippedMissingIdentity);
};
let storage =
create_auth_storage(self.codex_home.clone(), self.auth_credentials_store_mode);
let Some(stored) = load_auth_dot_json_with_retries(&storage).await? else {
// Ensure the cached auth reflects the logged-out state.
self.reload();
return Ok(SyncFromStorageResult::LoggedOut);
};
// We only support ChatGPT in this sync path. If storage now points to an API key or is
// otherwise missing tokens, treat it as an identity mismatch for this request.
let Some(tokens) = stored.tokens.clone() else {
self.reload();
return Ok(SyncFromStorageResult::IdentityMismatch);
};
let Some(stored_account_id) = tokens.id_token.chatgpt_account_id.as_deref() else {
// Cannot prove identity match.
return Ok(SyncFromStorageResult::SkippedMissingIdentity);
};
if stored_account_id != expected_account_id {
// Keep cached auth in sync for subsequent requests, but do not retry the in-flight
// request under a different identity.
self.reload();
return Ok(SyncFromStorageResult::IdentityMismatch);
}
let changed = if let Some(current) = self.auth() {
if let Ok(mut guard) = current.auth_dot_json.lock() {
let current_auth = guard.clone();
*guard = Some(stored.clone());
current_auth != Some(stored)
} else {
false
}
} else {
false
};
Ok(SyncFromStorageResult::Applied { changed })
}
pub(crate) async fn refresh_token_for_request(
&self,
expected: &CodexAuth,
) -> Result<Option<String>, RefreshTokenError> {
if expected.mode != AuthMode::ChatGPT {
return Ok(None);
}
let Some(auth) = self.auth() else {
return Ok(None);
};
let expected_account_id = expected.chatgpt_account_id();
let Some(expected_account_id) = expected_account_id.as_deref() else {
// Cannot safely consult storage without a stable identity; fall back to current
// in-memory refresh behavior.
let token = auth.refresh_token().await?;
self.reload();
return Ok(Some(token));
};
let storage =
create_auth_storage(self.codex_home.clone(), self.auth_credentials_store_mode);
let Some(mut attempted_refresh_token) =
load_stored_refresh_token_if_identity_matches(&storage, expected_account_id).await?
else {
return Ok(None);
};
let mut retried = false;
loop {
let refresh_response =
try_refresh_token(attempted_refresh_token.clone(), &auth.client).await;
match refresh_response {
Ok(refresh_response) => {
let updated = update_tokens(
&storage,
refresh_response.id_token,
refresh_response.access_token,
refresh_response.refresh_token,
)
.await
.map_err(RefreshTokenError::from)?;
if let Ok(mut auth_lock) = auth.auth_dot_json.lock() {
*auth_lock = Some(updated.clone());
}
self.reload();
let access = updated
.tokens
.as_ref()
.map(|t| t.access_token.clone())
.ok_or_else(|| {
RefreshTokenError::other_with_message(
"Token data is not available after refresh.",
)
})?;
return Ok(Some(access));
}
Err(RefreshTokenError::Permanent(failed))
if !retried
&& matches!(
failed.reason,
RefreshTokenFailedReason::Expired | RefreshTokenFailedReason::Exhausted
) =>
{
// Another instance may have refreshed and rotated the refresh token while we
// were attempting our refresh. Reload and retry once if the stored refresh
// token differs and identity still matches.
let Some(stored_refresh_token) = load_stored_refresh_token_if_identity_matches(
&storage,
expected_account_id,
)
.await?
else {
return Ok(None);
};
if stored_refresh_token != attempted_refresh_token {
attempted_refresh_token = stored_refresh_token;
retried = true;
continue;
}
return Err(RefreshTokenError::Permanent(failed));
}
Err(err) => return Err(err),
}
}
}
pub(crate) async fn recover_from_unauthorized_for_request(
&self,
expected: &CodexAuth,
recovery: &mut UnauthorizedRecovery,
) -> Result<UnauthorizedRecoveryDecision, RefreshTokenError> {
if recovery.refreshed_token {
return Ok(UnauthorizedRecoveryDecision::GiveUp);
}
if expected.mode != AuthMode::ChatGPT {
return Ok(UnauthorizedRecoveryDecision::GiveUp);
}
if !recovery.synced_from_storage {
let sync = self
.sync_from_storage_for_request(expected)
.await
.map_err(RefreshTokenError::Transient)?;
recovery.synced_from_storage = true;
match sync {
SyncFromStorageResult::Applied { changed } => {
tracing::debug!(changed, "synced ChatGPT credentials from storage after 401");
Ok(UnauthorizedRecoveryDecision::Retry)
}
SyncFromStorageResult::SkippedMissingIdentity => {
Ok(UnauthorizedRecoveryDecision::Retry)
}
SyncFromStorageResult::LoggedOut | SyncFromStorageResult::IdentityMismatch => {
Ok(UnauthorizedRecoveryDecision::GiveUp)
}
}
} else {
match self.refresh_token_for_request(expected).await? {
Some(_) => {
recovery.refreshed_token = true;
Ok(UnauthorizedRecoveryDecision::Retry)
}
None => Ok(UnauthorizedRecoveryDecision::GiveUp),
}
}
}
}
async fn load_stored_refresh_token_if_identity_matches(
storage: &Arc<dyn AuthStorageBackend>,
expected_account_id: &str,
) -> Result<Option<String>, RefreshTokenError> {
let Some(stored) = load_auth_dot_json_with_retries(storage)
.await
.map_err(RefreshTokenError::Transient)?
else {
return Ok(None);
};
let Some(tokens) = stored.tokens else {
return Ok(None);
};
if tokens.id_token.chatgpt_account_id.as_deref() != Some(expected_account_id) {
return Ok(None);
}
Ok(Some(tokens.refresh_token))
}
async fn load_auth_dot_json_with_retries(
storage: &Arc<dyn AuthStorageBackend>,
) -> std::io::Result<Option<AuthDotJson>> {
// This primarily mitigates concurrent file writes where another process truncates and rewrites
// auth.json, which can cause transient JSON parse errors for readers.
const MAX_ATTEMPTS: usize = 3;
const BASE_DELAY_MS: u64 = 25;
for attempt in 0..MAX_ATTEMPTS {
match storage.load() {
Ok(value) => return Ok(value),
Err(err) if err.kind() == ErrorKind::InvalidData && attempt + 1 < MAX_ATTEMPTS => {
let delay = Duration::from_millis(BASE_DELAY_MS * (attempt as u64 + 1));
tokio::time::sleep(delay).await;
continue;
}
Err(err) => return Err(err),
}
}
Ok(None)
}

View File

@@ -7,6 +7,7 @@ use sha2::Sha256;
use std::fmt::Debug;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::ErrorKind;
use std::io::Read;
use std::io::Write;
#[cfg(unix)]
@@ -81,7 +82,8 @@ impl FileAuthStorage {
let mut file = File::open(auth_file)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let auth_dot_json: AuthDotJson = serde_json::from_str(&contents)?;
let auth_dot_json: AuthDotJson = serde_json::from_str(&contents)
.map_err(|err| std::io::Error::new(ErrorKind::InvalidData, err))?;
Ok(auth_dot_json)
}

View File

@@ -17,7 +17,6 @@ use codex_api::TransportError;
use codex_api::common::Reasoning;
use codex_api::create_text_param_for_request;
use codex_api::error::ApiError;
use codex_app_server_protocol::AuthMode;
use codex_otel::otel_manager::OtelManager;
use codex_protocol::ConversationId;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
@@ -153,7 +152,7 @@ impl ModelClient {
let conversation_id = self.conversation_id.to_string();
let session_source = self.session_source.clone();
let mut refreshed = false;
let mut recovery = crate::auth::UnauthorizedRecovery::default();
loop {
let auth = auth_manager.as_ref().and_then(|m| m.auth());
let api_provider = self
@@ -179,7 +178,7 @@ impl ModelClient {
Err(ApiError::Transport(TransportError::Http { status, .. }))
if status == StatusCode::UNAUTHORIZED =>
{
handle_unauthorized(status, &mut refreshed, &auth_manager, &auth).await?;
handle_unauthorized(status, &mut recovery, &auth_manager, &auth).await?;
continue;
}
Err(err) => return Err(map_api_error(err)),
@@ -242,7 +241,7 @@ impl ModelClient {
let conversation_id = self.conversation_id.to_string();
let session_source = self.session_source.clone();
let mut refreshed = false;
let mut recovery = crate::auth::UnauthorizedRecovery::default();
loop {
let auth = auth_manager.as_ref().and_then(|m| m.auth());
let api_provider = self
@@ -276,7 +275,7 @@ impl ModelClient {
Err(ApiError::Transport(TransportError::Http { status, .. }))
if status == StatusCode::UNAUTHORIZED =>
{
handle_unauthorized(status, &mut refreshed, &auth_manager, &auth).await?;
handle_unauthorized(status, &mut recovery, &auth_manager, &auth).await?;
continue;
}
Err(err) => return Err(map_api_error(err)),
@@ -485,22 +484,20 @@ where
/// the mapped `CodexErr` is returned to the caller.
async fn handle_unauthorized(
status: StatusCode,
refreshed: &mut bool,
recovery: &mut crate::auth::UnauthorizedRecovery,
auth_manager: &Option<Arc<AuthManager>>,
auth: &Option<crate::auth::CodexAuth>,
) -> Result<()> {
if *refreshed {
return Err(map_unauthorized_status(status));
}
if let Some(manager) = auth_manager.as_ref()
&& let Some(auth) = auth.as_ref()
&& auth.mode == AuthMode::ChatGPT
{
match manager.refresh_token().await {
Ok(_) => {
*refreshed = true;
Ok(())
match manager
.recover_from_unauthorized_for_request(auth, recovery)
.await
{
Ok(crate::auth::UnauthorizedRecoveryDecision::Retry) => Ok(()),
Ok(crate::auth::UnauthorizedRecoveryDecision::GiveUp) => {
Err(map_unauthorized_status(status))
}
Err(RefreshTokenError::Permanent(failed)) => Err(CodexErr::RefreshTokenFailed(failed)),
Err(RefreshTokenError::Transient(other)) => Err(CodexErr::Io(other)),