mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
Fix js repl timeout reset hang
This commit is contained in:
@@ -114,6 +114,7 @@ struct ExecContext {
|
||||
struct ExecToolCalls {
|
||||
in_flight: usize,
|
||||
notify: Arc<Notify>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
enum KernelStreamEnd {
|
||||
@@ -300,6 +301,7 @@ impl JsReplManager {
|
||||
|
||||
async fn clear_exec_tool_calls(&self, exec_id: &str) {
|
||||
if let Some(state) = self.exec_tool_calls.lock().await.remove(exec_id) {
|
||||
state.cancel.cancel();
|
||||
state.notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
@@ -323,13 +325,13 @@ impl JsReplManager {
|
||||
async fn begin_exec_tool_call(
|
||||
exec_tool_calls: &Arc<Mutex<HashMap<String, ExecToolCalls>>>,
|
||||
exec_id: &str,
|
||||
) -> bool {
|
||||
) -> Option<CancellationToken> {
|
||||
let mut calls = exec_tool_calls.lock().await;
|
||||
let Some(state) = calls.get_mut(exec_id) else {
|
||||
return false;
|
||||
return None;
|
||||
};
|
||||
state.in_flight += 1;
|
||||
true
|
||||
Some(state.cancel.clone())
|
||||
}
|
||||
|
||||
async fn finish_exec_tool_call(
|
||||
@@ -380,6 +382,7 @@ impl JsReplManager {
|
||||
exec_id: &str,
|
||||
) {
|
||||
if let Some(state) = exec_tool_calls.lock().await.remove(exec_id) {
|
||||
state.cancel.cancel();
|
||||
state.notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
@@ -387,15 +390,13 @@ impl JsReplManager {
|
||||
async fn clear_all_exec_tool_calls_map(
|
||||
exec_tool_calls: &Arc<Mutex<HashMap<String, ExecToolCalls>>>,
|
||||
) {
|
||||
let notifiers = {
|
||||
let states = {
|
||||
let mut calls = exec_tool_calls.lock().await;
|
||||
calls
|
||||
.drain()
|
||||
.map(|(_, state)| state.notify)
|
||||
.collect::<Vec<_>>()
|
||||
calls.drain().map(|(_, state)| state).collect::<Vec<_>>()
|
||||
};
|
||||
for notify in notifiers {
|
||||
notify.notify_waiters();
|
||||
for state in states {
|
||||
state.cancel.cancel();
|
||||
state.notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -852,7 +853,9 @@ impl JsReplManager {
|
||||
JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, &id).await;
|
||||
}
|
||||
KernelToHost::RunTool(req) => {
|
||||
if !JsReplManager::begin_exec_tool_call(&exec_tool_calls, &req.exec_id).await {
|
||||
let Some(reset_cancel) =
|
||||
JsReplManager::begin_exec_tool_call(&exec_tool_calls, &req.exec_id).await
|
||||
else {
|
||||
let exec_id = req.exec_id.clone();
|
||||
let tool_call_id = req.id.clone();
|
||||
let payload = HostToKernel::RunToolResult(RunToolResult {
|
||||
@@ -875,10 +878,10 @@ impl JsReplManager {
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let stdin_clone = Arc::clone(&stdin);
|
||||
let exec_contexts = Arc::clone(&exec_contexts);
|
||||
let exec_tool_calls = Arc::clone(&exec_tool_calls);
|
||||
let exec_tool_calls_for_task = Arc::clone(&exec_tool_calls);
|
||||
let recent_stderr = Arc::clone(&recent_stderr);
|
||||
tokio::spawn(async move {
|
||||
let exec_id = req.exec_id.clone();
|
||||
@@ -886,15 +889,26 @@ impl JsReplManager {
|
||||
let tool_name = req.tool_name.clone();
|
||||
let context = { exec_contexts.lock().await.get(&exec_id).cloned() };
|
||||
let result = match context {
|
||||
Some(ctx) => JsReplManager::run_tool_request(ctx, req).await,
|
||||
Some(ctx) => {
|
||||
tokio::select! {
|
||||
_ = reset_cancel.cancelled() => RunToolResult {
|
||||
id: tool_call_id.clone(),
|
||||
ok: false,
|
||||
response: None,
|
||||
error: Some("js_repl execution reset".to_string()),
|
||||
},
|
||||
result = JsReplManager::run_tool_request(ctx, req) => result,
|
||||
}
|
||||
}
|
||||
None => RunToolResult {
|
||||
id: req.id.clone(),
|
||||
id: tool_call_id.clone(),
|
||||
ok: false,
|
||||
response: None,
|
||||
error: Some("js_repl exec context not found".to_string()),
|
||||
},
|
||||
};
|
||||
JsReplManager::finish_exec_tool_call(&exec_tool_calls, &exec_id).await;
|
||||
JsReplManager::finish_exec_tool_call(&exec_tool_calls_for_task, &exec_id)
|
||||
.await;
|
||||
let payload = HostToKernel::RunToolResult(result);
|
||||
if let Err(err) = JsReplManager::write_message(&stdin_clone, &payload).await
|
||||
{
|
||||
@@ -1415,7 +1429,11 @@ mod tests {
|
||||
.lock()
|
||||
.await
|
||||
.insert(exec_id.clone(), ExecToolCalls::default());
|
||||
assert!(JsReplManager::begin_exec_tool_call(&exec_tool_calls, &exec_id).await);
|
||||
assert!(
|
||||
JsReplManager::begin_exec_tool_call(&exec_tool_calls, &exec_id)
|
||||
.await
|
||||
.is_some()
|
||||
);
|
||||
|
||||
let wait_map = Arc::clone(&exec_tool_calls);
|
||||
let wait_exec_id = exec_id.clone();
|
||||
@@ -1447,7 +1465,11 @@ mod tests {
|
||||
.expect("manager should initialize");
|
||||
let exec_id = Uuid::new_v4().to_string();
|
||||
manager.register_exec_tool_calls(&exec_id).await;
|
||||
assert!(JsReplManager::begin_exec_tool_call(&manager.exec_tool_calls, &exec_id).await);
|
||||
assert!(
|
||||
JsReplManager::begin_exec_tool_call(&manager.exec_tool_calls, &exec_id)
|
||||
.await
|
||||
.is_some()
|
||||
);
|
||||
|
||||
let wait_manager = Arc::clone(&manager);
|
||||
let wait_exec_id = exec_id.clone();
|
||||
@@ -1469,6 +1491,36 @@ mod tests {
|
||||
assert!(manager.exec_tool_calls.lock().await.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn reset_aborts_inflight_exec_tool_tasks() {
|
||||
let manager = JsReplManager::new(None, std::env::temp_dir())
|
||||
.await
|
||||
.expect("manager should initialize");
|
||||
let exec_id = Uuid::new_v4().to_string();
|
||||
manager.register_exec_tool_calls(&exec_id).await;
|
||||
let reset_cancel = JsReplManager::begin_exec_tool_call(&manager.exec_tool_calls, &exec_id)
|
||||
.await
|
||||
.expect("exec should be registered");
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = reset_cancel.cancelled() => "cancelled",
|
||||
_ = tokio::time::sleep(Duration::from_secs(60)) => "timed_out",
|
||||
}
|
||||
});
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(1), manager.reset())
|
||||
.await
|
||||
.expect("reset should not hang")
|
||||
.expect("reset should succeed");
|
||||
|
||||
let outcome = tokio::time::timeout(Duration::from_secs(1), task)
|
||||
.await
|
||||
.expect("cancelled task should resolve promptly")
|
||||
.expect("task should not panic");
|
||||
assert_eq!(outcome, "cancelled");
|
||||
}
|
||||
|
||||
async fn can_run_js_repl_runtime_tests() -> bool {
|
||||
if std::env::var_os("CODEX_SANDBOX").is_some() {
|
||||
return false;
|
||||
|
||||
Reference in New Issue
Block a user