js_repl cleans up fresh kernels on interrupt

Track when a turn has started a fresh js_repl kernel for the current exec attempt so interrupt cleanup can reset it before any exec is formally submitted.

This closes the remaining startup-window race where an explicit stop could land after the kernel was created but before current exec state was visible, leaving a live Node kernel behind.

Add regression coverage for interrupting that pending-kernel-start state.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Aaron Levine
2026-03-03 11:27:40 -08:00
parent dec8436c11
commit 5da4ca6a4f

View File

@@ -117,6 +117,7 @@ struct KernelState {
stdin: Arc<Mutex<ChildStdin>>,
pending_execs: Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<ExecResultMessage>>>>,
exec_contexts: Arc<Mutex<HashMap<String, ExecContext>>>,
pending_owner_turn: Option<String>,
shutdown: CancellationToken,
}
@@ -494,6 +495,23 @@ impl JsReplManager {
self.current_exec.lock().await.take();
}
async fn clear_pending_kernel_owner(&self, turn_id: &str) {
let mut kernel = self.kernel.lock().await;
if let Some(state) = kernel.as_mut()
&& state.pending_owner_turn.as_deref() == Some(turn_id)
{
state.pending_owner_turn = None;
}
}
async fn turn_owns_pending_kernel(&self, turn_id: &str) -> bool {
self.kernel
.lock()
.await
.as_ref()
.is_some_and(|state| state.pending_owner_turn.as_deref() == Some(turn_id))
}
async fn turn_has_submitted_exec(&self, turn_id: &str) -> bool {
self.current_exec
.lock()
@@ -715,7 +733,9 @@ impl JsReplManager {
let _permit = self.exec_lock.clone().acquire_owned().await.map_err(|_| {
FunctionCallError::RespondToModel("js_repl execution unavailable".to_string())
})?;
if !self.turn_has_submitted_exec(turn_id).await {
if !self.turn_has_submitted_exec(turn_id).await
&& !self.turn_owns_pending_kernel(turn_id).await
{
return Ok(false);
}
self.reset_kernel().await;
@@ -752,10 +772,11 @@ impl JsReplManager {
let state = self
.start_kernel(Arc::clone(&turn), Some(session.conversation_id))
.await;
let state = match state {
let mut state = match state {
Ok(state) => state,
Err(err) => return Err(FunctionCallError::RespondToModel(err)),
};
state.pending_owner_turn = Some(turn.sub_id.clone());
*kernel = Some(state);
}
@@ -801,6 +822,7 @@ impl JsReplManager {
timeout_ms: args.timeout_ms,
};
self.clear_pending_kernel_owner(&turn.sub_id).await;
let write_result = {
let mut current_exec = self.current_exec.lock().await;
// Treat the exec as submitted before the async pipe writes begin: once we start
@@ -1050,6 +1072,7 @@ impl JsReplManager {
stdin: stdin_arc,
pending_execs,
exec_contexts,
pending_owner_turn: None,
shutdown,
})
}
@@ -2094,6 +2117,46 @@ mod tests {
assert!(manager.exec_tool_calls.lock().await.is_empty());
}
#[tokio::test]
async fn interrupt_turn_exec_resets_matching_pending_kernel_start() -> anyhow::Result<()> {
if !can_run_js_repl_runtime_tests().await {
return Ok(());
}
let manager = JsReplManager::new(None, Vec::new())
.await
.expect("manager should initialize");
let (_session, turn) = make_session_and_context().await;
let turn = Arc::new(turn);
let mut state = manager
.start_kernel(Arc::clone(&turn), None)
.await
.map_err(anyhow::Error::msg)?;
state.pending_owner_turn = Some(turn.sub_id.clone());
let child = Arc::clone(&state.child);
*manager.kernel.lock().await = Some(state);
assert!(manager.interrupt_turn_exec(&turn.sub_id).await?);
assert!(manager.kernel.lock().await.is_none());
tokio::time::timeout(Duration::from_secs(3), async {
loop {
let exited = {
let mut child = child.lock().await;
child.try_wait()?.is_some()
};
if exited {
return Ok::<(), anyhow::Error>(());
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await
.expect("kernel should exit after interrupt cleanup")?;
Ok(())
}
#[test]
fn summarize_tool_call_error_marks_error_payload() {
let actual = JsReplManager::summarize_tool_call_error("tool failed");