mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
Add js_repl kernel crash diagnostics (#11666)
## Summary
This PR improves `js_repl` crash diagnostics so kernel failures are
debuggable without weakening timeout/reset guarantees.
## What Changed
- Added bounded kernel stderr capture and truncation logic (line + byte
caps).
- Added structured kernel snapshots (`pid`, exit status, stderr tail)
for failure paths.
- Enriched model-visible kernel-failure errors with a structured
diagnostics payload:
- `js_repl diagnostics: {...}`
- Included only for likely kernel-failure write/EOF cases.
- Improved logging around kernel write failures, unexpected exits, and
kill/wait paths.
- Added/updated unit tests for:
- UTF-8-safe truncation
- stderr tail bounds
- structured diagnostics shape/truncation
- conditional diagnostics emission
- timeout kill behavior
- forced kernel-failure diagnostics
## Why
Before this, failures like broken pipe / unexpected kernel exit often
surfaced as generic errors with little context. This change preserves
existing behavior but adds actionable diagnostics while keeping output
bounded.
## Scope
- Code changes are limited to:
-
`/Users/fjord/code/codex-jsrepl-seq/codex-rs/core/src/tools/js_repl/mod.rs`
## Validation
- `cargo clippy -p codex-core --all-targets -- -D warnings`
- Targeted `codex-core` js_repl unit tests (including new
diagnostics/timeout coverage)
- Tried starting a long running js_repl command (sleep for 10 minutes),
verified error output was as expected after killing the node process.
#### [git stack](https://github.com/magus/git-stack-cli)
- 👉 `1` https://github.com/openai/codex/pull/11666
- ⏳ `2` https://github.com/openai/codex/pull/10673
- ⏳ `3` https://github.com/openai/codex/pull/10670
This commit is contained in:
committed by
GitHub
parent
8468871e2b
commit
a02342c9e1
@@ -1,5 +1,8 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::process::ExitStatusExt;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
@@ -38,6 +41,13 @@ pub(crate) const JS_REPL_PRAGMA_PREFIX: &str = "// codex-js-repl:";
|
||||
const KERNEL_SOURCE: &str = include_str!("kernel.js");
|
||||
const MERIYAH_UMD: &str = include_str!("meriyah.umd.min.js");
|
||||
const JS_REPL_MIN_NODE_VERSION: &str = include_str!("../../../../node-version.txt");
|
||||
const JS_REPL_STDERR_TAIL_LINE_LIMIT: usize = 20;
|
||||
const JS_REPL_STDERR_TAIL_LINE_MAX_BYTES: usize = 512;
|
||||
const JS_REPL_STDERR_TAIL_MAX_BYTES: usize = 4_096;
|
||||
const JS_REPL_STDERR_TAIL_SEPARATOR: &str = " | ";
|
||||
const JS_REPL_EXEC_ID_LOG_LIMIT: usize = 8;
|
||||
const JS_REPL_MODEL_DIAG_STDERR_MAX_BYTES: usize = 1_024;
|
||||
const JS_REPL_MODEL_DIAG_ERROR_MAX_BYTES: usize = 256;
|
||||
|
||||
/// Per-task js_repl handle stored on the turn context.
|
||||
pub(crate) struct JsReplHandle {
|
||||
@@ -85,7 +95,8 @@ pub struct JsExecResult {
|
||||
}
|
||||
|
||||
struct KernelState {
|
||||
_child: Child,
|
||||
child: Arc<Mutex<Child>>,
|
||||
recent_stderr: Arc<Mutex<VecDeque<String>>>,
|
||||
stdin: Arc<Mutex<ChildStdin>>,
|
||||
pending_execs: Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<ExecResultMessage>>>>,
|
||||
exec_contexts: Arc<Mutex<HashMap<String, ExecContext>>>,
|
||||
@@ -105,6 +116,151 @@ struct ExecToolCalls {
|
||||
notify: Arc<Notify>,
|
||||
}
|
||||
|
||||
enum KernelStreamEnd {
|
||||
Shutdown,
|
||||
StdoutEof,
|
||||
StdoutReadError(String),
|
||||
}
|
||||
|
||||
impl KernelStreamEnd {
|
||||
fn reason(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Shutdown => "shutdown",
|
||||
Self::StdoutEof => "stdout_eof",
|
||||
Self::StdoutReadError(_) => "stdout_read_error",
|
||||
}
|
||||
}
|
||||
|
||||
fn error(&self) -> Option<&str> {
|
||||
match self {
|
||||
Self::StdoutReadError(err) => Some(err),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct KernelDebugSnapshot {
|
||||
pid: Option<u32>,
|
||||
status: String,
|
||||
stderr_tail: String,
|
||||
}
|
||||
|
||||
fn format_exit_status(status: std::process::ExitStatus) -> String {
|
||||
if let Some(code) = status.code() {
|
||||
return format!("code={code}");
|
||||
}
|
||||
#[cfg(unix)]
|
||||
if let Some(signal) = status.signal() {
|
||||
return format!("signal={signal}");
|
||||
}
|
||||
"unknown".to_string()
|
||||
}
|
||||
|
||||
fn format_stderr_tail(lines: &VecDeque<String>) -> String {
|
||||
if lines.is_empty() {
|
||||
return "<empty>".to_string();
|
||||
}
|
||||
lines
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>()
|
||||
.join(JS_REPL_STDERR_TAIL_SEPARATOR)
|
||||
}
|
||||
|
||||
fn truncate_utf8_prefix_by_bytes(input: &str, max_bytes: usize) -> String {
|
||||
if input.len() <= max_bytes {
|
||||
return input.to_string();
|
||||
}
|
||||
if max_bytes == 0 {
|
||||
return String::new();
|
||||
}
|
||||
let mut end = max_bytes;
|
||||
while end > 0 && !input.is_char_boundary(end) {
|
||||
end -= 1;
|
||||
}
|
||||
input[..end].to_string()
|
||||
}
|
||||
|
||||
fn stderr_tail_formatted_bytes(lines: &VecDeque<String>) -> usize {
|
||||
if lines.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
let payload_bytes: usize = lines.iter().map(String::len).sum();
|
||||
let separator_bytes = JS_REPL_STDERR_TAIL_SEPARATOR.len() * (lines.len() - 1);
|
||||
payload_bytes + separator_bytes
|
||||
}
|
||||
|
||||
fn stderr_tail_bytes_with_candidate(lines: &VecDeque<String>, line: &str) -> usize {
|
||||
if lines.is_empty() {
|
||||
return line.len();
|
||||
}
|
||||
stderr_tail_formatted_bytes(lines) + JS_REPL_STDERR_TAIL_SEPARATOR.len() + line.len()
|
||||
}
|
||||
|
||||
fn push_stderr_tail_line(lines: &mut VecDeque<String>, line: &str) -> String {
|
||||
let max_line_bytes = JS_REPL_STDERR_TAIL_LINE_MAX_BYTES.min(JS_REPL_STDERR_TAIL_MAX_BYTES);
|
||||
let bounded_line = truncate_utf8_prefix_by_bytes(line, max_line_bytes);
|
||||
if bounded_line.is_empty() {
|
||||
return bounded_line;
|
||||
}
|
||||
|
||||
while !lines.is_empty()
|
||||
&& (lines.len() >= JS_REPL_STDERR_TAIL_LINE_LIMIT
|
||||
|| stderr_tail_bytes_with_candidate(lines, &bounded_line)
|
||||
> JS_REPL_STDERR_TAIL_MAX_BYTES)
|
||||
{
|
||||
lines.pop_front();
|
||||
}
|
||||
|
||||
lines.push_back(bounded_line.clone());
|
||||
bounded_line
|
||||
}
|
||||
|
||||
fn is_kernel_status_exited(status: &str) -> bool {
|
||||
status.starts_with("exited(")
|
||||
}
|
||||
|
||||
fn should_include_model_diagnostics_for_write_error(
|
||||
err_message: &str,
|
||||
snapshot: &KernelDebugSnapshot,
|
||||
) -> bool {
|
||||
is_kernel_status_exited(&snapshot.status)
|
||||
|| err_message.to_ascii_lowercase().contains("broken pipe")
|
||||
}
|
||||
|
||||
fn format_model_kernel_failure_details(
|
||||
reason: &str,
|
||||
stream_error: Option<&str>,
|
||||
snapshot: &KernelDebugSnapshot,
|
||||
) -> String {
|
||||
let payload = serde_json::json!({
|
||||
"reason": reason,
|
||||
"stream_error": stream_error
|
||||
.map(|err| truncate_utf8_prefix_by_bytes(err, JS_REPL_MODEL_DIAG_ERROR_MAX_BYTES)),
|
||||
"kernel_pid": snapshot.pid,
|
||||
"kernel_status": snapshot.status,
|
||||
"kernel_stderr_tail": truncate_utf8_prefix_by_bytes(
|
||||
&snapshot.stderr_tail,
|
||||
JS_REPL_MODEL_DIAG_STDERR_MAX_BYTES,
|
||||
),
|
||||
});
|
||||
let encoded = serde_json::to_string(&payload)
|
||||
.unwrap_or_else(|err| format!(r#"{{"reason":"serialization_error","error":"{err}"}}"#));
|
||||
format!("js_repl diagnostics: {encoded}")
|
||||
}
|
||||
|
||||
fn with_model_kernel_failure_message(
|
||||
base_message: &str,
|
||||
reason: &str,
|
||||
stream_error: Option<&str>,
|
||||
snapshot: &KernelDebugSnapshot,
|
||||
) -> String {
|
||||
format!(
|
||||
"{base_message}\n\n{}",
|
||||
format_model_kernel_failure_details(reason, stream_error, snapshot)
|
||||
)
|
||||
}
|
||||
|
||||
pub struct JsReplManager {
|
||||
node_path: Option<PathBuf>,
|
||||
codex_home: PathBuf,
|
||||
@@ -258,6 +414,7 @@ impl JsReplManager {
|
||||
};
|
||||
if let Some(state) = state {
|
||||
state.shutdown.cancel();
|
||||
Self::kill_kernel_child(&state.child, "reset").await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -272,7 +429,7 @@ impl JsReplManager {
|
||||
FunctionCallError::RespondToModel("js_repl execution unavailable".to_string())
|
||||
})?;
|
||||
|
||||
let (stdin, pending_execs, exec_contexts) = {
|
||||
let (stdin, pending_execs, exec_contexts, child, recent_stderr) = {
|
||||
let mut kernel = self.kernel.lock().await;
|
||||
if kernel.is_none() {
|
||||
let state = self
|
||||
@@ -294,6 +451,8 @@ impl JsReplManager {
|
||||
Arc::clone(&state.stdin),
|
||||
Arc::clone(&state.pending_execs),
|
||||
Arc::clone(&state.exec_contexts),
|
||||
Arc::clone(&state.child),
|
||||
Arc::clone(&state.recent_stderr),
|
||||
)
|
||||
};
|
||||
|
||||
@@ -324,7 +483,28 @@ impl JsReplManager {
|
||||
pending_execs.lock().await.remove(&req_id);
|
||||
exec_contexts.lock().await.remove(&req_id);
|
||||
self.clear_exec_tool_calls(&req_id).await;
|
||||
return Err(err);
|
||||
let snapshot = Self::kernel_debug_snapshot(&child, &recent_stderr).await;
|
||||
let err_message = err.to_string();
|
||||
warn!(
|
||||
exec_id = %req_id,
|
||||
error = %err_message,
|
||||
kernel_pid = ?snapshot.pid,
|
||||
kernel_status = %snapshot.status,
|
||||
kernel_stderr_tail = %snapshot.stderr_tail,
|
||||
"failed to submit js_repl exec request to kernel"
|
||||
);
|
||||
let message =
|
||||
if should_include_model_diagnostics_for_write_error(&err_message, &snapshot) {
|
||||
with_model_kernel_failure_message(
|
||||
&err_message,
|
||||
"write_failed",
|
||||
Some(&err_message),
|
||||
&snapshot,
|
||||
)
|
||||
} else {
|
||||
err_message
|
||||
};
|
||||
return Err(FunctionCallError::RespondToModel(message));
|
||||
}
|
||||
|
||||
let timeout_ms = args.timeout_ms.unwrap_or(30_000);
|
||||
@@ -336,9 +516,18 @@ impl JsReplManager {
|
||||
exec_contexts.lock().await.remove(&req_id);
|
||||
self.wait_for_exec_tool_calls(&req_id).await;
|
||||
self.clear_exec_tool_calls(&req_id).await;
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"js_repl kernel closed unexpectedly".to_string(),
|
||||
));
|
||||
let snapshot = Self::kernel_debug_snapshot(&child, &recent_stderr).await;
|
||||
let message = if is_kernel_status_exited(&snapshot.status) {
|
||||
with_model_kernel_failure_message(
|
||||
"js_repl kernel closed unexpectedly",
|
||||
"response_channel_closed",
|
||||
None,
|
||||
&snapshot,
|
||||
)
|
||||
} else {
|
||||
"js_repl kernel closed unexpectedly".to_string()
|
||||
};
|
||||
return Err(FunctionCallError::RespondToModel(message));
|
||||
}
|
||||
Err(_) => {
|
||||
self.reset().await?;
|
||||
@@ -464,9 +653,15 @@ impl JsReplManager {
|
||||
let exec_contexts: Arc<Mutex<HashMap<String, ExecContext>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
let stdin_arc = Arc::new(Mutex::new(stdin));
|
||||
let child = Arc::new(Mutex::new(child));
|
||||
let recent_stderr = Arc::new(Mutex::new(VecDeque::with_capacity(
|
||||
JS_REPL_STDERR_TAIL_LINE_LIMIT,
|
||||
)));
|
||||
|
||||
tokio::spawn(Self::read_stdout(
|
||||
stdout,
|
||||
Arc::clone(&child),
|
||||
Arc::clone(&recent_stderr),
|
||||
Arc::clone(&pending_execs),
|
||||
Arc::clone(&exec_contexts),
|
||||
Arc::clone(&self.exec_tool_calls),
|
||||
@@ -474,13 +669,18 @@ impl JsReplManager {
|
||||
shutdown.clone(),
|
||||
));
|
||||
if let Some(stderr) = stderr {
|
||||
tokio::spawn(Self::read_stderr(stderr, shutdown.clone()));
|
||||
tokio::spawn(Self::read_stderr(
|
||||
stderr,
|
||||
Arc::clone(&recent_stderr),
|
||||
shutdown.clone(),
|
||||
));
|
||||
} else {
|
||||
warn!("js_repl kernel missing stderr");
|
||||
}
|
||||
|
||||
Ok(KernelState {
|
||||
_child: child,
|
||||
child,
|
||||
recent_stderr,
|
||||
stdin: stdin_arc,
|
||||
pending_execs,
|
||||
exec_contexts,
|
||||
@@ -514,8 +714,96 @@ impl JsReplManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn kernel_stderr_tail_snapshot(recent_stderr: &Arc<Mutex<VecDeque<String>>>) -> String {
|
||||
let tail = recent_stderr.lock().await;
|
||||
format_stderr_tail(&tail)
|
||||
}
|
||||
|
||||
async fn kernel_debug_snapshot(
|
||||
child: &Arc<Mutex<Child>>,
|
||||
recent_stderr: &Arc<Mutex<VecDeque<String>>>,
|
||||
) -> KernelDebugSnapshot {
|
||||
let (pid, status) = {
|
||||
let mut guard = child.lock().await;
|
||||
let pid = guard.id();
|
||||
let status = match guard.try_wait() {
|
||||
Ok(Some(status)) => format!("exited({})", format_exit_status(status)),
|
||||
Ok(None) => "running".to_string(),
|
||||
Err(err) => format!("unknown ({err})"),
|
||||
};
|
||||
(pid, status)
|
||||
};
|
||||
let stderr_tail = {
|
||||
let tail = recent_stderr.lock().await;
|
||||
format_stderr_tail(&tail)
|
||||
};
|
||||
KernelDebugSnapshot {
|
||||
pid,
|
||||
status,
|
||||
stderr_tail,
|
||||
}
|
||||
}
|
||||
|
||||
async fn kill_kernel_child(child: &Arc<Mutex<Child>>, reason: &'static str) {
|
||||
let mut guard = child.lock().await;
|
||||
let pid = guard.id();
|
||||
match guard.try_wait() {
|
||||
Ok(Some(_)) => return,
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
kernel_pid = ?pid,
|
||||
kill_reason = reason,
|
||||
error = %err,
|
||||
"failed to inspect js_repl kernel before kill"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(err) = guard.start_kill() {
|
||||
warn!(
|
||||
kernel_pid = ?pid,
|
||||
kill_reason = reason,
|
||||
error = %err,
|
||||
"failed to send kill signal to js_repl kernel"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
match tokio::time::timeout(Duration::from_secs(2), guard.wait()).await {
|
||||
Ok(Ok(_status)) => {}
|
||||
Ok(Err(err)) => {
|
||||
warn!(
|
||||
kernel_pid = ?pid,
|
||||
kill_reason = reason,
|
||||
error = %err,
|
||||
"failed while waiting for js_repl kernel exit"
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
warn!(
|
||||
kernel_pid = ?pid,
|
||||
kill_reason = reason,
|
||||
"timed out waiting for js_repl kernel to exit after kill"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate_id_list(ids: &[String]) -> Vec<String> {
|
||||
if ids.len() <= JS_REPL_EXEC_ID_LOG_LIMIT {
|
||||
return ids.to_vec();
|
||||
}
|
||||
let mut output = ids[..JS_REPL_EXEC_ID_LOG_LIMIT].to_vec();
|
||||
output.push(format!("...+{}", ids.len() - JS_REPL_EXEC_ID_LOG_LIMIT));
|
||||
output
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn read_stdout(
|
||||
stdout: tokio::process::ChildStdout,
|
||||
child: Arc<Mutex<Child>>,
|
||||
recent_stderr: Arc<Mutex<VecDeque<String>>>,
|
||||
pending_execs: Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<ExecResultMessage>>>>,
|
||||
exec_contexts: Arc<Mutex<HashMap<String, ExecContext>>>,
|
||||
exec_tool_calls: Arc<Mutex<HashMap<String, ExecToolCalls>>>,
|
||||
@@ -523,17 +811,13 @@ impl JsReplManager {
|
||||
shutdown: CancellationToken,
|
||||
) {
|
||||
let mut reader = BufReader::new(stdout).lines();
|
||||
|
||||
loop {
|
||||
let end_reason = loop {
|
||||
let line = tokio::select! {
|
||||
_ = shutdown.cancelled() => break,
|
||||
_ = shutdown.cancelled() => break KernelStreamEnd::Shutdown,
|
||||
res = reader.next_line() => match res {
|
||||
Ok(Some(line)) => line,
|
||||
Ok(None) => break,
|
||||
Err(err) => {
|
||||
warn!("js_repl kernel stream ended: {err}");
|
||||
break;
|
||||
}
|
||||
Ok(None) => break KernelStreamEnd::StdoutEof,
|
||||
Err(err) => break KernelStreamEnd::StdoutReadError(err.to_string()),
|
||||
},
|
||||
};
|
||||
|
||||
@@ -571,6 +855,8 @@ impl JsReplManager {
|
||||
}
|
||||
KernelToHost::RunTool(req) => {
|
||||
if !JsReplManager::begin_exec_tool_call(&exec_tool_calls, &req.exec_id).await {
|
||||
let exec_id = req.exec_id.clone();
|
||||
let tool_call_id = req.id.clone();
|
||||
let payload = HostToKernel::RunToolResult(RunToolResult {
|
||||
id: req.id,
|
||||
ok: false,
|
||||
@@ -578,15 +864,28 @@ impl JsReplManager {
|
||||
error: Some("js_repl exec context not found".to_string()),
|
||||
});
|
||||
if let Err(err) = JsReplManager::write_message(&stdin, &payload).await {
|
||||
warn!("failed to reply to kernel run_tool request: {err}");
|
||||
let snapshot =
|
||||
JsReplManager::kernel_debug_snapshot(&child, &recent_stderr).await;
|
||||
warn!(
|
||||
exec_id = %exec_id,
|
||||
tool_call_id = %tool_call_id,
|
||||
error = %err,
|
||||
kernel_pid = ?snapshot.pid,
|
||||
kernel_status = %snapshot.status,
|
||||
kernel_stderr_tail = %snapshot.stderr_tail,
|
||||
"failed to reply to kernel run_tool request"
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
let stdin_clone = Arc::clone(&stdin);
|
||||
let exec_contexts = Arc::clone(&exec_contexts);
|
||||
let exec_tool_calls = Arc::clone(&exec_tool_calls);
|
||||
let recent_stderr = Arc::clone(&recent_stderr);
|
||||
tokio::spawn(async move {
|
||||
let exec_id = req.exec_id.clone();
|
||||
let tool_call_id = req.id.clone();
|
||||
let tool_name = req.tool_name.clone();
|
||||
let context = { exec_contexts.lock().await.get(&exec_id).cloned() };
|
||||
let result = match context {
|
||||
Some(ctx) => JsReplManager::run_tool_request(ctx, req).await,
|
||||
@@ -601,12 +900,21 @@ impl JsReplManager {
|
||||
let payload = HostToKernel::RunToolResult(result);
|
||||
if let Err(err) = JsReplManager::write_message(&stdin_clone, &payload).await
|
||||
{
|
||||
warn!("failed to reply to kernel run_tool request: {err}");
|
||||
let stderr_tail =
|
||||
JsReplManager::kernel_stderr_tail_snapshot(&recent_stderr).await;
|
||||
warn!(
|
||||
exec_id = %exec_id,
|
||||
tool_call_id = %tool_call_id,
|
||||
tool_name = %tool_name,
|
||||
error = %err,
|
||||
kernel_stderr_tail = %stderr_tail,
|
||||
"failed to reply to kernel run_tool request"
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let exec_ids = {
|
||||
let mut contexts = exec_contexts.lock().await;
|
||||
@@ -618,12 +926,47 @@ impl JsReplManager {
|
||||
JsReplManager::wait_for_exec_tool_calls_map(&exec_tool_calls, &exec_id).await;
|
||||
JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, &exec_id).await;
|
||||
}
|
||||
let unexpected_snapshot = if matches!(end_reason, KernelStreamEnd::Shutdown) {
|
||||
None
|
||||
} else {
|
||||
Some(Self::kernel_debug_snapshot(&child, &recent_stderr).await)
|
||||
};
|
||||
let kernel_failure_message = unexpected_snapshot.as_ref().map(|snapshot| {
|
||||
with_model_kernel_failure_message(
|
||||
"js_repl kernel exited unexpectedly",
|
||||
end_reason.reason(),
|
||||
end_reason.error(),
|
||||
snapshot,
|
||||
)
|
||||
});
|
||||
let kernel_exit_message = kernel_failure_message
|
||||
.clone()
|
||||
.unwrap_or_else(|| "js_repl kernel exited unexpectedly".to_string());
|
||||
|
||||
let mut pending = pending_execs.lock().await;
|
||||
let pending_exec_ids = pending.keys().cloned().collect::<Vec<_>>();
|
||||
for (_id, tx) in pending.drain() {
|
||||
let _ = tx.send(ExecResultMessage::Err {
|
||||
message: "js_repl kernel exited unexpectedly".to_string(),
|
||||
message: kernel_exit_message.clone(),
|
||||
});
|
||||
}
|
||||
drop(pending);
|
||||
|
||||
if !matches!(end_reason, KernelStreamEnd::Shutdown) {
|
||||
let mut pending_exec_ids = pending_exec_ids;
|
||||
pending_exec_ids.sort_unstable();
|
||||
let snapshot = Self::kernel_debug_snapshot(&child, &recent_stderr).await;
|
||||
warn!(
|
||||
reason = %end_reason.reason(),
|
||||
stream_error = %end_reason.error().unwrap_or(""),
|
||||
kernel_pid = ?snapshot.pid,
|
||||
kernel_status = %snapshot.status,
|
||||
pending_exec_count = pending_exec_ids.len(),
|
||||
pending_exec_ids = ?Self::truncate_id_list(&pending_exec_ids),
|
||||
kernel_stderr_tail = %snapshot.stderr_tail,
|
||||
"js_repl kernel terminated unexpectedly"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_tool_request(exec: ExecContext, req: RunToolRequest) -> RunToolResult {
|
||||
@@ -713,7 +1056,11 @@ impl JsReplManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_stderr(stderr: tokio::process::ChildStderr, shutdown: CancellationToken) {
|
||||
async fn read_stderr(
|
||||
stderr: tokio::process::ChildStderr,
|
||||
recent_stderr: Arc<Mutex<VecDeque<String>>>,
|
||||
shutdown: CancellationToken,
|
||||
) {
|
||||
let mut reader = BufReader::new(stderr).lines();
|
||||
|
||||
loop {
|
||||
@@ -730,7 +1077,14 @@ impl JsReplManager {
|
||||
};
|
||||
let trimmed = line.trim();
|
||||
if !trimmed.is_empty() {
|
||||
warn!("js_repl stderr: {trimmed}");
|
||||
let bounded_line = {
|
||||
let mut tail = recent_stderr.lock().await;
|
||||
push_stderr_tail_line(&mut tail, trimmed)
|
||||
};
|
||||
if bounded_line.is_empty() {
|
||||
continue;
|
||||
}
|
||||
warn!("js_repl stderr: {bounded_line}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -935,6 +1289,116 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncate_utf8_prefix_by_bytes_preserves_character_boundaries() {
|
||||
let input = "aé🙂z";
|
||||
assert_eq!(truncate_utf8_prefix_by_bytes(input, 0), "");
|
||||
assert_eq!(truncate_utf8_prefix_by_bytes(input, 1), "a");
|
||||
assert_eq!(truncate_utf8_prefix_by_bytes(input, 2), "a");
|
||||
assert_eq!(truncate_utf8_prefix_by_bytes(input, 3), "aé");
|
||||
assert_eq!(truncate_utf8_prefix_by_bytes(input, 6), "aé");
|
||||
assert_eq!(truncate_utf8_prefix_by_bytes(input, 7), "aé🙂");
|
||||
assert_eq!(truncate_utf8_prefix_by_bytes(input, 8), "aé🙂z");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stderr_tail_applies_line_and_byte_limits() {
|
||||
let mut lines = VecDeque::new();
|
||||
let per_line_cap = JS_REPL_STDERR_TAIL_LINE_MAX_BYTES.min(JS_REPL_STDERR_TAIL_MAX_BYTES);
|
||||
let long = "x".repeat(per_line_cap + 128);
|
||||
let bounded = push_stderr_tail_line(&mut lines, &long);
|
||||
assert_eq!(bounded.len(), per_line_cap);
|
||||
|
||||
for i in 0..50 {
|
||||
let line = format!("line-{i}-{}", "y".repeat(200));
|
||||
push_stderr_tail_line(&mut lines, &line);
|
||||
}
|
||||
|
||||
assert!(lines.len() <= JS_REPL_STDERR_TAIL_LINE_LIMIT);
|
||||
assert!(lines.iter().all(|line| line.len() <= per_line_cap));
|
||||
assert!(stderr_tail_formatted_bytes(&lines) <= JS_REPL_STDERR_TAIL_MAX_BYTES);
|
||||
assert_eq!(
|
||||
format_stderr_tail(&lines).len(),
|
||||
stderr_tail_formatted_bytes(&lines)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn model_kernel_failure_details_are_structured_and_truncated() {
|
||||
let snapshot = KernelDebugSnapshot {
|
||||
pid: Some(42),
|
||||
status: "exited(code=1)".to_string(),
|
||||
stderr_tail: "s".repeat(JS_REPL_MODEL_DIAG_STDERR_MAX_BYTES + 400),
|
||||
};
|
||||
let stream_error = "e".repeat(JS_REPL_MODEL_DIAG_ERROR_MAX_BYTES + 200);
|
||||
let message = with_model_kernel_failure_message(
|
||||
"js_repl kernel exited unexpectedly",
|
||||
"stdout_eof",
|
||||
Some(&stream_error),
|
||||
&snapshot,
|
||||
);
|
||||
assert!(message.starts_with("js_repl kernel exited unexpectedly\n\njs_repl diagnostics: "));
|
||||
let (_prefix, encoded) = message
|
||||
.split_once("js_repl diagnostics: ")
|
||||
.expect("diagnostics suffix should be present");
|
||||
let parsed: serde_json::Value =
|
||||
serde_json::from_str(encoded).expect("diagnostics should be valid json");
|
||||
assert_eq!(
|
||||
parsed.get("reason").and_then(|v| v.as_str()),
|
||||
Some("stdout_eof")
|
||||
);
|
||||
assert_eq!(
|
||||
parsed.get("kernel_pid").and_then(serde_json::Value::as_u64),
|
||||
Some(42)
|
||||
);
|
||||
assert_eq!(
|
||||
parsed.get("kernel_status").and_then(|v| v.as_str()),
|
||||
Some("exited(code=1)")
|
||||
);
|
||||
assert!(
|
||||
parsed
|
||||
.get("kernel_stderr_tail")
|
||||
.and_then(|v| v.as_str())
|
||||
.expect("kernel_stderr_tail should be present")
|
||||
.len()
|
||||
<= JS_REPL_MODEL_DIAG_STDERR_MAX_BYTES
|
||||
);
|
||||
assert!(
|
||||
parsed
|
||||
.get("stream_error")
|
||||
.and_then(|v| v.as_str())
|
||||
.expect("stream_error should be present")
|
||||
.len()
|
||||
<= JS_REPL_MODEL_DIAG_ERROR_MAX_BYTES
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn write_error_diagnostics_only_attach_for_likely_kernel_failures() {
|
||||
let running = KernelDebugSnapshot {
|
||||
pid: Some(7),
|
||||
status: "running".to_string(),
|
||||
stderr_tail: "<empty>".to_string(),
|
||||
};
|
||||
let exited = KernelDebugSnapshot {
|
||||
pid: Some(7),
|
||||
status: "exited(code=1)".to_string(),
|
||||
stderr_tail: "<empty>".to_string(),
|
||||
};
|
||||
assert!(!should_include_model_diagnostics_for_write_error(
|
||||
"failed to flush kernel message: other io error",
|
||||
&running
|
||||
));
|
||||
assert!(should_include_model_diagnostics_for_write_error(
|
||||
"failed to write to kernel: Broken pipe (os error 32)",
|
||||
&running
|
||||
));
|
||||
assert!(should_include_model_diagnostics_for_write_error(
|
||||
"failed to write to kernel: some other io error",
|
||||
&exited
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn js_repl_internal_tool_guard_matches_expected_names() {
|
||||
assert!(is_js_repl_internal_tool("js_repl"));
|
||||
@@ -1037,6 +1501,117 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn js_repl_timeout_kills_kernel_process() -> anyhow::Result<()> {
|
||||
if !can_run_js_repl_runtime_tests().await {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let session = Arc::new(session);
|
||||
let turn = Arc::new(turn);
|
||||
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default()));
|
||||
let manager = turn.js_repl.manager().await?;
|
||||
|
||||
manager
|
||||
.execute(
|
||||
Arc::clone(&session),
|
||||
Arc::clone(&turn),
|
||||
Arc::clone(&tracker),
|
||||
JsReplArgs {
|
||||
code: "console.log('warmup');".to_string(),
|
||||
timeout_ms: Some(10_000),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let child = {
|
||||
let guard = manager.kernel.lock().await;
|
||||
let state = guard.as_ref().expect("kernel should exist after warmup");
|
||||
Arc::clone(&state.child)
|
||||
};
|
||||
|
||||
let result = manager
|
||||
.execute(
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
JsReplArgs {
|
||||
code: "while (true) {}".to_string(),
|
||||
timeout_ms: Some(50),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect_err("expected timeout error");
|
||||
|
||||
assert_eq!(
|
||||
result.to_string(),
|
||||
"js_repl execution timed out; kernel reset, rerun your request"
|
||||
);
|
||||
|
||||
let exit_state = {
|
||||
let mut child = child.lock().await;
|
||||
child.try_wait()?
|
||||
};
|
||||
assert!(
|
||||
exit_state.is_some(),
|
||||
"timed out js_repl execution should kill previous kernel process"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn js_repl_kernel_failure_includes_model_diagnostics() -> anyhow::Result<()> {
|
||||
if !can_run_js_repl_runtime_tests().await {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let session = Arc::new(session);
|
||||
let turn = Arc::new(turn);
|
||||
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default()));
|
||||
let manager = turn.js_repl.manager().await?;
|
||||
|
||||
manager
|
||||
.execute(
|
||||
Arc::clone(&session),
|
||||
Arc::clone(&turn),
|
||||
Arc::clone(&tracker),
|
||||
JsReplArgs {
|
||||
code: "console.log('warmup');".to_string(),
|
||||
timeout_ms: Some(10_000),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let child = {
|
||||
let guard = manager.kernel.lock().await;
|
||||
let state = guard.as_ref().expect("kernel should exist after warmup");
|
||||
Arc::clone(&state.child)
|
||||
};
|
||||
JsReplManager::kill_kernel_child(&child, "test_crash").await;
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
let err = manager
|
||||
.execute(
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
JsReplArgs {
|
||||
code: "console.log('after-kill');".to_string(),
|
||||
timeout_ms: Some(10_000),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect_err("expected kernel failure after forced kill");
|
||||
|
||||
let message = err.to_string();
|
||||
assert!(message.contains("js_repl diagnostics:"));
|
||||
assert!(message.contains("\"reason\":\"write_failed\""));
|
||||
assert!(message.contains("\"kernel_status\":\"exited("));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn js_repl_can_call_tools() -> anyhow::Result<()> {
|
||||
if !can_run_js_repl_runtime_tests().await {
|
||||
|
||||
Reference in New Issue
Block a user