From 9f86a0cfa712074eb166ef827abb22dec599397e Mon Sep 17 00:00:00 2001 From: Cooper Gamble Date: Wed, 20 May 2026 23:27:08 +0000 Subject: [PATCH] [codex-core] guard revoked auth retry after output [ci changed_files] --- codex-rs/core/src/client.rs | 5 +- codex-rs/core/src/client_tests.rs | 55 ++++++++++++++++ codex-rs/core/tests/common/responses.rs | 5 ++ codex-rs/core/tests/suite/client.rs | 88 +++++++++++++++++++++++++ 4 files changed, 152 insertions(+), 1 deletion(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 83002d5dab..4b3c63c127 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -1908,6 +1908,7 @@ where err, &mut revoked_auth_recovery, &session_telemetry, + items_added.is_empty(), ) .await; inference_trace_attempt.record_failed( @@ -1945,6 +1946,7 @@ async fn map_response_stream_error( err: ApiError, revoked_auth_recovery: &mut Option, session_telemetry: &SessionTelemetry, + retry_after_auth_recovery_allowed: bool, ) -> CodexErr { match err { ApiError::Transport( @@ -1957,10 +1959,11 @@ async fn map_response_stream_error( match handle_unauthorized(recovery_transport, revoked_auth_recovery, session_telemetry) .await { - Ok(_) => CodexErr::Stream( + Ok(_) if retry_after_auth_recovery_allowed => CodexErr::Stream( WEBSOCKET_AUTH_RECOVERY_RETRY_REASON.to_string(), /*requested_delay*/ None, ), + Ok(_) => map_api_error(ApiError::Transport(transport)), Err(err) => err, } } diff --git a/codex-rs/core/src/client_tests.rs b/codex-rs/core/src/client_tests.rs index 30815757c9..495e20e05c 100644 --- a/codex-rs/core/src/client_tests.rs +++ b/codex-rs/core/src/client_tests.rs @@ -891,6 +891,61 @@ async fn websocket_stream_token_revoked_401_retries_after_loading_replacement_au ); } +#[tokio::test] +async fn websocket_stream_token_revoked_401_after_output_does_not_retry_after_loading_replacement_auth() + { + let (codex_home, manager) = managed_chatgpt_auth_manager("revoked-access-token").await; + write_managed_chatgpt_auth(codex_home.path(), "replacement-access-token"); + let api_stream = futures::stream::iter([ + Ok(ResponseEvent::OutputItemDone(output_message( + "msg-1", + "partial answer", + ))), + Err(ApiError::Transport(TransportError::Http { + status: StatusCode::UNAUTHORIZED, + url: None, + headers: None, + body: Some( + r#"{"type":"error","status":401,"error":{"type":"token_revoked"}}"#.to_string(), + ), + })), + ]); + + let (mut stream, _) = super::map_response_events( + /*upstream_request_id*/ None, + api_stream, + test_session_telemetry(), + InferenceTraceAttempt::disabled(), + Some(manager.unauthorized_recovery()), + ); + assert!(matches!( + stream + .next() + .await + .expect("partial output should be forwarded") + .expect("partial output should stay successful"), + ResponseEvent::OutputItemDone(_) + )); + let err = stream + .next() + .await + .expect("revoked websocket stream should emit an error") + .expect_err("revoked websocket stream should stop after output escapes"); + + assert!( + !super::is_websocket_auth_recovery_retry(&err), + "streams with recorded output must not transparently retry after auth recovery" + ); + assert_eq!( + manager + .auth_cached() + .expect("replacement auth should remain cached") + .get_token() + .expect("replacement token should resolve"), + "replacement-access-token" + ); +} + #[tokio::test] async fn websocket_stream_token_revoked_401_refreshes_external_auth_before_retry() { let codex_home = TempDir::new().expect("external auth tempdir"); diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index c917436b6d..f24e729b46 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -1362,6 +1362,11 @@ pub async fn start_websocket_server_with_headers( if close_after_requests { let _ = ws_stream.close(None).await; + } else if !connections.lock().unwrap().is_empty() { + tokio::select! { + _ = &mut shutdown_rx => return, + _ = ws_stream.next() => {} + } } else { let _ = shutdown_rx.await; return; diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 135ab780bd..db7188e502 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -1156,6 +1156,94 @@ async fn revoked_chatgpt_auth_user_turn_clears_auth_and_requests_relogin() -> an Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn revoked_chatgpt_auth_user_turn_retries_reloaded_auth() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/api/codex/responses")) + .and(header("authorization", "Bearer revoked-access-token")) + .respond_with(ResponseTemplate::new(401).set_body_json(json!({ + "error": { + "code": "token_revoked", + "message": "revoked", + } + }))) + .expect(1) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/api/codex/responses")) + .and(header("authorization", "Bearer replacement-access-token")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("content-type", "text/event-stream") + .set_body_raw( + sse(vec![ev_response_created("resp1"), ev_completed("resp1")]), + "text/event-stream", + ), + ) + .expect(1) + .mount(&server) + .await; + + let codex_home = Arc::new(TempDir::new()?); + let _jwt = write_auth_json( + codex_home.as_ref(), + /*openai_api_key*/ None, + "pro", + "revoked-access-token", + Some("account_id"), + ); + let auth = CodexAuth::from_auth_storage( + codex_home.path(), + AuthCredentialsStoreMode::File, + /*chatgpt_base_url*/ None, + ) + .await? + .expect("managed ChatGPT auth should load"); + + let mut model_provider = built_in_model_providers(/*openai_base_url*/ None)["openai"].clone(); + model_provider.base_url = Some(format!("{}/api/codex", server.uri())); + model_provider.supports_websockets = false; + let mut builder = test_codex() + .with_home(codex_home.clone()) + .with_auth(auth) + .with_config(move |config| { + config.model_provider = model_provider; + }); + let test = builder.build(&server).await?; + + write_auth_json( + codex_home.as_ref(), + /*openai_api_key*/ None, + "pro", + "replacement-access-token", + Some("account_id"), + ); + + test.submit_turn("hello") + .await + .expect("reloaded managed auth should retry the HTTP turn"); + + let persisted_auth = CodexAuth::from_auth_storage( + codex_home.path(), + AuthCredentialsStoreMode::File, + /*chatgpt_base_url*/ None, + ) + .await? + .expect("replacement managed auth should persist"); + assert_eq!( + persisted_auth + .get_token() + .expect("replacement token should resolve"), + "replacement-access-token" + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() { skip_if_no_network!();