From 2a3f524006c2db6dc2db4c099096110b2956d9dc Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Mon, 18 May 2026 20:04:52 -0700 Subject: [PATCH 1/4] Serialize thread settings notifications --- .../src/request_processors/turn_processor.rs | 40 ++++++++++++------- codex-rs/app-server/src/thread_state.rs | 5 ++- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index 18590d6ff2..1728f39dab 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -679,12 +679,16 @@ impl TurnRequestProcessor { thread_settings, }; let turn_id = Uuid::now_v7().to_string(); - let thread_settings_applied = if has_thread_settings_overrides { - let thread_state = self.thread_state_manager.thread_state(thread_id).await; - Some({ + let pending_thread_settings = if has_thread_settings_overrides { + let (thread_settings_applied, notification_lock) = { + let thread_state = self.thread_state_manager.thread_state(thread_id).await; let mut thread_state = thread_state.lock().await; thread_state.track_pending_thread_settings(turn_id.clone()) - }) + }; + Some(( + thread_settings_applied, + notification_lock.lock_owned().await, + )) } else { None }; @@ -706,11 +710,14 @@ impl TurnRequestProcessor { return Err(error); } - if let Some(thread_settings_applied) = thread_settings_applied { + if let Some((thread_settings_applied, thread_settings_notification_guard)) = + pending_thread_settings + { let processor = self.clone(); let api_thread_id = params.thread_id.clone(); let tracked_turn_id = turn_id.clone(); tokio::spawn(async move { + let _thread_settings_notification_guard = thread_settings_notification_guard; match tokio::time::timeout(THREAD_SETTINGS_ACK_TIMEOUT, thread_settings_applied) .await { @@ -815,11 +822,12 @@ impl TurnRequestProcessor { invalid_request(format!("invalid thread settings override: {err}")) })?; let update_id = Uuid::now_v7().to_string(); - let thread_settings_applied = { + let (thread_settings_applied, notification_lock) = { let thread_state = self.thread_state_manager.thread_state(thread_id).await; let mut thread_state = thread_state.lock().await; thread_state.track_pending_thread_settings(update_id.clone()) }; + let thread_settings_notification_guard = notification_lock.lock_owned().await; if let Err(err) = thread .submit_with_id(Submission { id: update_id.clone(), @@ -837,7 +845,7 @@ impl TurnRequestProcessor { "failed to update thread settings: {err}" ))); } - match thread_settings_applied.await { + let after_thread_settings = match thread_settings_applied.await { Ok(Ok(payload)) => thread_settings_from_applied_event(&payload), Ok(Err(err)) => return Err(invalid_request(err)), Err(_) => { @@ -845,17 +853,19 @@ impl TurnRequestProcessor { "thread settings override waiter was cancelled".to_string(), )); } - } + }; + self.maybe_emit_thread_settings_updated( + thread_id, + ¶ms.thread_id, + &before_thread_settings, + after_thread_settings.clone(), + ) + .await; + drop(thread_settings_notification_guard); + after_thread_settings } else { before_thread_settings.clone() }; - self.maybe_emit_thread_settings_updated( - thread_id, - ¶ms.thread_id, - &before_thread_settings, - after_thread_settings.clone(), - ) - .await; Ok(ThreadSettingsUpdateResponse { thread_settings: after_thread_settings, diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 4bd075d9bd..f4fb7a0bb4 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -81,6 +81,7 @@ pub(crate) struct ThreadState { listener_command_tx: Option>, current_turn_history: ThreadHistoryBuilder, pending_thread_settings_waiters: HashMap>>, + thread_settings_notification_lock: Arc>, listener_thread: Option>, watch_registration: WatchRegistration, } @@ -138,13 +139,13 @@ impl ThreadState { pub(crate) fn track_pending_thread_settings( &mut self, submission_id: String, - ) -> oneshot::Receiver { + ) -> (oneshot::Receiver, Arc>) { let (tx, rx) = oneshot::channel(); self.pending_thread_settings_waiters .entry(submission_id) .or_default() .push(tx); - rx + (rx, Arc::clone(&self.thread_settings_notification_lock)) } pub(crate) fn track_current_pending_thread_settings( From c8a56c1ce0027ad3f2da24f706284975b2919917 Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Mon, 18 May 2026 20:21:17 -0700 Subject: [PATCH 2/4] Wait for accepted thread settings notifications --- .../src/request_processors/turn_processor.rs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index 1728f39dab..44f01beb11 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -718,10 +718,8 @@ impl TurnRequestProcessor { let tracked_turn_id = turn_id.clone(); tokio::spawn(async move { let _thread_settings_notification_guard = thread_settings_notification_guard; - match tokio::time::timeout(THREAD_SETTINGS_ACK_TIMEOUT, thread_settings_applied) - .await - { - Ok(Ok(Ok(payload))) => { + match thread_settings_applied.await { + Ok(Ok(payload)) => { let after_thread_settings = thread_settings_from_applied_event(&payload); processor .maybe_emit_thread_settings_updated( @@ -732,19 +730,14 @@ impl TurnRequestProcessor { ) .await; } - Ok(Ok(Err(err))) => { + Ok(Err(err)) => { tracing::warn!( "failed to apply thread settings overrides for turn {tracked_turn_id}: {err}" ); } - Ok(Err(_)) => { - tracing::warn!( - "thread settings override acknowledgement was cancelled for turn {tracked_turn_id}" - ); - } Err(_) => { tracing::warn!( - "timed out waiting for thread settings overrides to apply for turn {tracked_turn_id}" + "thread settings override acknowledgement was cancelled for turn {tracked_turn_id}" ); } } From e7d5ddc46e538b344a49c5033ec914925d2d7fcb Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Mon, 18 May 2026 20:23:29 -0700 Subject: [PATCH 3/4] Defer running resume subscription to listener --- .../src/request_processors/thread_processor.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 730f194c08..dd2dd6a57e 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -2690,17 +2690,8 @@ impl ThreadRequestProcessor { let thread_state = self .thread_state_manager - .try_ensure_connection_subscribed( - existing_thread_id, - request_id.connection_id, - /*experimental_raw_events*/ false, - ) - .await - .ok_or_else(|| { - internal_error(format!( - "failed to subscribe connection for running thread {existing_thread_id}" - )) - })?; + .thread_state(existing_thread_id) + .await; self.ensure_listener_task_running( existing_thread_id, existing_thread.clone(), From b00aa1ead17b47718c33e29bd1c71126cc1d30f0 Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Mon, 18 May 2026 20:27:40 -0700 Subject: [PATCH 4/4] Route thread settings notifications by thread --- codex-rs/tui/src/app/app_server_event_targets.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/codex-rs/tui/src/app/app_server_event_targets.rs b/codex-rs/tui/src/app/app_server_event_targets.rs index d53318018d..b99354b0ea 100644 --- a/codex-rs/tui/src/app/app_server_event_targets.rs +++ b/codex-rs/tui/src/app/app_server_event_targets.rs @@ -47,7 +47,9 @@ pub(super) fn server_notification_thread_target( ServerNotification::ThreadStatusChanged(notification) => { Some(notification.thread_id.as_str()) } - ServerNotification::ThreadSettingsUpdated(_) => None, + ServerNotification::ThreadSettingsUpdated(notification) => { + Some(notification.thread_id.as_str()) + } ServerNotification::ThreadArchived(notification) => Some(notification.thread_id.as_str()), ServerNotification::ThreadUnarchived(notification) => Some(notification.thread_id.as_str()), ServerNotification::ThreadClosed(notification) => Some(notification.thread_id.as_str()),