core: revert unified exec drain latch

This commit is contained in:
dank-openai
2026-05-02 12:28:13 -04:00
parent e8c08110a0
commit df7c3dbe78
2 changed files with 9 additions and 43 deletions

View File

@@ -43,7 +43,7 @@ pub(crate) fn start_streaming_output(
transcript: Arc<Mutex<HeadTailBuffer>>,
) {
let mut receiver = process.output_receiver();
let output_drained = process.output_drained_signal();
let output_drained = process.output_drained_notify();
let exit_token = process.cancellation_token();
let session_ref = Arc::clone(&context.session);
@@ -70,7 +70,7 @@ pub(crate) fn start_streaming_output(
sleep.as_mut().await;
}
}, if grace_sleep.is_some() => {
output_drained.mark_drained();
output_drained.notify_one();
break;
}
@@ -81,7 +81,7 @@ pub(crate) fn start_streaming_output(
continue;
},
Err(RecvError::Closed) => {
output_drained.mark_drained();
output_drained.notify_one();
break;
}
};
@@ -116,11 +116,11 @@ pub(crate) fn spawn_exit_watcher(
started_at: Instant,
) {
let exit_token = process.cancellation_token();
let output_drained = process.output_drained_signal();
let output_drained = process.output_drained_notify();
tokio::spawn(async move {
exit_token.cancelled().await;
output_drained.wait().await;
output_drained.notified().await;
let duration = Instant::now().saturating_duration_since(started_at);
if let Some(message) = process.failure_message() {

View File

@@ -63,40 +63,6 @@ pub(crate) struct OutputHandles {
pub(crate) cancellation_token: CancellationToken,
}
/// One-shot signal that output streaming has consumed the process tail.
///
/// The state bit makes the signal robust when the streaming task finishes before
/// the exit watcher starts waiting.
#[derive(Clone)]
pub(crate) struct OutputDrainedSignal {
drained: Arc<AtomicBool>,
notify: Arc<Notify>,
}
impl OutputDrainedSignal {
fn new() -> Self {
Self {
drained: Arc::new(AtomicBool::new(false)),
notify: Arc::new(Notify::new()),
}
}
pub(crate) fn mark_drained(&self) {
self.drained.store(true, Ordering::Release);
self.notify.notify_waiters();
}
pub(crate) async fn wait(&self) {
loop {
let notified = self.notify.notified();
if self.drained.load(Ordering::Acquire) {
break;
}
notified.await;
}
}
}
/// Transport-specific process handle used by unified exec.
enum ProcessHandle {
Local(Box<ExecCommandSession>),
@@ -113,7 +79,7 @@ pub(crate) struct UnifiedExecProcess {
output_closed: Arc<AtomicBool>,
output_closed_notify: Arc<Notify>,
cancellation_token: CancellationToken,
output_drained: OutputDrainedSignal,
output_drained: Arc<Notify>,
state_tx: watch::Sender<ProcessState>,
state_rx: watch::Receiver<ProcessState>,
output_task: Option<JoinHandle<()>>,
@@ -142,7 +108,7 @@ impl UnifiedExecProcess {
let output_closed = Arc::new(AtomicBool::new(false));
let output_closed_notify = Arc::new(Notify::new());
let cancellation_token = CancellationToken::new();
let output_drained = OutputDrainedSignal::new();
let output_drained = Arc::new(Notify::new());
let (output_tx, _) = broadcast::channel(64);
let (state_tx, state_rx) = watch::channel(ProcessState::default());
@@ -206,8 +172,8 @@ impl UnifiedExecProcess {
self.cancellation_token.clone()
}
pub(super) fn output_drained_signal(&self) -> OutputDrainedSignal {
self.output_drained.clone()
pub(super) fn output_drained_notify(&self) -> Arc<Notify> {
Arc::clone(&self.output_drained)
}
pub(super) fn has_exited(&self) -> bool {