diff --git a/codex-rs/cloud-requirements/src/lib.rs b/codex-rs/cloud-requirements/src/lib.rs index 27649ad4c1..8fcb1e25e4 100644 --- a/codex-rs/cloud-requirements/src/lib.rs +++ b/codex-rs/cloud-requirements/src/lib.rs @@ -45,6 +45,7 @@ use tokio::time::sleep; use tokio::time::timeout; const CLOUD_REQUIREMENTS_TIMEOUT: Duration = Duration::from_secs(15); +const CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_TIMEOUT: Duration = Duration::from_secs(15); const CLOUD_REQUIREMENTS_MAX_ATTEMPTS: usize = 5; const CLOUD_REQUIREMENTS_CACHE_FILENAME: &str = "cloud-requirements-cache.json"; const CLOUD_REQUIREMENTS_CACHE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60); @@ -728,22 +729,71 @@ pub fn cloud_requirements_loader( }) } +async fn cloud_requirements_auth_manager_for_storage( + codex_home: &Path, + enable_codex_api_key_env: bool, + credentials_store_mode: AuthCredentialsStoreMode, + chatgpt_base_url: &str, +) -> Arc { + AuthManager::shared( + codex_home.to_path_buf(), + enable_codex_api_key_env, + credentials_store_mode, + Some(chatgpt_base_url.to_string()), + ) + .await +} + pub async fn cloud_requirements_loader_for_storage( codex_home: PathBuf, enable_codex_api_key_env: bool, credentials_store_mode: AuthCredentialsStoreMode, chatgpt_base_url: String, ) -> CloudRequirementsLoader { - let auth_manager = AuthManager::shared( - codex_home.clone(), + let auth_manager = cloud_requirements_auth_manager_for_storage( + &codex_home, enable_codex_api_key_env, credentials_store_mode, - Some(chatgpt_base_url.clone()), + &chatgpt_base_url, ) .await; cloud_requirements_loader(auth_manager, chatgpt_base_url, codex_home) } +pub async fn cloud_requirements_loader_for_storage_with_startup_refresh( + codex_home: PathBuf, + enable_codex_api_key_env: bool, + credentials_store_mode: AuthCredentialsStoreMode, + chatgpt_base_url: String, +) -> CloudRequirementsLoader { + let auth_manager = cloud_requirements_auth_manager_for_storage( + &codex_home, + enable_codex_api_key_env, + credentials_store_mode, + &chatgpt_base_url, + ) + .await; + match timeout( + CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_TIMEOUT, + auth_manager.refresh_managed_chatgpt_token_if_near_expiry(), + ) + .await + { + Ok(Ok(())) => {} + Ok(Err(err)) => { + tracing::warn!( + "failed to proactively refresh ChatGPT access token during CLI startup: {err}" + ); + } + Err(_) => { + tracing::warn!( + "timed out proactively refreshing ChatGPT access token during CLI startup" + ); + } + } + cloud_requirements_loader(auth_manager, chatgpt_base_url, codex_home) +} + fn parse_cloud_requirements( contents: &str, requirements_base_dir: &Path, diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 74b88bdeca..2aa197500e 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -51,7 +51,7 @@ use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStartedNotification; use codex_arg0::Arg0DispatchPaths; -use codex_cloud_requirements::cloud_requirements_loader; +use codex_cloud_requirements::cloud_requirements_loader_for_storage_with_startup_refresh; use codex_config::ConfigLoadError; use codex_config::ConfigLoadOptions; use codex_config::LoaderOverrides; @@ -71,7 +71,6 @@ use codex_core::path_utils; use codex_feedback::CodexFeedback; use codex_git_utils::get_git_repo_root; use codex_login::AuthConfig; -use codex_login::AuthManager; use codex_login::default_client::set_default_client_residency_requirement; use codex_login::default_client::set_default_originator; use codex_login::enforce_login_restrictions; @@ -157,9 +156,6 @@ use crate::event_processor::EventProcessor; const DEFAULT_ANALYTICS_ENABLED: bool = true; const EXEC_DEFAULT_LOG_FILTER: &str = "error,opentelemetry_sdk=off,opentelemetry_otlp=off"; -const CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_TIMEOUT: std::time::Duration = - std::time::Duration::from_secs(15); - enum InitialOperation { UserTurn { items: Vec, @@ -366,32 +362,13 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result .clone() .unwrap_or_else(|| "https://chatgpt.com/backend-api/".to_string()); // TODO(gt): Make cloud requirements failures blocking once we can fail-closed. - let cloud_auth_manager = AuthManager::shared( + let cloud_requirements = cloud_requirements_loader_for_storage_with_startup_refresh( codex_home.to_path_buf(), /*enable_codex_api_key_env*/ false, config_toml.cli_auth_credentials_store.unwrap_or_default(), - Some(chatgpt_base_url.clone()), + chatgpt_base_url, ) .await; - match tokio::time::timeout( - CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_TIMEOUT, - cloud_auth_manager.refresh_managed_chatgpt_token_if_near_expiry(), - ) - .await - { - Ok(Ok(())) => {} - Ok(Err(err)) => { - warn!("failed to proactively refresh ChatGPT access token during CLI startup: {err}"); - } - Err(_) => { - warn!("timed out proactively refreshing ChatGPT access token during CLI startup"); - } - } - let cloud_requirements = cloud_requirements_loader( - cloud_auth_manager, - chatgpt_base_url, - codex_home.to_path_buf(), - ); let run_cli_overrides = cli_kv_overrides.clone(); let run_loader_overrides = loader_overrides.clone(); let run_cloud_requirements = cloud_requirements.clone(); diff --git a/codex-rs/login/src/auth/manager.rs b/codex-rs/login/src/auth/manager.rs index dd3ea25dd0..0c8c3df2fb 100644 --- a/codex-rs/login/src/auth/manager.rs +++ b/codex-rs/login/src/auth/manager.rs @@ -7,6 +7,10 @@ use serde::Serialize; use serial_test::serial; use std::env; use std::fmt::Debug; +use std::fs::File; +use std::fs::OpenOptions; +#[cfg(unix)] +use std::os::unix::fs::OpenOptionsExt; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; @@ -85,6 +89,9 @@ struct ChatgptAuthState { const TOKEN_REFRESH_INTERVAL: i64 = 8; const CHATGPT_ACCESS_TOKEN_REFRESH_WINDOW_MINUTES: i64 = 5; +const CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_FILENAME: &str = + "chatgpt-access-token-startup-refresh.lock"; +const CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_POLL_INTERVAL_MS: u64 = 50; const REFRESH_TOKEN_EXPIRED_MESSAGE: &str = "Your access token could not be refreshed because your refresh token has expired. Please log out and sign in again."; const REFRESH_TOKEN_REUSED_MESSAGE: &str = "Your access token could not be refreshed because your refresh token was already used. Please log out and sign in again."; @@ -1740,9 +1747,55 @@ impl AuthManager { return Ok(()); } + let _refresh_lock = self.acquire_chatgpt_startup_refresh_lock().await?; + self.refresh_token().await } + async fn acquire_chatgpt_startup_refresh_lock(&self) -> Result { + let mut logged_wait = false; + loop { + if let Some(lock_file) = self.try_acquire_chatgpt_startup_refresh_lock()? { + return Ok(lock_file); + } + if !logged_wait { + tracing::info!( + "Waiting to proactively refresh ChatGPT access token because another process is already refreshing it." + ); + logged_wait = true; + } + tokio::time::sleep(std::time::Duration::from_millis( + CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_POLL_INTERVAL_MS, + )) + .await; + } + } + + fn try_acquire_chatgpt_startup_refresh_lock(&self) -> Result, RefreshTokenError> { + let lock_path = self + .codex_home + .join(CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_FILENAME); + if let Some(parent) = lock_path.parent() { + std::fs::create_dir_all(parent).map_err(RefreshTokenError::Transient)?; + } + + let mut options = OpenOptions::new(); + options.read(true).write(true).create(true).truncate(false); + #[cfg(unix)] + { + options.mode(0o600); + } + let lock_file = options + .open(lock_path) + .map_err(RefreshTokenError::Transient)?; + + match lock_file.try_lock() { + Ok(()) => Ok(Some(lock_file)), + Err(std::fs::TryLockError::WouldBlock) => Ok(None), + Err(err) => Err(RefreshTokenError::Transient(err.into())), + } + } + /// Attempt to refresh the current auth token from the authority that issued /// the token. On success, reloads the auth state from disk so other components /// observe refreshed token. If the token refresh fails, returns the error to diff --git a/codex-rs/login/tests/suite/auth_refresh.rs b/codex-rs/login/tests/suite/auth_refresh.rs index 228d8698ec..759204dff6 100644 --- a/codex-rs/login/tests/suite/auth_refresh.rs +++ b/codex-rs/login/tests/suite/auth_refresh.rs @@ -19,6 +19,7 @@ use pretty_assertions::assert_eq; use serde::Serialize; use serde_json::json; use std::ffi::OsString; +use std::fs::File; use std::sync::Arc; use tempfile::TempDir; use wiremock::Mock; @@ -244,6 +245,81 @@ async fn refresh_managed_chatgpt_token_skips_auth_outside_refresh_window() -> Re Ok(()) } +#[serial_test::serial(auth_refresh)] +#[tokio::test] +async fn refresh_managed_chatgpt_token_waits_while_startup_refresh_lock_is_held() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/oauth/token")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "access_token": "new-access-token", + "refresh_token": "new-refresh-token" + }))) + .expect(1) + .mount(&server) + .await; + + let ctx = RefreshTokenTestContext::new(&server).await?; + let initial_last_refresh = Utc::now(); + let expired_access_token = access_token_with_expiration(Utc::now() - Duration::minutes(1)); + let initial_tokens = build_tokens(&expired_access_token, INITIAL_REFRESH_TOKEN); + let initial_auth = AuthDotJson { + auth_mode: Some(AuthMode::Chatgpt), + openai_api_key: None, + tokens: Some(initial_tokens.clone()), + last_refresh: Some(initial_last_refresh), + agent_identity: None, + }; + ctx.write_auth(&initial_auth).await?; + + let lock_path = ctx + .codex_home + .path() + .join("chatgpt-access-token-startup-refresh.lock"); + let lock_file = File::options() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(lock_path)?; + lock_file.try_lock()?; + + let auth_manager = Arc::clone(&ctx.auth_manager); + let refresh_task = tokio::spawn(async move { + auth_manager + .refresh_managed_chatgpt_token_if_near_expiry() + .await + }); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + assert!( + !refresh_task.is_finished(), + "managed ChatGPT refresh should wait while another startup holds the lock" + ); + assert_eq!(ctx.load_auth()?, initial_auth); + let requests = server.received_requests().await.unwrap_or_default(); + assert!( + requests.is_empty(), + "expected no refresh token requests before the startup lock is released" + ); + + drop(lock_file); + refresh_task + .await + .context("startup refresh task should join")? + .context("managed ChatGPT refresh should resume after the lock is released")?; + + let stored = ctx.load_auth()?; + let tokens = stored.tokens.as_ref().context("tokens should exist")?; + assert_eq!(tokens.access_token, "new-access-token"); + assert_eq!(tokens.refresh_token, "new-refresh-token"); + server.verify().await; + + Ok(()) +} + #[serial_test::serial(auth_refresh)] #[tokio::test] async fn refresh_token_skips_refresh_when_auth_changed() -> Result<()> { diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 7e1503a98c..22eb9b48ea 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -37,8 +37,8 @@ use codex_app_server_protocol::ThreadListCwdFilter; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadSortKey as AppServerThreadSortKey; use codex_app_server_protocol::ThreadSourceKind; -use codex_cloud_requirements::cloud_requirements_loader; use codex_cloud_requirements::cloud_requirements_loader_for_storage; +use codex_cloud_requirements::cloud_requirements_loader_for_storage_with_startup_refresh; use codex_config::CloudRequirementsLoader; use codex_config::ConfigLoadError; use codex_config::LoaderOverrides; @@ -46,7 +46,6 @@ use codex_config::format_config_error_with_source; use codex_exec_server::EnvironmentManager; use codex_exec_server::ExecServerRuntimePaths; use codex_login::AuthConfig; -use codex_login::AuthManager; use codex_login::default_client::originator; use codex_login::default_client::set_default_client_residency_requirement; use codex_login::enforce_login_restrictions; @@ -283,9 +282,6 @@ pub use public_widgets::composer_input::ComposerInput; #[cfg(unix)] const AUTO_CONNECT_DAEMON_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50); -const CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_TIMEOUT: std::time::Duration = - std::time::Duration::from_secs(15); - #[allow(clippy::too_many_arguments)] async fn start_embedded_app_server( arg0_paths: Arg0DispatchPaths, @@ -989,32 +985,13 @@ pub async fn run_main( .chatgpt_base_url .clone() .unwrap_or_else(|| "https://chatgpt.com/backend-api/".to_string()); - let cloud_auth_manager = AuthManager::shared( + let cloud_requirements = cloud_requirements_loader_for_storage_with_startup_refresh( codex_home.to_path_buf(), /*enable_codex_api_key_env*/ false, config_toml.cli_auth_credentials_store.unwrap_or_default(), - Some(chatgpt_base_url.clone()), + chatgpt_base_url, ) .await; - match tokio::time::timeout( - CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_TIMEOUT, - cloud_auth_manager.refresh_managed_chatgpt_token_if_near_expiry(), - ) - .await - { - Ok(Ok(())) => {} - Ok(Err(err)) => { - warn!("failed to proactively refresh ChatGPT access token during CLI startup: {err}"); - } - Err(_) => { - warn!("timed out proactively refreshing ChatGPT access token during CLI startup"); - } - } - let cloud_requirements = cloud_requirements_loader( - cloud_auth_manager, - chatgpt_base_url, - codex_home.to_path_buf(), - ); let model_provider_override = if cli.oss { let resolved = resolve_oss_provider(