Compare commits

...

7 Commits

Author SHA1 Message Date
xli-oai
24a843dad1 Fetch announcement messages in poller 2026-05-06 02:28:25 -07:00
xli-oai
bc91df371c Add workspace announcement polling 2026-05-06 02:28:25 -07:00
xli-oai
1f1a7fa1df Update config schema for workspace messages 2026-05-05 23:12:41 -07:00
xli-oai
f8892a6362 Remove workspace message TUI prewarm 2026-05-05 23:12:41 -07:00
xli-oai
4ecb4497b2 Remove workspace message created_at field 2026-05-05 23:12:40 -07:00
xli-oai
f9cd5bd631 Gate workspace headline polling 2026-05-05 23:12:40 -07:00
xli-oai
926c68e4f4 Add workspace headline polling client 2026-05-05 23:12:40 -07:00
7 changed files with 190 additions and 0 deletions

View File

@@ -95,6 +95,7 @@ mod server_request_error;
mod thread_state;
mod thread_status;
mod transport;
mod workspace_messages;
pub use crate::error_code::INPUT_TOO_LARGE_ERROR_CODE;
pub use crate::error_code::INVALID_PARAMS_ERROR_CODE;
@@ -684,6 +685,17 @@ pub async fn run_main_with_transport_options(
)
.await?;
transport_accept_handles.push(remote_control_accept_handle);
let workspace_messages_poll_handle =
config
.features
.enabled(Feature::WorkspaceMessages)
.then(|| {
workspace_messages::spawn_announcement_poller(
auth_manager.clone(),
config.chatgpt_base_url.clone(),
transport_shutdown_token.clone(),
)
});
let outbound_handle = tokio::spawn(async move {
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
@@ -1020,6 +1032,9 @@ pub async fn run_main_with_transport_options(
let _ = outbound_handle.await;
transport_shutdown_token.cancel();
if let Some(handle) = workspace_messages_poll_handle {
let _ = handle.await;
}
for handle in transport_accept_handles {
let _ = handle.await;
}

View File

@@ -0,0 +1,73 @@
use codex_backend_client::Client as BackendClient;
use codex_backend_client::CodexWorkspaceMessage;
use codex_login::AuthManager;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;
use tokio::time::interval;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use tracing::debug;
const ANNOUNCEMENT_POLL_INTERVAL: Duration = Duration::from_secs(15 * 60);
const ANNOUNCEMENT_FETCH_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) fn spawn_announcement_poller(
auth_manager: Arc<AuthManager>,
chatgpt_base_url: String,
shutdown_token: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut interval = interval(ANNOUNCEMENT_POLL_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = shutdown_token.cancelled() => break,
_ = interval.tick() => {
poll_announcements(&auth_manager, &chatgpt_base_url).await;
}
}
}
})
}
async fn poll_announcements(auth_manager: &AuthManager, chatgpt_base_url: &str) {
match timeout(
ANNOUNCEMENT_FETCH_TIMEOUT,
fetch_announcements(auth_manager, chatgpt_base_url),
)
.await
{
Ok(Ok(announcements)) => {
debug!(
announcement_count = announcements.len(),
"workspace announcement poll completed"
);
}
Ok(Err(err)) => {
debug!(?err, "workspace announcement poll failed");
}
Err(_) => {
debug!("workspace announcement poll timed out");
}
}
}
async fn fetch_announcements(
auth_manager: &AuthManager,
chatgpt_base_url: &str,
) -> anyhow::Result<Vec<CodexWorkspaceMessage>> {
let Some(auth) = auth_manager.auth().await else {
return Ok(Vec::new());
};
if !auth.uses_codex_backend() {
return Ok(Vec::new());
}
let client = BackendClient::from_auth(chatgpt_base_url.to_owned(), &auth)?;
let messages = client.list_workspace_messages().await?;
// Preserve backend ranking; the API returns workspace messages ordered by created_at.
Ok(messages.announcements().cloned().collect())
}

View File

