From 367cfce9b6bd9911d6636e755cbaa1d61d033a11 Mon Sep 17 00:00:00 2001 From: Cooper Gamble Date: Fri, 22 May 2026 20:53:00 +0000 Subject: [PATCH] [codex-login] serialize managed ChatGPT token refreshes [ci changed_files] --- codex-rs/login/src/auth/manager.rs | 48 +++++++++------ codex-rs/login/tests/suite/auth_refresh.rs | 70 ++++++++++++---------- 2 files changed, 71 insertions(+), 47 deletions(-) diff --git a/codex-rs/login/src/auth/manager.rs b/codex-rs/login/src/auth/manager.rs index e73e0e83b0..bdd5951c97 100644 --- a/codex-rs/login/src/auth/manager.rs +++ b/codex-rs/login/src/auth/manager.rs @@ -89,9 +89,8 @@ struct ChatgptAuthState { const TOKEN_REFRESH_INTERVAL: i64 = 8; const CHATGPT_ACCESS_TOKEN_REFRESH_WINDOW_MINUTES: i64 = 5; -const CHATGPT_ACCESS_TOKEN_PROACTIVE_REFRESH_LOCK_FILENAME: &str = - "chatgpt-access-token-proactive-refresh.lock"; -const CHATGPT_ACCESS_TOKEN_PROACTIVE_REFRESH_LOCK_POLL_INTERVAL_MS: u64 = 50; +const CHATGPT_TOKEN_REFRESH_LOCK_FILENAME: &str = "chatgpt-token-refresh.lock"; +const CHATGPT_TOKEN_REFRESH_LOCK_POLL_INTERVAL_MS: u64 = 50; const REFRESH_TOKEN_EXPIRED_MESSAGE: &str = "Your access token could not be refreshed because your refresh token has expired. Please log out and sign in again."; const REFRESH_TOKEN_REUSED_MESSAGE: &str = "Your access token could not be refreshed because your refresh token was already used. Please log out and sign in again."; @@ -1688,7 +1687,22 @@ impl AuthManager { /// can assume that some other instance already refreshed it. If the persisted /// token is the same as the cached, then ask the token authority to refresh. pub async fn refresh_token(&self) -> Result<(), RefreshTokenError> { + let auth_before_wait = self.auth_cached(); + let is_managed_chatgpt = matches!(auth_before_wait.as_ref(), Some(CodexAuth::Chatgpt(_))); + let _token_refresh_lock = if is_managed_chatgpt { + Some(self.acquire_chatgpt_token_refresh_lock().await?) + } else { + None + }; let _refresh_guard = self.acquire_refresh_guard().await?; + if is_managed_chatgpt + && !Self::auths_equal_for_refresh( + auth_before_wait.as_ref(), + self.auth_cached().as_ref(), + ) + { + return Ok(()); + } self.refresh_token_after_guarded_reload().await } @@ -1734,7 +1748,7 @@ impl AuthManager { } async fn proactively_refresh_token(&self) -> Result<(), RefreshTokenError> { - let _refresh_lock = self.acquire_chatgpt_proactive_refresh_lock().await?; + let _refresh_lock = self.acquire_chatgpt_token_refresh_lock().await?; let _refresh_guard = self.acquire_refresh_guard().await?; if !self .auth_cached() @@ -1746,31 +1760,27 @@ impl AuthManager { self.refresh_token_after_guarded_reload().await } - async fn acquire_chatgpt_proactive_refresh_lock(&self) -> Result { + async fn acquire_chatgpt_token_refresh_lock(&self) -> Result { let mut logged_wait = false; loop { - if let Some(lock_file) = self.try_acquire_chatgpt_proactive_refresh_lock()? { + if let Some(lock_file) = self.try_acquire_chatgpt_token_refresh_lock()? { return Ok(lock_file); } if !logged_wait { tracing::info!( - "Waiting to proactively refresh ChatGPT access token because another process is already refreshing it." + "Waiting to refresh managed ChatGPT tokens because another process is already refreshing them." ); logged_wait = true; } tokio::time::sleep(std::time::Duration::from_millis( - CHATGPT_ACCESS_TOKEN_PROACTIVE_REFRESH_LOCK_POLL_INTERVAL_MS, + CHATGPT_TOKEN_REFRESH_LOCK_POLL_INTERVAL_MS, )) .await; } } - fn try_acquire_chatgpt_proactive_refresh_lock( - &self, - ) -> Result, RefreshTokenError> { - let lock_path = self - .codex_home - .join(CHATGPT_ACCESS_TOKEN_PROACTIVE_REFRESH_LOCK_FILENAME); + fn try_acquire_chatgpt_token_refresh_lock(&self) -> Result, RefreshTokenError> { + let lock_path = self.codex_home.join(CHATGPT_TOKEN_REFRESH_LOCK_FILENAME); if let Some(parent) = lock_path.parent() { std::fs::create_dir_all(parent).map_err(RefreshTokenError::Transient)?; } @@ -1793,10 +1803,14 @@ impl AuthManager { } /// Attempt to refresh the current auth token from the authority that issued - /// the token. On success, reloads the auth state from disk so other components - /// observe refreshed token. If the token refresh fails, returns the error to - /// the caller. + /// the token. Managed ChatGPT auth reuses tokens refreshed by another process + /// while waiting to serialize refresh-token rotation. On success, reloads the + /// auth state from disk so other components observe refreshed token. If the + /// token refresh fails, returns the error to the caller. pub async fn refresh_token_from_authority(&self) -> Result<(), RefreshTokenError> { + if matches!(self.auth_cached(), Some(CodexAuth::Chatgpt(_))) { + return self.refresh_token().await; + } let _refresh_guard = self.acquire_refresh_guard().await?; self.refresh_token_from_authority_impl().await } diff --git a/codex-rs/login/tests/suite/auth_refresh.rs b/codex-rs/login/tests/suite/auth_refresh.rs index 9f75e13c99..7db7cd0e2d 100644 --- a/codex-rs/login/tests/suite/auth_refresh.rs +++ b/codex-rs/login/tests/suite/auth_refresh.rs @@ -257,7 +257,7 @@ async fn auth_skips_access_token_outside_refresh_window() -> Result<()> { #[serial_test::serial(auth_refresh)] #[tokio::test] -async fn auth_waits_while_proactive_refresh_lock_is_held() -> Result<()> { +async fn auth_waits_while_chatgpt_token_refresh_lock_is_held() -> Result<()> { skip_if_no_network!(Ok(())); let server = MockServer::start().await; @@ -284,7 +284,7 @@ async fn auth_waits_while_proactive_refresh_lock_is_held() -> Result<()> { }; ctx.write_auth(&initial_auth).await?; - let lock_file = ctx.hold_proactive_refresh_lock()?; + let lock_file = ctx.hold_chatgpt_token_refresh_lock()?; let auth_manager = Arc::clone(&ctx.auth_manager); let refresh_task = tokio::spawn(async move { auth_manager.auth().await }); @@ -349,7 +349,7 @@ async fn concurrent_auth_requests_share_one_proactive_refresh() -> Result<()> { }) .await?; - let lock_file = ctx.hold_proactive_refresh_lock()?; + let lock_file = ctx.hold_chatgpt_token_refresh_lock()?; let first_manager = Arc::clone(&ctx.auth_manager); let first = tokio::spawn(async move { first_manager.auth().await }); let second_manager = Arc::clone(&ctx.auth_manager); @@ -386,24 +386,23 @@ async fn concurrent_auth_requests_share_one_proactive_refresh() -> Result<()> { #[serial_test::serial(auth_refresh)] #[tokio::test] -async fn refresh_token_does_not_wait_while_proactive_refresh_lock_is_held() -> Result<()> { +async fn refresh_token_reloads_managed_auth_after_waiting_for_token_refresh_lock() -> Result<()> { skip_if_no_network!(Ok(())); let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/oauth/token")) .respond_with(ResponseTemplate::new(200).set_body_json(json!({ - "access_token": "new-access-token", - "refresh_token": "new-refresh-token" + "access_token": "unexpected-access-token", + "refresh_token": "unexpected-refresh-token" }))) - .expect(1) + .expect(0) .mount(&server) .await; let ctx = RefreshTokenTestContext::new(&server).await?; let initial_last_refresh = Utc::now(); - let expired_access_token = access_token_with_expiration(Utc::now() - Duration::minutes(1)); - let initial_tokens = build_tokens(&expired_access_token, INITIAL_REFRESH_TOKEN); + let initial_tokens = build_tokens(INITIAL_ACCESS_TOKEN, INITIAL_REFRESH_TOKEN); ctx.write_auth(&AuthDotJson { auth_mode: Some(AuthMode::Chatgpt), openai_api_key: None, @@ -413,33 +412,47 @@ async fn refresh_token_does_not_wait_while_proactive_refresh_lock_is_held() -> R }) .await?; - let lock_file = ctx.hold_proactive_refresh_lock()?; + let lock_file = ctx.hold_chatgpt_token_refresh_lock()?; let auth_manager = Arc::clone(&ctx.auth_manager); - let proactive_refresh_task = tokio::spawn(async move { auth_manager.auth().await }); + let refresh_task = + tokio::spawn(async move { auth_manager.refresh_token_from_authority().await }); tokio::time::sleep(std::time::Duration::from_millis(100)).await; assert!( - !proactive_refresh_task.is_finished(), - "proactive refresh should wait while another process holds the file lock" + !refresh_task.is_finished(), + "managed refresh should wait while another process holds the token refresh lock" ); - tokio::time::timeout( - std::time::Duration::from_secs(1), - ctx.auth_manager.refresh_token_from_authority(), - ) - .await - .context("normal refresh should not wait for the proactive file lock")? - .context("normal refresh should succeed")?; + let disk_tokens = build_tokens("disk-access-token", "disk-refresh-token"); + let disk_auth = AuthDotJson { + auth_mode: Some(AuthMode::Chatgpt), + openai_api_key: None, + tokens: Some(disk_tokens.clone()), + last_refresh: Some(initial_last_refresh), + agent_identity: None, + }; + save_auth( + ctx.codex_home.path(), + &disk_auth, + AuthCredentialsStoreMode::File, + )?; - proactive_refresh_task.abort(); - let _ = proactive_refresh_task.await; drop(lock_file); + refresh_task + .await + .context("managed refresh task should join")? + .context("managed refresh should use newly persisted auth")?; let stored = ctx.load_auth()?; - let tokens = stored.tokens.as_ref().context("tokens should exist")?; - assert_eq!(tokens.access_token, "new-access-token"); - assert_eq!(tokens.refresh_token, "new-refresh-token"); + assert_eq!(stored, disk_auth); + let cached = ctx + .auth_manager + .auth_cached() + .context("auth should be cached")? + .get_token_data() + .context("token data should reload")?; + assert_eq!(cached, disk_tokens); server.verify().await; Ok(()) @@ -1298,11 +1311,8 @@ impl RefreshTokenTestContext { Ok(()) } - fn hold_proactive_refresh_lock(&self) -> Result { - let lock_path = self - .codex_home - .path() - .join("chatgpt-access-token-proactive-refresh.lock"); + fn hold_chatgpt_token_refresh_lock(&self) -> Result { + let lock_path = self.codex_home.path().join("chatgpt-token-refresh.lock"); let lock_file = File::options() .read(true) .write(true)