mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
fix: js_repl reset hang by clearing exec tool calls without waiting (#11932)
Remove the waiting loop in `reset` so it no longer blocks on potentially hanging exec tool calls + add `clear_all_exec_tool_calls_map` to drain the map and notify waiters so `reset` completes immediately
This commit is contained in:
@@ -114,6 +114,7 @@ struct ExecContext {
|
||||
struct ExecToolCalls {
|
||||
in_flight: usize,
|
||||
notify: Arc<Notify>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
enum KernelStreamEnd {
|
||||
@@ -300,6 +301,7 @@ impl JsReplManager {
|
||||
|
||||
async fn clear_exec_tool_calls(&self, exec_id: &str) {
|
||||
if let Some(state) = self.exec_tool_calls.lock().await.remove(exec_id) {
|
||||
state.cancel.cancel();
|
||||
state.notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
@@ -320,32 +322,14 @@ impl JsReplManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_all_exec_tool_calls(&self) {
|
||||
loop {
|
||||
let notified = {
|
||||
let calls = self.exec_tool_calls.lock().await;
|
||||
calls
|
||||
.values()
|
||||
.find(|state| state.in_flight > 0)
|
||||
.map(|state| Arc::clone(&state.notify).notified_owned())
|
||||
};
|
||||
match notified {
|
||||
Some(notified) => notified.await,
|
||||
None => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn begin_exec_tool_call(
|
||||
exec_tool_calls: &Arc<Mutex<HashMap<String, ExecToolCalls>>>,
|
||||
exec_id: &str,
|
||||
) -> bool {
|
||||
) -> Option<CancellationToken> {
|
||||
let mut calls = exec_tool_calls.lock().await;
|
||||
let Some(state) = calls.get_mut(exec_id) else {
|
||||
return false;
|
||||
};
|
||||
let state = calls.get_mut(exec_id)?;
|
||||
state.in_flight += 1;
|
||||
true
|
||||
Some(state.cancel.clone())
|
||||
}
|
||||
|
||||
async fn finish_exec_tool_call(
|
||||
@@ -396,14 +380,27 @@ impl JsReplManager {
|
||||
exec_id: &str,
|
||||
) {
|
||||
if let Some(state) = exec_tool_calls.lock().await.remove(exec_id) {
|
||||
state.cancel.cancel();
|
||||
state.notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
async fn clear_all_exec_tool_calls_map(
|
||||
exec_tool_calls: &Arc<Mutex<HashMap<String, ExecToolCalls>>>,
|
||||
) {
|
||||
let states = {
|
||||
let mut calls = exec_tool_calls.lock().await;
|
||||
calls.drain().map(|(_, state)| state).collect::<Vec<_>>()
|
||||
};
|
||||
for state in states {
|
||||
state.cancel.cancel();
|
||||
state.notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn reset(&self) -> Result<(), FunctionCallError> {
|
||||
self.reset_kernel().await;
|
||||
self.wait_for_all_exec_tool_calls().await;
|
||||
self.exec_tool_calls.lock().await.clear();
|
||||
Self::clear_all_exec_tool_calls_map(&self.exec_tool_calls).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -854,7 +851,9 @@ impl JsReplManager {
|
||||
JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, &id).await;
|
||||
}
|
||||
KernelToHost::RunTool(req) => {
|
||||
if !JsReplManager::begin_exec_tool_call(&exec_tool_calls, &req.exec_id).await {
|
||||
let Some(reset_cancel) =
|
||||
JsReplManager::begin_exec_tool_call(&exec_tool_calls, &req.exec_id).await
|
||||
else {
|
||||
let exec_id = req.exec_id.clone();
|
||||
let tool_call_id = req.id.clone();
|
||||
let payload = HostToKernel::RunToolResult(RunToolResult {
|
||||
@@ -877,10 +876,10 @@ impl JsReplManager {
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let stdin_clone = Arc::clone(&stdin);
|
||||
let exec_contexts = Arc::clone(&exec_contexts);
|
||||
let exec_tool_calls = Arc::clone(&exec_tool_calls);
|
||||
let exec_tool_calls_for_task = Arc::clone(&exec_tool_calls);
|
||||
let recent_stderr = Arc::clone(&recent_stderr);
|
||||
tokio::spawn(async move {
|
||||
let exec_id = req.exec_id.clone();
|
||||
@@ -888,15 +887,26 @@ impl JsReplManager {
|
||||
let tool_name = req.tool_name.clone();
|
||||
let context = { exec_contexts.lock().await.get(&exec_id).cloned() };
|
||||
let result = match context {
|
||||
Some(ctx) => JsReplManager::run_tool_request(ctx, req).await,
|
||||
Some(ctx) => {
|
||||
tokio::select! {
|
||||
_ = reset_cancel.cancelled() => RunToolResult {
|
||||
id: tool_call_id.clone(),
|
||||
ok: false,
|
||||
response: None,
|
||||
error: Some("js_repl execution reset".to_string()),
|
||||
},
|
||||
result = JsReplManager::run_tool_request(ctx, req) => result,
|
||||
}
|
||||
}
|
||||
None => RunToolResult {
|
||||
id: req.id.clone(),
|
||||
id: tool_call_id.clone(),
|
||||
ok: false,
|
||||
response: None,
|
||||
error: Some("js_repl exec context not found".to_string()),
|
||||
},
|
||||
};
|
||||
JsReplManager::finish_exec_tool_call(&exec_tool_calls, &exec_id).await;
|
||||
JsReplManager::finish_exec_tool_call(&exec_tool_calls_for_task, &exec_id)
|
||||
.await;
|
||||
let payload = HostToKernel::RunToolResult(result);
|
||||
if let Err(err) = JsReplManager::write_message(&stdin_clone, &payload).await
|
||||
{
|
||||
@@ -1417,7 +1427,11 @@ mod tests {
|
||||
.lock()
|
||||
.await
|
||||
.insert(exec_id.clone(), ExecToolCalls::default());
|
||||
assert!(JsReplManager::begin_exec_tool_call(&exec_tool_calls, &exec_id).await);
|
||||
assert!(
|
||||
JsReplManager::begin_exec_tool_call(&exec_tool_calls, &exec_id)
|
||||
.await
|
||||
.is_some()
|
||||
);
|
||||
|
||||
let wait_map = Arc::clone(&exec_tool_calls);
|
||||
let wait_exec_id = exec_id.clone();
|
||||
@@ -1441,6 +1455,70 @@ mod tests {
|
||||
JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, &exec_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn reset_clears_inflight_exec_tool_calls_without_waiting() {
|
||||
let manager = JsReplManager::new(None, std::env::temp_dir())
|
||||
.await
|
||||
.expect("manager should initialize");
|
||||
let exec_id = Uuid::new_v4().to_string();
|
||||
manager.register_exec_tool_calls(&exec_id).await;
|
||||
assert!(
|
||||
JsReplManager::begin_exec_tool_call(&manager.exec_tool_calls, &exec_id)
|
||||
.await
|
||||
.is_some()
|
||||
);
|
||||
|
||||
let wait_manager = Arc::clone(&manager);
|
||||
let wait_exec_id = exec_id.clone();
|
||||
let waiter = tokio::spawn(async move {
|
||||
wait_manager.wait_for_exec_tool_calls(&wait_exec_id).await;
|
||||
});
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(1), manager.reset())
|
||||
.await
|
||||
.expect("reset should not hang")
|
||||
.expect("reset should succeed");
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(1), waiter)
|
||||
.await
|
||||
.expect("waiter should be released")
|
||||
.expect("wait task should not panic");
|
||||
|
||||
assert!(manager.exec_tool_calls.lock().await.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn reset_aborts_inflight_exec_tool_tasks() {
|
||||
let manager = JsReplManager::new(None, std::env::temp_dir())
|
||||
.await
|
||||
.expect("manager should initialize");
|
||||
let exec_id = Uuid::new_v4().to_string();
|
||||
manager.register_exec_tool_calls(&exec_id).await;
|
||||
let reset_cancel = JsReplManager::begin_exec_tool_call(&manager.exec_tool_calls, &exec_id)
|
||||
.await
|
||||
.expect("exec should be registered");
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = reset_cancel.cancelled() => "cancelled",
|
||||
_ = tokio::time::sleep(Duration::from_secs(60)) => "timed_out",
|
||||
}
|
||||
});
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(1), manager.reset())
|
||||
.await
|
||||
.expect("reset should not hang")
|
||||
.expect("reset should succeed");
|
||||
|
||||
let outcome = tokio::time::timeout(Duration::from_secs(1), task)
|
||||
.await
|
||||
.expect("cancelled task should resolve promptly")
|
||||
.expect("task should not panic");
|
||||
assert_eq!(outcome, "cancelled");
|
||||
}
|
||||
|
||||
async fn can_run_js_repl_runtime_tests() -> bool {
|
||||
if std::env::var_os("CODEX_SANDBOX").is_some() {
|
||||
return false;
|
||||
|
||||
Reference in New Issue
Block a user