Compare commits

...

3 Commits

Author SHA1 Message Date
jif-oai
038baeab9f Fix let else lint in js_repl 2026-02-16 18:08:55 +00:00
jif-oai
0920eda22e Fix js repl timeout reset hang 2026-02-16 17:58:16 +00:00
jif-oai
6a3459c054 Fix JS repl timeout hang 2026-02-16 17:35:34 +00:00

View File

@@ -114,6 +114,7 @@ struct ExecContext {
struct ExecToolCalls {
in_flight: usize,
notify: Arc<Notify>,
cancel: CancellationToken,
}
enum KernelStreamEnd {
@@ -300,6 +301,7 @@ impl JsReplManager {
async fn clear_exec_tool_calls(&self, exec_id: &str) {
if let Some(state) = self.exec_tool_calls.lock().await.remove(exec_id) {
state.cancel.cancel();
state.notify.notify_waiters();
}
}
@@ -320,32 +322,14 @@ impl JsReplManager {
}
}
async fn wait_for_all_exec_tool_calls(&self) {
loop {
let notified = {
let calls = self.exec_tool_calls.lock().await;
calls
.values()
.find(|state| state.in_flight > 0)
.map(|state| Arc::clone(&state.notify).notified_owned())
};
match notified {
Some(notified) => notified.await,
None => return,
}
}
}
async fn begin_exec_tool_call(
exec_tool_calls: &Arc<Mutex<HashMap<String, ExecToolCalls>>>,
exec_id: &str,
) -> bool {
) -> Option<CancellationToken> {
let mut calls = exec_tool_calls.lock().await;
let Some(state) = calls.get_mut(exec_id) else {
return false;
};
let state = calls.get_mut(exec_id)?;
state.in_flight += 1;
true
Some(state.cancel.clone())
}
async fn finish_exec_tool_call(
@@ -396,14 +380,27 @@ impl JsReplManager {
exec_id: &str,
) {
if let Some(state) = exec_tool_calls.lock().await.remove(exec_id) {
state.cancel.cancel();
state.notify.notify_waiters();
}
}
async fn clear_all_exec_tool_calls_map(
exec_tool_calls: &Arc<Mutex<HashMap<String, ExecToolCalls>>>,
) {
let states = {
let mut calls = exec_tool_calls.lock().await;
calls.drain().map(|(_, state)| state).collect::<Vec<_>>()
};
for state in states {
state.cancel.cancel();
state.notify.notify_waiters();
}
}
pub async fn reset(&self) -> Result<(), FunctionCallError> {
self.reset_kernel().await;
self.wait_for_all_exec_tool_calls().await;
self.exec_tool_calls.lock().await.clear();
Self::clear_all_exec_tool_calls_map(&self.exec_tool_calls).await;
Ok(())
}
@@ -854,7 +851,9 @@ impl JsReplManager {
JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, &id).await;
}
KernelToHost::RunTool(req) => {
if !JsReplManager::begin_exec_tool_call(&exec_tool_calls, &req.exec_id).await {
let Some(reset_cancel) =
JsReplManager::begin_exec_tool_call(&exec_tool_calls, &req.exec_id).await
else {
let exec_id = req.exec_id.clone();
let tool_call_id = req.id.clone();
let payload = HostToKernel::RunToolResult(RunToolResult {
@@ -877,10 +876,10 @@ impl JsReplManager {
);
}
continue;
}
};
let stdin_clone = Arc::clone(&stdin);
let exec_contexts = Arc::clone(&exec_contexts);
let exec_tool_calls = Arc::clone(&exec_tool_calls);
let exec_tool_calls_for_task = Arc::clone(&exec_tool_calls);
let recent_stderr = Arc::clone(&recent_stderr);
tokio::spawn(async move {
let exec_id = req.exec_id.clone();
@@ -888,15 +887,26 @@ impl JsReplManager {
let tool_name = req.tool_name.clone();
let context = { exec_contexts.lock().await.get(&exec_id).cloned() };
let result = match context {
Some(ctx) => JsReplManager::run_tool_request(ctx, req).await,
Some(ctx) => {
tokio::select! {
_ = reset_cancel.cancelled() => RunToolResult {
id: tool_call_id.clone(),
ok: false,
response: None,
error: Some("js_repl execution reset".to_string()),
},
result = JsReplManager::run_tool_request(ctx, req) => result,
}
}
None => RunToolResult {
id: req.id.clone(),
id: tool_call_id.clone(),
ok: false,
response: None,
error: Some("js_repl exec context not found".to_string()),
},
};
JsReplManager::finish_exec_tool_call(&exec_tool_calls, &exec_id).await;
JsReplManager::finish_exec_tool_call(&exec_tool_calls_for_task, &exec_id)
.await;
let payload = HostToKernel::RunToolResult(result);
if let Err(err) = JsReplManager::write_message(&stdin_clone, &payload).await
{
@@ -1417,7 +1427,11 @@ mod tests {
.lock()
.await
.insert(exec_id.clone(), ExecToolCalls::default());
assert!(JsReplManager::begin_exec_tool_call(&exec_tool_calls, &exec_id).await);
assert!(
JsReplManager::begin_exec_tool_call(&exec_tool_calls, &exec_id)
.await
.is_some()
);
let wait_map = Arc::clone(&exec_tool_calls);
let wait_exec_id = exec_id.clone();
@@ -1441,6 +1455,70 @@ mod tests {
JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, &exec_id).await;
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reset_clears_inflight_exec_tool_calls_without_waiting() {
let manager = JsReplManager::new(None, std::env::temp_dir())
.await
.expect("manager should initialize");
let exec_id = Uuid::new_v4().to_string();
manager.register_exec_tool_calls(&exec_id).await;
assert!(
JsReplManager::begin_exec_tool_call(&manager.exec_tool_calls, &exec_id)
.await
.is_some()
);
let wait_manager = Arc::clone(&manager);
let wait_exec_id = exec_id.clone();
let waiter = tokio::spawn(async move {
wait_manager.wait_for_exec_tool_calls(&wait_exec_id).await;
});
tokio::task::yield_now().await;
tokio::time::timeout(Duration::from_secs(1), manager.reset())
.await
.expect("reset should not hang")
.expect("reset should succeed");
tokio::time::timeout(Duration::from_secs(1), waiter)
.await
.expect("waiter should be released")
.expect("wait task should not panic");
assert!(manager.exec_tool_calls.lock().await.is_empty());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reset_aborts_inflight_exec_tool_tasks() {
let manager = JsReplManager::new(None, std::env::temp_dir())
.await
.expect("manager should initialize");
let exec_id = Uuid::new_v4().to_string();
manager.register_exec_tool_calls(&exec_id).await;
let reset_cancel = JsReplManager::begin_exec_tool_call(&manager.exec_tool_calls, &exec_id)
.await
.expect("exec should be registered");
let task = tokio::spawn(async move {
tokio::select! {
_ = reset_cancel.cancelled() => "cancelled",
_ = tokio::time::sleep(Duration::from_secs(60)) => "timed_out",
}
});
tokio::time::timeout(Duration::from_secs(1), manager.reset())
.await
.expect("reset should not hang")
.expect("reset should succeed");
let outcome = tokio::time::timeout(Duration::from_secs(1), task)
.await
.expect("cancelled task should resolve promptly")
.expect("task should not panic");
assert_eq!(outcome, "cancelled");
}
async fn can_run_js_repl_runtime_tests() -> bool {
if std::env::var_os("CODEX_SANDBOX").is_some() {
return false;