From e6736e5d81e7b3b85154ac87a7f1bbcb0f437b87 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 27 Jan 2026 00:45:46 +0000 Subject: [PATCH 1/4] fix: try to fix freezes 2 --- codex-rs/tui/src/app.rs | 64 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index bc006a2928..25680b08e6 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -93,11 +93,12 @@ use tokio::sync::Mutex; use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::unbounded_channel; use toml::Value as TomlValue; const EXTERNAL_EDITOR_HINT: &str = "Save and close external editor to continue."; -const THREAD_EVENT_CHANNEL_CAPACITY: usize = 1024; +const THREAD_EVENT_CHANNEL_CAPACITY: usize = 4; #[derive(Debug, Clone)] pub struct AppExitInfo { @@ -714,8 +715,23 @@ impl App { guard.active }; - if should_send && let Err(err) = sender.send(event).await { - tracing::warn!("thread {thread_id} event channel closed: {err}"); + if should_send { + // Never await a bounded channel send on the main TUI loop: if the receiver falls behind, + // `send().await` can block and the UI stops drawing. If the channel is full, wait in a + // spawned task instead. + match sender.try_send(event) { + Ok(()) => {} + Err(TrySendError::Full(event)) => { + tokio::spawn(async move { + if let Err(err) = sender.send(event).await { + tracing::warn!("thread {thread_id} event channel closed: {err}"); + } + }); + } + Err(TrySendError::Closed(_)) => { + tracing::warn!("thread {thread_id} event channel closed"); + } + } } Ok(()) } @@ -2400,6 +2416,7 @@ mod tests { use std::sync::Arc; use std::sync::atomic::AtomicBool; use tempfile::tempdir; + use tokio::time; #[test] fn normalize_harness_overrides_resolves_relative_add_dirs() -> Result<()> { @@ -2420,6 +2437,47 @@ mod tests { Ok(()) } + #[tokio::test] + async fn enqueue_thread_event_does_not_block_when_channel_full() -> Result<()> { + let mut app = make_test_app().await; + let thread_id = ThreadId::new(); + app.thread_event_channels + .insert(thread_id, ThreadEventChannel::new(1)); + app.set_thread_active(thread_id, true).await; + + let event = Event { + id: String::new(), + msg: EventMsg::ShutdownComplete, + }; + + app.enqueue_thread_event(thread_id, event.clone()).await?; + time::timeout( + Duration::from_millis(50), + app.enqueue_thread_event(thread_id, event), + ) + .await + .expect("enqueue_thread_event blocked on a full channel")?; + + let mut rx = app + .thread_event_channels + .get_mut(&thread_id) + .expect("missing thread channel") + .receiver + .take() + .expect("missing receiver"); + + time::timeout(Duration::from_millis(50), rx.recv()) + .await + .expect("timed out waiting for first event") + .expect("channel closed unexpectedly"); + time::timeout(Duration::from_millis(50), rx.recv()) + .await + .expect("timed out waiting for second event") + .expect("channel closed unexpectedly"); + + Ok(()) + } + async fn make_test_app() -> App { let (chat_widget, app_event_tx, _rx, _op_rx) = make_chatwidget_manual_with_sender().await; let config = chat_widget.config_ref().clone(); From 46c23bea910279ed62c1178b37d0d02abb55f066 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 27 Jan 2026 00:46:04 +0000 Subject: [PATCH 2/4] nit --- codex-rs/tui/src/app.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 25680b08e6..7a87d396f0 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -98,7 +98,7 @@ use tokio::sync::mpsc::unbounded_channel; use toml::Value as TomlValue; const EXTERNAL_EDITOR_HINT: &str = "Save and close external editor to continue."; -const THREAD_EVENT_CHANNEL_CAPACITY: usize = 4; +const THREAD_EVENT_CHANNEL_CAPACITY: usize = 1024; #[derive(Debug, Clone)] pub struct AppExitInfo { From 97b08393b5e5b202bafe79c550d4ede832de5136 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 27 Jan 2026 01:13:45 +0000 Subject: [PATCH 3/4] nit --- codex-rs/tui/src/app.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 7a87d396f0..4b10bf2e39 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -98,7 +98,7 @@ use tokio::sync::mpsc::unbounded_channel; use toml::Value as TomlValue; const EXTERNAL_EDITOR_HINT: &str = "Save and close external editor to continue."; -const THREAD_EVENT_CHANNEL_CAPACITY: usize = 1024; +const THREAD_EVENT_CHANNEL_CAPACITY: usize = 4096; #[derive(Debug, Clone)] pub struct AppExitInfo { From ffdff036cf8e3f3c7a083c8213f43d30a025753a Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 27 Jan 2026 01:31:13 +0000 Subject: [PATCH 4/4] fix freeze 3 --- codex-rs/tui/src/app.rs | 75 +++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 33 deletions(-) diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 4b10bf2e39..221b74953f 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -717,25 +717,42 @@ impl App { if should_send { // Never await a bounded channel send on the main TUI loop: if the receiver falls behind, - // `send().await` can block and the UI stops drawing. If the channel is full, wait in a - // spawned task instead. - match sender.try_send(event) { - Ok(()) => {} - Err(TrySendError::Full(event)) => { - tokio::spawn(async move { - if let Err(err) = sender.send(event).await { - tracing::warn!("thread {thread_id} event channel closed: {err}"); + // `send().await` can block and the UI stops drawing. Instead, drop the oldest queued + // event (from the active thread) until we can enqueue the newest. + let mut event = event; + loop { + match sender.try_send(event) { + Ok(()) => break, + Err(TrySendError::Closed(_)) => { + tracing::warn!("thread {thread_id} event channel closed"); + break; + } + Err(TrySendError::Full(unsent)) => { + if !self.drop_oldest_active_thread_event(thread_id) { + tracing::debug!("dropping event for thread {thread_id}: channel full"); + break; } - }); - } - Err(TrySendError::Closed(_)) => { - tracing::warn!("thread {thread_id} event channel closed"); + event = unsent; + } } } } Ok(()) } + fn drop_oldest_active_thread_event(&mut self, thread_id: ThreadId) -> bool { + if self.active_thread_id != Some(thread_id) { + return false; + } + let Some(rx) = self.active_thread_rx.as_mut() else { + return false; + }; + match rx.try_recv() { + Ok(_) => true, + Err(TryRecvError::Empty | TryRecvError::Disconnected) => false, + } + } + async fn enqueue_primary_event(&mut self, event: Event) -> Result<()> { if let Some(thread_id) = self.primary_thread_id { return self.enqueue_thread_event(thread_id, event).await; @@ -2443,37 +2460,29 @@ mod tests { let thread_id = ThreadId::new(); app.thread_event_channels .insert(thread_id, ThreadEventChannel::new(1)); - app.set_thread_active(thread_id, true).await; + app.activate_thread_channel(thread_id).await; - let event = Event { - id: String::new(), + let older = Event { + id: "older".to_string(), msg: EventMsg::ShutdownComplete, }; + let newer = Event { + id: "newer".to_string(), + msg: EventMsg::SkillsUpdateAvailable, + }; - app.enqueue_thread_event(thread_id, event.clone()).await?; + app.enqueue_thread_event(thread_id, older).await?; time::timeout( Duration::from_millis(50), - app.enqueue_thread_event(thread_id, event), + app.enqueue_thread_event(thread_id, newer), ) .await .expect("enqueue_thread_event blocked on a full channel")?; - let mut rx = app - .thread_event_channels - .get_mut(&thread_id) - .expect("missing thread channel") - .receiver - .take() - .expect("missing receiver"); - - time::timeout(Duration::from_millis(50), rx.recv()) - .await - .expect("timed out waiting for first event") - .expect("channel closed unexpectedly"); - time::timeout(Duration::from_millis(50), rx.recv()) - .await - .expect("timed out waiting for second event") - .expect("channel closed unexpectedly"); + let rx = app.active_thread_rx.as_mut().expect("missing receiver"); + let received = rx.try_recv().expect("expected queued event"); + assert_eq!(received.id, "newer"); + assert!(rx.try_recv().is_err(), "expected only one queued event"); Ok(()) }