From 58be470d159b40cbfd0dce92de18416b4472e16f Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Thu, 21 May 2026 14:38:30 -0700 Subject: [PATCH] fix(remote-control): retry after auth recovery (#23775) ## Why When remote control hits an auth failure such as a revoked or reused refresh token, the websocket loop falls into reconnect backoff. If the user fixes auth while that loop is sleeping, remote control can stay offline until the old retry timer expires because nothing wakes the loop or resets its exhausted auth recovery state. ## What Changed Added an auth-change watch on `AuthManager` for refresh-relevant cached auth updates. The remote-control websocket loop now subscribes to that signal, resets `UnauthorizedRecovery` and reconnect backoff when auth changes, and retries immediately instead of waiting for the previous delay. Updated the remote-control transport test to verify that reloading auth with the now-available account id wakes enrollment before the prior retry delay. ## Verification `cargo test -p codex-app-server-transport remote_control_waits_for_account_id_before_enrolling` --- .../src/transport/remote_control/tests.rs | 7 +- .../src/transport/remote_control/websocket.rs | 146 ++++++++++++++++-- codex-rs/app-server/src/lib.rs | 3 +- codex-rs/login/src/auth/manager.rs | 18 +++ 4 files changed, 153 insertions(+), 21 deletions(-) diff --git a/codex-rs/app-server-transport/src/transport/remote_control/tests.rs b/codex-rs/app-server-transport/src/transport/remote_control/tests.rs index fb1512fedb..6279072713 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/tests.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/tests.rs @@ -1339,7 +1339,7 @@ async fn remote_control_waits_for_account_id_before_enrolling() { installation_id: TEST_INSTALLATION_ID.to_string(), }, Some(state_db.clone()), - auth_manager, + auth_manager.clone(), transport_event_tx, shutdown_token.clone(), /*app_server_client_name_rx*/ None, @@ -1358,8 +1358,11 @@ async fn remote_control_waits_for_account_id_before_enrolling() { AuthCredentialsStoreMode::File, ) .expect("auth with account id should save"); + auth_manager.reload().await; - let enroll_request = accept_http_request(&listener).await; + let enroll_request = timeout(Duration::from_millis(100), accept_http_request(&listener)) + .await + .expect("auth change should wake remote control before the retry delay"); assert_eq!( enroll_request.request_line, "POST /backend-api/wham/remote/control/server/enroll HTTP/1.1" diff --git a/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs b/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs index f117aec3b4..74ef9c2301 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs @@ -226,6 +226,7 @@ pub(crate) struct RemoteControlWebsocket { reconnect_attempt: u64, enrollment: Option, auth_recovery: UnauthorizedRecovery, + auth_change_rx: watch::Receiver, client_tracker: Arc>, state: Arc>, server_event_rx: Arc>>, @@ -240,6 +241,12 @@ pub(crate) struct RemoteControlWebsocketConfig { pub(crate) server_name: String, } +pub(super) struct RemoteControlAuthContext<'a> { + auth_manager: &'a Arc, + auth_recovery: &'a mut UnauthorizedRecovery, + auth_change_rx: &'a mut watch::Receiver, +} + enum ConnectOutcome { Connected(Box>>), Disabled, @@ -321,6 +328,7 @@ impl RemoteControlWebsocket { ); let (outbound_buffer, used_rx) = BoundedOutboundBuffer::new(); let auth_recovery = auth_manager.unauthorized_recovery(); + let auth_change_rx = auth_manager.auth_change_receiver(); Self { remote_control_url: config.remote_control_url, @@ -334,6 +342,7 @@ impl RemoteControlWebsocket { reconnect_attempt: 0, enrollment: None, auth_recovery, + auth_change_rx, client_tracker: Arc::new(Mutex::new(client_tracker)), state: Arc::new(Mutex::new(WebsocketState { outbound_buffer, @@ -457,6 +466,11 @@ impl RemoteControlWebsocket { subscribe_cursor: subscribe_cursor.as_deref(), app_server_client_name, }; + let auth_context = RemoteControlAuthContext { + auth_manager: &self.auth_manager, + auth_recovery: &mut self.auth_recovery, + auth_change_rx: &mut self.auth_change_rx, + }; let connect_result = tokio::select! { _ = shutdown_token.cancelled() => return ConnectOutcome::Shutdown, changed = self.enabled_rx.wait_for(|enabled| !*enabled) => { @@ -468,8 +482,7 @@ impl RemoteControlWebsocket { connect_result = connect_remote_control_websocket( &remote_control_target, self.state_db.as_deref(), - &self.auth_manager, - &mut self.auth_recovery, + auth_context, &mut self.enrollment, connect_options, &self.status_publisher, @@ -517,6 +530,14 @@ impl RemoteControlWebsocket { } return ConnectOutcome::Disabled; } + changed = self.auth_change_rx.changed() => { + if changed.is_err() { + return ConnectOutcome::Shutdown; + } + self.auth_recovery = self.auth_manager.unauthorized_recovery(); + self.reconnect_attempt = 0; + info!("retrying app-server remote control websocket after auth changed"); + } _ = tokio::time::sleep(reconnect_delay) => {} } } @@ -1018,8 +1039,7 @@ pub(crate) async fn load_remote_control_auth( pub(super) async fn connect_remote_control_websocket( remote_control_target: &RemoteControlTarget, state_db: Option<&StateRuntime>, - auth_manager: &Arc, - auth_recovery: &mut UnauthorizedRecovery, + auth_context: RemoteControlAuthContext<'_>, enrollment: &mut Option, connect_options: RemoteControlConnectOptions<'_>, status_publisher: &RemoteControlStatusPublisher, @@ -1028,6 +1048,11 @@ pub(super) async fn connect_remote_control_websocket( tungstenite::http::Response<()>, )> { ensure_rustls_crypto_provider(); + let RemoteControlAuthContext { + auth_manager, + auth_recovery, + auth_change_rx, + } = auth_context; let Some(state_db) = state_db else { *enrollment = None; @@ -1098,7 +1123,7 @@ pub(super) async fn connect_remote_control_websocket( Ok(new_enrollment) => new_enrollment, Err(err) if err.kind() == ErrorKind::PermissionDenied - && recover_remote_control_auth(auth_recovery).await => + && recover_remote_control_auth(auth_recovery, auth_change_rx).await => { return Err(io::Error::other(format!( "{err}; retrying after auth recovery" @@ -1172,7 +1197,7 @@ pub(super) async fn connect_remote_control_websocket( tungstenite::Error::Http(response) if matches!(response.status().as_u16(), 401 | 403) => { - if recover_remote_control_auth(auth_recovery).await { + if recover_remote_control_auth(auth_recovery, auth_change_rx).await { return Err(io::Error::other(format!( "remote control websocket auth failed with HTTP {}; retrying after auth recovery", response.status() @@ -1191,15 +1216,25 @@ pub(super) async fn connect_remote_control_websocket( } } -async fn recover_remote_control_auth(auth_recovery: &mut UnauthorizedRecovery) -> bool { +async fn recover_remote_control_auth( + auth_recovery: &mut UnauthorizedRecovery, + auth_change_rx: &mut watch::Receiver, +) -> bool { if !auth_recovery.has_next() { return false; } let mode = auth_recovery.mode_name(); let step = auth_recovery.step_name(); + let auth_change_revision_before_recovery = *auth_change_rx.borrow(); match auth_recovery.next().await { Ok(step_result) => { + if step_result.auth_state_changed() == Some(true) { + mark_recovery_auth_change_seen( + auth_change_rx, + auth_change_revision_before_recovery, + ); + } info!( "remote control websocket auth recovery succeeded: mode={mode}, step={step}, auth_state_changed={:?}", step_result.auth_state_changed() @@ -1213,6 +1248,20 @@ async fn recover_remote_control_auth(auth_recovery: &mut UnauthorizedRecovery) - } } +fn mark_recovery_auth_change_seen( + auth_change_rx: &mut watch::Receiver, + auth_change_revision_before_recovery: u64, +) { + let auth_change_revision_after_recovery = *auth_change_rx.borrow(); + if auth_change_revision_after_recovery == auth_change_revision_before_recovery.wrapping_add(1) { + // Recovery updated the same watch that wakes the outer reconnect + // loop. Mark only that single revision seen; if more revisions + // arrived while recovery was in flight, leave them pending so the + // reconnect loop still reacts to the later external auth change. + auth_change_rx.borrow_and_update(); + } +} + fn format_remote_control_websocket_connect_error( websocket_url: &str, err: &tungstenite::Error, @@ -1290,6 +1339,37 @@ mod tests { (RemoteControlStatusPublisher::new(status_tx), status_rx) } + #[test] + fn mark_recovery_auth_change_seen_marks_only_recovery_revision_seen() { + let (auth_change_tx, mut auth_change_rx) = watch::channel(0u64); + let auth_change_revision_before_recovery = *auth_change_rx.borrow(); + auth_change_tx.send_modify(|revision| *revision += 1); + + mark_recovery_auth_change_seen(&mut auth_change_rx, auth_change_revision_before_recovery); + + assert!( + !auth_change_rx + .has_changed() + .expect("auth change watch should remain open") + ); + } + + #[test] + fn mark_recovery_auth_change_seen_preserves_racing_auth_change() { + let (auth_change_tx, mut auth_change_rx) = watch::channel(0u64); + let auth_change_revision_before_recovery = *auth_change_rx.borrow(); + auth_change_tx.send_modify(|revision| *revision += 1); + auth_change_tx.send_modify(|revision| *revision += 1); + + mark_recovery_auth_change_seen(&mut auth_change_rx, auth_change_revision_before_recovery); + + assert!( + auth_change_rx + .has_changed() + .expect("auth change watch should remain open") + ); + } + async fn remote_control_state_runtime(codex_home: &TempDir) -> Arc { StateRuntime::init(codex_home.path().to_path_buf(), "test-provider".to_string()) .await @@ -1375,6 +1455,7 @@ mod tests { let state_db = remote_control_state_runtime(&codex_home).await; let auth_manager = remote_control_auth_manager(); let mut auth_recovery = auth_manager.unauthorized_recovery(); + let mut auth_change_rx = auth_manager.auth_change_receiver(); let mut enrollment = Some(RemoteControlEnrollment { account_id: "account_id".to_string(), environment_id: "env_test".to_string(), @@ -1386,8 +1467,11 @@ mod tests { let err = match connect_remote_control_websocket( &remote_control_target, Some(state_db.as_ref()), - &auth_manager, - &mut auth_recovery, + RemoteControlAuthContext { + auth_manager: &auth_manager, + auth_recovery: &mut auth_recovery, + auth_change_rx: &mut auth_change_rx, + }, &mut enrollment, RemoteControlConnectOptions { installation_id: TEST_INSTALLATION_ID, @@ -1440,6 +1524,7 @@ mod tests { ) .await; let mut auth_recovery = auth_manager.unauthorized_recovery(); + let mut auth_change_rx = auth_manager.auth_change_receiver(); let mut enrollment = Some(RemoteControlEnrollment { account_id: "account_id".to_string(), environment_id: "env_test".to_string(), @@ -1466,8 +1551,11 @@ mod tests { let err = connect_remote_control_websocket( &remote_control_target, Some(state_db.as_ref()), - &auth_manager, - &mut auth_recovery, + RemoteControlAuthContext { + auth_manager: &auth_manager, + auth_recovery: &mut auth_recovery, + auth_change_rx: &mut auth_change_rx, + }, &mut enrollment, RemoteControlConnectOptions { installation_id: TEST_INSTALLATION_ID, @@ -1503,6 +1591,12 @@ mod tests { .expect("token should be readable"), "fresh-token" ); + assert!( + !auth_change_rx + .has_changed() + .expect("auth change watch should remain open"), + "recovery's own auth reload should not wake the reconnect loop" + ); } #[tokio::test] @@ -1538,6 +1632,7 @@ mod tests { ) .await; let mut auth_recovery = auth_manager.unauthorized_recovery(); + let mut auth_change_rx = auth_manager.auth_change_receiver(); let mut enrollment = None; let (status_publisher, status_rx) = remote_control_status_channel(); save_auth( @@ -1550,8 +1645,11 @@ mod tests { let err = connect_remote_control_websocket( &remote_control_target, Some(state_db.as_ref()), - &auth_manager, - &mut auth_recovery, + RemoteControlAuthContext { + auth_manager: &auth_manager, + auth_recovery: &mut auth_recovery, + auth_change_rx: &mut auth_change_rx, + }, &mut enrollment, RemoteControlConnectOptions { installation_id: TEST_INSTALLATION_ID, @@ -1585,6 +1683,12 @@ mod tests { .expect("token should be readable"), "fresh-token" ); + assert!( + !auth_change_rx + .has_changed() + .expect("auth change watch should remain open"), + "recovery's own auth reload should not wake the reconnect loop" + ); } #[tokio::test] @@ -1593,6 +1697,7 @@ mod tests { .expect("target should parse"); let auth_manager = remote_control_auth_manager(); let mut auth_recovery = auth_manager.unauthorized_recovery(); + let mut auth_change_rx = auth_manager.auth_change_receiver(); let mut enrollment = Some(RemoteControlEnrollment { account_id: "account_id".to_string(), environment_id: "env_test".to_string(), @@ -1604,8 +1709,11 @@ mod tests { let err = connect_remote_control_websocket( &remote_control_target, /*state_db*/ None, - &auth_manager, - &mut auth_recovery, + RemoteControlAuthContext { + auth_manager: &auth_manager, + auth_recovery: &mut auth_recovery, + auth_change_rx: &mut auth_change_rx, + }, &mut enrollment, RemoteControlConnectOptions { installation_id: TEST_INSTALLATION_ID, @@ -1637,6 +1745,7 @@ mod tests { ) .await; let mut auth_recovery = auth_manager.unauthorized_recovery(); + let mut auth_change_rx = auth_manager.auth_change_receiver(); let mut enrollment = Some(RemoteControlEnrollment { account_id: "account_id".to_string(), environment_id: "env_test".to_string(), @@ -1653,8 +1762,11 @@ mod tests { let err = connect_remote_control_websocket( &remote_control_target, Some(state_db.as_ref()), - &auth_manager, - &mut auth_recovery, + RemoteControlAuthContext { + auth_manager: &auth_manager, + auth_recovery: &mut auth_recovery, + auth_change_rx: &mut auth_change_rx, + }, &mut enrollment, RemoteControlConnectOptions { installation_id: TEST_INSTALLATION_ID, diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index ef4106aa14..80305d2d9f 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -786,8 +786,7 @@ pub async fn run_main_with_transport_options( }); let processor_handle = tokio::spawn({ - let auth_manager = - AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await; + let auth_manager = Arc::clone(&auth_manager); let analytics_events_client = analytics_events_client_from_config(Arc::clone(&auth_manager), &config); let outgoing_message_sender = Arc::new(OutgoingMessageSender::new( diff --git a/codex-rs/login/src/auth/manager.rs b/codex-rs/login/src/auth/manager.rs index a2e4e8e0d8..d94c48165d 100644 --- a/codex-rs/login/src/auth/manager.rs +++ b/codex-rs/login/src/auth/manager.rs @@ -15,6 +15,7 @@ use std::sync::RwLock; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use tokio::sync::Semaphore; +use tokio::sync::watch; use codex_agent_identity::decode_agent_identity_jwt; use codex_agent_identity::fetch_agent_identity_jwks; @@ -1252,6 +1253,7 @@ impl UnauthorizedRecovery { pub struct AuthManager { codex_home: PathBuf, inner: RwLock, + auth_change_tx: watch::Sender, enable_codex_api_key_env: bool, auth_credentials_store_mode: AuthCredentialsStoreMode, forced_chatgpt_workspace_id: RwLock>>, @@ -1320,12 +1322,14 @@ impl AuthManager { .await .ok() .flatten(); + let (auth_change_tx, _auth_change_rx) = watch::channel(0); Self { codex_home, inner: RwLock::new(CachedAuth { auth: managed_auth, permanent_refresh_failure: None, }), + auth_change_tx, enable_codex_api_key_env, auth_credentials_store_mode, forced_chatgpt_workspace_id: RwLock::new(None), @@ -1341,10 +1345,12 @@ impl AuthManager { auth: Some(auth), permanent_refresh_failure: None, }; + let (auth_change_tx, _auth_change_rx) = watch::channel(0); Arc::new(Self { codex_home: PathBuf::from("non-existent"), inner: RwLock::new(cached), + auth_change_tx, enable_codex_api_key_env: false, auth_credentials_store_mode: AuthCredentialsStoreMode::File, forced_chatgpt_workspace_id: RwLock::new(None), @@ -1360,9 +1366,11 @@ impl AuthManager { auth: Some(auth), permanent_refresh_failure: None, }; + let (auth_change_tx, _auth_change_rx) = watch::channel(0); Arc::new(Self { codex_home, inner: RwLock::new(cached), + auth_change_tx, enable_codex_api_key_env: false, auth_credentials_store_mode: AuthCredentialsStoreMode::File, forced_chatgpt_workspace_id: RwLock::new(None), @@ -1373,12 +1381,14 @@ impl AuthManager { } pub fn external_bearer_only(config: ModelProviderAuthInfo) -> Arc { + let (auth_change_tx, _auth_change_rx) = watch::channel(0); Arc::new(Self { codex_home: PathBuf::from("non-existent"), inner: RwLock::new(CachedAuth { auth: None, permanent_refresh_failure: None, }), + auth_change_tx, enable_codex_api_key_env: false, auth_credentials_store_mode: AuthCredentialsStoreMode::File, forced_chatgpt_workspace_id: RwLock::new(None), @@ -1395,6 +1405,11 @@ impl AuthManager { self.inner.read().ok().and_then(|c| c.auth.clone()) } + /// Subscribes to cached auth changes that can affect request recovery. + pub fn auth_change_receiver(&self) -> watch::Receiver { + self.auth_change_tx.subscribe() + } + pub fn refresh_failure_for_auth(&self, auth: &CodexAuth) -> Option { self.inner.read().ok().and_then(|cached| { cached @@ -1537,6 +1552,9 @@ impl AuthManager { } tracing::info!("Reloaded auth, changed: {changed}"); guard.auth = new_auth; + if auth_changed_for_refresh { + self.auth_change_tx.send_modify(|revision| *revision += 1); + } changed } else { false