diff --git a/codex-rs/core/src/unified_exec/async_watcher.rs b/codex-rs/core/src/unified_exec/async_watcher.rs index cea65426a8..8d8efa796c 100644 --- a/codex-rs/core/src/unified_exec/async_watcher.rs +++ b/codex-rs/core/src/unified_exec/async_watcher.rs @@ -43,7 +43,7 @@ pub(crate) fn start_streaming_output( transcript: Arc>, ) { let mut receiver = process.output_receiver(); - let output_drained = process.output_drained_signal(); + let output_drained = process.output_drained_notify(); let exit_token = process.cancellation_token(); let session_ref = Arc::clone(&context.session); @@ -70,7 +70,7 @@ pub(crate) fn start_streaming_output( sleep.as_mut().await; } }, if grace_sleep.is_some() => { - output_drained.mark_drained(); + output_drained.notify_one(); break; } @@ -81,7 +81,7 @@ pub(crate) fn start_streaming_output( continue; }, Err(RecvError::Closed) => { - output_drained.mark_drained(); + output_drained.notify_one(); break; } }; @@ -116,11 +116,11 @@ pub(crate) fn spawn_exit_watcher( started_at: Instant, ) { let exit_token = process.cancellation_token(); - let output_drained = process.output_drained_signal(); + let output_drained = process.output_drained_notify(); tokio::spawn(async move { exit_token.cancelled().await; - output_drained.wait().await; + output_drained.notified().await; let duration = Instant::now().saturating_duration_since(started_at); if let Some(message) = process.failure_message() { diff --git a/codex-rs/core/src/unified_exec/process.rs b/codex-rs/core/src/unified_exec/process.rs index cc10993923..9671429d01 100644 --- a/codex-rs/core/src/unified_exec/process.rs +++ b/codex-rs/core/src/unified_exec/process.rs @@ -63,40 +63,6 @@ pub(crate) struct OutputHandles { pub(crate) cancellation_token: CancellationToken, } -/// One-shot signal that output streaming has consumed the process tail. -/// -/// The state bit makes the signal robust when the streaming task finishes before -/// the exit watcher starts waiting. -#[derive(Clone)] -pub(crate) struct OutputDrainedSignal { - drained: Arc, - notify: Arc, -} - -impl OutputDrainedSignal { - fn new() -> Self { - Self { - drained: Arc::new(AtomicBool::new(false)), - notify: Arc::new(Notify::new()), - } - } - - pub(crate) fn mark_drained(&self) { - self.drained.store(true, Ordering::Release); - self.notify.notify_waiters(); - } - - pub(crate) async fn wait(&self) { - loop { - let notified = self.notify.notified(); - if self.drained.load(Ordering::Acquire) { - break; - } - notified.await; - } - } -} - /// Transport-specific process handle used by unified exec. enum ProcessHandle { Local(Box), @@ -113,7 +79,7 @@ pub(crate) struct UnifiedExecProcess { output_closed: Arc, output_closed_notify: Arc, cancellation_token: CancellationToken, - output_drained: OutputDrainedSignal, + output_drained: Arc, state_tx: watch::Sender, state_rx: watch::Receiver, output_task: Option>, @@ -142,7 +108,7 @@ impl UnifiedExecProcess { let output_closed = Arc::new(AtomicBool::new(false)); let output_closed_notify = Arc::new(Notify::new()); let cancellation_token = CancellationToken::new(); - let output_drained = OutputDrainedSignal::new(); + let output_drained = Arc::new(Notify::new()); let (output_tx, _) = broadcast::channel(64); let (state_tx, state_rx) = watch::channel(ProcessState::default()); @@ -206,8 +172,8 @@ impl UnifiedExecProcess { self.cancellation_token.clone() } - pub(super) fn output_drained_signal(&self) -> OutputDrainedSignal { - self.output_drained.clone() + pub(super) fn output_drained_notify(&self) -> Arc { + Arc::clone(&self.output_drained) } pub(super) fn has_exited(&self) -> bool {