From bc91df371c613debcafb41d013f326d5cbe23aca Mon Sep 17 00:00:00 2001 From: xli-oai Date: Tue, 5 May 2026 12:26:44 -0700 Subject: [PATCH] Add workspace announcement polling --- codex-rs/app-server/src/lib.rs | 15 ++++ codex-rs/app-server/src/workspace_messages.rs | 71 +++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 codex-rs/app-server/src/workspace_messages.rs diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index cf6a9e890e..5ab2910177 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -95,6 +95,7 @@ mod server_request_error; mod thread_state; mod thread_status; mod transport; +mod workspace_messages; pub use crate::error_code::INPUT_TOO_LARGE_ERROR_CODE; pub use crate::error_code::INVALID_PARAMS_ERROR_CODE; @@ -684,6 +685,17 @@ pub async fn run_main_with_transport_options( ) .await?; transport_accept_handles.push(remote_control_accept_handle); + let workspace_messages_poll_handle = + config + .features + .enabled(Feature::WorkspaceMessages) + .then(|| { + workspace_messages::spawn_announcement_poller( + auth_manager.clone(), + config.chatgpt_base_url.clone(), + transport_shutdown_token.clone(), + ) + }); let outbound_handle = tokio::spawn(async move { let mut outbound_connections = HashMap::::new(); @@ -1020,6 +1032,9 @@ pub async fn run_main_with_transport_options( let _ = outbound_handle.await; transport_shutdown_token.cancel(); + if let Some(handle) = workspace_messages_poll_handle { + let _ = handle.await; + } for handle in transport_accept_handles { let _ = handle.await; } diff --git a/codex-rs/app-server/src/workspace_messages.rs b/codex-rs/app-server/src/workspace_messages.rs new file mode 100644 index 0000000000..aea3f27607 --- /dev/null +++ b/codex-rs/app-server/src/workspace_messages.rs @@ -0,0 +1,71 @@ +use codex_backend_client::Client as BackendClient; +use codex_login::AuthManager; +use std::sync::Arc; +use std::time::Duration; +use tokio::task::JoinHandle; +use tokio::time::MissedTickBehavior; +use tokio::time::interval; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use tracing::debug; + +const ANNOUNCEMENT_POLL_INTERVAL: Duration = Duration::from_secs(15 * 60); +const ANNOUNCEMENT_FETCH_TIMEOUT: Duration = Duration::from_secs(5); + +pub(crate) fn spawn_announcement_poller( + auth_manager: Arc, + chatgpt_base_url: String, + shutdown_token: CancellationToken, +) -> JoinHandle<()> { + tokio::spawn(async move { + let mut interval = interval(ANNOUNCEMENT_POLL_INTERVAL); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + tokio::select! { + _ = shutdown_token.cancelled() => break, + _ = interval.tick() => { + poll_announcements(&auth_manager, &chatgpt_base_url).await; + } + } + } + }) +} + +async fn poll_announcements(auth_manager: &AuthManager, chatgpt_base_url: &str) { + match timeout( + ANNOUNCEMENT_FETCH_TIMEOUT, + fetch_announcement_count(auth_manager, chatgpt_base_url), + ) + .await + { + Ok(Ok(count)) => { + debug!( + announcement_count = count, + "workspace announcement poll completed" + ); + } + Ok(Err(err)) => { + debug!(?err, "workspace announcement poll failed"); + } + Err(_) => { + debug!("workspace announcement poll timed out"); + } + } +} + +async fn fetch_announcement_count( + auth_manager: &AuthManager, + chatgpt_base_url: &str, +) -> anyhow::Result { + let Some(auth) = auth_manager.auth().await else { + return Ok(0); + }; + if !auth.uses_codex_backend() { + return Ok(0); + } + + let client = BackendClient::from_auth(chatgpt_base_url.to_owned(), &auth)?; + let messages = client.list_workspace_messages().await?; + Ok(messages.announcements().count()) +}