Compare commits

..

2 Commits

Author SHA1 Message Date
Ahmed Ibrahim
934ef25b97 codex: fix CI failure on PR #17676
Co-authored-by: Codex <noreply@openai.com>
2026-04-13 14:05:21 -07:00
Ahmed Ibrahim
8a122f64fd Show realtime text in TUI transcript
Request text-only output from realtime v2, render realtime transcript deltas in the TUI transcript, and clean common text response wrappers before display.

Co-authored-by: Codex <noreply@openai.com>
2026-04-13 12:57:57 -07:00
17 changed files with 423 additions and 660 deletions

View File

@@ -92,7 +92,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.121.0-alpha.5"
version = "0.0.0"
# Track the edition for all workspace crates in one place. Individual
# crates can still override this value, but keeping it here means new
# crates created with `cargo new -w ...` automatically inherit the 2024

View File

@@ -143,7 +143,7 @@ Example with notification opt-out:
- `thread/memoryMode/set` — experimental; set a threads persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success.
- `thread/status/changed` — notification emitted when a loaded threads status changes (`threadId` + new `status`).
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success and emits `thread/archived`.
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server keeps the thread loaded and unloads it only after it has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed`.
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server shuts down and unloads the thread, then emits `thread/closed`.
- `thread/name/set` — set or update a threads user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success and emits `thread/name/updated` to initialized, opted-in clients. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`.
- `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications.
@@ -338,16 +338,11 @@ When `nextCursor` is `null`, youve reached the final page.
- `notSubscribed` when the connection was not subscribed to that thread.
- `notLoaded` when the thread is not loaded.
If this was the last subscriber, the server does not unload the thread immediately. It unloads the thread after the thread has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed` and a `thread/status/changed` transition to `notLoaded`.
If this was the last subscriber, the server unloads the thread and emits `thread/closed` and a `thread/status/changed` transition to `notLoaded`.
```json
{ "method": "thread/unsubscribe", "id": 22, "params": { "threadId": "thr_123" } }
{ "id": 22, "result": { "status": "unsubscribed" } }
```
Later, after the idle unload timeout:
```json
{ "method": "thread/status/changed", "params": {
"threadId": "thr_123",
"status": { "type": "notLoaded" }

View File

@@ -329,7 +329,6 @@ use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use tokio::sync::Mutex;
use tokio::sync::broadcast;
@@ -372,7 +371,6 @@ struct ThreadListFilters {
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
const LOGIN_ISSUER_OVERRIDE_ENV_VAR: &str = "CODEX_APP_SERVER_LOGIN_ISSUER";
const APP_LIST_LOAD_TIMEOUT: Duration = Duration::from_secs(90);
const THREAD_UNLOADING_DELAY: Duration = Duration::from_secs(30 * 60);
enum ActiveLogin {
Browser {
@@ -462,7 +460,6 @@ struct ListenerTaskContext {
thread_manager: Arc<ThreadManager>,
thread_state_manager: ThreadStateManager,
outgoing: Arc<OutgoingMessageSender>,
pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
analytics_events_client: AnalyticsEventsClient,
general_analytics_enabled: bool,
thread_watch_manager: ThreadWatchManager,
@@ -483,110 +480,6 @@ enum RefreshTokenRequestOutcome {
FailedPermanently,
}
struct UnloadingState {
delay: Duration,
has_subscribers_rx: watch::Receiver<bool>,
has_subscribers: (bool, Instant),
thread_status_rx: watch::Receiver<ThreadStatus>,
is_active: (bool, Instant),
}
impl UnloadingState {
async fn new(
listener_task_context: &ListenerTaskContext,
thread_id: ThreadId,
delay: Duration,
) -> Option<Self> {
let has_subscribers_rx = listener_task_context
.thread_state_manager
.subscribe_to_has_connections(thread_id)
.await?;
let thread_status_rx = listener_task_context
.thread_watch_manager
.subscribe(thread_id)
.await?;
let has_subscribers = (*has_subscribers_rx.borrow(), Instant::now());
let is_active = (
matches!(*thread_status_rx.borrow(), ThreadStatus::Active { .. }),
Instant::now(),
);
Some(Self {
delay,
has_subscribers_rx,
thread_status_rx,
has_subscribers,
is_active,
})
}
fn unloading_target(&self) -> Option<Instant> {
match (self.has_subscribers, self.is_active) {
((false, has_no_subscribers_since), (false, is_inactive_since)) => {
Some(std::cmp::max(has_no_subscribers_since, is_inactive_since) + self.delay)
}
_ => None,
}
}
fn sync_receiver_values(&mut self) {
let has_subscribers = *self.has_subscribers_rx.borrow();
if self.has_subscribers.0 != has_subscribers {
self.has_subscribers = (has_subscribers, Instant::now());
}
let is_active = matches!(*self.thread_status_rx.borrow(), ThreadStatus::Active { .. });
if self.is_active.0 != is_active {
self.is_active = (is_active, Instant::now());
}
}
fn should_unload_now(&mut self) -> bool {
self.sync_receiver_values();
self.unloading_target()
.is_some_and(|target| target <= Instant::now())
}
fn note_thread_activity_observed(&mut self) {
if !self.is_active.0 {
self.is_active = (false, Instant::now());
}
}
async fn wait_for_unloading_trigger(&mut self) -> bool {
loop {
self.sync_receiver_values();
let unloading_target = self.unloading_target();
if let Some(target) = unloading_target
&& target <= Instant::now()
{
return true;
}
let unloading_sleep = async {
if let Some(target) = unloading_target {
tokio::time::sleep_until(target.into()).await;
} else {
futures::future::pending::<()>().await;
}
};
tokio::select! {
_ = unloading_sleep => return true,
changed = self.has_subscribers_rx.changed() => {
if changed.is_err() {
return false;
}
self.sync_receiver_values();
},
changed = self.thread_status_rx.changed() => {
if changed.is_err() {
return false;
}
self.sync_receiver_values();
},
}
}
}
}
pub(crate) struct CodexMessageProcessorArgs {
pub(crate) auth_manager: Arc<AuthManager>,
pub(crate) thread_manager: Arc<ThreadManager>,
@@ -2256,7 +2149,6 @@ impl CodexMessageProcessor {
thread_manager: Arc::clone(&self.thread_manager),
thread_state_manager: self.thread_state_manager.clone(),
outgoing: Arc::clone(&self.outgoing),
pending_thread_unloads: Arc::clone(&self.pending_thread_unloads),
analytics_events_client: self.analytics_events_client.clone(),
general_analytics_enabled: self.config.features.enabled(Feature::GeneralAnalytics),
thread_watch_manager: self.thread_watch_manager.clone(),
@@ -3992,17 +3884,17 @@ impl CodexMessageProcessor {
self.command_exec_manager
.connection_closed(connection_id)
.await;
let thread_ids = self
let thread_ids_with_no_subscribers = self
.thread_state_manager
.remove_connection(connection_id)
.await;
for thread_id in thread_ids {
if self.thread_manager.get_thread(thread_id).await.is_err() {
// Reconcile stale app-server bookkeeping when the thread has already been
// removed from the core manager.
for thread_id in thread_ids_with_no_subscribers {
let Ok(thread) = self.thread_manager.get_thread(thread_id).await else {
self.finalize_thread_teardown(thread_id).await;
}
continue;
};
self.unload_thread_without_subscribers(thread_id, thread)
.await;
}
}
@@ -4368,18 +4260,13 @@ impl CodexMessageProcessor {
.thread_state_manager
.thread_state(existing_thread_id)
.await;
if let Err(error) = self
.ensure_listener_task_running(
existing_thread_id,
existing_thread.clone(),
thread_state.clone(),
ApiVersion::V2,
)
.await
{
self.outgoing.send_error(request_id, error).await;
return true;
}
self.ensure_listener_task_running(
existing_thread_id,
existing_thread.clone(),
thread_state.clone(),
ApiVersion::V2,
)
.await;
let config_snapshot = existing_thread.config_snapshot().await;
let mismatch_details = collect_resume_override_mismatches(params, &config_snapshot);
@@ -5766,23 +5653,31 @@ impl CodexMessageProcessor {
}
async fn unload_thread_without_subscribers(
thread_manager: Arc<ThreadManager>,
outgoing: Arc<OutgoingMessageSender>,
pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
thread_state_manager: ThreadStateManager,
thread_watch_manager: ThreadWatchManager,
&self,
thread_id: ThreadId,
thread: Arc<CodexThread>,
) {
info!("thread {thread_id} has no subscribers and is idle; shutting down");
// This connection was the last subscriber. Only now do we unload the thread.
info!("thread {thread_id} has no subscribers; shutting down");
let should_start_unload_task = self.pending_thread_unloads.lock().await.insert(thread_id);
// Any pending app-server -> client requests for this thread can no longer be
// answered; cancel their callbacks before shutdown/unload.
outgoing
self.outgoing
.cancel_requests_for_thread(thread_id, /*error*/ None)
.await;
thread_state_manager.remove_thread_state(thread_id).await;
self.thread_state_manager
.remove_thread_state(thread_id)
.await;
if !should_start_unload_task {
return;
}
let outgoing = self.outgoing.clone();
let pending_thread_unloads = self.pending_thread_unloads.clone();
let thread_manager = self.thread_manager.clone();
let thread_watch_manager = self.thread_watch_manager.clone();
tokio::spawn(async move {
match Self::wait_for_thread_shutdown(&thread).await {
ThreadShutdownResult::Complete => {
@@ -5831,7 +5726,7 @@ impl CodexMessageProcessor {
}
};
if self.thread_manager.get_thread(thread_id).await.is_err() {
let Ok(thread) = self.thread_manager.get_thread(thread_id).await else {
// Reconcile stale app-server bookkeeping when the thread has already been
// removed from the core manager. This keeps loaded-status/subscription state
// consistent with the source of truth before reporting NotLoaded.
@@ -5851,14 +5746,30 @@ impl CodexMessageProcessor {
.thread_state_manager
.unsubscribe_connection_from_thread(thread_id, request_id.connection_id)
.await;
if !was_subscribed {
self.outgoing
.send_response(
request_id,
ThreadUnsubscribeResponse {
status: ThreadUnsubscribeStatus::NotSubscribed,
},
)
.await;
return;
}
if !self.thread_state_manager.has_subscribers(thread_id).await {
self.unload_thread_without_subscribers(thread_id, thread)
.await;
}
let status = if was_subscribed {
ThreadUnsubscribeStatus::Unsubscribed
} else {
ThreadUnsubscribeStatus::NotSubscribed
};
self.outgoing
.send_response(request_id, ThreadUnsubscribeResponse { status })
.send_response(
request_id,
ThreadUnsubscribeResponse {
status: ThreadUnsubscribeStatus::Unsubscribed,
},
)
.await;
}
@@ -7603,7 +7514,6 @@ impl CodexMessageProcessor {
thread_manager: Arc::clone(&self.thread_manager),
thread_state_manager: self.thread_state_manager.clone(),
outgoing: Arc::clone(&self.outgoing),
pending_thread_unloads: Arc::clone(&self.pending_thread_unloads),
analytics_events_client: self.analytics_events_client.clone(),
general_analytics_enabled: self.config.features.enabled(Feature::GeneralAnalytics),
thread_watch_manager: self.thread_watch_manager.clone(),
@@ -7639,45 +7549,21 @@ impl CodexMessageProcessor {
});
}
};
let thread_state = {
let pending_thread_unloads = listener_task_context.pending_thread_unloads.lock().await;
if pending_thread_unloads.contains(&conversation_id) {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"thread {conversation_id} is closing; retry after the thread is closed"
),
data: None,
});
}
let Some(thread_state) = listener_task_context
.thread_state_manager
.try_ensure_connection_subscribed(
conversation_id,
connection_id,
raw_events_enabled,
)
.await
else {
return Ok(EnsureConversationListenerResult::ConnectionClosed);
};
thread_state
let Some(thread_state) = listener_task_context
.thread_state_manager
.try_ensure_connection_subscribed(conversation_id, connection_id, raw_events_enabled)
.await
else {
return Ok(EnsureConversationListenerResult::ConnectionClosed);
};
if let Err(error) = Self::ensure_listener_task_running_task(
listener_task_context.clone(),
Self::ensure_listener_task_running_task(
listener_task_context,
conversation_id,
conversation,
thread_state,
api_version,
)
.await
{
let _ = listener_task_context
.thread_state_manager
.unsubscribe_connection_from_thread(conversation_id, connection_id)
.await;
return Err(error);
}
.await;
Ok(EnsureConversationListenerResult::Attached)
}
@@ -7711,13 +7597,12 @@ impl CodexMessageProcessor {
conversation: Arc<CodexThread>,
thread_state: Arc<Mutex<ThreadState>>,
api_version: ApiVersion,
) -> Result<(), JSONRPCErrorError> {
) {
Self::ensure_listener_task_running_task(
ListenerTaskContext {
thread_manager: Arc::clone(&self.thread_manager),
thread_state_manager: self.thread_state_manager.clone(),
outgoing: Arc::clone(&self.outgoing),
pending_thread_unloads: Arc::clone(&self.pending_thread_unloads),
analytics_events_client: self.analytics_events_client.clone(),
general_analytics_enabled: self.config.features.enabled(Feature::GeneralAnalytics),
thread_watch_manager: self.thread_watch_manager.clone(),
@@ -7729,7 +7614,7 @@ impl CodexMessageProcessor {
thread_state,
api_version,
)
.await
.await;
}
async fn ensure_listener_task_running_task(
@@ -7738,27 +7623,12 @@ impl CodexMessageProcessor {
conversation: Arc<CodexThread>,
thread_state: Arc<Mutex<ThreadState>>,
api_version: ApiVersion,
) -> Result<(), JSONRPCErrorError> {
) {
let (cancel_tx, mut cancel_rx) = oneshot::channel();
let Some(mut unloading_state) = UnloadingState::new(
&listener_task_context,
conversation_id,
THREAD_UNLOADING_DELAY,
)
.await
else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"thread {conversation_id} is closing; retry after the thread is closed"
),
data: None,
});
};
let (mut listener_command_rx, listener_generation) = {
let mut thread_state = thread_state.lock().await;
if thread_state.listener_matches(&conversation) {
return Ok(());
return;
}
thread_state.set_listener(cancel_tx, &conversation)
};
@@ -7766,7 +7636,6 @@ impl CodexMessageProcessor {
outgoing,
thread_manager,
thread_state_manager,
pending_thread_unloads,
analytics_events_client: _,
general_analytics_enabled: _,
thread_watch_manager,
@@ -7777,28 +7646,10 @@ impl CodexMessageProcessor {
tokio::spawn(async move {
loop {
tokio::select! {
biased;
_ = &mut cancel_rx => {
// Listener was superseded or the thread is being torn down.
break;
}
listener_command = listener_command_rx.recv() => {
let Some(listener_command) = listener_command else {
break;
};
handle_thread_listener_command(
conversation_id,
&conversation,
codex_home.as_path(),
&thread_state_manager,
&thread_state,
&thread_watch_manager,
&outgoing_for_task,
&pending_thread_unloads,
listener_command,
)
.await;
}
event = conversation.next_event() => {
let event = match event {
Ok(event) => event,
@@ -7853,38 +7704,21 @@ impl CodexMessageProcessor {
)
.await;
}
unloading_watchers_open = unloading_state.wait_for_unloading_trigger() => {
if !unloading_watchers_open {
listener_command = listener_command_rx.recv() => {
let Some(listener_command) = listener_command else {
break;
}
if !unloading_state.should_unload_now() {
continue;
}
if matches!(conversation.agent_status().await, AgentStatus::Running) {
unloading_state.note_thread_activity_observed();
continue;
}
{
let mut pending_thread_unloads = pending_thread_unloads.lock().await;
if pending_thread_unloads.contains(&conversation_id) {
continue;
}
if !unloading_state.should_unload_now() {
continue;
}
pending_thread_unloads.insert(conversation_id);
}
Self::unload_thread_without_subscribers(
thread_manager.clone(),
outgoing_for_task.clone(),
pending_thread_unloads.clone(),
thread_state_manager.clone(),
thread_watch_manager.clone(),
};
handle_thread_listener_command(
conversation_id,
conversation.clone(),
&conversation,
codex_home.as_path(),
&thread_state_manager,
&thread_state,
&thread_watch_manager,
&outgoing_for_task,
listener_command,
)
.await;
break;
}
}
}
@@ -7894,7 +7728,6 @@ impl CodexMessageProcessor {
thread_state.clear_listener();
}
});
Ok(())
}
async fn git_diff_to_origin(&self, request_id: ConnectionRequestId, cwd: PathBuf) {
let diff = git_diff_to_remote(&cwd).await;
@@ -8385,7 +8218,6 @@ async fn handle_thread_listener_command(
thread_state: &Arc<Mutex<ThreadState>>,
thread_watch_manager: &ThreadWatchManager,
outgoing: &Arc<OutgoingMessageSender>,
pending_thread_unloads: &Arc<Mutex<HashSet<ThreadId>>>,
listener_command: ThreadListenerCommand,
) {
match listener_command {
@@ -8398,7 +8230,6 @@ async fn handle_thread_listener_command(
thread_state,
thread_watch_manager,
outgoing,
pending_thread_unloads,
*resume_request,
)
.await;
@@ -8428,7 +8259,6 @@ async fn handle_pending_thread_resume_request(
thread_state: &Arc<Mutex<ThreadState>>,
thread_watch_manager: &ThreadWatchManager,
outgoing: &Arc<OutgoingMessageSender>,
pending_thread_unloads: &Arc<Mutex<HashSet<ThreadId>>>,
pending: crate::thread_state::PendingThreadResumeRequest,
) {
let active_turn = {
@@ -8482,37 +8312,6 @@ async fn handle_pending_thread_resume_request(
has_live_in_progress_turn,
);
{
let pending_thread_unloads = pending_thread_unloads.lock().await;
if pending_thread_unloads.contains(&conversation_id) {
drop(pending_thread_unloads);
outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"thread {conversation_id} is closing; retry thread/resume after the thread is closed"
),
data: None,
},
)
.await;
return;
}
if !thread_state_manager
.try_add_connection_to_thread(conversation_id, connection_id)
.await
{
tracing::debug!(
thread_id = %conversation_id,
connection_id = ?connection_id,
"skipping running thread resume for closed connection"
);
return;
}
}
let ThreadConfigSnapshot {
model,
model_provider_id,
@@ -8541,6 +8340,9 @@ async fn handle_pending_thread_resume_request(
outgoing
.replay_requests_to_connection_for_thread(connection_id, conversation_id)
.await;
let _attached = thread_state_manager
.try_add_connection_to_thread(conversation_id, connection_id)
.await;
}
enum ThreadTurnSource<'a> {
@@ -10335,53 +10137,6 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn adding_connection_to_thread_updates_has_connections_watcher() -> Result<()> {
let manager = ThreadStateManager::new();
let thread_id = ThreadId::from_string("ad7f0408-99b8-4f6e-a46f-bd0eec433370")?;
let connection_a = ConnectionId(1);
let connection_b = ConnectionId(2);
manager.connection_initialized(connection_a).await;
manager.connection_initialized(connection_b).await;
manager
.try_ensure_connection_subscribed(
thread_id,
connection_a,
/*experimental_raw_events*/ false,
)
.await
.expect("connection_a should be live");
let mut has_connections = manager
.subscribe_to_has_connections(thread_id)
.await
.expect("thread should have a has-connections watcher");
assert!(*has_connections.borrow());
assert!(
manager
.unsubscribe_connection_from_thread(thread_id, connection_a)
.await
);
tokio::time::timeout(Duration::from_secs(1), has_connections.changed())
.await
.expect("timed out waiting for no-subscriber update")
.expect("has-connections watcher should remain open");
assert!(!*has_connections.borrow());
assert!(
manager
.try_add_connection_to_thread(thread_id, connection_b)
.await
);
tokio::time::timeout(Duration::from_secs(1), has_connections.changed())
.await
.expect("timed out waiting for subscriber update")
.expect("has-connections watcher should remain open");
assert!(*has_connections.borrow());
Ok(())
}
#[tokio::test]
async fn closed_connection_cannot_be_reintroduced_by_auto_subscribe() -> Result<()> {
let manager = ThreadStateManager::new();

View File

@@ -16,7 +16,6 @@ use std::sync::Weak;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tracing::error;
type PendingInterruptQueue = Vec<(
@@ -160,7 +159,6 @@ pub(crate) async fn resolve_server_request_on_thread_listener(
struct ThreadEntry {
state: Arc<Mutex<ThreadState>>,
connection_ids: HashSet<ConnectionId>,
has_connections_watcher: watch::Sender<bool>,
}
impl Default for ThreadEntry {
@@ -168,21 +166,10 @@ impl Default for ThreadEntry {
Self {
state: Arc::new(Mutex::new(ThreadState::default())),
connection_ids: HashSet::new(),
has_connections_watcher: watch::channel(false).0,
}
}
}
impl ThreadEntry {
fn update_has_connections(&self) {
let _ = self.has_connections_watcher.send_if_modified(|current| {
let prev = *current;
*current = !self.connection_ids.is_empty();
prev != *current
});
}
}
#[derive(Default)]
struct ThreadStateManagerInner {
live_connections: HashSet<ConnectionId>,
@@ -299,14 +286,12 @@ impl ThreadStateManager {
}
if let Some(thread_entry) = state.threads.get_mut(&thread_id) {
thread_entry.connection_ids.remove(&connection_id);
thread_entry.update_has_connections();
}
};
true
}
#[cfg(test)]
pub(crate) async fn has_subscribers(&self, thread_id: ThreadId) -> bool {
self.state
.lock()
@@ -334,7 +319,6 @@ impl ThreadStateManager {
.insert(thread_id);
let thread_entry = state.threads.entry(thread_id).or_default();
thread_entry.connection_ids.insert(connection_id);
thread_entry.update_has_connections();
thread_entry.state.clone()
};
{
@@ -360,9 +344,12 @@ impl ThreadStateManager {
.entry(connection_id)
.or_default()
.insert(thread_id);
let thread_entry = state.threads.entry(thread_id).or_default();
thread_entry.connection_ids.insert(connection_id);
thread_entry.update_has_connections();
state
.threads
.entry(thread_id)
.or_default()
.connection_ids
.insert(connection_id);
true
}
@@ -377,7 +364,6 @@ impl ThreadStateManager {
for thread_id in &thread_ids {
if let Some(thread_entry) = state.threads.get_mut(thread_id) {
thread_entry.connection_ids.remove(&connection_id);
thread_entry.update_has_connections();
}
}
thread_ids
@@ -391,15 +377,4 @@ impl ThreadStateManager {
.collect::<Vec<_>>()
}
}
pub(crate) async fn subscribe_to_has_connections(
&self,
thread_id: ThreadId,
) -> Option<watch::Receiver<bool>> {
let state = self.state.lock().await;
state
.threads
.get(&thread_id)
.map(|thread_entry| thread_entry.has_connections_watcher.subscribe())
}
}

View File

@@ -8,7 +8,6 @@ use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadActiveFlag;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadStatusChangedNotification;
use codex_protocol::ThreadId;
use std::collections::HashMap;
#[cfg(test)]
use std::path::PathBuf;
@@ -245,13 +244,6 @@ impl ThreadWatchManager {
}
}
pub(crate) async fn subscribe(
&self,
thread_id: ThreadId,
) -> Option<watch::Receiver<ThreadStatus>> {
Some(self.state.lock().await.subscribe(thread_id.to_string()))
}
async fn note_active_guard_released(
&self,
thread_id: String,
@@ -303,7 +295,6 @@ pub(crate) fn resolve_thread_status(
#[derive(Default)]
struct ThreadWatchState {
runtime_by_thread_id: HashMap<String, RuntimeFacts>,
status_watcher_by_thread_id: HashMap<String, watch::Sender<ThreadStatus>>,
}
impl ThreadWatchState {
@@ -318,7 +309,6 @@ impl ThreadWatchState {
.entry(thread_id.clone())
.or_default();
runtime.is_loaded = true;
self.update_status_watcher_for_thread(&thread_id);
if emit_notification {
self.status_changed_notification(thread_id, previous_status)
} else {
@@ -329,7 +319,6 @@ impl ThreadWatchState {
fn remove_thread(&mut self, thread_id: &str) -> Option<ThreadStatusChangedNotification> {
let previous_status = self.status_for(thread_id);
self.runtime_by_thread_id.remove(thread_id);
self.update_status_watcher(thread_id, &ThreadStatus::NotLoaded);
if previous_status.is_some() && previous_status != Some(ThreadStatus::NotLoaded) {
Some(ThreadStatusChangedNotification {
thread_id: thread_id.to_string(),
@@ -355,7 +344,6 @@ impl ThreadWatchState {
.or_default();
runtime.is_loaded = true;
mutate(runtime);
self.update_status_watcher_for_thread(thread_id);
self.status_changed_notification(thread_id.to_string(), previous_status)
}
@@ -370,40 +358,6 @@ impl ThreadWatchState {
.unwrap_or(ThreadStatus::NotLoaded)
}
fn subscribe(&mut self, thread_id: String) -> watch::Receiver<ThreadStatus> {
let status = self.loaded_status_for_thread(&thread_id);
let sender = self
.status_watcher_by_thread_id
.entry(thread_id)
.or_insert_with(|| watch::channel(status.clone()).0);
sender.subscribe()
}
fn update_status_watcher_for_thread(&mut self, thread_id: &str) {
let status = self.loaded_status_for_thread(thread_id);
self.update_status_watcher(thread_id, &status);
}
fn update_status_watcher(&mut self, thread_id: &str, status: &ThreadStatus) {
let remove_watcher = if let Some(sender) = self.status_watcher_by_thread_id.get(thread_id) {
let status = status.clone();
let _ = sender.send_if_modified(|current| {
if *current == status {
false
} else {
*current = status;
true
}
});
sender.receiver_count() == 0
} else {
false
};
if remove_watcher {
self.status_watcher_by_thread_id.remove(thread_id);
}
}
fn status_changed_notification(
&self,
thread_id: String,
@@ -798,55 +752,6 @@ mod tests {
);
}
#[tokio::test]
async fn status_watchers_receive_only_their_thread_updates() {
let manager = ThreadWatchManager::new();
manager
.upsert_thread(test_thread(
INTERACTIVE_THREAD_ID,
codex_app_server_protocol::SessionSource::Cli,
))
.await;
manager
.upsert_thread(test_thread(
NON_INTERACTIVE_THREAD_ID,
codex_app_server_protocol::SessionSource::AppServer,
))
.await;
let interactive_thread_id = ThreadId::from_string(INTERACTIVE_THREAD_ID)
.expect("interactive thread id should parse");
let non_interactive_thread_id = ThreadId::from_string(NON_INTERACTIVE_THREAD_ID)
.expect("non-interactive thread id should parse");
let mut interactive_rx = manager
.subscribe(interactive_thread_id)
.await
.expect("interactive status watcher should subscribe");
let mut non_interactive_rx = manager
.subscribe(non_interactive_thread_id)
.await
.expect("non-interactive status watcher should subscribe");
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
timeout(Duration::from_secs(1), interactive_rx.changed())
.await
.expect("timed out waiting for interactive status update")
.expect("interactive status watcher should remain open");
assert_eq!(
*interactive_rx.borrow(),
ThreadStatus::Active {
active_flags: vec![],
},
);
assert!(
timeout(Duration::from_millis(100), non_interactive_rx.changed())
.await
.is_err(),
"unrelated thread watcher should not receive an update"
);
assert_eq!(*non_interactive_rx.borrow(), ThreadStatus::Idle);
}
async fn wait_for_status(
manager: &ThreadWatchManager,
thread_id: &str,

View File

@@ -338,8 +338,7 @@ async fn websocket_transport_allows_unauthenticated_non_loopback_startup_by_defa
}
#[tokio::test]
async fn websocket_disconnect_keeps_last_subscribed_thread_loaded_until_idle_timeout() -> Result<()>
{
async fn websocket_disconnect_unloads_last_subscribed_thread() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
@@ -360,7 +359,7 @@ async fn websocket_disconnect_keeps_last_subscribed_thread_loaded_until_idle_tim
send_initialize_request(&mut ws2, /*id*/ 4, "ws_reconnect_client").await?;
read_response_for_id(&mut ws2, /*id*/ 4).await?;
wait_for_loaded_threads(&mut ws2, /*first_id*/ 5, &[thread_id.as_str()]).await?;
wait_for_loaded_threads(&mut ws2, /*first_id*/ 5, &[]).await?;
process
.kill()

View File

@@ -550,9 +550,10 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
startup_context_request.body_json()["type"].as_str(),
Some("session.update")
);
assert_eq!(
startup_context_request.body_json()["session"]["audio"]["output"]["voice"],
"cedar"
assert!(
startup_context_request.body_json()["session"]["audio"]
.get("output")
.is_none()
);
let startup_context_instructions =
startup_context_request.body_json()["session"]["instructions"]
@@ -973,7 +974,7 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
Some("multipart/form-data; boundary=codex-realtime-call-boundary")
);
let body = String::from_utf8(request.body).context("multipart body should be utf-8")?;
let session = r#"{"tool_choice":"auto","type":"realtime","model":"gpt-realtime-1.5","instructions":"backend prompt\n\nstartup context","output_modalities":["audio"],"audio":{"input":{"format":{"type":"audio/pcm","rate":24000},"noise_reduction":{"type":"near_field"},"turn_detection":{"type":"server_vad","interrupt_response":true,"create_response":true}},"output":{"format":{"type":"audio/pcm","rate":24000},"voice":"marin"}},"tools":[{"type":"function","name":"background_agent","description":"Send a user request to the background agent. Use this as the default action. If the background agent is idle, this starts a new task and returns the final result to the user. If the background agent is already working on a task, this sends the request as guidance to steer that previous task. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.","parameters":{"type":"object","properties":{"prompt":{"type":"string","description":"The user request to delegate to the background agent."}},"required":["prompt"],"additionalProperties":false}}]}"#;
let session = r#"{"tool_choice":"auto","type":"realtime","model":"gpt-realtime-1.5","instructions":"backend prompt\n\nstartup context","output_modalities":["text"],"audio":{"input":{"format":{"type":"audio/pcm","rate":24000},"noise_reduction":{"type":"near_field"},"turn_detection":{"type":"server_vad","interrupt_response":true,"create_response":true}}},"tools":[{"type":"function","name":"background_agent","description":"Send a user request to the background agent. Use this as the default action. Do not rephrase the user's ask or rewrite it in your own words; pass along the user's own words. If the background agent is idle, this starts a new task and returns the final result to the user. If the background agent is already working on a task, this sends the request as guidance to steer that previous task. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.","parameters":{"type":"object","properties":{"prompt":{"type":"string","description":"The user request to delegate to the background agent."}},"required":["prompt"],"additionalProperties":false}}]}"#;
assert_eq!(
body,
format!(
@@ -1996,6 +1997,7 @@ fn assert_v2_session_update(request: &Value) -> Result<()> {
.context("v2 session.update instructions")?
.contains("startup context")
);
assert_eq!(request["session"]["output_modalities"], json!(["text"]));
assert_eq!(
request["session"]["tools"][0]["name"].as_str(),
Some("background_agent")

View File

@@ -7,8 +7,10 @@ use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
@@ -19,6 +21,7 @@ use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadStatusChangedNotification;
use codex_app_server_protocol::ThreadUnsubscribeParams;
use codex_app_server_protocol::ThreadUnsubscribeResponse;
use codex_app_server_protocol::ThreadUnsubscribeStatus;
@@ -78,7 +81,7 @@ async fn wait_for_responses_request_count_to_stabilize(
}
#[tokio::test]
async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<()> {
async fn thread_unsubscribe_unloads_thread_and_emits_thread_closed_notification() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -101,14 +104,20 @@ async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<(
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await
.is_err()
);
let closed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
let parsed: ServerNotification = closed_notif.try_into()?;
let ServerNotification::ThreadClosed(payload) = parsed else {
anyhow::bail!("expected thread/closed notification");
};
assert_eq!(payload.thread_id, thread_id);
let status_changed = wait_for_thread_status_not_loaded(&mut mcp, &payload.thread_id).await?;
assert_eq!(status_changed.thread_id, payload.thread_id);
assert_eq!(status_changed.status, ThreadStatus::NotLoaded);
let list_id = mcp
.send_thread_loaded_list_request(ThreadLoadedListParams::default())
@@ -120,22 +129,22 @@ async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<(
.await??;
let ThreadLoadedListResponse { data, next_cursor } =
to_response::<ThreadLoadedListResponse>(list_resp)?;
assert_eq!(data, vec![thread_id]);
assert_eq!(data, Vec::<String>::new());
assert_eq!(next_cursor, None);
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed() -> Result<()> {
#[cfg(target_os = "windows")]
let shell_command = vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 1".to_string(),
"Start-Sleep -Seconds 10".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "1".to_string()];
let shell_command = vec!["sleep".to_string(), "10".to_string()];
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
@@ -197,18 +206,20 @@ async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await
.is_err()
);
let closed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
let parsed: ServerNotification = closed_notif.try_into()?;
let ServerNotification::ThreadClosed(payload) = parsed else {
anyhow::bail!("expected thread/closed notification");
};
assert_eq!(payload.thread_id, thread_id);
wait_for_responses_request_count_to_stabilize(
&server,
/*expected_count*/ 2,
/*expected_count*/ 1,
std::time::Duration::from_millis(200),
)
.await?;
@@ -217,7 +228,7 @@ async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
}
#[tokio::test]
async fn thread_unsubscribe_preserves_cached_status_before_idle_unload() -> Result<()> {
async fn thread_unsubscribe_clears_cached_status_before_resume() -> Result<()> {
let server = responses::start_mock_server().await;
let _response_mock = responses::mount_sse_once(
&server,
@@ -280,14 +291,11 @@ async fn thread_unsubscribe_preserves_cached_status_before_idle_unload() -> Resu
.await??;
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await
.is_err()
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
@@ -301,13 +309,13 @@ async fn thread_unsubscribe_preserves_cached_status_before_idle_unload() -> Resu
)
.await??;
let resume: ThreadResumeResponse = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resume.thread.status, ThreadStatus::SystemError);
assert_eq!(resume.thread.status, ThreadStatus::Idle);
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_reports_not_subscribed_before_idle_unload() -> Result<()> {
async fn thread_unsubscribe_reports_not_loaded_after_thread_is_unloaded() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -333,6 +341,12 @@ async fn thread_unsubscribe_reports_not_subscribed_before_idle_unload() -> Resul
ThreadUnsubscribeStatus::Unsubscribed
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
let second_unsubscribe_id = mcp
.send_thread_unsubscribe_request(ThreadUnsubscribeParams { thread_id })
.await?;
@@ -344,7 +358,7 @@ async fn thread_unsubscribe_reports_not_subscribed_before_idle_unload() -> Resul
let second_unsubscribe = to_response::<ThreadUnsubscribeResponse>(second_unsubscribe_resp)?;
assert_eq!(
second_unsubscribe.status,
ThreadUnsubscribeStatus::NotSubscribed
ThreadUnsubscribeStatus::NotLoaded
);
Ok(())
@@ -363,6 +377,28 @@ async fn wait_for_command_execution_item_started(mcp: &mut McpProcess) -> Result
}
}
async fn wait_for_thread_status_not_loaded(
mcp: &mut McpProcess,
thread_id: &str,
) -> Result<ThreadStatusChangedNotification> {
loop {
let status_changed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/status/changed"),
)
.await??;
let status_changed_params = status_changed_notif
.params
.context("thread/status/changed params must be present")?;
let status_changed: ThreadStatusChangedNotification =
serde_json::from_value(status_changed_params)?;
if status_changed.thread_id == thread_id && status_changed.status == ThreadStatus::NotLoaded
{
return Ok(status_changed);
}
}
}
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(

View File

@@ -1519,7 +1519,7 @@ mod tests {
first_json["session"]["type"],
Value::String("realtime".to_string())
);
assert_eq!(first_json["session"]["output_modalities"], json!(["audio"]));
assert_eq!(first_json["session"]["output_modalities"], json!(["text"]));
assert_eq!(
first_json["session"]["audio"]["input"]["format"],
json!({
@@ -1541,17 +1541,7 @@ mod tests {
"create_response": true,
})
);
assert_eq!(
first_json["session"]["audio"]["output"]["format"],
json!({
"type": "audio/pcm",
"rate": 24_000,
})
);
assert_eq!(
first_json["session"]["audio"]["output"]["voice"],
Value::String("cedar".to_string())
);
assert!(first_json["session"]["audio"].get("output").is_none());
assert_eq!(
first_json["session"]["tools"][0]["type"],
Value::String("function".to_string())

View File

@@ -14,8 +14,6 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeVoice;
use crate::endpoint::realtime_websocket::protocol::SessionAudio;
use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat;
use crate::endpoint::realtime_websocket::protocol::SessionAudioInput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioOutputFormat;
use crate::endpoint::realtime_websocket::protocol::SessionFunctionTool;
use crate::endpoint::realtime_websocket::protocol::SessionNoiseReduction;
use crate::endpoint::realtime_websocket::protocol::SessionToolType;
@@ -25,10 +23,10 @@ use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
use crate::endpoint::realtime_websocket::protocol::TurnDetectionType;
use serde_json::json;
const REALTIME_V2_OUTPUT_MODALITY_AUDIO: &str = "audio";
const REALTIME_V2_OUTPUT_MODALITY_TEXT: &str = "text";
const REALTIME_V2_TOOL_CHOICE: &str = "auto";
const REALTIME_V2_BACKGROUND_AGENT_TOOL_NAME: &str = "background_agent";
const REALTIME_V2_BACKGROUND_AGENT_TOOL_DESCRIPTION: &str = "Send a user request to the background agent. Use this as the default action. If the background agent is idle, this starts a new task and returns the final result to the user. If the background agent is already working on a task, this sends the request as guidance to steer that previous task. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.";
const REALTIME_V2_BACKGROUND_AGENT_TOOL_DESCRIPTION: &str = "Send a user request to the background agent. Use this as the default action. Do not rephrase the user's ask or rewrite it in your own words; pass along the user's own words. If the background agent is idle, this starts a new task and returns the final result to the user. If the background agent is already working on a task, this sends the request as guidance to steer that previous task. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.";
pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage {
RealtimeOutboundMessage::ConversationItemCreate {
@@ -59,7 +57,7 @@ pub(super) fn conversation_handoff_append_message(
pub(super) fn session_update_session(
instructions: String,
session_mode: RealtimeSessionMode,
voice: RealtimeVoice,
_voice: RealtimeVoice,
) -> SessionUpdateSession {
match session_mode {
RealtimeSessionMode::Conversational => SessionUpdateSession {
@@ -67,7 +65,7 @@ pub(super) fn session_update_session(
r#type: SessionType::Realtime,
model: None,
instructions: Some(instructions),
output_modalities: Some(vec![REALTIME_V2_OUTPUT_MODALITY_AUDIO.to_string()]),
output_modalities: Some(vec![REALTIME_V2_OUTPUT_MODALITY_TEXT.to_string()]),
audio: SessionAudio {
input: SessionAudioInput {
format: SessionAudioFormat {
@@ -83,13 +81,7 @@ pub(super) fn session_update_session(
create_response: true,
}),
},
output: Some(SessionAudioOutput {
format: Some(SessionAudioOutputFormat {
r#type: AudioFormatType::AudioPcm,
rate: REALTIME_AUDIO_SAMPLE_RATE,
}),
voice,
}),
output: None,
},
tools: Some(vec![SessionFunctionTool {
r#type: SessionToolType::Function,

View File

@@ -415,7 +415,7 @@ async fn conversation_start_defaults_to_v2_and_gpt_realtime_1_5() -> Result<()>
json!({
"startedVersion": RealtimeConversationVersion::V2,
"handshakeUri": "/v1/realtime?model=gpt-realtime-1.5",
"voice": "marin",
"voice": null,
"instructions": "backend prompt",
})
);

View File

@@ -193,6 +193,10 @@ use codex_protocol::protocol::McpToolCallEndEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::PatchApplyBeginEvent;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RealtimeConversationClosedEvent;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeConversationStartedEvent;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::ReviewTarget;
use codex_protocol::protocol::SkillMetadata as ProtocolSkillMetadata;
@@ -337,6 +341,7 @@ use crate::history_cell::HistoryCell;
use crate::history_cell::HookCell;
use crate::history_cell::McpToolCallCell;
use crate::history_cell::PlainHistoryCell;
use crate::history_cell::RealtimeTranscriptRole;
use crate::history_cell::WebSearchCell;
use crate::key_hint;
use crate::key_hint::KeyBinding;
@@ -6175,54 +6180,45 @@ impl ChatWidget {
}
ServerNotification::ThreadRealtimeStarted(notification) => {
if !from_replay {
self.on_realtime_conversation_started(
codex_protocol::protocol::RealtimeConversationStartedEvent {
session_id: notification.session_id,
version: notification.version,
},
);
self.on_realtime_conversation_started(RealtimeConversationStartedEvent {
session_id: notification.session_id,
version: notification.version,
});
}
}
ServerNotification::ThreadRealtimeItemAdded(notification) => {
if !from_replay {
self.on_realtime_conversation_realtime(
codex_protocol::protocol::RealtimeConversationRealtimeEvent {
payload: codex_protocol::protocol::RealtimeEvent::ConversationItemAdded(
notification.item,
),
},
);
self.on_realtime_conversation_realtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::ConversationItemAdded(notification.item),
});
}
}
ServerNotification::ThreadRealtimeTranscriptUpdated(notification) => {
if !from_replay
&& let Some(role) = RealtimeTranscriptRole::from_name(&notification.role)
{
self.on_realtime_transcript_delta(role, notification.text);
}
}
ServerNotification::ThreadRealtimeOutputAudioDelta(notification) => {
if !from_replay {
self.on_realtime_conversation_realtime(
codex_protocol::protocol::RealtimeConversationRealtimeEvent {
payload: codex_protocol::protocol::RealtimeEvent::AudioOut(
notification.audio.into(),
),
},
);
self.on_realtime_conversation_realtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::AudioOut(notification.audio.into()),
});
}
}
ServerNotification::ThreadRealtimeError(notification) => {
if !from_replay {
self.on_realtime_conversation_realtime(
codex_protocol::protocol::RealtimeConversationRealtimeEvent {
payload: codex_protocol::protocol::RealtimeEvent::Error(
notification.message,
),
},
);
self.on_realtime_conversation_realtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(notification.message),
});
}
}
ServerNotification::ThreadRealtimeClosed(notification) => {
if !from_replay {
self.on_realtime_conversation_closed(
codex_protocol::protocol::RealtimeConversationClosedEvent {
reason: notification.reason,
},
);
self.on_realtime_conversation_closed(RealtimeConversationClosedEvent {
reason: notification.reason,
});
}
}
ServerNotification::ThreadRealtimeSdp(notification) => {
@@ -6245,7 +6241,6 @@ impl ChatWidget {
| ServerNotification::FsChanged(_)
| ServerNotification::FuzzyFileSearchSessionUpdated(_)
| ServerNotification::FuzzyFileSearchSessionCompleted(_)
| ServerNotification::ThreadRealtimeTranscriptUpdated(_)
| ServerNotification::WindowsWorldWritableWarning(_)
| ServerNotification::WindowsSandboxSetupCompleted(_)
| ServerNotification::AccountLoginCompleted(_) => {}

View File

@@ -1,4 +1,6 @@
use super::*;
use crate::history_cell::RealtimeTranscriptCell;
use crate::history_cell::RealtimeTranscriptRole;
use codex_config::config_toml::RealtimeTransport;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::ConversationStartTransport;
@@ -315,8 +317,6 @@ impl ChatWidget {
RealtimeEvent::AudioOut(_)
| RealtimeEvent::InputAudioSpeechStarted(_)
| RealtimeEvent::ResponseCreated(_)
| RealtimeEvent::ResponseCancelled(_)
| RealtimeEvent::ResponseDone(_)
)
{
return;
@@ -326,12 +326,19 @@ impl ChatWidget {
self.realtime_conversation.session_id = Some(session_id);
}
RealtimeEvent::InputAudioSpeechStarted(_) => self.interrupt_realtime_audio_playback(),
RealtimeEvent::InputTranscriptDelta(_) => {}
RealtimeEvent::OutputTranscriptDelta(_) => {}
RealtimeEvent::InputTranscriptDelta(delta) => {
self.on_realtime_transcript_delta(RealtimeTranscriptRole::User, delta.delta);
}
RealtimeEvent::OutputTranscriptDelta(delta) => {
self.on_realtime_transcript_delta(RealtimeTranscriptRole::Assistant, delta.delta);
}
RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame),
RealtimeEvent::ResponseCreated(_) => {}
RealtimeEvent::ResponseCancelled(_) => self.interrupt_realtime_audio_playback(),
RealtimeEvent::ResponseDone(_) => {}
RealtimeEvent::ResponseCancelled(_) => {
self.flush_active_realtime_transcript();
self.interrupt_realtime_audio_playback();
}
RealtimeEvent::ResponseDone(_) => self.flush_active_realtime_transcript(),
RealtimeEvent::ConversationItemAdded(_item) => {}
RealtimeEvent::ConversationItemDone { .. } => {}
RealtimeEvent::HandoffRequested(_) => {}
@@ -341,6 +348,46 @@ impl ChatWidget {
}
}
pub(super) fn on_realtime_transcript_delta(
&mut self,
role: RealtimeTranscriptRole,
delta: String,
) {
if delta.is_empty() {
return;
}
self.flush_unified_exec_wait_streak();
if let Some(cell) = self
.active_cell
.as_mut()
.and_then(|cell| cell.as_any_mut().downcast_mut::<RealtimeTranscriptCell>())
&& cell.role == role
{
cell.append(&delta);
self.bump_active_cell_revision();
self.request_redraw();
return;
}
self.flush_active_cell();
self.active_cell = Some(Box::new(RealtimeTranscriptCell::new(role, delta)));
self.bump_active_cell_revision();
self.request_redraw();
}
fn flush_active_realtime_transcript(&mut self) {
if self
.active_cell
.as_ref()
.is_some_and(|cell| cell.as_any().is::<RealtimeTranscriptCell>())
{
self.flush_active_cell();
self.request_redraw();
}
}
pub(super) fn on_realtime_conversation_closed(&mut self, ev: RealtimeConversationClosedEvent) {
if self.realtime_conversation_uses_webrtc()
&& self.realtime_conversation.is_live()
@@ -349,6 +396,7 @@ impl ChatWidget {
return;
}
self.flush_active_realtime_transcript();
let requested = self.realtime_conversation.requested_close;
let reason = ev.reason;
self.reset_realtime_conversation_state();

View File

@@ -172,6 +172,7 @@ pub(super) use codex_protocol::protocol::ReadOnlyAccess;
pub(super) use codex_protocol::protocol::RealtimeConversationClosedEvent;
pub(super) use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
pub(super) use codex_protocol::protocol::RealtimeEvent;
pub(super) use codex_protocol::protocol::RealtimeTranscriptDelta;
pub(super) use codex_protocol::protocol::ReviewRequest;
pub(super) use codex_protocol::protocol::ReviewTarget;
pub(super) use codex_protocol::protocol::SessionConfiguredEvent;

View File

@@ -22,6 +22,31 @@ async fn realtime_error_closes_without_followup_closed_info() {
insta::assert_snapshot!(rendered.join("\n\n"), @"■ Realtime voice error: boom");
}
#[tokio::test]
async fn realtime_output_text_delta_renders_transcript_cell() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.realtime_conversation.phase = RealtimeConversationPhase::Active;
chat.on_realtime_conversation_realtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta {
delta: "hello ".to_string(),
}),
});
chat.on_realtime_conversation_realtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta {
delta: "there".to_string(),
}),
});
let lines = chat
.active_cell_transcript_lines(/*width*/ 80)
.expect("realtime transcript cell");
insta::assert_snapshot!(
lines_to_single_string(&lines).trim_end(),
@"assistant: hello there"
);
}
#[cfg(not(target_os = "linux"))]
#[tokio::test]
async fn deleted_realtime_meter_uses_shared_stop_path() {

View File

@@ -489,6 +489,98 @@ impl HistoryCell for AgentMessageCell {
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum RealtimeTranscriptRole {
User,
Assistant,
}
impl RealtimeTranscriptRole {
pub(crate) fn from_name(role: &str) -> Option<Self> {
match role {
"user" => Some(Self::User),
"assistant" => Some(Self::Assistant),
_ => None,
}
}
}
#[derive(Debug)]
pub(crate) struct RealtimeTranscriptCell {
pub(crate) role: RealtimeTranscriptRole,
text: String,
}
impl RealtimeTranscriptCell {
pub(crate) fn new(role: RealtimeTranscriptRole, text: String) -> Self {
Self { role, text }
}
pub(crate) fn append(&mut self, delta: &str) {
self.text.push_str(delta);
}
}
impl HistoryCell for RealtimeTranscriptCell {
fn display_lines(&self, width: u16) -> Vec<Line<'static>> {
let role_name = match self.role {
RealtimeTranscriptRole::User => "user",
RealtimeTranscriptRole::Assistant => "assistant",
};
let style = Style::default().dim().italic();
let text = match self.role {
RealtimeTranscriptRole::User => self.text.clone(),
RealtimeTranscriptRole::Assistant => clean_assistant_realtime_transcript(&self.text),
};
let line = Line::from(text).style(style);
adaptive_wrap_lines(
&[line],
RtOptions::new(width as usize)
.initial_indent(vec![format!("{role_name}: ").dim().italic()].into())
.subsequent_indent(" ".dim().italic().into()),
)
}
}
fn clean_assistant_realtime_transcript(text: &str) -> String {
let text = strip_realtime_code_fence(text);
serde_json::from_str::<serde_json::Value>(text)
.ok()
.and_then(|value| {
value
.as_object()
.and_then(|object| object.get("response"))
.and_then(serde_json::Value::as_str)
.map(str::to_string)
})
.unwrap_or_else(|| text.to_string())
}
fn strip_realtime_code_fence(text: &str) -> &str {
let text = text.trim();
let Some(mut rest) = text.strip_prefix("```") else {
return text;
};
let has_language = rest.chars().next().is_some_and(|ch| !ch.is_whitespace());
rest = rest.trim_start();
if has_language && let Some((first_line, remaining)) = rest.split_once('\n') {
let first_line = first_line.trim();
if !first_line.is_empty()
&& first_line
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_')
{
rest = remaining;
}
}
rest.trim()
.strip_suffix("```")
.map(str::trim)
.unwrap_or_else(|| rest.trim())
}
#[derive(Debug)]
pub(crate) struct PlainHistoryCell {
lines: Vec<Line<'static>>,
@@ -2934,6 +3026,32 @@ mod tests {
render_lines(&cell.transcript_lines(u16::MAX))
}
#[test]
fn realtime_transcript_cell_strips_assistant_code_fence() {
let cell = RealtimeTranscriptCell::new(
RealtimeTranscriptRole::Assistant,
"``` \nHey!".to_string(),
);
assert_eq!(
render_lines(&cell.display_lines(/*width*/ 80)),
vec!["assistant: Hey!"]
);
}
#[test]
fn realtime_transcript_cell_unwraps_assistant_response_json() {
let cell = RealtimeTranscriptCell::new(
RealtimeTranscriptRole::Assistant,
r#"{"response":"I'm doing well."}"#.to_string(),
);
assert_eq!(
render_lines(&cell.display_lines(/*width*/ 80)),
vec!["assistant: I'm doing well."]
);
}
fn image_block(data: &str) -> serde_json::Value {
serde_json::to_value(Content::image(data.to_string(), "image/png"))
.expect("image content should serialize")

View File

@@ -303,7 +303,28 @@ fn configure_rule(rule: &INetFwRule3, spec: &BlockRuleSpec<'_>) -> Result<()> {
format!("SetProfiles failed: {err:?}"),
))
})?;
configure_rule_network_scope(rule, spec)?;
rule.SetProtocol(spec.protocol).map_err(|err| {
anyhow::Error::new(SetupFailure::new(
SetupErrorCode::HelperFirewallRuleCreateOrAddFailed,
format!("SetProtocol failed: {err:?}"),
))
})?;
let remote_addresses = spec.remote_addresses.unwrap_or("*");
rule.SetRemoteAddresses(&BSTR::from(remote_addresses))
.map_err(|err| {
anyhow::Error::new(SetupFailure::new(
SetupErrorCode::HelperFirewallRuleCreateOrAddFailed,
format!("SetRemoteAddresses failed: {err:?}"),
))
})?;
let remote_ports = spec.remote_ports.unwrap_or("*");
rule.SetRemotePorts(&BSTR::from(remote_ports))
.map_err(|err| {
anyhow::Error::new(SetupFailure::new(
SetupErrorCode::HelperFirewallRuleCreateOrAddFailed,
format!("SetRemotePorts failed: {err:?}"),
))
})?;
rule.SetLocalUserAuthorizedList(&BSTR::from(spec.local_user_spec))
.map_err(|err| {
anyhow::Error::new(SetupFailure::new(
@@ -333,36 +354,6 @@ fn configure_rule(rule: &INetFwRule3, spec: &BlockRuleSpec<'_>) -> Result<()> {
Ok(())
}
fn configure_rule_network_scope(rule: &INetFwRule3, spec: &BlockRuleSpec<'_>) -> Result<()> {
unsafe {
rule.SetProtocol(spec.protocol).map_err(|err| {
anyhow::Error::new(SetupFailure::new(
SetupErrorCode::HelperFirewallRuleCreateOrAddFailed,
format!("SetProtocol failed: {err:?}"),
))
})?;
let remote_addresses = spec.remote_addresses.unwrap_or("*");
rule.SetRemoteAddresses(&BSTR::from(remote_addresses))
.map_err(|err| {
anyhow::Error::new(SetupFailure::new(
SetupErrorCode::HelperFirewallRuleCreateOrAddFailed,
format!("SetRemoteAddresses failed: {err:?}"),
))
})?;
if let Some(remote_ports) = spec.remote_ports {
rule.SetRemotePorts(&BSTR::from(remote_ports))
.map_err(|err| {
anyhow::Error::new(SetupFailure::new(
SetupErrorCode::HelperFirewallRuleCreateOrAddFailed,
format!("SetRemotePorts failed: {err:?}"),
))
})?;
}
}
Ok(())
}
fn blocked_loopback_tcp_remote_ports(proxy_ports: &[u16]) -> Option<String> {
let mut allowed_ports = proxy_ports
.iter()
@@ -445,68 +436,4 @@ mod tests {
);
}
}
#[test]
fn production_firewall_rule_network_scopes_are_accepted_by_firewall_com() {
let hr = unsafe { CoInitializeEx(None, COINIT_APARTMENTTHREADED) };
assert!(hr.is_ok(), "CoInitializeEx failed: {hr:?}");
let local_user_spec = "O:LSD:(A;;CC;;;S-1-5-18)";
let offline_sid = "S-1-5-18";
let blocked_remote_ports =
blocked_loopback_tcp_remote_ports(&[8080]).expect("proxy-port complement should exist");
let specs = [
BlockRuleSpec {
internal_name: OFFLINE_BLOCK_LOOPBACK_UDP_RULE_NAME,
friendly_desc: OFFLINE_BLOCK_LOOPBACK_UDP_RULE_FRIENDLY,
protocol: NET_FW_IP_PROTOCOL_UDP.0,
local_user_spec,
offline_sid,
remote_addresses: Some(LOOPBACK_REMOTE_ADDRESSES),
remote_ports: None,
},
BlockRuleSpec {
internal_name: OFFLINE_BLOCK_LOOPBACK_TCP_RULE_NAME,
friendly_desc: OFFLINE_BLOCK_LOOPBACK_TCP_RULE_FRIENDLY,
protocol: NET_FW_IP_PROTOCOL_TCP.0,
local_user_spec,
offline_sid,
remote_addresses: Some(LOOPBACK_REMOTE_ADDRESSES),
remote_ports: Some(&blocked_remote_ports),
},
BlockRuleSpec {
internal_name: OFFLINE_BLOCK_RULE_NAME,
friendly_desc: OFFLINE_BLOCK_RULE_FRIENDLY,
protocol: NET_FW_IP_PROTOCOL_ANY.0,
local_user_spec,
offline_sid,
remote_addresses: Some(NON_LOOPBACK_REMOTE_ADDRESSES),
remote_ports: None,
},
];
let results = specs.each_ref().map(|spec| unsafe {
let rule: windows::core::Result<INetFwRule3> =
CoCreateInstance(&NetFwRule, None, CLSCTX_INPROC_SERVER);
match rule {
Ok(rule) => configure_rule_network_scope(&rule, spec),
Err(err) => Err(err.into()),
}
});
unsafe {
CoUninitialize();
}
for (spec, result) in specs.into_iter().zip(results) {
assert!(
result.is_ok(),
"firewall rejected network scope for rule={} protocol={} remote_addresses={:?} remote_ports={:?}: {result:?}",
spec.internal_name,
spec.protocol,
spec.remote_addresses,
spec.remote_ports
);
}
}
}