@@ -1,4 +1,5 @@
use crate::types::CodeTaskDetailsResponse;
use crate::types::CodexWorkspaceMessagesResponse;
use crate::types::ConfigFileResponse;
use crate::types::PaginatedListTaskListItem;
use crate::types::RateLimitReachedKind as BackendRateLimitReachedKind;
@@ -408,6 +409,16 @@ impl Client {
.map_err(RequestError::from)
}
pub async fn list_workspace_messages(
&self,
) -> std::result::Result<CodexWorkspaceMessagesResponse, RequestError> {
let url = self.workspace_messages_url();
let req = self.http.get(&url).headers(self.headers());
let (body, ct) = self.exec_request_detailed(req, "GET", &url).await?;
self.decode_json::<CodexWorkspaceMessagesResponse>(&url, &ct, &body)
.map_err(RequestError::from)
}
/// Create a new task (user turn) by POSTing to the appropriate backend path
/// based on `path_style`. Returns the created task id.
pub async fn create_task(&self, request_body: serde_json::Value) -> Result<String> {
@@ -539,6 +550,13 @@ impl Client {
}
}
fn workspace_messages_url(&self) -> String {
match self.path_style {
PathStyle::CodexApi => format!("{}/api/codex/workspace-messages", self.base_url),
PathStyle::ChatGptApi => format!("{}/wham/workspace-messages", self.base_url),
}
}
fn map_rate_limit_window(
window: Option<Option<Box<crate::types::RateLimitWindowSnapshot>>>,
) -> Option<RateLimitWindow> {
@@ -862,4 +880,35 @@ mod tests {
serde_json::json!({ "credit_type": "usage_limit" })
);
}
#[test]
fn workspace_messages_uses_expected_paths() {
let codex_client = Client {
base_url: "https://example.test".to_string(),
http: reqwest::Client::new(),
auth_provider: codex_model_provider::unauthenticated_auth_provider(),
user_agent: None,
chatgpt_account_id: None,
chatgpt_account_is_fedramp: false,
path_style: PathStyle::CodexApi,
};
assert_eq!(
codex_client.workspace_messages_url(),
"https://example.test/api/codex/workspace-messages"
);
let chatgpt_client = Client {
base_url: "https://chatgpt.com/backend-api".to_string(),
http: reqwest::Client::new(),
auth_provider: codex_model_provider::unauthenticated_auth_provider(),
user_agent: None,
chatgpt_account_id: None,
chatgpt_account_is_fedramp: false,
path_style: PathStyle::ChatGptApi,
};
assert_eq!(
chatgpt_client.workspace_messages_url(),
"https://chatgpt.com/backend-api/wham/workspace-messages"
);
}
}

View File

@@ -6,6 +6,9 @@ pub use client::Client;
pub use client::RequestError;
pub use types::CodeTaskDetailsResponse;
pub use types::CodeTaskDetailsResponseExt;
pub use types::CodexWorkspaceMessage;
pub use types::CodexWorkspaceMessageType;
pub use types::CodexWorkspaceMessagesResponse;
pub use types::ConfigFileResponse;
pub use types::PaginatedListTaskListItem;
pub use types::TaskListItem;

View File

@@ -13,6 +13,42 @@ use serde::de::Deserializer;
use serde_json::Value;
use std::collections::HashMap;
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
pub struct CodexWorkspaceMessagesResponse {
#[serde(default)]
pub messages: Vec<CodexWorkspaceMessage>,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
pub struct CodexWorkspaceMessage {
pub message_id: String,
pub message_type: CodexWorkspaceMessageType,
pub message_body: String,
}
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum CodexWorkspaceMessageType {
Headline,
Announcement,
#[serde(other)]
Unknown,
}
impl CodexWorkspaceMessagesResponse {
pub fn headlines(&self) -> impl Iterator<Item = &CodexWorkspaceMessage> {
self.messages
.iter()
.filter(|message| message.message_type == CodexWorkspaceMessageType::Headline)
}
pub fn announcements(&self) -> impl Iterator<Item = &CodexWorkspaceMessage> {
self.messages
.iter()
.filter(|message| message.message_type == CodexWorkspaceMessageType::Announcement)
}
}
/// Hand-rolled models for the Cloud Tasks task-details response.
/// The generated OpenAPI models are pretty bad. This is a half-step
/// towards hand-rolling them.

View File

@@ -598,6 +598,9 @@
"workspace_dependencies": {
"type": "boolean"
},
"workspace_messages": {
"type": "boolean"
},
"workspace_owner_usage_nudge": {
"type": "boolean"
}
@@ -4147,6 +4150,9 @@
"workspace_dependencies": {
"type": "boolean"
},
"workspace_messages": {
"type": "boolean"
},
"workspace_owner_usage_nudge": {
"type": "boolean"
}

View File

@@ -223,6 +223,8 @@ pub enum Feature {
PreventIdleSleep,
/// Enable workspace-specific owner nudge copy and prompts in the TUI.
WorkspaceOwnerUsageNudge,
/// Enable workspace headline and announcement message polling.
WorkspaceMessages,
/// Legacy rollout flag for Responses API WebSocket transport experiments.
ResponsesWebsockets,
/// Legacy rollout flag for Responses API WebSocket transport v2 experiments.
@@ -1111,6 +1113,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::WorkspaceMessages,
key: "workspace_messages",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::ResponsesWebsockets,
key: "responses_websockets",