mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
fix: try to fix freezes 2 (#9951)
Fixes a TUI freeze caused by awaiting `mpsc::Sender::send()` that blocks the tokio thread, stopping the consumption runtime and creating a deadlock. This could happen if the server was producing enough chunks to fill the `mpsc` fast enough. To solve this we try on insert using a `try_send()` (not requiring an `await`) and delegate to a tokio task if this does not work This is a temporary solution as it can contain races for delta elements and a stronger design should come here
This commit is contained in:
@@ -93,6 +93,7 @@ use tokio::sync::Mutex;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
use toml::Value as TomlValue;
|
||||
|
||||
@@ -714,8 +715,23 @@ impl App {
|
||||
guard.active
|
||||
};
|
||||
|
||||
if should_send && let Err(err) = sender.send(event).await {
|
||||
tracing::warn!("thread {thread_id} event channel closed: {err}");
|
||||
if should_send {
|
||||
// Never await a bounded channel send on the main TUI loop: if the receiver falls behind,
|
||||
// `send().await` can block and the UI stops drawing. If the channel is full, wait in a
|
||||
// spawned task instead.
|
||||
match sender.try_send(event) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(event)) => {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = sender.send(event).await {
|
||||
tracing::warn!("thread {thread_id} event channel closed: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
tracing::warn!("thread {thread_id} event channel closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -2400,6 +2416,7 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use tempfile::tempdir;
|
||||
use tokio::time;
|
||||
|
||||
#[test]
|
||||
fn normalize_harness_overrides_resolves_relative_add_dirs() -> Result<()> {
|
||||
@@ -2420,6 +2437,47 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enqueue_thread_event_does_not_block_when_channel_full() -> Result<()> {
|
||||
let mut app = make_test_app().await;
|
||||
let thread_id = ThreadId::new();
|
||||
app.thread_event_channels
|
||||
.insert(thread_id, ThreadEventChannel::new(1));
|
||||
app.set_thread_active(thread_id, true).await;
|
||||
|
||||
let event = Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::ShutdownComplete,
|
||||
};
|
||||
|
||||
app.enqueue_thread_event(thread_id, event.clone()).await?;
|
||||
time::timeout(
|
||||
Duration::from_millis(50),
|
||||
app.enqueue_thread_event(thread_id, event),
|
||||
)
|
||||
.await
|
||||
.expect("enqueue_thread_event blocked on a full channel")?;
|
||||
|
||||
let mut rx = app
|
||||
.thread_event_channels
|
||||
.get_mut(&thread_id)
|
||||
.expect("missing thread channel")
|
||||
.receiver
|
||||
.take()
|
||||
.expect("missing receiver");
|
||||
|
||||
time::timeout(Duration::from_millis(50), rx.recv())
|
||||
.await
|
||||
.expect("timed out waiting for first event")
|
||||
.expect("channel closed unexpectedly");
|
||||
time::timeout(Duration::from_millis(50), rx.recv())
|
||||
.await
|
||||
.expect("timed out waiting for second event")
|
||||
.expect("channel closed unexpectedly");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn make_test_app() -> App {
|
||||
let (chat_widget, app_event_tx, _rx, _op_rx) = make_chatwidget_manual_with_sender().await;
|
||||
let config = chat_widget.config_ref().clone();
|
||||
|
||||
Reference in New Issue
Block a user