[js_repl] Hard-stop active js_repl execs on explicit user interrupts (#13329)

## Summary
- hard-stop `js_repl` only for `TurnAbortReason::Interrupted`,
preserving the persistent REPL across replaced turns
- track the current top-level exec by turn and only reset when the
interrupted turn owns submitted work or a freshly started kernel for the
current exec attempt
- close both interrupt races: the write-window race by marking the exec
as submitted before async pipe writes begin, and the startup-window race
by tracking fresh-kernel ownership until submission
- add regression coverage for interrupted in-flight execs and the
pending-kernel-start window

## Why
Stopping a turn previously surfaced `aborted by user after Xs` even
though the underlying `js_repl` kernel could continue executing. Earlier
fixes also risked resetting the session-scoped REPL too broadly or
missing already-dispatched work. This change keeps cleanup scoped to
explicit stop semantics and makes the interrupt path line up with both
submitted execs and newly started kernels.

## Testing
- `just fmt`
- `cargo test -p codex-core`
- `just fix -p codex-core`

`cargo test -p codex-core` passes the updated `js_repl` coverage,
including the new startup-window regression test, but still has
unrelated integration failures in this environment outside `js_repl`.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
aaronl-openai
2026-03-12 17:51:56 -07:00
committed by GitHub
parent 793bf32585
commit d9a403a8c0
3 changed files with 431 additions and 9 deletions

View File

@@ -227,9 +227,6 @@ impl Session {
// in-flight approval wait can surface as a model-visible rejection before TurnAborted.
active_turn.clear_pending().await;
}
if reason == TurnAbortReason::Interrupted {
self.close_unified_exec_processes().await;
}
}
pub async fn on_task_finished(
@@ -396,6 +393,16 @@ impl Session {
.await;
}
pub(crate) async fn cleanup_after_interrupt(&self, turn_context: &Arc<TurnContext>) {
self.close_unified_exec_processes().await;
if let Some(manager) = turn_context.js_repl.manager_if_initialized()
&& let Err(err) = manager.interrupt_turn_exec(&turn_context.sub_id).await
{
warn!("failed to interrupt js_repl kernel: {err}");
}
}
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
let sub_id = task.turn_context.sub_id.clone();
if task.cancellation_token.is_cancelled() {
@@ -425,6 +432,8 @@ impl Session {
.await;
if reason == TurnAbortReason::Interrupted {
self.cleanup_after_interrupt(&task.turn_context).await;
let marker = ResponseItem::Message {
id: None,
role: "user".to_string(),

View File

@@ -93,6 +93,10 @@ impl JsReplHandle {
.await
.cloned()
}
pub(crate) fn manager_if_initialized(&self) -> Option<Arc<JsReplManager>> {
self.cell.get().cloned()
}
}
#[derive(Clone, Debug, Deserialize)]
@@ -115,6 +119,7 @@ struct KernelState {
stdin: Arc<Mutex<ChildStdin>>,
pending_execs: Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<ExecResultMessage>>>>,
exec_contexts: Arc<Mutex<HashMap<String, ExecContext>>>,
top_level_exec_state: TopLevelExecState,
shutdown: CancellationToken,
}
@@ -125,6 +130,54 @@ struct ExecContext {
tracker: SharedTurnDiffTracker,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
enum TopLevelExecState {
#[default]
Idle,
FreshKernel {
turn_id: String,
exec_id: Option<String>,
},
ReusedKernelPending {
turn_id: String,
exec_id: String,
},
Submitted {
turn_id: String,
exec_id: String,
},
}
impl TopLevelExecState {
fn registered_exec_id(&self) -> Option<&str> {
match self {
Self::Idle => None,
Self::FreshKernel {
exec_id: Some(exec_id),
..
}
| Self::ReusedKernelPending { exec_id, .. }
| Self::Submitted { exec_id, .. } => Some(exec_id.as_str()),
Self::FreshKernel { exec_id: None, .. } => None,
}
}
fn should_reset_for_interrupt(&self, turn_id: &str) -> bool {
match self {
Self::Idle => false,
Self::FreshKernel {
turn_id: active_turn_id,
..
}
| Self::Submitted {
turn_id: active_turn_id,
..
} => active_turn_id == turn_id,
Self::ReusedKernelPending { .. } => false,
}
}
}
#[derive(Default)]
struct ExecToolCalls {
in_flight: usize,
@@ -451,6 +504,94 @@ impl JsReplManager {
}
}
async fn register_top_level_exec(&self, exec_id: String, turn_id: String) {
let mut kernel = self.kernel.lock().await;
let Some(state) = kernel.as_mut() else {
return;
};
state.top_level_exec_state = match &state.top_level_exec_state {
TopLevelExecState::FreshKernel {
turn_id: active_turn_id,
..
} if active_turn_id == &turn_id => TopLevelExecState::FreshKernel {
turn_id,
exec_id: Some(exec_id),
},
TopLevelExecState::Idle
| TopLevelExecState::ReusedKernelPending { .. }
| TopLevelExecState::Submitted { .. }
| TopLevelExecState::FreshKernel { .. } => {
TopLevelExecState::ReusedKernelPending { turn_id, exec_id }
}
};
}
async fn mark_top_level_exec_submitted(&self, exec_id: &str) {
let mut kernel = self.kernel.lock().await;
let Some(state) = kernel.as_mut() else {
return;
};
let next_state = match &state.top_level_exec_state {
TopLevelExecState::FreshKernel {
turn_id,
exec_id: Some(active_exec_id),
}
| TopLevelExecState::ReusedKernelPending {
turn_id,
exec_id: active_exec_id,
} if active_exec_id == exec_id => Some(TopLevelExecState::Submitted {
turn_id: turn_id.clone(),
exec_id: active_exec_id.clone(),
}),
TopLevelExecState::Idle
| TopLevelExecState::FreshKernel { .. }
| TopLevelExecState::ReusedKernelPending { .. }
| TopLevelExecState::Submitted { .. } => None,
};
if let Some(next_state) = next_state {
state.top_level_exec_state = next_state;
}
}
async fn clear_top_level_exec_if_matches(&self, exec_id: &str) {
Self::clear_top_level_exec_if_matches_map(&self.kernel, exec_id).await;
}
async fn clear_top_level_exec_if_matches_map(
kernel: &Arc<Mutex<Option<KernelState>>>,
exec_id: &str,
) {
let mut kernel = kernel.lock().await;
if let Some(state) = kernel.as_mut()
&& state.top_level_exec_state.registered_exec_id() == Some(exec_id)
{
state.top_level_exec_state = TopLevelExecState::Idle;
}
}
async fn clear_top_level_exec_if_matches_any_map(
kernel: &Arc<Mutex<Option<KernelState>>>,
exec_ids: &[String],
) {
let mut kernel = kernel.lock().await;
if let Some(state) = kernel.as_mut()
&& state
.top_level_exec_state
.registered_exec_id()
.is_some_and(|exec_id| exec_ids.iter().any(|pending_id| pending_id == exec_id))
{
state.top_level_exec_state = TopLevelExecState::Idle;
}
}
async fn turn_interrupt_requires_reset(&self, turn_id: &str) -> bool {
self.kernel.lock().await.as_ref().is_some_and(|state| {
state
.top_level_exec_state
.should_reset_for_interrupt(turn_id)
})
}
fn log_tool_call_response(
req: &RunToolRequest,
ok: bool,
@@ -663,6 +804,18 @@ impl JsReplManager {
Ok(())
}
pub async fn interrupt_turn_exec(&self, turn_id: &str) -> Result<bool, FunctionCallError> {
let _permit = self.exec_lock.clone().acquire_owned().await.map_err(|_| {
FunctionCallError::RespondToModel("js_repl execution unavailable".to_string())
})?;
if !self.turn_interrupt_requires_reset(turn_id).await {
return Ok(false);
}
self.reset_kernel().await;
Self::clear_all_exec_tool_calls_map(&self.exec_tool_calls).await;
Ok(true)
}
async fn reset_kernel(&self) {
let state = {
let mut guard = self.kernel.lock().await;
@@ -689,7 +842,7 @@ impl JsReplManager {
let mut kernel = self.kernel.lock().await;
if kernel.is_none() {
let dependency_env = session.dependency_env().await;
let state = self
let mut state = self
.start_kernel(
Arc::clone(&turn),
&dependency_env,
@@ -697,6 +850,10 @@ impl JsReplManager {
)
.await
.map_err(FunctionCallError::RespondToModel)?;
state.top_level_exec_state = TopLevelExecState::FreshKernel {
turn_id: turn.sub_id.clone(),
exec_id: None,
};
*kernel = Some(state);
}
@@ -732,6 +889,8 @@ impl JsReplManager {
);
(req_id, rx)
};
self.register_top_level_exec(req_id.clone(), turn.sub_id.clone())
.await;
self.register_exec_tool_calls(&req_id).await;
let payload = HostToKernel::Exec {
@@ -740,8 +899,25 @@ impl JsReplManager {
timeout_ms: args.timeout_ms,
};
if let Err(err) = Self::write_message(&stdin, &payload).await {
pending_execs.lock().await.remove(&req_id);
let write_result = {
// Treat the exec as submitted before the async pipe writes begin: once we start
// awaiting `write_all`, the kernel may already observe runnable JS even if the turn is
// aborted before control returns here.
self.mark_top_level_exec_submitted(&req_id).await;
let write_result = Self::write_message(&stdin, &payload).await;
match write_result {
Ok(()) => Ok(()),
Err(err) => {
self.clear_top_level_exec_if_matches(&req_id).await;
Err(err)
}
}
};
if let Err(err) = write_result {
if pending_execs.lock().await.remove(&req_id).is_some() {
self.clear_top_level_exec_if_matches(&req_id).await;
}
exec_contexts.lock().await.remove(&req_id);
self.clear_exec_tool_calls(&req_id).await;
let snapshot = Self::kernel_debug_snapshot(&child, &recent_stderr).await;
@@ -773,7 +949,11 @@ impl JsReplManager {
Ok(Ok(msg)) => msg,
Ok(Err(_)) => {
let mut pending = pending_execs.lock().await;
pending.remove(&req_id);
let removed = pending.remove(&req_id).is_some();
drop(pending);
if removed {
self.clear_top_level_exec_if_matches(&req_id).await;
}
exec_contexts.lock().await.remove(&req_id);
self.wait_for_exec_tool_calls(&req_id).await;
self.clear_exec_tool_calls(&req_id).await;
@@ -794,6 +974,7 @@ impl JsReplManager {
self.reset_kernel().await;
self.wait_for_exec_tool_calls(&req_id).await;
self.exec_tool_calls.lock().await.clear();
self.clear_top_level_exec_if_matches(&req_id).await;
return Err(FunctionCallError::RespondToModel(
"js_repl execution timed out; kernel reset, rerun your request".to_string(),
));
@@ -961,6 +1142,7 @@ impl JsReplManager {
stdin: stdin_arc,
pending_execs,
exec_contexts,
top_level_exec_state: TopLevelExecState::Idle,
shutdown,
})
}
@@ -1123,8 +1305,12 @@ impl JsReplManager {
.map(|state| state.content_items.clone())
.unwrap_or_default()
};
let mut pending = pending_execs.lock().await;
if let Some(tx) = pending.remove(&id) {
let tx = {
let mut pending = pending_execs.lock().await;
pending.remove(&id)
};
if let Some(tx) = tx {
Self::clear_top_level_exec_if_matches_map(&manager_kernel, &id).await;
let payload = if ok {
ExecResultMessage::Ok {
content_items: build_exec_result_content_items(
@@ -1316,6 +1502,9 @@ impl JsReplManager {
});
}
drop(pending);
if !pending_exec_ids.is_empty() {
Self::clear_top_level_exec_if_matches_any_map(&manager_kernel, &pending_exec_ids).await;
}
if !matches!(end_reason, KernelStreamEnd::Shutdown) {
let mut pending_exec_ids = pending_exec_ids;

View File

@@ -612,6 +612,230 @@ async fn js_repl_timeout_kills_kernel_process() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test]
async fn interrupt_turn_exec_clears_matching_submitted_exec() -> anyhow::Result<()> {
if !can_run_js_repl_runtime_tests().await {
return Ok(());
}
let manager = JsReplManager::new(None, Vec::new())
.await
.expect("manager should initialize");
let (_session, turn) = make_session_and_context().await;
let turn = Arc::new(turn);
let dependency_env = HashMap::new();
let mut state = manager
.start_kernel(Arc::clone(&turn), &dependency_env, None)
.await
.map_err(anyhow::Error::msg)?;
let child = Arc::clone(&state.child);
state.top_level_exec_state = TopLevelExecState::Submitted {
turn_id: turn.sub_id.clone(),
exec_id: "exec-1".to_string(),
};
*manager.kernel.lock().await = Some(state);
manager.register_exec_tool_calls("exec-1").await;
assert!(manager.interrupt_turn_exec(&turn.sub_id).await?);
assert!(manager.kernel.lock().await.is_none());
assert!(manager.exec_tool_calls.lock().await.is_empty());
tokio::time::timeout(Duration::from_secs(3), async {
loop {
let exited = {
let mut child = child.lock().await;
child.try_wait()?.is_some()
};
if exited {
return Ok::<(), anyhow::Error>(());
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await
.expect("kernel should exit after interrupt cleanup")?;
Ok(())
}
#[tokio::test]
async fn interrupt_turn_exec_resets_matching_pending_kernel_start() -> anyhow::Result<()> {
if !can_run_js_repl_runtime_tests().await {
return Ok(());
}
let manager = JsReplManager::new(None, Vec::new())
.await
.expect("manager should initialize");
let (_session, turn) = make_session_and_context().await;
let turn = Arc::new(turn);
let dependency_env = HashMap::new();
let mut state = manager
.start_kernel(Arc::clone(&turn), &dependency_env, None)
.await
.map_err(anyhow::Error::msg)?;
state.top_level_exec_state = TopLevelExecState::FreshKernel {
turn_id: turn.sub_id.clone(),
exec_id: None,
};
let child = Arc::clone(&state.child);
*manager.kernel.lock().await = Some(state);
assert!(manager.interrupt_turn_exec(&turn.sub_id).await?);
assert!(manager.kernel.lock().await.is_none());
tokio::time::timeout(Duration::from_secs(3), async {
loop {
let exited = {
let mut child = child.lock().await;
child.try_wait()?.is_some()
};
if exited {
return Ok::<(), anyhow::Error>(());
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await
.expect("kernel should exit after interrupt cleanup")?;
Ok(())
}
#[tokio::test]
async fn interrupt_turn_exec_does_not_reset_reused_kernel_before_submit() -> anyhow::Result<()> {
if !can_run_js_repl_runtime_tests().await {
return Ok(());
}
let manager = JsReplManager::new(None, Vec::new())
.await
.expect("manager should initialize");
let (_session, turn) = make_session_and_context().await;
let turn = Arc::new(turn);
let dependency_env = HashMap::new();
let mut state = manager
.start_kernel(Arc::clone(&turn), &dependency_env, None)
.await
.map_err(anyhow::Error::msg)?;
state.top_level_exec_state = TopLevelExecState::ReusedKernelPending {
turn_id: turn.sub_id.clone(),
exec_id: "exec-1".to_string(),
};
*manager.kernel.lock().await = Some(state);
assert!(!manager.interrupt_turn_exec(&turn.sub_id).await?);
assert!(manager.kernel.lock().await.is_some());
manager.reset().await.map_err(anyhow::Error::msg)
}
#[tokio::test]
async fn interrupt_active_exec_stops_aborted_kernel_before_later_exec() -> anyhow::Result<()> {
if !can_run_js_repl_runtime_tests().await {
return Ok(());
}
let dir = tempdir()?;
let (session, mut turn) = make_session_and_context().await;
turn.cwd = dir.path().to_path_buf();
set_danger_full_access(&mut turn);
let session = Arc::new(session);
let turn = Arc::new(turn);
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default()));
let manager = turn.js_repl.manager().await?;
let first_file = dir.path().join("1.txt");
let second_file = dir.path().join("2.txt");
let first_file_js = serde_json::to_string(&first_file.to_string_lossy().to_string())?;
let second_file_js = serde_json::to_string(&second_file.to_string_lossy().to_string())?;
let code = format!(
r#"
const {{ promises: fs }} = await import("fs");
const paths = [{first_file_js}, {second_file_js}];
for (let i = 0; i < paths.length; i++) {{
await fs.writeFile(paths[i], `${{i + 1}}`);
if (i + 1 < paths.length) {{
await new Promise((resolve) => setTimeout(resolve, 1000));
}}
}}
"#
);
let handle = tokio::spawn({
let manager = Arc::clone(&manager);
let session = Arc::clone(&session);
let turn = Arc::clone(&turn);
let tracker = Arc::clone(&tracker);
async move {
manager
.execute(
session,
turn,
tracker,
JsReplArgs {
code,
timeout_ms: Some(15_000),
},
)
.await
}
});
tokio::time::timeout(Duration::from_secs(3), async {
while !first_file.exists() {
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await
.expect("first file should be written before interrupt");
let child = {
let guard = manager.kernel.lock().await;
let state = guard
.as_ref()
.expect("kernel should exist while exec is running");
Arc::clone(&state.child)
};
handle.abort();
assert!(manager.interrupt_turn_exec(&turn.sub_id).await?);
tokio::time::timeout(Duration::from_secs(3), async {
loop {
let exited = {
let mut child = child.lock().await;
child.try_wait()?.is_some()
};
if exited {
return Ok::<(), anyhow::Error>(());
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await
.expect("kernel should exit after interrupt")?;
tokio::time::sleep(Duration::from_millis(1500)).await;
assert!(first_file.exists());
assert!(!second_file.exists());
let result = manager
.execute(
session,
turn,
tracker,
JsReplArgs {
code: "console.log('after interrupt');".to_string(),
timeout_ms: Some(10_000),
},
)
.await?;
assert!(result.output.contains("after interrupt"));
Ok(())
}
#[tokio::test]
async fn js_repl_forced_kernel_exit_recovers_on_next_exec() -> anyhow::Result<()> {
if !can_run_js_repl_runtime_tests().await {