mirror of
https://github.com/openai/codex.git
synced 2026-06-02 11:22:01 +00:00
Compare commits
15 Commits
rhan/empty
...
cooper/ski
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f86a0cfa7 | ||
|
|
6112b49ab6 | ||
|
|
2beaf6ca44 | ||
|
|
4eedd05bba | ||
|
|
053f6633b1 | ||
|
|
96e9af9096 | ||
|
|
27acff776c | ||
|
|
8f13e709e6 | ||
|
|
41b65786aa | ||
|
|
d6c7bfd44f | ||
|
|
445c687cdf | ||
|
|
4e0397eea7 | ||
|
|
d5eb810671 | ||
|
|
b901ed1d55 | ||
|
|
d17c381d3f |
@@ -446,7 +446,7 @@ async fn external_auth_refreshes_on_unauthorized() -> Result<()> {
|
||||
responses::ev_completed("resp-turn"),
|
||||
]);
|
||||
let unauthorized = ResponseTemplate::new(401).set_body_json(json!({
|
||||
"error": { "message": "unauthorized" }
|
||||
"error": { "message": "unauthorized", "type": "token_invalidated" }
|
||||
}));
|
||||
let responses_mock = responses::mount_response_sequence(
|
||||
&mock_server,
|
||||
|
||||
@@ -45,6 +45,7 @@ use tokio::time::sleep;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const CLOUD_REQUIREMENTS_TIMEOUT: Duration = Duration::from_secs(15);
|
||||
const CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_TIMEOUT: Duration = Duration::from_secs(15);
|
||||
const CLOUD_REQUIREMENTS_MAX_ATTEMPTS: usize = 5;
|
||||
const CLOUD_REQUIREMENTS_CACHE_FILENAME: &str = "cloud-requirements-cache.json";
|
||||
const CLOUD_REQUIREMENTS_CACHE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
|
||||
@@ -728,22 +729,75 @@ pub fn cloud_requirements_loader(
|
||||
})
|
||||
}
|
||||
|
||||
async fn cloud_requirements_auth_manager_for_storage(
|
||||
codex_home: &Path,
|
||||
enable_codex_api_key_env: bool,
|
||||
credentials_store_mode: AuthCredentialsStoreMode,
|
||||
chatgpt_base_url: &str,
|
||||
) -> Arc<AuthManager> {
|
||||
AuthManager::shared(
|
||||
codex_home.to_path_buf(),
|
||||
enable_codex_api_key_env,
|
||||
credentials_store_mode,
|
||||
Some(chatgpt_base_url.to_string()),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn cloud_requirements_loader_for_storage(
|
||||
codex_home: PathBuf,
|
||||
enable_codex_api_key_env: bool,
|
||||
credentials_store_mode: AuthCredentialsStoreMode,
|
||||
chatgpt_base_url: String,
|
||||
) -> CloudRequirementsLoader {
|
||||
let auth_manager = AuthManager::shared(
|
||||
codex_home.clone(),
|
||||
let auth_manager = cloud_requirements_auth_manager_for_storage(
|
||||
&codex_home,
|
||||
enable_codex_api_key_env,
|
||||
credentials_store_mode,
|
||||
Some(chatgpt_base_url.clone()),
|
||||
&chatgpt_base_url,
|
||||
)
|
||||
.await;
|
||||
cloud_requirements_loader(auth_manager, chatgpt_base_url, codex_home)
|
||||
}
|
||||
|
||||
pub async fn refresh_managed_chatgpt_token_for_storage_if_near_expiry(
|
||||
codex_home: PathBuf,
|
||||
enable_codex_api_key_env: bool,
|
||||
credentials_store_mode: AuthCredentialsStoreMode,
|
||||
chatgpt_base_url: String,
|
||||
) {
|
||||
let auth_manager = cloud_requirements_auth_manager_for_storage(
|
||||
&codex_home,
|
||||
enable_codex_api_key_env,
|
||||
credentials_store_mode,
|
||||
&chatgpt_base_url,
|
||||
)
|
||||
.await;
|
||||
let refresh_task = tokio::spawn(async move {
|
||||
auth_manager
|
||||
.refresh_managed_chatgpt_token_if_near_expiry()
|
||||
.await
|
||||
});
|
||||
match timeout(CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_TIMEOUT, refresh_task).await {
|
||||
Ok(Ok(Ok(()))) => {}
|
||||
Ok(Ok(Err(err))) => {
|
||||
tracing::warn!(
|
||||
"failed to proactively refresh ChatGPT access token during CLI startup: {err}"
|
||||
);
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
tracing::warn!(
|
||||
"startup ChatGPT access token refresh task failed before completion: {err}"
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::warn!(
|
||||
"timed out waiting for proactive ChatGPT access token refresh during CLI startup; refresh will continue in the background"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_cloud_requirements(
|
||||
contents: &str,
|
||||
requirements_base_dir: &Path,
|
||||
|
||||
@@ -150,6 +150,8 @@ const RESPONSES_COMPACT_ENDPOINT: &str = "/responses/compact";
|
||||
// period between stream events.
|
||||
const COMPACT_REQUEST_TIMEOUT_IDLE_MULTIPLIER: u32 = 4;
|
||||
const MEMORIES_SUMMARIZE_ENDPOINT: &str = "/memories/trace_summarize";
|
||||
const TOKEN_INVALIDATED_ERROR_CODE: &str = "token_invalidated";
|
||||
const TOKEN_REVOKED_ERROR_CODE: &str = "token_revoked";
|
||||
#[cfg(test)]
|
||||
pub(crate) const WEBSOCKET_CONNECT_TIMEOUT: Duration =
|
||||
Duration::from_millis(DEFAULT_WEBSOCKET_CONNECT_TIMEOUT_MS);
|
||||
@@ -1281,6 +1283,7 @@ impl ModelClientSession {
|
||||
stream,
|
||||
session_telemetry.clone(),
|
||||
inference_trace_attempt,
|
||||
/*revoked_auth_recovery*/ None,
|
||||
);
|
||||
return Ok(stream);
|
||||
}
|
||||
@@ -1464,6 +1467,7 @@ impl ModelClientSession {
|
||||
stream_result,
|
||||
session_telemetry.clone(),
|
||||
inference_trace_attempt,
|
||||
auth_recovery,
|
||||
);
|
||||
self.websocket_session.last_response_rx = Some(last_request_rx);
|
||||
return Ok(WebsocketStreamOutcome::Stream(stream));
|
||||
@@ -1524,37 +1528,53 @@ impl ModelClientSession {
|
||||
}
|
||||
|
||||
let disabled_trace = InferenceTraceContext::disabled();
|
||||
match self
|
||||
.stream_responses_websocket(
|
||||
prompt,
|
||||
model_info,
|
||||
session_telemetry,
|
||||
effort,
|
||||
summary,
|
||||
service_tier,
|
||||
turn_metadata_header,
|
||||
/*warmup*/ true,
|
||||
current_span_w3c_trace_context(),
|
||||
&disabled_trace,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(WebsocketStreamOutcome::Stream(mut stream)) => {
|
||||
// Wait for the v2 warmup request to complete before sending the first turn request.
|
||||
while let Some(event) = stream.next().await {
|
||||
match event {
|
||||
Ok(ResponseEvent::Completed { .. }) => break,
|
||||
Err(err) => return Err(err),
|
||||
_ => {}
|
||||
let mut retried_after_auth_recovery = false;
|
||||
loop {
|
||||
match self
|
||||
.stream_responses_websocket(
|
||||
prompt,
|
||||
model_info,
|
||||
session_telemetry,
|
||||
effort,
|
||||
summary,
|
||||
service_tier.clone(),
|
||||
turn_metadata_header,
|
||||
/*warmup*/ true,
|
||||
current_span_w3c_trace_context(),
|
||||
&disabled_trace,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(WebsocketStreamOutcome::Stream(mut stream)) => {
|
||||
// Wait for the v2 warmup request to complete before sending the first turn request.
|
||||
let mut retry_after_auth_recovery = false;
|
||||
while let Some(event) = stream.next().await {
|
||||
match event {
|
||||
Ok(ResponseEvent::Completed { .. }) => return Ok(()),
|
||||
Err(err)
|
||||
if !retried_after_auth_recovery
|
||||
&& is_websocket_auth_recovery_retry(&err) =>
|
||||
{
|
||||
retried_after_auth_recovery = true;
|
||||
self.reset_websocket_session();
|
||||
retry_after_auth_recovery = true;
|
||||
break;
|
||||
}
|
||||
Err(err) => return Err(err),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
if retry_after_auth_recovery {
|
||||
continue;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
Ok(())
|
||||
Ok(WebsocketStreamOutcome::FallbackToHttp) => {
|
||||
self.try_switch_fallback_transport(session_telemetry, model_info);
|
||||
return Ok(());
|
||||
}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
Ok(WebsocketStreamOutcome::FallbackToHttp) => {
|
||||
self.try_switch_fallback_transport(session_telemetry, model_info);
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1734,11 +1754,22 @@ fn parent_thread_id_header_value(session_source: &SessionSource) -> Option<Strin
|
||||
|
||||
const RESPONSE_STREAM_CHANNEL_CAPACITY: usize = 1600;
|
||||
const STREAM_DROPPED_REASON: &str = "response stream dropped before provider terminal event";
|
||||
const WEBSOCKET_AUTH_RECOVERY_RETRY_REASON: &str =
|
||||
"responses websocket auth recovered after unauthorized response";
|
||||
|
||||
pub(crate) fn is_websocket_auth_recovery_retry(err: &CodexErr) -> bool {
|
||||
matches!(
|
||||
err,
|
||||
CodexErr::Stream(message, _)
|
||||
if message == WEBSOCKET_AUTH_RECOVERY_RETRY_REASON
|
||||
)
|
||||
}
|
||||
|
||||
fn map_response_stream(
|
||||
api_stream: codex_api::ResponseStream,
|
||||
session_telemetry: SessionTelemetry,
|
||||
inference_trace_attempt: InferenceTraceAttempt,
|
||||
revoked_auth_recovery: Option<UnauthorizedRecovery>,
|
||||
) -> (ResponseStream, oneshot::Receiver<LastResponse>) {
|
||||
let codex_api::ResponseStream {
|
||||
rx_event,
|
||||
@@ -1753,6 +1784,7 @@ fn map_response_stream(
|
||||
api_stream,
|
||||
session_telemetry,
|
||||
inference_trace_attempt,
|
||||
revoked_auth_recovery,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1761,6 +1793,7 @@ fn map_response_events<S>(
|
||||
api_stream: S,
|
||||
session_telemetry: SessionTelemetry,
|
||||
inference_trace_attempt: InferenceTraceAttempt,
|
||||
mut revoked_auth_recovery: Option<UnauthorizedRecovery>,
|
||||
) -> (ResponseStream, oneshot::Receiver<LastResponse>)
|
||||
where
|
||||
S: futures::Stream<Item = std::result::Result<ResponseEvent, ApiError>>
|
||||
@@ -1871,7 +1904,13 @@ where
|
||||
if let Some(upstream_request_id) = upstream_request_id {
|
||||
feedback_tags!(last_model_request_id = upstream_request_id);
|
||||
}
|
||||
let mapped = map_api_error(err);
|
||||
let mapped = map_response_stream_error(
|
||||
err,
|
||||
&mut revoked_auth_recovery,
|
||||
&session_telemetry,
|
||||
items_added.is_empty(),
|
||||
)
|
||||
.await;
|
||||
inference_trace_attempt.record_failed(
|
||||
&mapped,
|
||||
upstream_request_id,
|
||||
@@ -1903,6 +1942,61 @@ where
|
||||
)
|
||||
}
|
||||
|
||||
async fn map_response_stream_error(
|
||||
err: ApiError,
|
||||
revoked_auth_recovery: &mut Option<UnauthorizedRecovery>,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
retry_after_auth_recovery_allowed: bool,
|
||||
) -> CodexErr {
|
||||
match err {
|
||||
ApiError::Transport(
|
||||
transport @ TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
..
|
||||
},
|
||||
) if stream_error_has_revoked_access_token(&transport) => {
|
||||
let recovery_transport = clone_http_transport_error(&transport);
|
||||
match handle_unauthorized(recovery_transport, revoked_auth_recovery, session_telemetry)
|
||||
.await
|
||||
{
|
||||
Ok(_) if retry_after_auth_recovery_allowed => CodexErr::Stream(
|
||||
WEBSOCKET_AUTH_RECOVERY_RETRY_REASON.to_string(),
|
||||
/*requested_delay*/ None,
|
||||
),
|
||||
Ok(_) => map_api_error(ApiError::Transport(transport)),
|
||||
Err(err) => err,
|
||||
}
|
||||
}
|
||||
err => map_api_error(err),
|
||||
}
|
||||
}
|
||||
|
||||
fn stream_error_has_revoked_access_token(transport: &TransportError) -> bool {
|
||||
let debug = extract_response_debug_context(transport);
|
||||
access_token_revocation_recovery_reason(
|
||||
debug.auth_error_code.as_deref(),
|
||||
debug.auth_error_type.as_deref(),
|
||||
)
|
||||
.is_some()
|
||||
}
|
||||
|
||||
fn clone_http_transport_error(transport: &TransportError) -> TransportError {
|
||||
match transport {
|
||||
TransportError::Http {
|
||||
status,
|
||||
url,
|
||||
headers,
|
||||
body,
|
||||
} => TransportError::Http {
|
||||
status: *status,
|
||||
url: url.clone(),
|
||||
headers: headers.clone(),
|
||||
body: body.clone(),
|
||||
},
|
||||
_ => unreachable!("stream auth recovery only clones HTTP transport errors"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a 401 response by optionally refreshing ChatGPT tokens once.
|
||||
///
|
||||
/// When refresh succeeds, the caller should retry the API call; otherwise
|
||||
@@ -1979,6 +2073,66 @@ async fn handle_unauthorized(
|
||||
session_telemetry: &SessionTelemetry,
|
||||
) -> Result<UnauthorizedRecoveryExecution> {
|
||||
let debug = extract_response_debug_context(&transport);
|
||||
let revocation_recovery_reason = access_token_revocation_recovery_reason(
|
||||
debug.auth_error_code.as_deref(),
|
||||
debug.auth_error_type.as_deref(),
|
||||
);
|
||||
if let Some(recovery_reason) = revocation_recovery_reason
|
||||
&& let Some(recovery) = auth_recovery.as_mut()
|
||||
&& recovery.handles_invalidated_access_token_auth()
|
||||
{
|
||||
let mode = recovery.mode_name();
|
||||
let phase = recovery.step_name();
|
||||
return match recovery.handle_invalidated_access_token_auth().await {
|
||||
Ok(step_result) => {
|
||||
session_telemetry.record_auth_recovery(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_succeeded",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
Some(recovery_reason),
|
||||
step_result.auth_state_changed(),
|
||||
);
|
||||
emit_feedback_auth_recovery_tags(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_succeeded",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
);
|
||||
Ok(UnauthorizedRecoveryExecution { mode, phase })
|
||||
}
|
||||
Err(failed) => {
|
||||
session_telemetry.record_auth_recovery(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_failed_permanent",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
Some(recovery_reason),
|
||||
/*auth_state_changed*/ None,
|
||||
);
|
||||
emit_feedback_auth_recovery_tags(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_failed_permanent",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
);
|
||||
Err(CodexErr::RefreshTokenFailed(failed))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if let Some(recovery) = auth_recovery
|
||||
&& recovery.has_next()
|
||||
{
|
||||
@@ -2063,7 +2217,11 @@ async fn handle_unauthorized(
|
||||
recovery.step_name(),
|
||||
Some(recovery.unavailable_reason()),
|
||||
),
|
||||
None => ("none", "none", Some("auth_manager_missing")),
|
||||
None => (
|
||||
"none",
|
||||
"none",
|
||||
revocation_recovery_reason.or(Some("auth_manager_missing")),
|
||||
),
|
||||
};
|
||||
session_telemetry.record_auth_recovery(
|
||||
mode,
|
||||
@@ -2089,6 +2247,21 @@ async fn handle_unauthorized(
|
||||
Err(map_api_error(ApiError::Transport(transport)))
|
||||
}
|
||||
|
||||
fn access_token_revocation_recovery_reason(
|
||||
auth_error_code: Option<&str>,
|
||||
auth_error_type: Option<&str>,
|
||||
) -> Option<&'static str> {
|
||||
match (auth_error_code, auth_error_type) {
|
||||
(Some(TOKEN_INVALIDATED_ERROR_CODE), _) | (_, Some(TOKEN_INVALIDATED_ERROR_CODE)) => {
|
||||
Some("access_token_invalidated")
|
||||
}
|
||||
(Some(TOKEN_REVOKED_ERROR_CODE), _) | (_, Some(TOKEN_REVOKED_ERROR_CODE)) => {
|
||||
Some("access_token_revoked")
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn api_error_http_status(error: &ApiError) -> Option<u16> {
|
||||
match error {
|
||||
ApiError::Transport(TransportError::Http { status, .. }) => Some(status.as_u16()),
|
||||
|
||||
@@ -2,19 +2,28 @@ use super::AuthRequestTelemetryContext;
|
||||
use super::ModelClient;
|
||||
use super::PendingUnauthorizedRetry;
|
||||
use super::UnauthorizedRecoveryExecution;
|
||||
use super::WEBSOCKET_AUTH_RECOVERY_RETRY_REASON;
|
||||
use super::X_CODEX_INSTALLATION_ID_HEADER;
|
||||
use super::X_CODEX_PARENT_THREAD_ID_HEADER;
|
||||
use super::X_CODEX_TURN_METADATA_HEADER;
|
||||
use super::X_CODEX_WINDOW_ID_HEADER;
|
||||
use super::X_OPENAI_SUBAGENT_HEADER;
|
||||
use super::handle_unauthorized;
|
||||
use crate::AttestationContext;
|
||||
use crate::AttestationProvider;
|
||||
use crate::GenerateAttestationFuture;
|
||||
use async_trait::async_trait;
|
||||
use base64::Engine;
|
||||
use codex_api::ApiError;
|
||||
use codex_api::ResponseEvent;
|
||||
use codex_api::TransportError;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_login::ExternalAuth;
|
||||
use codex_login::ExternalAuthRefreshContext;
|
||||
use codex_login::ExternalAuthRefreshReason;
|
||||
use codex_login::ExternalAuthTokens;
|
||||
use codex_model_provider::BearerAuthProvider;
|
||||
use codex_model_provider_info::CHATGPT_CODEX_BASE_URL;
|
||||
use codex_model_provider_info::ModelProviderInfo;
|
||||
@@ -23,6 +32,8 @@ use codex_model_provider_info::create_oss_provider_with_base_url;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_protocol::SessionId;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::RefreshTokenFailedReason;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
@@ -37,10 +48,14 @@ use codex_rollout_trace::RolloutTrace;
|
||||
use codex_rollout_trace::TraceWriter;
|
||||
use codex_rollout_trace::replay_bundle;
|
||||
use futures::StreamExt;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use http::StatusCode;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
@@ -123,6 +138,86 @@ fn test_session_telemetry() -> SessionTelemetry {
|
||||
)
|
||||
}
|
||||
|
||||
fn fake_chatgpt_jwt(signature: &str) -> String {
|
||||
let encode_json = |value: serde_json::Value| {
|
||||
base64::engine::general_purpose::URL_SAFE_NO_PAD
|
||||
.encode(serde_json::to_vec(&value).expect("managed auth JWT segment should serialize"))
|
||||
};
|
||||
format!(
|
||||
"{}.{}.{}",
|
||||
encode_json(json!({"alg": "none", "typ": "JWT"})),
|
||||
encode_json(json!({
|
||||
"email": "user@example.com",
|
||||
"https://api.openai.com/auth": {
|
||||
"chatgpt_account_id": "account_id",
|
||||
"chatgpt_user_id": "user-12345",
|
||||
"user_id": "user-12345"
|
||||
}
|
||||
})),
|
||||
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(signature.as_bytes()),
|
||||
)
|
||||
}
|
||||
|
||||
fn write_managed_chatgpt_auth(codex_home: &Path, access_token: &str) {
|
||||
let id_token = fake_chatgpt_jwt("managed-auth");
|
||||
let auth_json = json!({
|
||||
"tokens": {
|
||||
"id_token": id_token,
|
||||
"access_token": access_token,
|
||||
"refresh_token": "test-refresh-token",
|
||||
"account_id": "account_id"
|
||||
},
|
||||
"last_refresh": chrono::Utc::now(),
|
||||
});
|
||||
std::fs::write(
|
||||
codex_home.join("auth.json"),
|
||||
serde_json::to_vec_pretty(&auth_json).expect("managed auth should serialize"),
|
||||
)
|
||||
.expect("managed auth should persist");
|
||||
}
|
||||
|
||||
async fn managed_chatgpt_auth_manager(access_token: &str) -> (TempDir, Arc<AuthManager>) {
|
||||
let codex_home = TempDir::new().expect("managed auth tempdir");
|
||||
write_managed_chatgpt_auth(codex_home.path(), access_token);
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
codex_login::AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
(codex_home, manager)
|
||||
}
|
||||
|
||||
struct RefreshingExternalChatgptAuth {
|
||||
refreshed_token: String,
|
||||
refresh_count: AtomicUsize,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ExternalAuth for RefreshingExternalChatgptAuth {
|
||||
fn auth_mode(&self) -> AuthMode {
|
||||
AuthMode::Chatgpt
|
||||
}
|
||||
|
||||
async fn refresh(
|
||||
&self,
|
||||
context: ExternalAuthRefreshContext,
|
||||
) -> std::io::Result<ExternalAuthTokens> {
|
||||
assert_eq!(
|
||||
context.reason,
|
||||
ExternalAuthRefreshReason::Unauthorized,
|
||||
"websocket revoked-token recovery should use unauthorized external refresh"
|
||||
);
|
||||
self.refresh_count.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(ExternalAuthTokens::chatgpt(
|
||||
self.refreshed_token.clone(),
|
||||
"account_id",
|
||||
Some("pro".to_string()),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TagCollectorVisitor {
|
||||
tags: BTreeMap<String, String>,
|
||||
@@ -346,6 +441,7 @@ async fn dropped_response_stream_traces_cancelled_partial_output() -> anyhow::Re
|
||||
api_stream,
|
||||
test_session_telemetry(),
|
||||
attempt,
|
||||
/*revoked_auth_recovery*/ None,
|
||||
);
|
||||
|
||||
let observed = stream
|
||||
@@ -395,6 +491,7 @@ async fn response_stream_records_last_model_feedback_ids() {
|
||||
api_stream,
|
||||
test_session_telemetry(),
|
||||
InferenceTraceAttempt::disabled(),
|
||||
/*revoked_auth_recovery*/ None,
|
||||
);
|
||||
|
||||
while stream.next().await.is_some() {}
|
||||
@@ -436,6 +533,7 @@ async fn dropped_backpressured_response_stream_traces_cancelled_partial_output()
|
||||
api_stream,
|
||||
test_session_telemetry(),
|
||||
attempt,
|
||||
/*revoked_auth_recovery*/ None,
|
||||
);
|
||||
|
||||
// Fill the mapper channel with non-terminal events, then yield one output
|
||||
@@ -581,3 +679,362 @@ async fn non_chatgpt_codex_endpoints_omit_attestation_generation() {
|
||||
);
|
||||
assert_eq!(attestation_calls.load(Ordering::Relaxed), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn token_invalidated_401_clears_auth_and_requires_relogin() {
|
||||
let (_codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
|
||||
let mut recovery = Some(manager.unauthorized_recovery());
|
||||
let x_error_json = base64::engine::general_purpose::STANDARD
|
||||
.encode(r#"{"error":{"code":"token_invalidated"}}"#);
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
"x-error-json",
|
||||
HeaderValue::from_str(&x_error_json).expect("valid x-error-json header"),
|
||||
);
|
||||
|
||||
let err = handle_unauthorized(
|
||||
TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
headers: Some(headers),
|
||||
body: Some(r#"{"error":{"message":"revoked"}}"#.to_string()),
|
||||
},
|
||||
&mut recovery,
|
||||
&test_session_telemetry(),
|
||||
)
|
||||
.await
|
||||
.expect_err("revoked access tokens should not enter refresh recovery");
|
||||
|
||||
let CodexErr::RefreshTokenFailed(failed) = err else {
|
||||
panic!("expected invalidated access token to force relogin, got {err:?}");
|
||||
};
|
||||
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
|
||||
assert_eq!(
|
||||
failed.message,
|
||||
"Your ChatGPT session is no longer valid. Please sign in again."
|
||||
);
|
||||
assert!(manager.auth_cached().is_none());
|
||||
assert_eq!(
|
||||
recovery
|
||||
.as_ref()
|
||||
.expect("recovery state should remain available")
|
||||
.step_name(),
|
||||
"reload"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn token_invalidated_error_type_401_clears_auth_and_requires_relogin() {
|
||||
let (_codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
|
||||
let mut recovery = Some(manager.unauthorized_recovery());
|
||||
|
||||
let err = handle_unauthorized(
|
||||
TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
headers: None,
|
||||
body: Some(r#"{"error":{"type":"token_invalidated"}}"#.to_string()),
|
||||
},
|
||||
&mut recovery,
|
||||
&test_session_telemetry(),
|
||||
)
|
||||
.await
|
||||
.expect_err("invalidated access tokens should not enter refresh recovery");
|
||||
|
||||
let CodexErr::RefreshTokenFailed(failed) = err else {
|
||||
panic!("expected invalidated access token to force relogin, got {err:?}");
|
||||
};
|
||||
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
|
||||
assert_eq!(
|
||||
failed.message,
|
||||
"Your ChatGPT session is no longer valid. Please sign in again."
|
||||
);
|
||||
assert!(manager.auth_cached().is_none());
|
||||
assert_eq!(
|
||||
recovery
|
||||
.as_ref()
|
||||
.expect("recovery state should remain available")
|
||||
.step_name(),
|
||||
"reload"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn token_revoked_error_code_401_clears_auth_and_requires_relogin() {
|
||||
let (_codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
|
||||
let mut recovery = Some(manager.unauthorized_recovery());
|
||||
|
||||
let err = handle_unauthorized(
|
||||
TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
headers: None,
|
||||
body: Some(r#"{"error":{"code":"token_revoked"}}"#.to_string()),
|
||||
},
|
||||
&mut recovery,
|
||||
&test_session_telemetry(),
|
||||
)
|
||||
.await
|
||||
.expect_err("revoked access tokens should force relogin");
|
||||
|
||||
let CodexErr::RefreshTokenFailed(failed) = err else {
|
||||
panic!("expected revoked access token to force relogin, got {err:?}");
|
||||
};
|
||||
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
|
||||
assert_eq!(
|
||||
failed.message,
|
||||
"Your ChatGPT session is no longer valid. Please sign in again."
|
||||
);
|
||||
assert!(manager.auth_cached().is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn token_revoked_error_type_401_clears_auth_and_requires_relogin() {
|
||||
let (_codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
|
||||
let mut recovery = Some(manager.unauthorized_recovery());
|
||||
|
||||
let err = handle_unauthorized(
|
||||
TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
headers: None,
|
||||
body: Some(r#"{"error":{"type":"token_revoked"}}"#.to_string()),
|
||||
},
|
||||
&mut recovery,
|
||||
&test_session_telemetry(),
|
||||
)
|
||||
.await
|
||||
.expect_err("revoked access tokens should force relogin");
|
||||
|
||||
let CodexErr::RefreshTokenFailed(failed) = err else {
|
||||
panic!("expected revoked access token to force relogin, got {err:?}");
|
||||
};
|
||||
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
|
||||
assert_eq!(
|
||||
failed.message,
|
||||
"Your ChatGPT session is no longer valid. Please sign in again."
|
||||
);
|
||||
assert!(manager.auth_cached().is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn websocket_stream_token_revoked_401_clears_auth_and_requires_relogin() {
|
||||
let (_codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
|
||||
let api_stream = futures::stream::iter([Err(ApiError::Transport(TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: None,
|
||||
headers: None,
|
||||
body: Some(r#"{"type":"error","status":401,"error":{"type":"token_revoked"}}"#.to_string()),
|
||||
}))]);
|
||||
|
||||
let (mut stream, _) = super::map_response_events(
|
||||
/*upstream_request_id*/ None,
|
||||
api_stream,
|
||||
test_session_telemetry(),
|
||||
InferenceTraceAttempt::disabled(),
|
||||
Some(manager.unauthorized_recovery()),
|
||||
);
|
||||
let err = stream
|
||||
.next()
|
||||
.await
|
||||
.expect("revoked websocket stream should emit an error")
|
||||
.expect_err("revoked websocket stream should require relogin");
|
||||
|
||||
let CodexErr::RefreshTokenFailed(failed) = err else {
|
||||
panic!("expected revoked websocket stream to force relogin, got {err:?}");
|
||||
};
|
||||
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
|
||||
assert_eq!(
|
||||
failed.message,
|
||||
"Your ChatGPT session is no longer valid. Please sign in again."
|
||||
);
|
||||
assert!(manager.auth_cached().is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn websocket_stream_token_revoked_401_retries_after_loading_replacement_auth() {
|
||||
let (codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
|
||||
write_managed_chatgpt_auth(codex_home.path(), "replacement-access-token");
|
||||
let api_stream = futures::stream::iter([Err(ApiError::Transport(TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: None,
|
||||
headers: None,
|
||||
body: Some(r#"{"type":"error","status":401,"error":{"type":"token_revoked"}}"#.to_string()),
|
||||
}))]);
|
||||
|
||||
let (mut stream, _) = super::map_response_events(
|
||||
/*upstream_request_id*/ None,
|
||||
api_stream,
|
||||
test_session_telemetry(),
|
||||
InferenceTraceAttempt::disabled(),
|
||||
Some(manager.unauthorized_recovery()),
|
||||
);
|
||||
let err = stream
|
||||
.next()
|
||||
.await
|
||||
.expect("recovered websocket stream should emit a retryable error")
|
||||
.expect_err("recovered websocket stream should ask the turn loop to retry");
|
||||
|
||||
let CodexErr::Stream(message, None) = err else {
|
||||
panic!(
|
||||
"expected recovered websocket revocation to surface a retryable stream error, got {err:?}"
|
||||
);
|
||||
};
|
||||
assert_eq!(message, WEBSOCKET_AUTH_RECOVERY_RETRY_REASON);
|
||||
assert_eq!(
|
||||
manager
|
||||
.auth_cached()
|
||||
.expect("replacement auth should remain cached")
|
||||
.get_token()
|
||||
.expect("replacement token should resolve"),
|
||||
"replacement-access-token"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn websocket_stream_token_revoked_401_after_output_does_not_retry_after_loading_replacement_auth()
|
||||
{
|
||||
let (codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
|
||||
write_managed_chatgpt_auth(codex_home.path(), "replacement-access-token");
|
||||
let api_stream = futures::stream::iter([
|
||||
Ok(ResponseEvent::OutputItemDone(output_message(
|
||||
"msg-1",
|
||||
"partial answer",
|
||||
))),
|
||||
Err(ApiError::Transport(TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: None,
|
||||
headers: None,
|
||||
body: Some(
|
||||
r#"{"type":"error","status":401,"error":{"type":"token_revoked"}}"#.to_string(),
|
||||
),
|
||||
})),
|
||||
]);
|
||||
|
||||
let (mut stream, _) = super::map_response_events(
|
||||
/*upstream_request_id*/ None,
|
||||
api_stream,
|
||||
test_session_telemetry(),
|
||||
InferenceTraceAttempt::disabled(),
|
||||
Some(manager.unauthorized_recovery()),
|
||||
);
|
||||
assert!(matches!(
|
||||
stream
|
||||
.next()
|
||||
.await
|
||||
.expect("partial output should be forwarded")
|
||||
.expect("partial output should stay successful"),
|
||||
ResponseEvent::OutputItemDone(_)
|
||||
));
|
||||
let err = stream
|
||||
.next()
|
||||
.await
|
||||
.expect("revoked websocket stream should emit an error")
|
||||
.expect_err("revoked websocket stream should stop after output escapes");
|
||||
|
||||
assert!(
|
||||
!super::is_websocket_auth_recovery_retry(&err),
|
||||
"streams with recorded output must not transparently retry after auth recovery"
|
||||
);
|
||||
assert_eq!(
|
||||
manager
|
||||
.auth_cached()
|
||||
.expect("replacement auth should remain cached")
|
||||
.get_token()
|
||||
.expect("replacement token should resolve"),
|
||||
"replacement-access-token"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn websocket_stream_token_revoked_401_refreshes_external_auth_before_retry() {
|
||||
let codex_home = TempDir::new().expect("external auth tempdir");
|
||||
let initial_access_token = fake_chatgpt_jwt("external-initial");
|
||||
let refreshed_access_token = fake_chatgpt_jwt("external-refreshed");
|
||||
codex_login::auth::login_with_chatgpt_auth_tokens(
|
||||
codex_home.path(),
|
||||
&initial_access_token,
|
||||
"account_id",
|
||||
Some("pro"),
|
||||
)
|
||||
.expect("external ChatGPT auth should seed");
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
codex_login::AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let external_auth = Arc::new(RefreshingExternalChatgptAuth {
|
||||
refreshed_token: refreshed_access_token.clone(),
|
||||
refresh_count: AtomicUsize::new(0),
|
||||
});
|
||||
manager.set_external_auth(external_auth.clone());
|
||||
|
||||
let api_stream = futures::stream::iter([Err(ApiError::Transport(TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: None,
|
||||
headers: None,
|
||||
body: Some(r#"{"type":"error","status":401,"error":{"type":"token_revoked"}}"#.to_string()),
|
||||
}))]);
|
||||
|
||||
let (mut stream, _) = super::map_response_events(
|
||||
/*upstream_request_id*/ None,
|
||||
api_stream,
|
||||
test_session_telemetry(),
|
||||
InferenceTraceAttempt::disabled(),
|
||||
Some(manager.unauthorized_recovery()),
|
||||
);
|
||||
let err = stream
|
||||
.next()
|
||||
.await
|
||||
.expect("refreshed websocket stream should emit a retryable error")
|
||||
.expect_err("refreshed websocket stream should ask the turn loop to retry");
|
||||
|
||||
let CodexErr::Stream(message, None) = err else {
|
||||
panic!(
|
||||
"expected external websocket revocation to surface a retryable stream error, got {err:?}"
|
||||
);
|
||||
};
|
||||
assert_eq!(message, WEBSOCKET_AUTH_RECOVERY_RETRY_REASON);
|
||||
assert_eq!(external_auth.refresh_count.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(
|
||||
manager
|
||||
.auth_cached()
|
||||
.expect("refreshed external auth should remain cached")
|
||||
.get_token()
|
||||
.expect("refreshed token should resolve"),
|
||||
refreshed_access_token
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn token_invalidated_401_retries_when_persisted_auth_changed() {
|
||||
let (codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await;
|
||||
write_managed_chatgpt_auth(codex_home.path(), "replacement-access-token");
|
||||
let mut recovery = Some(manager.unauthorized_recovery());
|
||||
|
||||
let retry = handle_unauthorized(
|
||||
TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
headers: None,
|
||||
body: Some(r#"{"error":{"type":"token_invalidated"}}"#.to_string()),
|
||||
},
|
||||
&mut recovery,
|
||||
&test_session_telemetry(),
|
||||
)
|
||||
.await
|
||||
.expect("new persisted auth should retry instead of logging out");
|
||||
|
||||
assert_eq!(retry.mode, "managed");
|
||||
assert_eq!(retry.phase, "reload");
|
||||
assert_eq!(
|
||||
manager
|
||||
.auth_cached()
|
||||
.expect("replacement auth should remain cached")
|
||||
.get_token()
|
||||
.expect("replacement token should resolve"),
|
||||
"replacement-access-token"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -186,6 +186,7 @@ async fn run_compact_task_inner_impl(
|
||||
|
||||
let max_retries = turn_context.provider.info().stream_max_retries();
|
||||
let mut retries = 0;
|
||||
let mut websocket_auth_recovery_retries = 0;
|
||||
let mut client_session = sess.services.model_client.new_session();
|
||||
// Reuse one client session so turn-scoped state (sticky routing, websocket incremental
|
||||
// request tracking)
|
||||
@@ -235,6 +236,14 @@ async fn run_compact_task_inner_impl(
|
||||
sess.send_event(&turn_context, event).await;
|
||||
return Err(e);
|
||||
}
|
||||
Err(e)
|
||||
if crate::client::is_websocket_auth_recovery_retry(&e)
|
||||
&& websocket_auth_recovery_retries == 0 =>
|
||||
{
|
||||
websocket_auth_recovery_retries += 1;
|
||||
client_session.reset_websocket_session();
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
if retries < max_retries {
|
||||
retries += 1;
|
||||
|
||||
@@ -277,31 +277,44 @@ async fn run_remote_compaction_request_v2(
|
||||
prompt: &Prompt,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> CodexResult<(ResponseItem, String)> {
|
||||
let stream = client_session
|
||||
.stream(
|
||||
prompt,
|
||||
&turn_context.model_info,
|
||||
&turn_context.session_telemetry,
|
||||
turn_context.reasoning_effort,
|
||||
turn_context.reasoning_summary,
|
||||
turn_context.config.service_tier.clone(),
|
||||
turn_metadata_header,
|
||||
&InferenceTraceContext::disabled(),
|
||||
)
|
||||
.or_else(|err| async {
|
||||
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
|
||||
let compact_request_log_data =
|
||||
build_compact_request_log_data(&prompt.input, &prompt.base_instructions.text);
|
||||
log_remote_compact_failure(
|
||||
turn_context,
|
||||
&compact_request_log_data,
|
||||
total_usage_breakdown,
|
||||
&err,
|
||||
);
|
||||
let mut websocket_auth_recovery_retries = 0;
|
||||
loop {
|
||||
let stream = client_session
|
||||
.stream(
|
||||
prompt,
|
||||
&turn_context.model_info,
|
||||
&turn_context.session_telemetry,
|
||||
turn_context.reasoning_effort,
|
||||
turn_context.reasoning_summary,
|
||||
turn_context.config.service_tier.clone(),
|
||||
turn_metadata_header,
|
||||
&InferenceTraceContext::disabled(),
|
||||
)
|
||||
.or_else(|err| async {
|
||||
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
|
||||
let compact_request_log_data =
|
||||
build_compact_request_log_data(&prompt.input, &prompt.base_instructions.text);
|
||||
log_remote_compact_failure(
|
||||
turn_context,
|
||||
&compact_request_log_data,
|
||||
total_usage_breakdown,
|
||||
&err,
|
||||
);
|
||||
Err(err)
|
||||
})
|
||||
.await?;
|
||||
|
||||
match collect_compaction_output(stream).await {
|
||||
Err(err)
|
||||
})
|
||||
.await?;
|
||||
collect_compaction_output(stream).await
|
||||
if crate::client::is_websocket_auth_recovery_retry(&err)
|
||||
&& websocket_auth_recovery_retries == 0 =>
|
||||
{
|
||||
websocket_auth_recovery_retries += 1;
|
||||
client_session.reset_websocket_session();
|
||||
}
|
||||
result => return result,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn collect_compaction_output(
|
||||
|
||||
@@ -943,6 +943,7 @@ async fn run_sampling_request(
|
||||
)
|
||||
.await;
|
||||
let mut retries = 0;
|
||||
let mut websocket_auth_recovery_retries = 0;
|
||||
let mut initial_input = Some(input);
|
||||
loop {
|
||||
let prompt_input = if let Some(input) = initial_input.take() {
|
||||
@@ -992,6 +993,16 @@ async fn run_sampling_request(
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
// Auth recovery already loaded replacement credentials. Give that one immediate retry
|
||||
// without spending the transport reconnect budget.
|
||||
if crate::client::is_websocket_auth_recovery_retry(&err)
|
||||
&& websocket_auth_recovery_retries == 0
|
||||
{
|
||||
websocket_auth_recovery_retries += 1;
|
||||
client_session.reset_websocket_session();
|
||||
continue;
|
||||
}
|
||||
|
||||
// Use the configured provider-specific stream retry budget.
|
||||
let max_retries = turn_context.provider.info().stream_max_retries();
|
||||
if retries >= max_retries
|
||||
|
||||
@@ -339,7 +339,7 @@ impl ThreadManager {
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> Self {
|
||||
set_thread_manager_test_mode_for_tests(/*enabled*/ true);
|
||||
let auth_manager = AuthManager::from_auth_for_testing(auth);
|
||||
let auth_manager = AuthManager::from_auth_for_testing_with_home(auth, codex_home.clone());
|
||||
let installation_id = uuid::Uuid::new_v4().to_string();
|
||||
let skills_codex_home = match AbsolutePathBuf::from_absolute_path_checked(&codex_home) {
|
||||
Ok(codex_home) => codex_home,
|
||||
|
||||
@@ -1362,6 +1362,11 @@ pub async fn start_websocket_server_with_headers(
|
||||
|
||||
if close_after_requests {
|
||||
let _ = ws_stream.close(None).await;
|
||||
} else if !connections.lock().unwrap().is_empty() {
|
||||
tokio::select! {
|
||||
_ = &mut shutdown_rx => return,
|
||||
_ = ws_stream.next() => {}
|
||||
}
|
||||
} else {
|
||||
let _ = shutdown_rx.await;
|
||||
return;
|
||||
|
||||
@@ -467,7 +467,10 @@ impl TestCodexBuilder {
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
let thread_manager = ThreadManager::new(
|
||||
&config,
|
||||
codex_core::test_support::auth_manager_from_auth(auth.clone()),
|
||||
codex_core::test_support::auth_manager_from_auth_with_home(
|
||||
auth.clone(),
|
||||
config.codex_home.to_path_buf(),
|
||||
),
|
||||
SessionSource::Exec,
|
||||
Arc::clone(&environment_manager),
|
||||
empty_extension_registry(),
|
||||
|
||||
@@ -1066,6 +1066,184 @@ async fn chatgpt_auth_sends_correct_request() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn revoked_chatgpt_auth_user_turn_clears_auth_and_requests_relogin() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/api/codex/responses"))
|
||||
.respond_with(ResponseTemplate::new(401).set_body_json(json!({
|
||||
"error": {
|
||||
"code": "token_revoked",
|
||||
"message": "revoked",
|
||||
}
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let codex_home = Arc::new(TempDir::new()?);
|
||||
let _jwt = write_auth_json(
|
||||
codex_home.as_ref(),
|
||||
/*openai_api_key*/ None,
|
||||
"pro",
|
||||
"revoked-access-token",
|
||||
Some("account_id"),
|
||||
);
|
||||
let auth = CodexAuth::from_auth_storage(
|
||||
codex_home.path(),
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await?
|
||||
.expect("managed ChatGPT auth should load");
|
||||
|
||||
let mut model_provider = built_in_model_providers(/*openai_base_url*/ None)["openai"].clone();
|
||||
model_provider.base_url = Some(format!("{}/api/codex", server.uri()));
|
||||
model_provider.supports_websockets = false;
|
||||
let mut builder = test_codex()
|
||||
.with_home(codex_home.clone())
|
||||
.with_auth(auth)
|
||||
.with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let codex = test.codex.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
environments: None,
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
thread_settings: Default::default(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let error_event = wait_for_event(&codex, |ev| matches!(ev, EventMsg::Error(_))).await;
|
||||
assert!(
|
||||
matches!(
|
||||
error_event,
|
||||
EventMsg::Error(ref err)
|
||||
if err.message.contains(
|
||||
"Your ChatGPT session is no longer valid. Please sign in again."
|
||||
)
|
||||
),
|
||||
"expected invalidated-session relogin error; got {error_event:?}"
|
||||
);
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
assert!(
|
||||
!test.codex_home_path().join("auth.json").exists(),
|
||||
"revoked managed ChatGPT auth should be removed"
|
||||
);
|
||||
let response_attempts = server
|
||||
.received_requests()
|
||||
.await
|
||||
.expect("mock server should capture requests")
|
||||
.into_iter()
|
||||
.filter(|request| request.url.path() == "/api/codex/responses")
|
||||
.count();
|
||||
assert_eq!(
|
||||
response_attempts, 1,
|
||||
"revoked managed auth should fail without retrying /responses"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn revoked_chatgpt_auth_user_turn_retries_reloaded_auth() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/api/codex/responses"))
|
||||
.and(header("authorization", "Bearer revoked-access-token"))
|
||||
.respond_with(ResponseTemplate::new(401).set_body_json(json!({
|
||||
"error": {
|
||||
"code": "token_revoked",
|
||||
"message": "revoked",
|
||||
}
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/api/codex/responses"))
|
||||
.and(header("authorization", "Bearer replacement-access-token"))
|
||||
.respond_with(
|
||||
ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.set_body_raw(
|
||||
sse(vec![ev_response_created("resp1"), ev_completed("resp1")]),
|
||||
"text/event-stream",
|
||||
),
|
||||
)
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let codex_home = Arc::new(TempDir::new()?);
|
||||
let _jwt = write_auth_json(
|
||||
codex_home.as_ref(),
|
||||
/*openai_api_key*/ None,
|
||||
"pro",
|
||||
"revoked-access-token",
|
||||
Some("account_id"),
|
||||
);
|
||||
let auth = CodexAuth::from_auth_storage(
|
||||
codex_home.path(),
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await?
|
||||
.expect("managed ChatGPT auth should load");
|
||||
|
||||
let mut model_provider = built_in_model_providers(/*openai_base_url*/ None)["openai"].clone();
|
||||
model_provider.base_url = Some(format!("{}/api/codex", server.uri()));
|
||||
model_provider.supports_websockets = false;
|
||||
let mut builder = test_codex()
|
||||
.with_home(codex_home.clone())
|
||||
.with_auth(auth)
|
||||
.with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
write_auth_json(
|
||||
codex_home.as_ref(),
|
||||
/*openai_api_key*/ None,
|
||||
"pro",
|
||||
"replacement-access-token",
|
||||
Some("account_id"),
|
||||
);
|
||||
|
||||
test.submit_turn("hello")
|
||||
.await
|
||||
.expect("reloaded managed auth should retry the HTTP turn");
|
||||
|
||||
let persisted_auth = CodexAuth::from_auth_storage(
|
||||
codex_home.path(),
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await?
|
||||
.expect("replacement managed auth should persist");
|
||||
assert_eq!(
|
||||
persisted_auth
|
||||
.get_token()
|
||||
.expect("replacement token should resolve"),
|
||||
"replacement-access-token"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
|
||||
skip_if_no_network!();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#![allow(clippy::expect_used, clippy::unwrap_used)]
|
||||
use codex_api::WS_REQUEST_HEADER_TRACEPARENT_CLIENT_METADATA_KEY;
|
||||
use codex_api::WS_REQUEST_HEADER_TRACESTATE_CLIENT_METADATA_KEY;
|
||||
use codex_config::types::AuthCredentialsStoreMode;
|
||||
use codex_core::ModelClient;
|
||||
use codex_core::ModelClientSession;
|
||||
use codex_core::Prompt;
|
||||
@@ -66,6 +67,42 @@ const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
|
||||
const X_CODEX_WS_STREAM_REQUEST_START_MS_CLIENT_METADATA_KEY: &str =
|
||||
"x-codex-ws-stream-request-start-ms";
|
||||
|
||||
fn write_managed_chatgpt_auth(codex_home: &TempDir, access_token: &str) {
|
||||
use base64::Engine as _;
|
||||
|
||||
let encode_json = |value: serde_json::Value| {
|
||||
base64::engine::general_purpose::URL_SAFE_NO_PAD
|
||||
.encode(serde_json::to_vec(&value).expect("managed auth JWT segment should serialize"))
|
||||
};
|
||||
let id_token = format!(
|
||||
"{}.{}.{}",
|
||||
encode_json(json!({"alg": "none", "typ": "JWT"})),
|
||||
encode_json(json!({
|
||||
"email": "user@example.com",
|
||||
"https://api.openai.com/auth": {
|
||||
"chatgpt_account_id": "account_id",
|
||||
"chatgpt_user_id": "user-12345",
|
||||
"user_id": "user-12345"
|
||||
}
|
||||
})),
|
||||
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"sig"),
|
||||
);
|
||||
let auth_json = json!({
|
||||
"tokens": {
|
||||
"id_token": id_token,
|
||||
"access_token": access_token,
|
||||
"refresh_token": "test-refresh-token",
|
||||
"account_id": "account_id"
|
||||
},
|
||||
"last_refresh": chrono::Utc::now(),
|
||||
});
|
||||
std::fs::write(
|
||||
codex_home.path().join("auth.json"),
|
||||
serde_json::to_vec_pretty(&auth_json).expect("managed auth should serialize"),
|
||||
)
|
||||
.expect("managed auth should persist");
|
||||
}
|
||||
|
||||
fn assert_request_trace_matches(body: &serde_json::Value, expected_trace: &W3cTraceContext) {
|
||||
let client_metadata = body["client_metadata"]
|
||||
.as_object()
|
||||
@@ -1429,6 +1466,384 @@ async fn responses_websocket_invalid_request_error_with_status_is_forwarded() {
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_revoked_managed_auth_clears_auth_and_requests_relogin() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let revoked_token_error = json!({
|
||||
"type": "error",
|
||||
"status": 401,
|
||||
"error": {
|
||||
"type": "token_revoked",
|
||||
"message": "revoked"
|
||||
}
|
||||
});
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![
|
||||
ev_response_created("resp-prewarm"),
|
||||
ev_completed("resp-prewarm"),
|
||||
],
|
||||
vec![revoked_token_error],
|
||||
]])
|
||||
.await;
|
||||
|
||||
let codex_home = Arc::new(TempDir::new().expect("managed auth tempdir"));
|
||||
write_managed_chatgpt_auth(codex_home.as_ref(), "revoked-access-token");
|
||||
let auth = CodexAuth::from_auth_storage(
|
||||
codex_home.path(),
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("managed ChatGPT auth should load")
|
||||
.expect("managed ChatGPT auth should exist");
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_home(codex_home.clone())
|
||||
.with_auth(auth)
|
||||
.with_config(|config| {
|
||||
config.model_provider.request_max_retries = Some(0);
|
||||
config.model_provider.stream_max_retries = Some(0);
|
||||
});
|
||||
let test = builder
|
||||
.build_with_websocket_server(&server)
|
||||
.await
|
||||
.expect("build websocket codex");
|
||||
|
||||
let submission_id = test
|
||||
.codex
|
||||
.submit(Op::UserInput {
|
||||
environments: None,
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
thread_settings: Default::default(),
|
||||
})
|
||||
.await
|
||||
.expect("submission should emit relogin events");
|
||||
|
||||
let error_event = wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::Error(_))).await;
|
||||
let EventMsg::Error(error_event) = error_event else {
|
||||
unreachable!();
|
||||
};
|
||||
assert!(
|
||||
error_event
|
||||
.message
|
||||
.contains("Your ChatGPT session is no longer valid. Please sign in again."),
|
||||
"unexpected error message for submission {submission_id}: {}",
|
||||
error_event.message
|
||||
);
|
||||
wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
assert!(
|
||||
!test.codex_home_path().join("auth.json").exists(),
|
||||
"revoked managed ChatGPT auth should be removed"
|
||||
);
|
||||
assert_eq!(server.single_connection().len(), 2);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_revoked_managed_auth_retries_reloaded_auth() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let revoked_token_error = json!({
|
||||
"type": "error",
|
||||
"status": 401,
|
||||
"error": {
|
||||
"type": "token_revoked",
|
||||
"message": "revoked"
|
||||
}
|
||||
});
|
||||
let server = start_websocket_server_with_headers(vec![
|
||||
WebSocketConnectionConfig {
|
||||
requests: vec![
|
||||
vec![
|
||||
ev_response_created("resp-prewarm"),
|
||||
ev_completed("resp-prewarm"),
|
||||
],
|
||||
vec![revoked_token_error],
|
||||
vec![
|
||||
ev_response_created("resp-stale-reuse"),
|
||||
ev_completed("resp-stale-reuse"),
|
||||
],
|
||||
],
|
||||
response_headers: Vec::new(),
|
||||
accept_delay: None,
|
||||
close_after_requests: false,
|
||||
},
|
||||
WebSocketConnectionConfig {
|
||||
requests: vec![vec![
|
||||
ev_response_created("resp-retry"),
|
||||
ev_completed("resp-retry"),
|
||||
]],
|
||||
response_headers: Vec::new(),
|
||||
accept_delay: None,
|
||||
close_after_requests: true,
|
||||
},
|
||||
])
|
||||
.await;
|
||||
|
||||
let codex_home = Arc::new(TempDir::new().expect("managed auth tempdir"));
|
||||
write_managed_chatgpt_auth(codex_home.as_ref(), "revoked-access-token");
|
||||
let auth = CodexAuth::from_auth_storage(
|
||||
codex_home.path(),
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("managed ChatGPT auth should load")
|
||||
.expect("managed ChatGPT auth should exist");
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_home(codex_home.clone())
|
||||
.with_auth(auth)
|
||||
.with_config(|config| {
|
||||
config.model_provider.request_max_retries = Some(0);
|
||||
config.model_provider.stream_max_retries = Some(0);
|
||||
});
|
||||
let test = builder
|
||||
.build_with_websocket_server(&server)
|
||||
.await
|
||||
.expect("build websocket codex");
|
||||
|
||||
write_managed_chatgpt_auth(codex_home.as_ref(), "replacement-access-token");
|
||||
|
||||
test.submit_turn("hello")
|
||||
.await
|
||||
.expect("reloaded managed auth should retry the websocket turn");
|
||||
|
||||
let persisted_auth = CodexAuth::from_auth_storage(
|
||||
codex_home.path(),
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("replacement managed auth should load")
|
||||
.expect("replacement managed auth should persist");
|
||||
assert_eq!(
|
||||
persisted_auth
|
||||
.get_token()
|
||||
.expect("replacement token should resolve"),
|
||||
"replacement-access-token"
|
||||
);
|
||||
|
||||
let connections = server.connections();
|
||||
assert_eq!(connections.len(), 2);
|
||||
assert_eq!(connections[0].len(), 2);
|
||||
assert_eq!(connections[1].len(), 1);
|
||||
|
||||
let handshakes = server.handshakes();
|
||||
assert_eq!(handshakes.len(), 2);
|
||||
assert_eq!(
|
||||
handshakes[0].header("authorization").as_deref(),
|
||||
Some("Bearer revoked-access-token")
|
||||
);
|
||||
assert_eq!(
|
||||
handshakes[1].header("authorization").as_deref(),
|
||||
Some("Bearer replacement-access-token")
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_revoked_managed_auth_prewarm_retries_reloaded_auth() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let revoked_token_error = json!({
|
||||
"type": "error",
|
||||
"status": 401,
|
||||
"error": {
|
||||
"type": "token_revoked",
|
||||
"message": "revoked"
|
||||
}
|
||||
});
|
||||
let server = start_websocket_server(vec![
|
||||
vec![vec![revoked_token_error]],
|
||||
vec![
|
||||
vec![
|
||||
ev_response_created("warm-retry"),
|
||||
ev_completed("warm-retry"),
|
||||
],
|
||||
vec![ev_response_created("resp-1"), ev_completed("resp-1")],
|
||||
],
|
||||
])
|
||||
.await;
|
||||
|
||||
let codex_home = Arc::new(TempDir::new().expect("managed auth tempdir"));
|
||||
write_managed_chatgpt_auth(codex_home.as_ref(), "revoked-access-token");
|
||||
let auth = CodexAuth::from_auth_storage(
|
||||
codex_home.path(),
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("managed ChatGPT auth should load")
|
||||
.expect("managed ChatGPT auth should exist");
|
||||
write_managed_chatgpt_auth(codex_home.as_ref(), "replacement-access-token");
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_home(codex_home.clone())
|
||||
.with_auth(auth)
|
||||
.with_config(|config| {
|
||||
config.model_provider.request_max_retries = Some(0);
|
||||
config.model_provider.stream_max_retries = Some(0);
|
||||
});
|
||||
let test = builder
|
||||
.build_with_websocket_server(&server)
|
||||
.await
|
||||
.expect("build websocket codex");
|
||||
|
||||
test.submit_turn("hello")
|
||||
.await
|
||||
.expect("reloaded managed auth should retry startup websocket prewarm");
|
||||
|
||||
let persisted_auth = CodexAuth::from_auth_storage(
|
||||
codex_home.path(),
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("replacement managed auth should load")
|
||||
.expect("replacement managed auth should persist");
|
||||
assert_eq!(
|
||||
persisted_auth
|
||||
.get_token()
|
||||
.expect("replacement token should resolve"),
|
||||
"replacement-access-token"
|
||||
);
|
||||
|
||||
let connections = server.connections();
|
||||
assert_eq!(connections.len(), 2);
|
||||
assert_eq!(connections[0].len(), 1);
|
||||
assert_eq!(connections[1].len(), 2);
|
||||
|
||||
let handshakes = server.handshakes();
|
||||
assert_eq!(handshakes.len(), 2);
|
||||
assert_eq!(
|
||||
handshakes[0].header("authorization").as_deref(),
|
||||
Some("Bearer revoked-access-token")
|
||||
);
|
||||
assert_eq!(
|
||||
handshakes[1].header("authorization").as_deref(),
|
||||
Some("Bearer replacement-access-token")
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_revoked_managed_auth_compact_retries_reloaded_auth() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let revoked_token_error = json!({
|
||||
"type": "error",
|
||||
"status": 401,
|
||||
"error": {
|
||||
"type": "token_revoked",
|
||||
"message": "revoked"
|
||||
}
|
||||
});
|
||||
let server = start_websocket_server(vec![
|
||||
vec![
|
||||
vec![
|
||||
ev_response_created("resp-prewarm"),
|
||||
ev_completed("resp-prewarm"),
|
||||
],
|
||||
vec![ev_response_created("resp-1"), ev_completed("resp-1")],
|
||||
vec![revoked_token_error],
|
||||
],
|
||||
vec![vec![
|
||||
ev_response_created("compact-retry"),
|
||||
ev_assistant_message("msg-compact", "COMPACT_SUMMARY"),
|
||||
ev_completed("compact-retry"),
|
||||
]],
|
||||
])
|
||||
.await;
|
||||
|
||||
let codex_home = Arc::new(TempDir::new().expect("managed auth tempdir"));
|
||||
write_managed_chatgpt_auth(codex_home.as_ref(), "revoked-access-token");
|
||||
let auth = CodexAuth::from_auth_storage(
|
||||
codex_home.path(),
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("managed ChatGPT auth should load")
|
||||
.expect("managed ChatGPT auth should exist");
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_home(codex_home.clone())
|
||||
.with_auth(auth)
|
||||
.with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::RemoteCompactionV2)
|
||||
.expect("test config should allow feature update");
|
||||
config.model_provider.request_max_retries = Some(0);
|
||||
config.model_provider.stream_max_retries = Some(0);
|
||||
});
|
||||
let test = builder
|
||||
.build_with_websocket_server(&server)
|
||||
.await
|
||||
.expect("build websocket codex");
|
||||
|
||||
test.submit_turn("hello")
|
||||
.await
|
||||
.expect("initial websocket turn should complete");
|
||||
write_managed_chatgpt_auth(codex_home.as_ref(), "replacement-access-token");
|
||||
|
||||
test.codex
|
||||
.submit(Op::Compact)
|
||||
.await
|
||||
.expect("compact submission should succeed");
|
||||
wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;
|
||||
assert!(
|
||||
server
|
||||
.wait_for_handshakes(/*expected*/ 2, Duration::from_secs(10))
|
||||
.await,
|
||||
"compact auth recovery should reconnect with replacement auth"
|
||||
);
|
||||
|
||||
let persisted_auth = CodexAuth::from_auth_storage(
|
||||
codex_home.path(),
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("replacement managed auth should load")
|
||||
.expect("replacement managed auth should persist");
|
||||
assert_eq!(
|
||||
persisted_auth
|
||||
.get_token()
|
||||
.expect("replacement token should resolve"),
|
||||
"replacement-access-token"
|
||||
);
|
||||
|
||||
let connections = server.connections();
|
||||
assert_eq!(connections.len(), 2);
|
||||
assert_eq!(connections[0].len(), 3);
|
||||
assert_eq!(connections[1].len(), 1);
|
||||
|
||||
let handshakes = server.handshakes();
|
||||
assert_eq!(handshakes.len(), 2);
|
||||
assert_eq!(
|
||||
handshakes[0].header("authorization").as_deref(),
|
||||
Some("Bearer revoked-access-token")
|
||||
);
|
||||
assert_eq!(
|
||||
handshakes[1].header("authorization").as_deref(),
|
||||
Some("Bearer replacement-access-token")
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_connection_limit_error_reconnects_and_completes() {
|
||||
skip_if_no_network!();
|
||||
|
||||
@@ -52,6 +52,7 @@ use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStartedNotification;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_cloud_requirements::cloud_requirements_loader_for_storage;
|
||||
use codex_cloud_requirements::refresh_managed_chatgpt_token_for_storage_if_near_expiry;
|
||||
use codex_config::ConfigLoadError;
|
||||
use codex_config::ConfigLoadOptions;
|
||||
use codex_config::LoaderOverrides;
|
||||
@@ -156,7 +157,6 @@ use crate::event_processor::EventProcessor;
|
||||
|
||||
const DEFAULT_ANALYTICS_ENABLED: bool = true;
|
||||
const EXEC_DEFAULT_LOG_FILTER: &str = "error,opentelemetry_sdk=off,opentelemetry_otlp=off";
|
||||
|
||||
enum InitialOperation {
|
||||
UserTurn {
|
||||
items: Vec<UserInput>,
|
||||
@@ -470,6 +470,14 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
refresh_managed_chatgpt_token_for_storage_if_near_expiry(
|
||||
config.codex_home.to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ true,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
config.chatgpt_base_url.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
codex_core::otel_init::build_provider(
|
||||
&config,
|
||||
|
||||
@@ -308,6 +308,7 @@ async fn unauthorized_recovery_reports_mode_and_step_names() {
|
||||
let managed = UnauthorizedRecovery {
|
||||
manager: Arc::clone(&manager),
|
||||
step: UnauthorizedRecoveryStep::Reload,
|
||||
expected_auth: None,
|
||||
expected_account_id: None,
|
||||
mode: UnauthorizedRecoveryMode::Managed,
|
||||
};
|
||||
@@ -317,6 +318,7 @@ async fn unauthorized_recovery_reports_mode_and_step_names() {
|
||||
let external = UnauthorizedRecovery {
|
||||
manager,
|
||||
step: UnauthorizedRecoveryStep::ExternalRefresh,
|
||||
expected_auth: None,
|
||||
expected_account_id: None,
|
||||
mode: UnauthorizedRecoveryMode::External,
|
||||
};
|
||||
@@ -324,6 +326,502 @@ async fn unauthorized_recovery_reports_mode_and_step_names() {
|
||||
assert_eq!(external.step_name(), "external_refresh");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(codex_auth_env)]
|
||||
async fn invalidated_access_token_logout_clears_cached_auth() {
|
||||
let codex_home = tempdir().unwrap();
|
||||
let _access_token_guard = remove_access_token_env_var();
|
||||
write_auth_file(
|
||||
AuthFileParams {
|
||||
openai_api_key: None,
|
||||
chatgpt_plan_type: Some("pro".to_string()),
|
||||
chatgpt_account_id: Some("org_mine".to_string()),
|
||||
},
|
||||
codex_home.path(),
|
||||
)
|
||||
.expect("failed to write auth file");
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let mut recovery = manager.unauthorized_recovery();
|
||||
|
||||
assert!(recovery.handles_invalidated_access_token_auth());
|
||||
let failed = recovery
|
||||
.handle_invalidated_access_token_auth()
|
||||
.await
|
||||
.expect_err("unchanged revoked auth should force login");
|
||||
|
||||
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
|
||||
assert_eq!(
|
||||
failed.message,
|
||||
"Your ChatGPT session is no longer valid. Please sign in again."
|
||||
);
|
||||
assert!(manager.auth_cached().is_none());
|
||||
assert!(!codex_home.path().join("auth.json").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(codex_auth_env)]
|
||||
async fn invalidated_access_token_logout_clears_cached_auth_without_account_id() {
|
||||
let codex_home = tempdir().unwrap();
|
||||
let _access_token_guard = remove_access_token_env_var();
|
||||
write_auth_file(
|
||||
AuthFileParams {
|
||||
openai_api_key: None,
|
||||
chatgpt_plan_type: Some("pro".to_string()),
|
||||
chatgpt_account_id: None,
|
||||
},
|
||||
codex_home.path(),
|
||||
)
|
||||
.expect("failed to write auth file");
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let mut recovery = manager.unauthorized_recovery();
|
||||
|
||||
let failed = recovery
|
||||
.handle_invalidated_access_token_auth()
|
||||
.await
|
||||
.expect_err("unchanged revoked auth without an account id should force login");
|
||||
|
||||
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
|
||||
assert_eq!(
|
||||
failed.message,
|
||||
"Your ChatGPT session is no longer valid. Please sign in again."
|
||||
);
|
||||
assert!(manager.auth_cached().is_none());
|
||||
assert!(!codex_home.path().join("auth.json").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(codex_auth_env)]
|
||||
async fn invalidated_access_token_clears_cached_auth_when_persisted_auth_was_removed() {
|
||||
let codex_home = tempdir().unwrap();
|
||||
let _access_token_guard = remove_access_token_env_var();
|
||||
write_auth_file(
|
||||
AuthFileParams {
|
||||
openai_api_key: None,
|
||||
chatgpt_plan_type: Some("pro".to_string()),
|
||||
chatgpt_account_id: Some("org_mine".to_string()),
|
||||
},
|
||||
codex_home.path(),
|
||||
)
|
||||
.expect("failed to write auth file");
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let mut recovery = manager.unauthorized_recovery();
|
||||
|
||||
std::fs::remove_file(codex_home.path().join("auth.json"))
|
||||
.expect("auth file should be removable");
|
||||
|
||||
let failed = recovery
|
||||
.handle_invalidated_access_token_auth()
|
||||
.await
|
||||
.expect_err("removed persisted auth should force login");
|
||||
|
||||
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
|
||||
assert_eq!(
|
||||
failed.message,
|
||||
"Your ChatGPT session is no longer valid. Please sign in again."
|
||||
);
|
||||
assert!(manager.auth_cached().is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(codex_auth_env)]
|
||||
async fn invalidated_access_token_preserves_cached_auth_when_storage_inspection_fails() {
|
||||
let codex_home = tempdir().unwrap();
|
||||
let _access_token_guard = remove_access_token_env_var();
|
||||
write_auth_file(
|
||||
AuthFileParams {
|
||||
openai_api_key: None,
|
||||
chatgpt_plan_type: Some("pro".to_string()),
|
||||
chatgpt_account_id: Some("org_mine".to_string()),
|
||||
},
|
||||
codex_home.path(),
|
||||
)
|
||||
.expect("failed to write auth file");
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let mut recovery = manager.unauthorized_recovery();
|
||||
|
||||
std::fs::write(codex_home.path().join("auth.json"), "{not-json")
|
||||
.expect("auth file should be corruptible");
|
||||
|
||||
let failed = recovery
|
||||
.handle_invalidated_access_token_auth()
|
||||
.await
|
||||
.expect_err("storage inspection failures should stop invalidation cleanup");
|
||||
|
||||
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
|
||||
assert!(failed.message.starts_with(
|
||||
"Your ChatGPT session is no longer valid. Please sign in again. Codex could not inspect saved auth:"
|
||||
));
|
||||
assert!(manager.auth_cached().is_some());
|
||||
assert!(codex_home.path().join("auth.json").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(codex_auth_env)]
|
||||
async fn invalidated_access_token_preserves_reloaded_auth() {
|
||||
let codex_home = tempdir().unwrap();
|
||||
let _access_token_guard = remove_access_token_env_var();
|
||||
write_auth_file(
|
||||
AuthFileParams {
|
||||
openai_api_key: None,
|
||||
chatgpt_plan_type: Some("pro".to_string()),
|
||||
chatgpt_account_id: Some("org_mine".to_string()),
|
||||
},
|
||||
codex_home.path(),
|
||||
)
|
||||
.expect("failed to write auth file");
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let mut recovery = manager.unauthorized_recovery();
|
||||
|
||||
let mut reauthenticated = load_auth_dot_json(codex_home.path(), AuthCredentialsStoreMode::File)
|
||||
.expect("auth should load")
|
||||
.expect("auth should exist");
|
||||
reauthenticated
|
||||
.tokens
|
||||
.as_mut()
|
||||
.expect("tokens should exist")
|
||||
.access_token = "replacement-access-token".to_string();
|
||||
save_auth(
|
||||
codex_home.path(),
|
||||
&reauthenticated,
|
||||
AuthCredentialsStoreMode::File,
|
||||
)
|
||||
.expect("replacement auth should persist");
|
||||
|
||||
let step_result = recovery
|
||||
.handle_invalidated_access_token_auth()
|
||||
.await
|
||||
.expect("new persisted auth should be retried");
|
||||
|
||||
assert_eq!(step_result.auth_state_changed(), Some(true));
|
||||
assert_eq!(
|
||||
manager
|
||||
.auth_cached()
|
||||
.expect("replacement auth should remain cached")
|
||||
.get_token()
|
||||
.expect("replacement token should resolve"),
|
||||
"replacement-access-token"
|
||||
);
|
||||
assert!(codex_home.path().join("auth.json").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(codex_auth_env)]
|
||||
async fn invalidated_access_token_logs_out_reloaded_auth_if_retry_is_also_revoked() {
|
||||
let codex_home = tempdir().unwrap();
|
||||
let _access_token_guard = remove_access_token_env_var();
|
||||
write_auth_file(
|
||||
AuthFileParams {
|
||||
openai_api_key: None,
|
||||
chatgpt_plan_type: Some("pro".to_string()),
|
||||
chatgpt_account_id: Some("org_mine".to_string()),
|
||||
},
|
||||
codex_home.path(),
|
||||
)
|
||||
.expect("failed to write auth file");
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let mut recovery = manager.unauthorized_recovery();
|
||||
|
||||
let mut reauthenticated = load_auth_dot_json(codex_home.path(), AuthCredentialsStoreMode::File)
|
||||
.expect("auth should load")
|
||||
.expect("auth should exist");
|
||||
reauthenticated
|
||||
.tokens
|
||||
.as_mut()
|
||||
.expect("tokens should exist")
|
||||
.access_token = "replacement-access-token".to_string();
|
||||
save_auth(
|
||||
codex_home.path(),
|
||||
&reauthenticated,
|
||||
AuthCredentialsStoreMode::File,
|
||||
)
|
||||
.expect("replacement auth should persist");
|
||||
|
||||
recovery
|
||||
.handle_invalidated_access_token_auth()
|
||||
.await
|
||||
.expect("first revoked token should retry the replacement auth");
|
||||
|
||||
let failed = recovery
|
||||
.handle_invalidated_access_token_auth()
|
||||
.await
|
||||
.expect_err("repeated revocation of the replacement auth should force login");
|
||||
|
||||
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
|
||||
assert_eq!(
|
||||
failed.message,
|
||||
"Your ChatGPT session is no longer valid. Please sign in again."
|
||||
);
|
||||
assert!(manager.auth_cached().is_none());
|
||||
assert!(!codex_home.path().join("auth.json").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(codex_auth_env)]
|
||||
async fn invalidated_access_token_logs_out_auth_reloaded_by_normal_recovery() {
|
||||
let codex_home = tempdir().unwrap();
|
||||
let _access_token_guard = remove_access_token_env_var();
|
||||
write_auth_file(
|
||||
AuthFileParams {
|
||||
openai_api_key: None,
|
||||
chatgpt_plan_type: Some("pro".to_string()),
|
||||
chatgpt_account_id: Some("org_mine".to_string()),
|
||||
},
|
||||
codex_home.path(),
|
||||
)
|
||||
.expect("failed to write auth file");
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let mut recovery = manager.unauthorized_recovery();
|
||||
|
||||
let mut reauthenticated = load_auth_dot_json(codex_home.path(), AuthCredentialsStoreMode::File)
|
||||
.expect("auth should load")
|
||||
.expect("auth should exist");
|
||||
reauthenticated
|
||||
.tokens
|
||||
.as_mut()
|
||||
.expect("tokens should exist")
|
||||
.access_token = "replacement-access-token".to_string();
|
||||
save_auth(
|
||||
codex_home.path(),
|
||||
&reauthenticated,
|
||||
AuthCredentialsStoreMode::File,
|
||||
)
|
||||
.expect("replacement auth should persist");
|
||||
|
||||
let reloaded = recovery
|
||||
.next()
|
||||
.await
|
||||
.expect("normal unauthorized recovery should reload replacement auth");
|
||||
assert_eq!(reloaded.auth_state_changed(), Some(true));
|
||||
|
||||
let failed = recovery
|
||||
.handle_invalidated_access_token_auth()
|
||||
.await
|
||||
.expect_err("revoking the just-reloaded auth should force login");
|
||||
|
||||
assert_eq!(failed.reason, RefreshTokenFailedReason::Revoked);
|
||||
assert_eq!(
|
||||
failed.message,
|
||||
"Your ChatGPT session is no longer valid. Please sign in again."
|
||||
);
|
||||
assert!(manager.auth_cached().is_none());
|
||||
assert!(!codex_home.path().join("auth.json").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(codex_auth_env)]
|
||||
async fn invalidated_access_token_preserves_auth_refreshed_after_request_started() {
|
||||
let codex_home = tempdir().unwrap();
|
||||
let _access_token_guard = remove_access_token_env_var();
|
||||
write_auth_file(
|
||||
AuthFileParams {
|
||||
openai_api_key: None,
|
||||
chatgpt_plan_type: Some("pro".to_string()),
|
||||
chatgpt_account_id: Some("org_mine".to_string()),
|
||||
},
|
||||
codex_home.path(),
|
||||
)
|
||||
.expect("failed to write auth file");
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let mut recovery = manager.unauthorized_recovery();
|
||||
|
||||
let mut refreshed_auth = load_auth_dot_json(codex_home.path(), AuthCredentialsStoreMode::File)
|
||||
.expect("auth should load")
|
||||
.expect("auth should exist");
|
||||
refreshed_auth
|
||||
.tokens
|
||||
.as_mut()
|
||||
.expect("tokens should exist")
|
||||
.access_token = "replacement-access-token".to_string();
|
||||
save_auth(
|
||||
codex_home.path(),
|
||||
&refreshed_auth,
|
||||
AuthCredentialsStoreMode::File,
|
||||
)
|
||||
.expect("replacement auth should persist");
|
||||
manager.reload().await;
|
||||
assert_eq!(
|
||||
manager
|
||||
.auth_cached()
|
||||
.expect("replacement auth should load into cache")
|
||||
.get_token()
|
||||
.expect("replacement token should resolve"),
|
||||
"replacement-access-token"
|
||||
);
|
||||
|
||||
let step_result = recovery
|
||||
.handle_invalidated_access_token_auth()
|
||||
.await
|
||||
.expect("auth refreshed after request start should be retried");
|
||||
|
||||
assert_eq!(step_result.auth_state_changed(), Some(true));
|
||||
assert_eq!(
|
||||
manager
|
||||
.auth_cached()
|
||||
.expect("replacement auth should remain cached")
|
||||
.get_token()
|
||||
.expect("replacement token should resolve"),
|
||||
"replacement-access-token"
|
||||
);
|
||||
assert!(codex_home.path().join("auth.json").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(codex_auth_env)]
|
||||
async fn invalidated_access_token_preserves_reloaded_auth_from_a_different_account() {
|
||||
let codex_home = tempdir().unwrap();
|
||||
let _access_token_guard = remove_access_token_env_var();
|
||||
write_auth_file(
|
||||
AuthFileParams {
|
||||
openai_api_key: None,
|
||||
chatgpt_plan_type: Some("pro".to_string()),
|
||||
chatgpt_account_id: Some("org_mine".to_string()),
|
||||
},
|
||||
codex_home.path(),
|
||||
)
|
||||
.expect("failed to write auth file");
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let mut recovery = manager.unauthorized_recovery();
|
||||
|
||||
write_auth_file(
|
||||
AuthFileParams {
|
||||
openai_api_key: None,
|
||||
chatgpt_plan_type: Some("pro".to_string()),
|
||||
chatgpt_account_id: Some("org_elsewhere".to_string()),
|
||||
},
|
||||
codex_home.path(),
|
||||
)
|
||||
.expect("replacement auth should persist");
|
||||
|
||||
let step_result = recovery
|
||||
.handle_invalidated_access_token_auth()
|
||||
.await
|
||||
.expect("new persisted auth should be retried across account changes");
|
||||
|
||||
assert_eq!(step_result.auth_state_changed(), Some(true));
|
||||
assert_eq!(
|
||||
manager
|
||||
.auth_cached()
|
||||
.expect("replacement auth should remain cached")
|
||||
.get_account_id()
|
||||
.as_deref(),
|
||||
Some("org_elsewhere")
|
||||
);
|
||||
let follow_up_step = recovery
|
||||
.next()
|
||||
.await
|
||||
.expect("follow-up refreshable 401 should use the replacement account");
|
||||
assert_eq!(follow_up_step.auth_state_changed(), Some(false));
|
||||
assert_eq!(recovery.step_name(), "refresh_token");
|
||||
assert!(codex_home.path().join("auth.json").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(codex_auth_env)]
|
||||
async fn invalidated_access_token_logout_preserves_auth_changed_before_delete() {
|
||||
let codex_home = tempdir().unwrap();
|
||||
let _access_token_guard = remove_access_token_env_var();
|
||||
write_auth_file(
|
||||
AuthFileParams {
|
||||
openai_api_key: None,
|
||||
chatgpt_plan_type: Some("pro".to_string()),
|
||||
chatgpt_account_id: Some("org_mine".to_string()),
|
||||
},
|
||||
codex_home.path(),
|
||||
)
|
||||
.expect("failed to write auth file");
|
||||
let manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut reauthenticated = load_auth_dot_json(codex_home.path(), AuthCredentialsStoreMode::File)
|
||||
.expect("auth should load")
|
||||
.expect("auth should exist");
|
||||
reauthenticated
|
||||
.tokens
|
||||
.as_mut()
|
||||
.expect("tokens should exist")
|
||||
.access_token = "replacement-access-token".to_string();
|
||||
save_auth(
|
||||
codex_home.path(),
|
||||
&reauthenticated,
|
||||
AuthCredentialsStoreMode::File,
|
||||
)
|
||||
.expect("replacement auth should persist");
|
||||
|
||||
let outcome = manager
|
||||
.logout_if_auth_snapshot_unchanged()
|
||||
.await
|
||||
.expect("replaced persisted auth should avoid logout");
|
||||
assert!(matches!(outcome, InvalidatedAuthLogoutOutcome::AuthChanged));
|
||||
assert_eq!(
|
||||
manager
|
||||
.auth_cached()
|
||||
.expect("replacement auth should remain cached")
|
||||
.get_token()
|
||||
.expect("replacement token should resolve"),
|
||||
"replacement-access-token"
|
||||
);
|
||||
assert!(codex_home.path().join("auth.json").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(codex_auth_env)]
|
||||
async fn refresh_failure_is_scoped_to_the_matching_auth_snapshot() {
|
||||
@@ -453,6 +951,7 @@ async fn unauthorized_recovery_uses_external_refresh_for_bearer_manager() {
|
||||
assert!(recovery.has_next());
|
||||
assert_eq!(recovery.mode_name(), "external");
|
||||
assert_eq!(recovery.step_name(), "external_refresh");
|
||||
assert!(!recovery.handles_invalidated_access_token_auth());
|
||||
|
||||
let result = recovery
|
||||
.next()
|
||||
@@ -610,7 +1109,8 @@ fn write_auth_file(params: AuthFileParams, codex_home: &Path) -> std::io::Result
|
||||
"tokens": {
|
||||
"id_token": fake_jwt,
|
||||
"access_token": "test-access-token",
|
||||
"refresh_token": "test-refresh-token"
|
||||
"refresh_token": "test-refresh-token",
|
||||
"account_id": params.chatgpt_account_id,
|
||||
},
|
||||
"last_refresh": Utc::now(),
|
||||
});
|
||||
|
||||
@@ -7,6 +7,10 @@ use serde::Serialize;
|
||||
use serial_test::serial;
|
||||
use std::env;
|
||||
use std::fmt::Debug;
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
@@ -84,6 +88,10 @@ struct ChatgptAuthState {
|
||||
}
|
||||
|
||||
const TOKEN_REFRESH_INTERVAL: i64 = 8;
|
||||
const CHATGPT_ACCESS_TOKEN_REFRESH_WINDOW_MINUTES: i64 = 5;
|
||||
const CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_FILENAME: &str =
|
||||
"chatgpt-access-token-startup-refresh.lock";
|
||||
const CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_POLL_INTERVAL_MS: u64 = 50;
|
||||
|
||||
const REFRESH_TOKEN_EXPIRED_MESSAGE: &str = "Your access token could not be refreshed because your refresh token has expired. Please log out and sign in again.";
|
||||
const REFRESH_TOKEN_REUSED_MESSAGE: &str = "Your access token could not be refreshed because your refresh token was already used. Please log out and sign in again.";
|
||||
@@ -91,6 +99,8 @@ const REFRESH_TOKEN_INVALIDATED_MESSAGE: &str = "Your access token could not be
|
||||
const REFRESH_TOKEN_UNKNOWN_MESSAGE: &str =
|
||||
"Your access token could not be refreshed. Please log out and sign in again.";
|
||||
const REFRESH_TOKEN_ACCOUNT_MISMATCH_MESSAGE: &str = "Your access token could not be refreshed because you have since logged out or signed in to another account. Please sign in again.";
|
||||
const ACCESS_TOKEN_INVALIDATED_MESSAGE: &str =
|
||||
"Your ChatGPT session is no longer valid. Please sign in again.";
|
||||
const DEFAULT_CHATGPT_BACKEND_BASE_URL: &str = "https://chatgpt.com/backend-api";
|
||||
const REFRESH_TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
|
||||
pub(super) const REVOKE_TOKEN_URL: &str = "https://auth.openai.com/oauth/revoke";
|
||||
@@ -1049,6 +1059,11 @@ enum ReloadOutcome {
|
||||
Skipped,
|
||||
}
|
||||
|
||||
enum InvalidatedAuthLogoutOutcome {
|
||||
LoggedOut,
|
||||
AuthChanged,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum UnauthorizedRecoveryMode {
|
||||
Managed,
|
||||
@@ -1056,7 +1071,9 @@ enum UnauthorizedRecoveryMode {
|
||||
}
|
||||
|
||||
// UnauthorizedRecovery is a state machine that handles an attempt to refresh the authentication when requests
|
||||
// to API fail with 401 status code.
|
||||
// to API fail with a refreshable 401 status code. Managed ChatGPT access-token revocation is handled beside
|
||||
// this flow so persisted credentials can be guarded before they are cleared; externally owned auth stays here
|
||||
// so its owner can supply replacement credentials.
|
||||
// The client calls next() every time it encounters a 401 error, one time per retry.
|
||||
// For API key based authentication, we don't do anything and let the error bubble to the user.
|
||||
//
|
||||
@@ -1076,6 +1093,7 @@ enum UnauthorizedRecoveryMode {
|
||||
pub struct UnauthorizedRecovery {
|
||||
manager: Arc<AuthManager>,
|
||||
step: UnauthorizedRecoveryStep,
|
||||
expected_auth: Option<CodexAuth>,
|
||||
expected_account_id: Option<String>,
|
||||
mode: UnauthorizedRecoveryMode,
|
||||
}
|
||||
@@ -1111,6 +1129,7 @@ impl UnauthorizedRecovery {
|
||||
Self {
|
||||
manager,
|
||||
step,
|
||||
expected_auth: cached_auth,
|
||||
expected_account_id,
|
||||
mode,
|
||||
}
|
||||
@@ -1182,6 +1201,15 @@ impl UnauthorizedRecovery {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handles_invalidated_access_token_auth(&self) -> bool {
|
||||
self.mode == UnauthorizedRecoveryMode::Managed
|
||||
&& self
|
||||
.manager
|
||||
.auth_cached()
|
||||
.as_ref()
|
||||
.is_some_and(|auth| matches!(auth, CodexAuth::Chatgpt(_)))
|
||||
}
|
||||
|
||||
pub async fn next(&mut self) -> Result<UnauthorizedRecoveryStepResult, RefreshTokenError> {
|
||||
if !self.has_next() {
|
||||
return Err(RefreshTokenError::Permanent(RefreshTokenFailedError::new(
|
||||
@@ -1198,6 +1226,7 @@ impl UnauthorizedRecovery {
|
||||
.await
|
||||
{
|
||||
ReloadOutcome::ReloadedChanged => {
|
||||
self.update_expected_auth_from_cache();
|
||||
self.step = UnauthorizedRecoveryStep::RefreshToken;
|
||||
return Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(true),
|
||||
@@ -1220,6 +1249,7 @@ impl UnauthorizedRecovery {
|
||||
}
|
||||
UnauthorizedRecoveryStep::RefreshToken => {
|
||||
self.manager.refresh_token_from_authority().await?;
|
||||
self.update_expected_auth_from_cache();
|
||||
self.step = UnauthorizedRecoveryStep::Done;
|
||||
return Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(true),
|
||||
@@ -1229,6 +1259,7 @@ impl UnauthorizedRecovery {
|
||||
self.manager
|
||||
.refresh_external_auth(ExternalAuthRefreshReason::Unauthorized)
|
||||
.await?;
|
||||
self.update_expected_auth_from_cache();
|
||||
self.step = UnauthorizedRecoveryStep::Done;
|
||||
return Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(true),
|
||||
@@ -1240,6 +1271,127 @@ impl UnauthorizedRecovery {
|
||||
auth_state_changed: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn update_expected_auth_from_cache(&mut self) {
|
||||
let expected_auth = self.manager.auth_cached();
|
||||
self.expected_account_id = expected_auth.as_ref().and_then(CodexAuth::get_account_id);
|
||||
self.expected_auth = expected_auth;
|
||||
}
|
||||
|
||||
pub async fn handle_invalidated_access_token_auth(
|
||||
&mut self,
|
||||
) -> Result<UnauthorizedRecoveryStepResult, RefreshTokenFailedError> {
|
||||
let (result, next_expected_auth) = {
|
||||
let _refresh_guard = self.manager.refresh_lock.acquire().await.map_err(|_| {
|
||||
RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Other,
|
||||
REFRESH_TOKEN_UNKNOWN_MESSAGE.to_string(),
|
||||
)
|
||||
})?;
|
||||
let cached_auth = self.manager.auth_cached();
|
||||
if !AuthManager::auths_equal_for_refresh(
|
||||
cached_auth.as_ref(),
|
||||
self.expected_auth.as_ref(),
|
||||
) {
|
||||
self.invalidated_auth_state_changed_result()
|
||||
} else {
|
||||
let reload_outcome = self
|
||||
.manager
|
||||
.reload_if_auth_snapshot_changed()
|
||||
.await
|
||||
.map_err(Self::invalidated_auth_storage_error)?;
|
||||
|
||||
match reload_outcome {
|
||||
ReloadOutcome::ReloadedChanged => self.invalidated_auth_state_changed_result(),
|
||||
ReloadOutcome::ReloadedNoChange => {
|
||||
match self.manager.logout_if_auth_snapshot_unchanged().await {
|
||||
Ok(InvalidatedAuthLogoutOutcome::AuthChanged) => {
|
||||
self.invalidated_auth_state_changed_result()
|
||||
}
|
||||
Ok(InvalidatedAuthLogoutOutcome::LoggedOut) => (
|
||||
Err(RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Revoked,
|
||||
ACCESS_TOKEN_INVALIDATED_MESSAGE.to_string(),
|
||||
)),
|
||||
None,
|
||||
),
|
||||
Err(err) => (
|
||||
Err(RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Revoked,
|
||||
format!(
|
||||
"{ACCESS_TOKEN_INVALIDATED_MESSAGE} Codex could not clear saved auth: {err}"
|
||||
),
|
||||
)),
|
||||
None,
|
||||
),
|
||||
}
|
||||
}
|
||||
ReloadOutcome::Skipped => {
|
||||
if self
|
||||
.manager
|
||||
.clear_cached_auth_if_storage_missing()
|
||||
.await
|
||||
.map_err(Self::invalidated_auth_storage_error)?
|
||||
{
|
||||
(
|
||||
Err(RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Revoked,
|
||||
ACCESS_TOKEN_INVALIDATED_MESSAGE.to_string(),
|
||||
)),
|
||||
None,
|
||||
)
|
||||
} else {
|
||||
(
|
||||
Err(RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Other,
|
||||
REFRESH_TOKEN_ACCOUNT_MISMATCH_MESSAGE.to_string(),
|
||||
)),
|
||||
None,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
if result.is_ok() {
|
||||
self.expected_account_id = next_expected_auth
|
||||
.as_ref()
|
||||
.and_then(CodexAuth::get_account_id);
|
||||
self.expected_auth = next_expected_auth;
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
fn invalidated_auth_state_changed_result(
|
||||
&self,
|
||||
) -> (
|
||||
Result<UnauthorizedRecoveryStepResult, RefreshTokenFailedError>,
|
||||
Option<CodexAuth>,
|
||||
) {
|
||||
let cached_auth = self.manager.auth_cached();
|
||||
if cached_auth.is_none() {
|
||||
return (
|
||||
Err(RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Revoked,
|
||||
ACCESS_TOKEN_INVALIDATED_MESSAGE.to_string(),
|
||||
)),
|
||||
None,
|
||||
);
|
||||
}
|
||||
(
|
||||
Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(true),
|
||||
}),
|
||||
cached_auth,
|
||||
)
|
||||
}
|
||||
|
||||
fn invalidated_auth_storage_error(err: std::io::Error) -> RefreshTokenFailedError {
|
||||
RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Revoked,
|
||||
format!("{ACCESS_TOKEN_INVALIDATED_MESSAGE} Codex could not inspect saved auth: {err}"),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Central manager providing a single source of truth for auth.json derived
|
||||
@@ -1481,6 +1633,48 @@ impl AuthManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn reload_if_auth_snapshot_changed(&self) -> std::io::Result<ReloadOutcome> {
|
||||
let new_auth = self.try_load_auth_from_storage().await?;
|
||||
let cached_before_reload = self.auth_cached();
|
||||
let auth_changed =
|
||||
!Self::auths_equal_for_refresh(cached_before_reload.as_ref(), new_auth.as_ref());
|
||||
if !auth_changed {
|
||||
return Ok(ReloadOutcome::ReloadedNoChange);
|
||||
}
|
||||
|
||||
tracing::info!("Reloading auth because the persisted auth snapshot changed.");
|
||||
self.set_cached_auth(new_auth);
|
||||
Ok(ReloadOutcome::ReloadedChanged)
|
||||
}
|
||||
|
||||
async fn clear_cached_auth_if_storage_missing(&self) -> std::io::Result<bool> {
|
||||
if self.try_load_auth_from_storage().await?.is_some() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
tracing::info!("Clearing cached auth because persisted auth is no longer available.");
|
||||
self.set_cached_auth(/*new_auth*/ None);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn logout_if_auth_snapshot_unchanged(
|
||||
&self,
|
||||
) -> std::io::Result<InvalidatedAuthLogoutOutcome> {
|
||||
let persisted_auth = self.try_load_auth_from_storage().await?;
|
||||
let cached_auth = self.auth_cached();
|
||||
if !Self::auths_equal_for_refresh(cached_auth.as_ref(), persisted_auth.as_ref()) {
|
||||
tracing::info!(
|
||||
"Skipping auth logout because the persisted auth snapshot changed before deletion."
|
||||
);
|
||||
self.set_cached_auth(persisted_auth);
|
||||
return Ok(InvalidatedAuthLogoutOutcome::AuthChanged);
|
||||
}
|
||||
|
||||
logout_all_stores(&self.codex_home, self.auth_credentials_store_mode)?;
|
||||
self.reload().await;
|
||||
Ok(InvalidatedAuthLogoutOutcome::LoggedOut)
|
||||
}
|
||||
|
||||
fn auths_equal_for_refresh(a: Option<&CodexAuth>, b: Option<&CodexAuth>) -> bool {
|
||||
match (a, b) {
|
||||
(None, None) => true,
|
||||
@@ -1529,7 +1723,7 @@ impl AuthManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn load_auth_from_storage(&self) -> Option<CodexAuth> {
|
||||
async fn try_load_auth_from_storage(&self) -> std::io::Result<Option<CodexAuth>> {
|
||||
load_auth(
|
||||
&self.codex_home,
|
||||
self.enable_codex_api_key_env,
|
||||
@@ -1537,8 +1731,10 @@ impl AuthManager {
|
||||
self.chatgpt_base_url.as_deref(),
|
||||
)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
async fn load_auth_from_storage(&self) -> Option<CodexAuth> {
|
||||
self.try_load_auth_from_storage().await.ok().flatten()
|
||||
}
|
||||
|
||||
fn set_cached_auth(&self, new_auth: Option<CodexAuth>) -> bool {
|
||||
@@ -1686,6 +1882,10 @@ impl AuthManager {
|
||||
REFRESH_TOKEN_UNKNOWN_MESSAGE.to_string(),
|
||||
))
|
||||
})?;
|
||||
self.refresh_token_with_refresh_lock_held().await
|
||||
}
|
||||
|
||||
async fn refresh_token_with_refresh_lock_held(&self) -> Result<(), RefreshTokenError> {
|
||||
let auth_before_reload = self.auth_cached();
|
||||
if auth_before_reload
|
||||
.as_ref()
|
||||
@@ -1715,6 +1915,85 @@ impl AuthManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Refresh managed ChatGPT auth when its access token is nearly expired.
|
||||
///
|
||||
/// CLI startup uses the same five-minute refresh window as ChatGPT web so a
|
||||
/// new session does not begin with a token that is about to expire. Other auth
|
||||
/// modes either cannot be refreshed locally or have separate refresh ownership,
|
||||
/// so they intentionally no-op here.
|
||||
pub async fn refresh_managed_chatgpt_token_if_near_expiry(
|
||||
&self,
|
||||
) -> Result<(), RefreshTokenError> {
|
||||
let Some(CodexAuth::Chatgpt(chatgpt_auth)) = self.auth_cached() else {
|
||||
return Ok(());
|
||||
};
|
||||
let should_refresh = chatgpt_auth
|
||||
.current_token_data()
|
||||
.and_then(|tokens| parse_jwt_expiration(&tokens.access_token).ok().flatten())
|
||||
.is_some_and(|expires_at| {
|
||||
expires_at
|
||||
<= Utc::now()
|
||||
+ chrono::Duration::minutes(CHATGPT_ACCESS_TOKEN_REFRESH_WINDOW_MINUTES)
|
||||
});
|
||||
if !should_refresh {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let _refresh_lock = self.acquire_chatgpt_startup_refresh_lock().await?;
|
||||
let _refresh_guard = self.refresh_lock.acquire().await.map_err(|_| {
|
||||
RefreshTokenError::Permanent(RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Other,
|
||||
REFRESH_TOKEN_UNKNOWN_MESSAGE.to_string(),
|
||||
))
|
||||
})?;
|
||||
|
||||
self.refresh_token_with_refresh_lock_held().await
|
||||
}
|
||||
|
||||
async fn acquire_chatgpt_startup_refresh_lock(&self) -> Result<File, RefreshTokenError> {
|
||||
let mut logged_wait = false;
|
||||
loop {
|
||||
if let Some(lock_file) = self.try_acquire_chatgpt_startup_refresh_lock()? {
|
||||
return Ok(lock_file);
|
||||
}
|
||||
if !logged_wait {
|
||||
tracing::info!(
|
||||
"Waiting to proactively refresh ChatGPT access token because another process is already refreshing it."
|
||||
);
|
||||
logged_wait = true;
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(
|
||||
CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_POLL_INTERVAL_MS,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
fn try_acquire_chatgpt_startup_refresh_lock(&self) -> Result<Option<File>, RefreshTokenError> {
|
||||
let lock_path = self
|
||||
.codex_home
|
||||
.join(CHATGPT_ACCESS_TOKEN_STARTUP_REFRESH_LOCK_FILENAME);
|
||||
if let Some(parent) = lock_path.parent() {
|
||||
std::fs::create_dir_all(parent).map_err(RefreshTokenError::Transient)?;
|
||||
}
|
||||
|
||||
let mut options = OpenOptions::new();
|
||||
options.read(true).write(true).create(true).truncate(false);
|
||||
#[cfg(unix)]
|
||||
{
|
||||
options.mode(0o600);
|
||||
}
|
||||
let lock_file = options
|
||||
.open(lock_path)
|
||||
.map_err(RefreshTokenError::Transient)?;
|
||||
|
||||
match lock_file.try_lock() {
|
||||
Ok(()) => Ok(Some(lock_file)),
|
||||
Err(std::fs::TryLockError::WouldBlock) => Ok(None),
|
||||
Err(err) => Err(RefreshTokenError::Transient(err.into())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to refresh the current auth token from the authority that issued
|
||||
/// the token. On success, reloads the auth state from disk so other components
|
||||
/// observe refreshed token. If the token refresh fails, returns the error to
|
||||
|
||||
@@ -19,6 +19,7 @@ use pretty_assertions::assert_eq;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use std::ffi::OsString;
|
||||
use std::fs::File;
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
use wiremock::Mock;
|
||||
@@ -158,6 +159,242 @@ async fn refresh_token_refreshes_when_auth_is_unchanged() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[serial_test::serial(auth_refresh)]
|
||||
#[tokio::test]
|
||||
async fn refresh_managed_chatgpt_token_refreshes_when_auth_is_near_expiry() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/oauth/token"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"access_token": "new-access-token",
|
||||
"refresh_token": "new-refresh-token"
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let ctx = RefreshTokenTestContext::new(&server).await?;
|
||||
let initial_last_refresh = Utc::now();
|
||||
let near_expiry_access_token = access_token_with_expiration(Utc::now() + Duration::minutes(4));
|
||||
let initial_tokens = build_tokens(&near_expiry_access_token, INITIAL_REFRESH_TOKEN);
|
||||
let initial_auth = AuthDotJson {
|
||||
auth_mode: Some(AuthMode::Chatgpt),
|
||||
openai_api_key: None,
|
||||
tokens: Some(initial_tokens.clone()),
|
||||
last_refresh: Some(initial_last_refresh),
|
||||
agent_identity: None,
|
||||
};
|
||||
ctx.write_auth(&initial_auth).await?;
|
||||
|
||||
ctx.auth_manager
|
||||
.refresh_managed_chatgpt_token_if_near_expiry()
|
||||
.await
|
||||
.context("managed ChatGPT refresh should succeed")?;
|
||||
|
||||
let refreshed_tokens = TokenData {
|
||||
access_token: "new-access-token".to_string(),
|
||||
refresh_token: "new-refresh-token".to_string(),
|
||||
..initial_tokens.clone()
|
||||
};
|
||||
let stored = ctx.load_auth()?;
|
||||
let tokens = stored.tokens.as_ref().context("tokens should exist")?;
|
||||
assert_eq!(tokens, &refreshed_tokens);
|
||||
let refreshed_at = stored
|
||||
.last_refresh
|
||||
.as_ref()
|
||||
.context("last_refresh should be recorded")?;
|
||||
assert!(
|
||||
*refreshed_at >= initial_last_refresh,
|
||||
"last_refresh should advance"
|
||||
);
|
||||
|
||||
server.verify().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[serial_test::serial(auth_refresh)]
|
||||
#[tokio::test]
|
||||
async fn refresh_managed_chatgpt_token_skips_auth_outside_refresh_window() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = MockServer::start().await;
|
||||
let ctx = RefreshTokenTestContext::new(&server).await?;
|
||||
let initial_last_refresh = Utc::now();
|
||||
let fresh_access_token = access_token_with_expiration(Utc::now() + Duration::minutes(6));
|
||||
let initial_tokens = build_tokens(&fresh_access_token, INITIAL_REFRESH_TOKEN);
|
||||
let initial_auth = AuthDotJson {
|
||||
auth_mode: Some(AuthMode::Chatgpt),
|
||||
openai_api_key: None,
|
||||
tokens: Some(initial_tokens.clone()),
|
||||
last_refresh: Some(initial_last_refresh),
|
||||
agent_identity: None,
|
||||
};
|
||||
ctx.write_auth(&initial_auth).await?;
|
||||
|
||||
ctx.auth_manager
|
||||
.refresh_managed_chatgpt_token_if_near_expiry()
|
||||
.await
|
||||
.context("managed ChatGPT refresh should no-op")?;
|
||||
|
||||
assert_eq!(ctx.load_auth()?, initial_auth);
|
||||
let requests = server.received_requests().await.unwrap_or_default();
|
||||
assert!(requests.is_empty(), "expected no refresh token requests");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[serial_test::serial(auth_refresh)]
|
||||
#[tokio::test]
|
||||
async fn refresh_managed_chatgpt_token_waits_while_startup_refresh_lock_is_held() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/oauth/token"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"access_token": "new-access-token",
|
||||
"refresh_token": "new-refresh-token"
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let ctx = RefreshTokenTestContext::new(&server).await?;
|
||||
let initial_last_refresh = Utc::now();
|
||||
let expired_access_token = access_token_with_expiration(Utc::now() - Duration::minutes(1));
|
||||
let initial_tokens = build_tokens(&expired_access_token, INITIAL_REFRESH_TOKEN);
|
||||
let initial_auth = AuthDotJson {
|
||||
auth_mode: Some(AuthMode::Chatgpt),
|
||||
openai_api_key: None,
|
||||
tokens: Some(initial_tokens.clone()),
|
||||
last_refresh: Some(initial_last_refresh),
|
||||
agent_identity: None,
|
||||
};
|
||||
ctx.write_auth(&initial_auth).await?;
|
||||
|
||||
let lock_path = ctx
|
||||
.codex_home
|
||||
.path()
|
||||
.join("chatgpt-access-token-startup-refresh.lock");
|
||||
let lock_file = File::options()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.open(lock_path)?;
|
||||
lock_file.try_lock()?;
|
||||
|
||||
let auth_manager = Arc::clone(&ctx.auth_manager);
|
||||
let refresh_task = tokio::spawn(async move {
|
||||
auth_manager
|
||||
.refresh_managed_chatgpt_token_if_near_expiry()
|
||||
.await
|
||||
});
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
assert!(
|
||||
!refresh_task.is_finished(),
|
||||
"managed ChatGPT refresh should wait while another startup holds the lock"
|
||||
);
|
||||
assert_eq!(ctx.load_auth()?, initial_auth);
|
||||
let requests = server.received_requests().await.unwrap_or_default();
|
||||
assert!(
|
||||
requests.is_empty(),
|
||||
"expected no refresh token requests before the startup lock is released"
|
||||
);
|
||||
|
||||
drop(lock_file);
|
||||
refresh_task
|
||||
.await
|
||||
.context("startup refresh task should join")?
|
||||
.context("managed ChatGPT refresh should resume after the lock is released")?;
|
||||
|
||||
let stored = ctx.load_auth()?;
|
||||
let tokens = stored.tokens.as_ref().context("tokens should exist")?;
|
||||
assert_eq!(tokens.access_token, "new-access-token");
|
||||
assert_eq!(tokens.refresh_token, "new-refresh-token");
|
||||
server.verify().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[serial_test::serial(auth_refresh)]
|
||||
#[tokio::test]
|
||||
async fn refresh_token_does_not_wait_while_startup_refresh_lock_is_held() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/oauth/token"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"access_token": "new-access-token",
|
||||
"refresh_token": "new-refresh-token"
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let ctx = RefreshTokenTestContext::new(&server).await?;
|
||||
let initial_last_refresh = Utc::now();
|
||||
let expired_access_token = access_token_with_expiration(Utc::now() - Duration::minutes(1));
|
||||
let initial_tokens = build_tokens(&expired_access_token, INITIAL_REFRESH_TOKEN);
|
||||
ctx.write_auth(&AuthDotJson {
|
||||
auth_mode: Some(AuthMode::Chatgpt),
|
||||
openai_api_key: None,
|
||||
tokens: Some(initial_tokens),
|
||||
last_refresh: Some(initial_last_refresh),
|
||||
agent_identity: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let lock_path = ctx
|
||||
.codex_home
|
||||
.path()
|
||||
.join("chatgpt-access-token-startup-refresh.lock");
|
||||
let lock_file = File::options()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.open(lock_path)?;
|
||||
lock_file.try_lock()?;
|
||||
|
||||
let auth_manager = Arc::clone(&ctx.auth_manager);
|
||||
let startup_refresh_task = tokio::spawn(async move {
|
||||
auth_manager
|
||||
.refresh_managed_chatgpt_token_if_near_expiry()
|
||||
.await
|
||||
});
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
assert!(
|
||||
!startup_refresh_task.is_finished(),
|
||||
"startup refresh should wait while another process holds the file lock"
|
||||
);
|
||||
|
||||
tokio::time::timeout(
|
||||
std::time::Duration::from_secs(1),
|
||||
ctx.auth_manager.refresh_token_from_authority(),
|
||||
)
|
||||
.await
|
||||
.context("normal refresh should not wait for the startup file lock")?
|
||||
.context("normal refresh should succeed")?;
|
||||
|
||||
startup_refresh_task.abort();
|
||||
let _ = startup_refresh_task.await;
|
||||
drop(lock_file);
|
||||
|
||||
let stored = ctx.load_auth()?;
|
||||
let tokens = stored.tokens.as_ref().context("tokens should exist")?;
|
||||
assert_eq!(tokens.access_token, "new-access-token");
|
||||
assert_eq!(tokens.refresh_token, "new-refresh-token");
|
||||
server.verify().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[serial_test::serial(auth_refresh)]
|
||||
#[tokio::test]
|
||||
async fn refresh_token_skips_refresh_when_auth_changed() -> Result<()> {
|
||||
|
||||
@@ -14,15 +14,13 @@ pub struct ResponseDebugContext {
|
||||
pub cf_ray: Option<String>,
|
||||
pub auth_error: Option<String>,
|
||||
pub auth_error_code: Option<String>,
|
||||
pub auth_error_type: Option<String>,
|
||||
}
|
||||
|
||||
pub fn extract_response_debug_context(transport: &TransportError) -> ResponseDebugContext {
|
||||
let mut context = ResponseDebugContext::default();
|
||||
|
||||
let TransportError::Http {
|
||||
headers, body: _, ..
|
||||
} = transport
|
||||
else {
|
||||
let TransportError::Http { headers, body, .. } = transport else {
|
||||
return context;
|
||||
};
|
||||
|
||||
@@ -38,21 +36,46 @@ pub fn extract_response_debug_context(transport: &TransportError) -> ResponseDeb
|
||||
extract_header(REQUEST_ID_HEADER).or_else(|| extract_header(OAI_REQUEST_ID_HEADER));
|
||||
context.cf_ray = extract_header(CF_RAY_HEADER);
|
||||
context.auth_error = extract_header(AUTH_ERROR_HEADER);
|
||||
context.auth_error_code = extract_header(X_ERROR_JSON_HEADER).and_then(|encoded| {
|
||||
let decoded = base64::engine::general_purpose::STANDARD
|
||||
.decode(encoded)
|
||||
.ok()?;
|
||||
let parsed = serde_json::from_slice::<serde_json::Value>(&decoded).ok()?;
|
||||
parsed
|
||||
.get("error")
|
||||
.and_then(|error| error.get("code"))
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(str::to_string)
|
||||
});
|
||||
let header_error = extract_header(X_ERROR_JSON_HEADER)
|
||||
.and_then(|encoded| parse_x_error_json(encoded.as_str()));
|
||||
let body_error = body
|
||||
.as_deref()
|
||||
.and_then(|body| serde_json::from_str::<serde_json::Value>(body).ok());
|
||||
context.auth_error_code = header_error
|
||||
.as_ref()
|
||||
.and_then(|error| extract_error_field(error, "code"))
|
||||
.or_else(|| {
|
||||
body_error
|
||||
.as_ref()
|
||||
.and_then(|error| extract_error_field(error, "code"))
|
||||
});
|
||||
context.auth_error_type = header_error
|
||||
.as_ref()
|
||||
.and_then(|error| extract_error_field(error, "type"))
|
||||
.or_else(|| {
|
||||
body_error
|
||||
.as_ref()
|
||||
.and_then(|error| extract_error_field(error, "type"))
|
||||
});
|
||||
|
||||
context
|
||||
}
|
||||
|
||||
fn parse_x_error_json(encoded: &str) -> Option<serde_json::Value> {
|
||||
let decoded = base64::engine::general_purpose::STANDARD
|
||||
.decode(encoded)
|
||||
.ok()?;
|
||||
serde_json::from_slice::<serde_json::Value>(&decoded).ok()
|
||||
}
|
||||
|
||||
fn extract_error_field(error: &serde_json::Value, field: &str) -> Option<String> {
|
||||
error
|
||||
.get("error")
|
||||
.and_then(|error| error.get(field))
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(str::to_string)
|
||||
}
|
||||
|
||||
pub fn extract_response_debug_context_from_api_error(error: &ApiError) -> ResponseDebugContext {
|
||||
match error {
|
||||
ApiError::Transport(transport) => extract_response_debug_context(transport),
|
||||
@@ -127,6 +150,43 @@ mod tests {
|
||||
cf_ray: Some("ray-auth".to_string()),
|
||||
auth_error: Some("missing_authorization_header".to_string()),
|
||||
auth_error_code: Some("token_expired".to_string()),
|
||||
auth_error_type: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extract_response_debug_context_reads_identity_error_code_from_body() {
|
||||
let context = extract_response_debug_context(&TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
|
||||
headers: None,
|
||||
body: Some(r#"{"error":{"code":"token_revoked"}}"#.to_string()),
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
context,
|
||||
ResponseDebugContext {
|
||||
auth_error_code: Some("token_revoked".to_string()),
|
||||
..ResponseDebugContext::default()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extract_response_debug_context_reads_identity_error_type_from_body() {
|
||||
let context = extract_response_debug_context(&TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
|
||||
headers: None,
|
||||
body: Some(r#"{"error":{"type":"token_invalidated"}}"#.to_string()),
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
context,
|
||||
ResponseDebugContext {
|
||||
auth_error_type: Some("token_invalidated".to_string()),
|
||||
..ResponseDebugContext::default()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadSortKey as AppServerThreadSortKey;
|
||||
use codex_app_server_protocol::ThreadSourceKind;
|
||||
use codex_cloud_requirements::cloud_requirements_loader_for_storage;
|
||||
use codex_cloud_requirements::refresh_managed_chatgpt_token_for_storage_if_near_expiry;
|
||||
use codex_config::CloudRequirementsLoader;
|
||||
use codex_config::ConfigLoadError;
|
||||
use codex_config::LoaderOverrides;
|
||||
@@ -281,7 +282,6 @@ pub use public_widgets::composer_input::ComposerInput;
|
||||
#[cfg(unix)]
|
||||
const AUTO_CONNECT_DAEMON_CONNECT_TIMEOUT: std::time::Duration =
|
||||
std::time::Duration::from_millis(50);
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn start_embedded_app_server(
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
@@ -1168,6 +1168,14 @@ pub async fn run_main(
|
||||
eprintln!("{err}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
refresh_managed_chatgpt_token_for_storage_if_near_expiry(
|
||||
config.codex_home.to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
config.chatgpt_base_url.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
let log_dir = config.log_dir.clone();
|
||||
|
||||
Reference in New Issue
Block a user