mirror of
https://github.com/openai/codex.git
synced 2026-05-24 13:04:29 +00:00
[codex-login] serialize managed ChatGPT token refreshes [ci changed_files]
This commit is contained in:
@@ -89,9 +89,8 @@ struct ChatgptAuthState {
|
||||
|
||||
const TOKEN_REFRESH_INTERVAL: i64 = 8;
|
||||
const CHATGPT_ACCESS_TOKEN_REFRESH_WINDOW_MINUTES: i64 = 5;
|
||||
const CHATGPT_ACCESS_TOKEN_PROACTIVE_REFRESH_LOCK_FILENAME: &str =
|
||||
"chatgpt-access-token-proactive-refresh.lock";
|
||||
const CHATGPT_ACCESS_TOKEN_PROACTIVE_REFRESH_LOCK_POLL_INTERVAL_MS: u64 = 50;
|
||||
const CHATGPT_TOKEN_REFRESH_LOCK_FILENAME: &str = "chatgpt-token-refresh.lock";
|
||||
const CHATGPT_TOKEN_REFRESH_LOCK_POLL_INTERVAL_MS: u64 = 50;
|
||||
|
||||
const REFRESH_TOKEN_EXPIRED_MESSAGE: &str = "Your access token could not be refreshed because your refresh token has expired. Please log out and sign in again.";
|
||||
const REFRESH_TOKEN_REUSED_MESSAGE: &str = "Your access token could not be refreshed because your refresh token was already used. Please log out and sign in again.";
|
||||
@@ -1688,7 +1687,22 @@ impl AuthManager {
|
||||
/// can assume that some other instance already refreshed it. If the persisted
|
||||
/// token is the same as the cached, then ask the token authority to refresh.
|
||||
pub async fn refresh_token(&self) -> Result<(), RefreshTokenError> {
|
||||
let auth_before_wait = self.auth_cached();
|
||||
let is_managed_chatgpt = matches!(auth_before_wait.as_ref(), Some(CodexAuth::Chatgpt(_)));
|
||||
let _token_refresh_lock = if is_managed_chatgpt {
|
||||
Some(self.acquire_chatgpt_token_refresh_lock().await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let _refresh_guard = self.acquire_refresh_guard().await?;
|
||||
if is_managed_chatgpt
|
||||
&& !Self::auths_equal_for_refresh(
|
||||
auth_before_wait.as_ref(),
|
||||
self.auth_cached().as_ref(),
|
||||
)
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
self.refresh_token_after_guarded_reload().await
|
||||
}
|
||||
|
||||
@@ -1734,7 +1748,7 @@ impl AuthManager {
|
||||
}
|
||||
|
||||
async fn proactively_refresh_token(&self) -> Result<(), RefreshTokenError> {
|
||||
let _refresh_lock = self.acquire_chatgpt_proactive_refresh_lock().await?;
|
||||
let _refresh_lock = self.acquire_chatgpt_token_refresh_lock().await?;
|
||||
let _refresh_guard = self.acquire_refresh_guard().await?;
|
||||
if !self
|
||||
.auth_cached()
|
||||
@@ -1746,31 +1760,27 @@ impl AuthManager {
|
||||
self.refresh_token_after_guarded_reload().await
|
||||
}
|
||||
|
||||
async fn acquire_chatgpt_proactive_refresh_lock(&self) -> Result<File, RefreshTokenError> {
|
||||
async fn acquire_chatgpt_token_refresh_lock(&self) -> Result<File, RefreshTokenError> {
|
||||
let mut logged_wait = false;
|
||||
loop {
|
||||
if let Some(lock_file) = self.try_acquire_chatgpt_proactive_refresh_lock()? {
|
||||
if let Some(lock_file) = self.try_acquire_chatgpt_token_refresh_lock()? {
|
||||
return Ok(lock_file);
|
||||
}
|
||||
if !logged_wait {
|
||||
tracing::info!(
|
||||
"Waiting to proactively refresh ChatGPT access token because another process is already refreshing it."
|
||||
"Waiting to refresh managed ChatGPT tokens because another process is already refreshing them."
|
||||
);
|
||||
logged_wait = true;
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(
|
||||
CHATGPT_ACCESS_TOKEN_PROACTIVE_REFRESH_LOCK_POLL_INTERVAL_MS,
|
||||
CHATGPT_TOKEN_REFRESH_LOCK_POLL_INTERVAL_MS,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
fn try_acquire_chatgpt_proactive_refresh_lock(
|
||||
&self,
|
||||
) -> Result<Option<File>, RefreshTokenError> {
|
||||
let lock_path = self
|
||||
.codex_home
|
||||
.join(CHATGPT_ACCESS_TOKEN_PROACTIVE_REFRESH_LOCK_FILENAME);
|
||||
fn try_acquire_chatgpt_token_refresh_lock(&self) -> Result<Option<File>, RefreshTokenError> {
|
||||
let lock_path = self.codex_home.join(CHATGPT_TOKEN_REFRESH_LOCK_FILENAME);
|
||||
if let Some(parent) = lock_path.parent() {
|
||||
std::fs::create_dir_all(parent).map_err(RefreshTokenError::Transient)?;
|
||||
}
|
||||
@@ -1793,10 +1803,14 @@ impl AuthManager {
|
||||
}
|
||||
|
||||
/// Attempt to refresh the current auth token from the authority that issued
|
||||
/// the token. On success, reloads the auth state from disk so other components
|
||||
/// observe refreshed token. If the token refresh fails, returns the error to
|
||||
/// the caller.
|
||||
/// the token. Managed ChatGPT auth reuses tokens refreshed by another process
|
||||
/// while waiting to serialize refresh-token rotation. On success, reloads the
|
||||
/// auth state from disk so other components observe refreshed token. If the
|
||||
/// token refresh fails, returns the error to the caller.
|
||||
pub async fn refresh_token_from_authority(&self) -> Result<(), RefreshTokenError> {
|
||||
if matches!(self.auth_cached(), Some(CodexAuth::Chatgpt(_))) {
|
||||
return self.refresh_token().await;
|
||||
}
|
||||
let _refresh_guard = self.acquire_refresh_guard().await?;
|
||||
self.refresh_token_from_authority_impl().await
|
||||
}
|
||||
|
||||
@@ -257,7 +257,7 @@ async fn auth_skips_access_token_outside_refresh_window() -> Result<()> {
|
||||
|
||||
#[serial_test::serial(auth_refresh)]
|
||||
#[tokio::test]
|
||||
async fn auth_waits_while_proactive_refresh_lock_is_held() -> Result<()> {
|
||||
async fn auth_waits_while_chatgpt_token_refresh_lock_is_held() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = MockServer::start().await;
|
||||
@@ -284,7 +284,7 @@ async fn auth_waits_while_proactive_refresh_lock_is_held() -> Result<()> {
|
||||
};
|
||||
ctx.write_auth(&initial_auth).await?;
|
||||
|
||||
let lock_file = ctx.hold_proactive_refresh_lock()?;
|
||||
let lock_file = ctx.hold_chatgpt_token_refresh_lock()?;
|
||||
|
||||
let auth_manager = Arc::clone(&ctx.auth_manager);
|
||||
let refresh_task = tokio::spawn(async move { auth_manager.auth().await });
|
||||
@@ -349,7 +349,7 @@ async fn concurrent_auth_requests_share_one_proactive_refresh() -> Result<()> {
|
||||
})
|
||||
.await?;
|
||||
|
||||
let lock_file = ctx.hold_proactive_refresh_lock()?;
|
||||
let lock_file = ctx.hold_chatgpt_token_refresh_lock()?;
|
||||
let first_manager = Arc::clone(&ctx.auth_manager);
|
||||
let first = tokio::spawn(async move { first_manager.auth().await });
|
||||
let second_manager = Arc::clone(&ctx.auth_manager);
|
||||
@@ -386,24 +386,23 @@ async fn concurrent_auth_requests_share_one_proactive_refresh() -> Result<()> {
|
||||
|
||||
#[serial_test::serial(auth_refresh)]
|
||||
#[tokio::test]
|
||||
async fn refresh_token_does_not_wait_while_proactive_refresh_lock_is_held() -> Result<()> {
|
||||
async fn refresh_token_reloads_managed_auth_after_waiting_for_token_refresh_lock() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/oauth/token"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"access_token": "new-access-token",
|
||||
"refresh_token": "new-refresh-token"
|
||||
"access_token": "unexpected-access-token",
|
||||
"refresh_token": "unexpected-refresh-token"
|
||||
})))
|
||||
.expect(1)
|
||||
.expect(0)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let ctx = RefreshTokenTestContext::new(&server).await?;
|
||||
let initial_last_refresh = Utc::now();
|
||||
let expired_access_token = access_token_with_expiration(Utc::now() - Duration::minutes(1));
|
||||
let initial_tokens = build_tokens(&expired_access_token, INITIAL_REFRESH_TOKEN);
|
||||
let initial_tokens = build_tokens(INITIAL_ACCESS_TOKEN, INITIAL_REFRESH_TOKEN);
|
||||
ctx.write_auth(&AuthDotJson {
|
||||
auth_mode: Some(AuthMode::Chatgpt),
|
||||
openai_api_key: None,
|
||||
@@ -413,33 +412,47 @@ async fn refresh_token_does_not_wait_while_proactive_refresh_lock_is_held() -> R
|
||||
})
|
||||
.await?;
|
||||
|
||||
let lock_file = ctx.hold_proactive_refresh_lock()?;
|
||||
let lock_file = ctx.hold_chatgpt_token_refresh_lock()?;
|
||||
|
||||
let auth_manager = Arc::clone(&ctx.auth_manager);
|
||||
let proactive_refresh_task = tokio::spawn(async move { auth_manager.auth().await });
|
||||
let refresh_task =
|
||||
tokio::spawn(async move { auth_manager.refresh_token_from_authority().await });
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
assert!(
|
||||
!proactive_refresh_task.is_finished(),
|
||||
"proactive refresh should wait while another process holds the file lock"
|
||||
!refresh_task.is_finished(),
|
||||
"managed refresh should wait while another process holds the token refresh lock"
|
||||
);
|
||||
|
||||
tokio::time::timeout(
|
||||
std::time::Duration::from_secs(1),
|
||||
ctx.auth_manager.refresh_token_from_authority(),
|
||||
)
|
||||
.await
|
||||
.context("normal refresh should not wait for the proactive file lock")?
|
||||
.context("normal refresh should succeed")?;
|
||||
let disk_tokens = build_tokens("disk-access-token", "disk-refresh-token");
|
||||
let disk_auth = AuthDotJson {
|
||||
auth_mode: Some(AuthMode::Chatgpt),
|
||||
openai_api_key: None,
|
||||
tokens: Some(disk_tokens.clone()),
|
||||
last_refresh: Some(initial_last_refresh),
|
||||
agent_identity: None,
|
||||
};
|
||||
save_auth(
|
||||
ctx.codex_home.path(),
|
||||
&disk_auth,
|
||||
AuthCredentialsStoreMode::File,
|
||||
)?;
|
||||
|
||||
proactive_refresh_task.abort();
|
||||
let _ = proactive_refresh_task.await;
|
||||
drop(lock_file);
|
||||
refresh_task
|
||||
.await
|
||||
.context("managed refresh task should join")?
|
||||
.context("managed refresh should use newly persisted auth")?;
|
||||
|
||||
let stored = ctx.load_auth()?;
|
||||
let tokens = stored.tokens.as_ref().context("tokens should exist")?;
|
||||
assert_eq!(tokens.access_token, "new-access-token");
|
||||
assert_eq!(tokens.refresh_token, "new-refresh-token");
|
||||
assert_eq!(stored, disk_auth);
|
||||
let cached = ctx
|
||||
.auth_manager
|
||||
.auth_cached()
|
||||
.context("auth should be cached")?
|
||||
.get_token_data()
|
||||
.context("token data should reload")?;
|
||||
assert_eq!(cached, disk_tokens);
|
||||
server.verify().await;
|
||||
|
||||
Ok(())
|
||||
@@ -1298,11 +1311,8 @@ impl RefreshTokenTestContext {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn hold_proactive_refresh_lock(&self) -> Result<File> {
|
||||
let lock_path = self
|
||||
.codex_home
|
||||
.path()
|
||||
.join("chatgpt-access-token-proactive-refresh.lock");
|
||||
fn hold_chatgpt_token_refresh_lock(&self) -> Result<File> {
|
||||
let lock_path = self.codex_home.path().join("chatgpt-token-refresh.lock");
|
||||
let lock_file = File::options()
|
||||
.read(true)
|
||||
.write(true)
|
||||
|
||||
Reference in New Issue
Block a user