diff --git a/codex-rs/app-server-transport/src/transport/remote_control/tests.rs b/codex-rs/app-server-transport/src/transport/remote_control/tests.rs index fb1512fedb..6279072713 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/tests.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/tests.rs @@ -1339,7 +1339,7 @@ async fn remote_control_waits_for_account_id_before_enrolling() { installation_id: TEST_INSTALLATION_ID.to_string(), }, Some(state_db.clone()), - auth_manager, + auth_manager.clone(), transport_event_tx, shutdown_token.clone(), /*app_server_client_name_rx*/ None, @@ -1358,8 +1358,11 @@ async fn remote_control_waits_for_account_id_before_enrolling() { AuthCredentialsStoreMode::File, ) .expect("auth with account id should save"); + auth_manager.reload().await; - let enroll_request = accept_http_request(&listener).await; + let enroll_request = timeout(Duration::from_millis(100), accept_http_request(&listener)) + .await + .expect("auth change should wake remote control before the retry delay"); assert_eq!( enroll_request.request_line, "POST /backend-api/wham/remote/control/server/enroll HTTP/1.1" diff --git a/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs b/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs index f117aec3b4..74ef9c2301 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs @@ -226,6 +226,7 @@ pub(crate) struct RemoteControlWebsocket { reconnect_attempt: u64, enrollment: Option, auth_recovery: UnauthorizedRecovery, + auth_change_rx: watch::Receiver, client_tracker: Arc>, state: Arc>, server_event_rx: Arc>>, @@ -240,6 +241,12 @@ pub(crate) struct RemoteControlWebsocketConfig { pub(crate) server_name: String, } +pub(super) struct RemoteControlAuthContext<'a> { + auth_manager: &'a Arc, + auth_recovery: &'a mut UnauthorizedRecovery, + auth_change_rx: &'a mut watch::Receiver, +} + enum ConnectOutcome { Connected(Box>>), Disabled, @@ -321,6 +328,7 @@ impl RemoteControlWebsocket { ); let (outbound_buffer, used_rx) = BoundedOutboundBuffer::new(); let auth_recovery = auth_manager.unauthorized_recovery(); + let auth_change_rx = auth_manager.auth_change_receiver(); Self { remote_control_url: config.remote_control_url, @@ -334,6 +342,7 @@ impl RemoteControlWebsocket { reconnect_attempt: 0, enrollment: None, auth_recovery, + auth_change_rx, client_tracker: Arc::new(Mutex::new(client_tracker)), state: Arc::new(Mutex::new(WebsocketState { outbound_buffer, @@ -457,6 +466,11 @@ impl RemoteControlWebsocket { subscribe_cursor: subscribe_cursor.as_deref(), app_server_client_name, }; + let auth_context = RemoteControlAuthContext { + auth_manager: &self.auth_manager, + auth_recovery: &mut self.auth_recovery, + auth_change_rx: &mut self.auth_change_rx, + }; let connect_result = tokio::select! { _ = shutdown_token.cancelled() => return ConnectOutcome::Shutdown, changed = self.enabled_rx.wait_for(|enabled| !*enabled) => { @@ -468,8 +482,7 @@ impl RemoteControlWebsocket { connect_result = connect_remote_control_websocket( &remote_control_target, self.state_db.as_deref(), - &self.auth_manager, - &mut self.auth_recovery, + auth_context, &mut self.enrollment, connect_options, &self.status_publisher, @@ -517,6 +530,14 @@ impl RemoteControlWebsocket { } return ConnectOutcome::Disabled; } + changed = self.auth_change_rx.changed() => { + if changed.is_err() { + return ConnectOutcome::Shutdown; + } + self.auth_recovery = self.auth_manager.unauthorized_recovery(); + self.reconnect_attempt = 0; + info!("retrying app-server remote control websocket after auth changed"); + } _ = tokio::time::sleep(reconnect_delay) => {} } } @@ -1018,8 +1039,7 @@ pub(crate) async fn load_remote_control_auth( pub(super) async fn connect_remote_control_websocket( remote_control_target: &RemoteControlTarget, state_db: Option<&StateRuntime>, - auth_manager: &Arc, - auth_recovery: &mut UnauthorizedRecovery, + auth_context: RemoteControlAuthContext<'_>, enrollment: &mut Option, connect_options: RemoteControlConnectOptions<'_>, status_publisher: &RemoteControlStatusPublisher, @@ -1028,6 +1048,11 @@ pub(super) async fn connect_remote_control_websocket( tungstenite::http::Response<()>, )> { ensure_rustls_crypto_provider(); + let RemoteControlAuthContext { + auth_manager, + auth_recovery, + auth_change_rx, + } = auth_context; let Some(state_db) = state_db else { *enrollment = None; @@ -1098,7 +1123,7 @@ pub(super) async fn connect_remote_control_websocket( Ok(new_enrollment) => new_enrollment, Err(err) if err.kind() == ErrorKind::PermissionDenied - && recover_remote_control_auth(auth_recovery).await => + && recover_remote_control_auth(auth_recovery, auth_change_rx).await => { return Err(io::Error::other(format!( "{err}; retrying after auth recovery" @@ -1172,7 +1197,7 @@ pub(super) async fn connect_remote_control_websocket( tungstenite::Error::Http(response) if matches!(response.status().as_u16(), 401 | 403) => { - if recover_remote_control_auth(auth_recovery).await { + if recover_remote_control_auth(auth_recovery, auth_change_rx).await { return Err(io::Error::other(format!( "remote control websocket auth failed with HTTP {}; retrying after auth recovery", response.status() @@ -1191,15 +1216,25 @@ pub(super) async fn connect_remote_control_websocket( } } -async fn recover_remote_control_auth(auth_recovery: &mut UnauthorizedRecovery) -> bool { +async fn recover_remote_control_auth( + auth_recovery: &mut UnauthorizedRecovery, + auth_change_rx: &mut watch::Receiver, +) -> bool { if !auth_recovery.has_next() { return false; } let mode = auth_recovery.mode_name(); let step = auth_recovery.step_name(); + let auth_change_revision_before_recovery = *auth_change_rx.borrow(); match auth_recovery.next().await { Ok(step_result) => { + if step_result.auth_state_changed() == Some(true) { + mark_recovery_auth_change_seen( + auth_change_rx, + auth_change_revision_before_recovery, + ); + } info!( "remote control websocket auth recovery succeeded: mode={mode}, step={step}, auth_state_changed={:?}", step_result.auth_state_changed() @@ -1213,6 +1248,20 @@ async fn recover_remote_control_auth(auth_recovery: &mut UnauthorizedRecovery) - } } +fn mark_recovery_auth_change_seen( + auth_change_rx: &mut watch::Receiver, + auth_change_revision_before_recovery: u64, +) { + let auth_change_revision_after_recovery = *auth_change_rx.borrow(); + if auth_change_revision_after_recovery == auth_change_revision_before_recovery.wrapping_add(1) { + // Recovery updated the same watch that wakes the outer reconnect + // loop. Mark only that single revision seen; if more revisions + // arrived while recovery was in flight, leave them pending so the + // reconnect loop still reacts to the later external auth change. + auth_change_rx.borrow_and_update(); + } +} + fn format_remote_control_websocket_connect_error( websocket_url: &str, err: &tungstenite::Error, @@ -1290,6 +1339,37 @@ mod tests { (RemoteControlStatusPublisher::new(status_tx), status_rx) } + #[test] + fn mark_recovery_auth_change_seen_marks_only_recovery_revision_seen() { + let (auth_change_tx, mut auth_change_rx) = watch::channel(0u64); + let auth_change_revision_before_recovery = *auth_change_rx.borrow(); + auth_change_tx.send_modify(|revision| *revision += 1); + + mark_recovery_auth_change_seen(&mut auth_change_rx, auth_change_revision_before_recovery); + + assert!( + !auth_change_rx + .has_changed() + .expect("auth change watch should remain open") + ); + } + + #[test] + fn mark_recovery_auth_change_seen_preserves_racing_auth_change() { + let (auth_change_tx, mut auth_change_rx) = watch::channel(0u64); + let auth_change_revision_before_recovery = *auth_change_rx.borrow(); + auth_change_tx.send_modify(|revision| *revision += 1); + auth_change_tx.send_modify(|revision| *revision += 1); + + mark_recovery_auth_change_seen(&mut auth_change_rx, auth_change_revision_before_recovery); + + assert!( + auth_change_rx + .has_changed() + .expect("auth change watch should remain open") + ); + } + async fn remote_control_state_runtime(codex_home: &TempDir) -> Arc { StateRuntime::init(codex_home.path().to_path_buf(), "test-provider".to_string()) .await @@ -1375,6 +1455,7 @@ mod tests { let state_db = remote_control_state_runtime(&codex_home).await; let auth_manager = remote_control_auth_manager(); let mut auth_recovery = auth_manager.unauthorized_recovery(); + let mut auth_change_rx = auth_manager.auth_change_receiver(); let mut enrollment = Some(RemoteControlEnrollment { account_id: "account_id".to_string(), environment_id: "env_test".to_string(), @@ -1386,8 +1467,11 @@ mod tests { let err = match connect_remote_control_websocket( &remote_control_target, Some(state_db.as_ref()), - &auth_manager, - &mut auth_recovery, + RemoteControlAuthContext { + auth_manager: &auth_manager, + auth_recovery: &mut auth_recovery, + auth_change_rx: &mut auth_change_rx, + }, &mut enrollment, RemoteControlConnectOptions { installation_id: TEST_INSTALLATION_ID, @@ -1440,6 +1524,7 @@ mod tests { ) .await; let mut auth_recovery = auth_manager.unauthorized_recovery(); + let mut auth_change_rx = auth_manager.auth_change_receiver(); let mut enrollment = Some(RemoteControlEnrollment { account_id: "account_id".to_string(), environment_id: "env_test".to_string(), @@ -1466,8 +1551,11 @@ mod tests { let err = connect_remote_control_websocket( &remote_control_target, Some(state_db.as_ref()), - &auth_manager, - &mut auth_recovery, + RemoteControlAuthContext { + auth_manager: &auth_manager, + auth_recovery: &mut auth_recovery, + auth_change_rx: &mut auth_change_rx, + }, &mut enrollment, RemoteControlConnectOptions { installation_id: TEST_INSTALLATION_ID, @@ -1503,6 +1591,12 @@ mod tests { .expect("token should be readable"), "fresh-token" ); + assert!( + !auth_change_rx + .has_changed() + .expect("auth change watch should remain open"), + "recovery's own auth reload should not wake the reconnect loop" + ); } #[tokio::test] @@ -1538,6 +1632,7 @@ mod tests { ) .await; let mut auth_recovery = auth_manager.unauthorized_recovery(); + let mut auth_change_rx = auth_manager.auth_change_receiver(); let mut enrollment = None; let (status_publisher, status_rx) = remote_control_status_channel(); save_auth( @@ -1550,8 +1645,11 @@ mod tests { let err = connect_remote_control_websocket( &remote_control_target, Some(state_db.as_ref()), - &auth_manager, - &mut auth_recovery, + RemoteControlAuthContext { + auth_manager: &auth_manager, + auth_recovery: &mut auth_recovery, + auth_change_rx: &mut auth_change_rx, + }, &mut enrollment, RemoteControlConnectOptions { installation_id: TEST_INSTALLATION_ID, @@ -1585,6 +1683,12 @@ mod tests { .expect("token should be readable"), "fresh-token" ); + assert!( + !auth_change_rx + .has_changed() + .expect("auth change watch should remain open"), + "recovery's own auth reload should not wake the reconnect loop" + ); } #[tokio::test] @@ -1593,6 +1697,7 @@ mod tests { .expect("target should parse"); let auth_manager = remote_control_auth_manager(); let mut auth_recovery = auth_manager.unauthorized_recovery(); + let mut auth_change_rx = auth_manager.auth_change_receiver(); let mut enrollment = Some(RemoteControlEnrollment { account_id: "account_id".to_string(), environment_id: "env_test".to_string(), @@ -1604,8 +1709,11 @@ mod tests { let err = connect_remote_control_websocket( &remote_control_target, /*state_db*/ None, - &auth_manager, - &mut auth_recovery, + RemoteControlAuthContext { + auth_manager: &auth_manager, + auth_recovery: &mut auth_recovery, + auth_change_rx: &mut auth_change_rx, + }, &mut enrollment, RemoteControlConnectOptions { installation_id: TEST_INSTALLATION_ID, @@ -1637,6 +1745,7 @@ mod tests { ) .await; let mut auth_recovery = auth_manager.unauthorized_recovery(); + let mut auth_change_rx = auth_manager.auth_change_receiver(); let mut enrollment = Some(RemoteControlEnrollment { account_id: "account_id".to_string(), environment_id: "env_test".to_string(), @@ -1653,8 +1762,11 @@ mod tests { let err = connect_remote_control_websocket( &remote_control_target, Some(state_db.as_ref()), - &auth_manager, - &mut auth_recovery, + RemoteControlAuthContext { + auth_manager: &auth_manager, + auth_recovery: &mut auth_recovery, + auth_change_rx: &mut auth_change_rx, + }, &mut enrollment, RemoteControlConnectOptions { installation_id: TEST_INSTALLATION_ID, diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index ef4106aa14..80305d2d9f 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -786,8 +786,7 @@ pub async fn run_main_with_transport_options( }); let processor_handle = tokio::spawn({ - let auth_manager = - AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await; + let auth_manager = Arc::clone(&auth_manager); let analytics_events_client = analytics_events_client_from_config(Arc::clone(&auth_manager), &config); let outgoing_message_sender = Arc::new(OutgoingMessageSender::new( diff --git a/codex-rs/login/src/auth/manager.rs b/codex-rs/login/src/auth/manager.rs index a2e4e8e0d8..d94c48165d 100644 --- a/codex-rs/login/src/auth/manager.rs +++ b/codex-rs/login/src/auth/manager.rs @@ -15,6 +15,7 @@ use std::sync::RwLock; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use tokio::sync::Semaphore; +use tokio::sync::watch; use codex_agent_identity::decode_agent_identity_jwt; use codex_agent_identity::fetch_agent_identity_jwks; @@ -1252,6 +1253,7 @@ impl UnauthorizedRecovery { pub struct AuthManager { codex_home: PathBuf, inner: RwLock, + auth_change_tx: watch::Sender, enable_codex_api_key_env: bool, auth_credentials_store_mode: AuthCredentialsStoreMode, forced_chatgpt_workspace_id: RwLock>>, @@ -1320,12 +1322,14 @@ impl AuthManager { .await .ok() .flatten(); + let (auth_change_tx, _auth_change_rx) = watch::channel(0); Self { codex_home, inner: RwLock::new(CachedAuth { auth: managed_auth, permanent_refresh_failure: None, }), + auth_change_tx, enable_codex_api_key_env, auth_credentials_store_mode, forced_chatgpt_workspace_id: RwLock::new(None), @@ -1341,10 +1345,12 @@ impl AuthManager { auth: Some(auth), permanent_refresh_failure: None, }; + let (auth_change_tx, _auth_change_rx) = watch::channel(0); Arc::new(Self { codex_home: PathBuf::from("non-existent"), inner: RwLock::new(cached), + auth_change_tx, enable_codex_api_key_env: false, auth_credentials_store_mode: AuthCredentialsStoreMode::File, forced_chatgpt_workspace_id: RwLock::new(None), @@ -1360,9 +1366,11 @@ impl AuthManager { auth: Some(auth), permanent_refresh_failure: None, }; + let (auth_change_tx, _auth_change_rx) = watch::channel(0); Arc::new(Self { codex_home, inner: RwLock::new(cached), + auth_change_tx, enable_codex_api_key_env: false, auth_credentials_store_mode: AuthCredentialsStoreMode::File, forced_chatgpt_workspace_id: RwLock::new(None), @@ -1373,12 +1381,14 @@ impl AuthManager { } pub fn external_bearer_only(config: ModelProviderAuthInfo) -> Arc { + let (auth_change_tx, _auth_change_rx) = watch::channel(0); Arc::new(Self { codex_home: PathBuf::from("non-existent"), inner: RwLock::new(CachedAuth { auth: None, permanent_refresh_failure: None, }), + auth_change_tx, enable_codex_api_key_env: false, auth_credentials_store_mode: AuthCredentialsStoreMode::File, forced_chatgpt_workspace_id: RwLock::new(None), @@ -1395,6 +1405,11 @@ impl AuthManager { self.inner.read().ok().and_then(|c| c.auth.clone()) } + /// Subscribes to cached auth changes that can affect request recovery. + pub fn auth_change_receiver(&self) -> watch::Receiver { + self.auth_change_tx.subscribe() + } + pub fn refresh_failure_for_auth(&self, auth: &CodexAuth) -> Option { self.inner.read().ok().and_then(|cached| { cached @@ -1537,6 +1552,9 @@ impl AuthManager { } tracing::info!("Reloaded auth, changed: {changed}"); guard.auth = new_auth; + if auth_changed_for_refresh { + self.auth_change_tx.send_modify(|revision| *revision += 1); + } changed } else { false