mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
fix freeze 3
This commit is contained in:
@@ -717,25 +717,42 @@ impl App {
|
||||
|
||||
if should_send {
|
||||
// Never await a bounded channel send on the main TUI loop: if the receiver falls behind,
|
||||
// `send().await` can block and the UI stops drawing. If the channel is full, wait in a
|
||||
// spawned task instead.
|
||||
match sender.try_send(event) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(event)) => {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = sender.send(event).await {
|
||||
tracing::warn!("thread {thread_id} event channel closed: {err}");
|
||||
// `send().await` can block and the UI stops drawing. Instead, drop the oldest queued
|
||||
// event (from the active thread) until we can enqueue the newest.
|
||||
let mut event = event;
|
||||
loop {
|
||||
match sender.try_send(event) {
|
||||
Ok(()) => break,
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
tracing::warn!("thread {thread_id} event channel closed");
|
||||
break;
|
||||
}
|
||||
Err(TrySendError::Full(unsent)) => {
|
||||
if !self.drop_oldest_active_thread_event(thread_id) {
|
||||
tracing::debug!("dropping event for thread {thread_id}: channel full");
|
||||
break;
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
tracing::warn!("thread {thread_id} event channel closed");
|
||||
event = unsent;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn drop_oldest_active_thread_event(&mut self, thread_id: ThreadId) -> bool {
|
||||
if self.active_thread_id != Some(thread_id) {
|
||||
return false;
|
||||
}
|
||||
let Some(rx) = self.active_thread_rx.as_mut() else {
|
||||
return false;
|
||||
};
|
||||
match rx.try_recv() {
|
||||
Ok(_) => true,
|
||||
Err(TryRecvError::Empty | TryRecvError::Disconnected) => false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn enqueue_primary_event(&mut self, event: Event) -> Result<()> {
|
||||
if let Some(thread_id) = self.primary_thread_id {
|
||||
return self.enqueue_thread_event(thread_id, event).await;
|
||||
@@ -2443,37 +2460,29 @@ mod tests {
|
||||
let thread_id = ThreadId::new();
|
||||
app.thread_event_channels
|
||||
.insert(thread_id, ThreadEventChannel::new(1));
|
||||
app.set_thread_active(thread_id, true).await;
|
||||
app.activate_thread_channel(thread_id).await;
|
||||
|
||||
let event = Event {
|
||||
id: String::new(),
|
||||
let older = Event {
|
||||
id: "older".to_string(),
|
||||
msg: EventMsg::ShutdownComplete,
|
||||
};
|
||||
let newer = Event {
|
||||
id: "newer".to_string(),
|
||||
msg: EventMsg::SkillsUpdateAvailable,
|
||||
};
|
||||
|
||||
app.enqueue_thread_event(thread_id, event.clone()).await?;
|
||||
app.enqueue_thread_event(thread_id, older).await?;
|
||||
time::timeout(
|
||||
Duration::from_millis(50),
|
||||
app.enqueue_thread_event(thread_id, event),
|
||||
app.enqueue_thread_event(thread_id, newer),
|
||||
)
|
||||
.await
|
||||
.expect("enqueue_thread_event blocked on a full channel")?;
|
||||
|
||||
let mut rx = app
|
||||
.thread_event_channels
|
||||
.get_mut(&thread_id)
|
||||
.expect("missing thread channel")
|
||||
.receiver
|
||||
.take()
|
||||
.expect("missing receiver");
|
||||
|
||||
time::timeout(Duration::from_millis(50), rx.recv())
|
||||
.await
|
||||
.expect("timed out waiting for first event")
|
||||
.expect("channel closed unexpectedly");
|
||||
time::timeout(Duration::from_millis(50), rx.recv())
|
||||
.await
|
||||
.expect("timed out waiting for second event")
|
||||
.expect("channel closed unexpectedly");
|
||||
let rx = app.active_thread_rx.as_mut().expect("missing receiver");
|
||||
let received = rx.try_recv().expect("expected queued event");
|
||||
assert_eq!(received.id, "newer");
|
||||
assert!(rx.try_recv().is_err(), "expected only one queued event");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user