Compare commits

...

3 Commits

Author SHA1 Message Date
Eric Traut
46ae34f428 Address code review feedback 2025-12-13 16:34:49 -08:00
Eric Traut
ffd0075e4a Fixed lint 2025-12-13 15:53:56 -08:00
Eric Traut
ae68a6ce2d Terminate process group on interruption to avoid dangling processes 2025-12-13 15:15:21 -08:00
3 changed files with 71 additions and 8 deletions

View File

@@ -556,23 +556,52 @@ async fn exec(
consume_truncated_output(child, expiration, stdout_stream).await
}
struct GroupTerminator {
child: Child,
kill_on_drop: bool,
}
impl GroupTerminator {
fn new(child: Child) -> Self {
Self {
child,
kill_on_drop: true,
}
}
fn disarm(&mut self) {
self.kill_on_drop = false;
}
}
impl Drop for GroupTerminator {
fn drop(&mut self) {
if self.kill_on_drop {
let _ = kill_child_process_group(&mut self.child);
let _ = self.child.start_kill();
}
}
}
/// Consumes the output of a child process, truncating it so it is suitable for
/// use as the output of a `shell` tool call. Also enforces specified timeout.
/// If the future is dropped early (e.g., on interrupt), the process group is killed.
async fn consume_truncated_output(
mut child: Child,
child: Child,
expiration: ExecExpiration,
stdout_stream: Option<StdoutStream>,
) -> Result<RawExecToolCallOutput> {
let mut terminator = GroupTerminator::new(child);
// Both stdout and stderr were configured with `Stdio::piped()`
// above, therefore `take()` should normally return `Some`. If it doesn't
// we treat it as an exceptional I/O error
let stdout_reader = child.stdout.take().ok_or_else(|| {
let stdout_reader = terminator.child.stdout.take().ok_or_else(|| {
CodexErr::Io(io::Error::other(
"stdout pipe was unexpectedly not available",
))
})?;
let stderr_reader = child.stderr.take().ok_or_else(|| {
let stderr_reader = terminator.child.stderr.take().ok_or_else(|| {
CodexErr::Io(io::Error::other(
"stderr pipe was unexpectedly not available",
))
@@ -594,18 +623,21 @@ async fn consume_truncated_output(
));
let (exit_status, timed_out) = tokio::select! {
status_result = child.wait() => {
status_result = terminator.child.wait() => {
let exit_status = status_result?;
terminator.disarm();
(exit_status, false)
}
_ = expiration.wait() => {
kill_child_process_group(&mut child)?;
child.start_kill()?;
kill_child_process_group(&mut terminator.child)?;
terminator.child.start_kill()?;
terminator.disarm();
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true)
}
_ = tokio::signal::ctrl_c() => {
kill_child_process_group(&mut child)?;
child.start_kill()?;
kill_child_process_group(&mut terminator.child)?;
terminator.child.start_kill()?;
terminator.disarm();
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE), false)
}
};

View File

@@ -199,11 +199,19 @@ impl Session {
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
let sub_id = task.turn_context.sub_id.clone();
if task.cancellation_token.is_cancelled() {
self.services
.unified_exec_manager
.terminate_sessions_for_turn(&sub_id)
.await;
return;
}
trace!(task_kind = ?task.kind, sub_id, "aborting running task");
task.cancellation_token.cancel();
self.services
.unified_exec_manager
.terminate_sessions_for_turn(&sub_id)
.await;
let session_task = task.task;
select! {

View File

@@ -643,6 +643,29 @@ impl UnifiedExecSessionManager {
entry.session.terminate();
}
}
pub(crate) async fn terminate_sessions_for_turn(&self, sub_id: &str) {
let entries: Vec<SessionEntry> = {
let mut sessions = self.session_store.lock().await;
let mut entries = Vec::new();
let mut removed_ids = Vec::new();
for (process_id, entry) in sessions
.sessions
.extract_if(|_, entry| entry.turn_ref.sub_id == sub_id)
{
removed_ids.push(process_id);
entries.push(entry);
}
for process_id in removed_ids {
sessions.reserved_sessions_id.remove(&process_id);
}
entries
};
for entry in entries {
entry.session.terminate();
}
}
}
enum SessionStatus {