Compare commits

...

1 Commits

Author SHA1 Message Date
Curtis 'Fjord' Hawthorne
54a8a2e59e add js_repl polling and reuse unified_exec managed process lifecycle 2026-03-06 20:27:04 -08:00
25 changed files with 5521 additions and 655 deletions

View File

@@ -28,8 +28,10 @@ use codex_core::sandboxing::ExecRequest;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use codex_utils_pty::ProcessHandle;
use codex_utils_pty::SpawnedProcess;
use codex_utils_pty::SpawnedProcessSplit;
use codex_utils_pty::TerminalSize;
use tokio::sync::Mutex;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
@@ -100,7 +102,7 @@ struct RunCommandParams {
outgoing: Arc<OutgoingMessageSender>,
request_id: ConnectionRequestId,
process_id: Option<String>,
spawned: SpawnedProcess,
spawned: SpawnedCommand,
control_rx: mpsc::Receiver<CommandControlRequest>,
stream_stdin: bool,
stream_stdout_stderr: bool,
@@ -108,10 +110,15 @@ struct RunCommandParams {
output_bytes_cap: Option<usize>,
}
enum SpawnedCommand {
Merged(SpawnedProcess),
Split(SpawnedProcessSplit),
}
struct SpawnProcessOutputParams {
connection_id: ConnectionId,
process_id: Option<String>,
output_rx: mpsc::Receiver<Vec<u8>>,
output_rx: CommandOutputReceiver,
stdio_timeout_rx: watch::Receiver<bool>,
outgoing: Arc<OutgoingMessageSender>,
stream: CommandExecOutputStream,
@@ -119,6 +126,11 @@ struct SpawnProcessOutputParams {
output_bytes_cap: Option<usize>,
}
enum CommandOutputReceiver {
Mpsc(mpsc::Receiver<Vec<u8>>),
Broadcast(broadcast::Receiver<Vec<u8>>),
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum InternalProcessId {
Generated(i64),
@@ -270,11 +282,16 @@ impl CommandExecManager {
size.unwrap_or_default(),
)
.await
} else if stream_stdin {
codex_utils_pty::spawn_pipe_process(program, args, cwd.as_path(), &env, &arg0).await
.map(SpawnedCommand::Merged)
} else {
codex_utils_pty::spawn_pipe_process_no_stdin(program, args, cwd.as_path(), &env, &arg0)
codex_utils_pty::spawn_pipe_process_split(program, args, cwd.as_path(), &env, &arg0)
.await
.map(|spawned| {
if !stream_stdin {
spawned.session.close_stdin();
}
SpawnedCommand::Split(spawned)
})
};
let spawned = match spawned {
Ok(spawned) => spawned,
@@ -462,36 +479,58 @@ async fn run_command(params: RunCommandParams) {
}
};
tokio::pin!(expiration);
let SpawnedProcess {
session,
stdout_rx,
stderr_rx,
exit_rx,
} = spawned;
tokio::pin!(exit_rx);
let mut timed_out = false;
let (stdio_timeout_tx, stdio_timeout_rx) = watch::channel(false);
let stdout_handle = spawn_process_output(SpawnProcessOutputParams {
connection_id: request_id.connection_id,
process_id: process_id.clone(),
output_rx: stdout_rx,
stdio_timeout_rx: stdio_timeout_rx.clone(),
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stdout,
stream_output: stream_stdout_stderr,
output_bytes_cap,
});
let stderr_handle = spawn_process_output(SpawnProcessOutputParams {
connection_id: request_id.connection_id,
process_id,
output_rx: stderr_rx,
stdio_timeout_rx,
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stderr,
stream_output: stream_stdout_stderr,
output_bytes_cap,
});
let (session, exit_rx, stdout_handle, stderr_handle) = match spawned {
SpawnedCommand::Merged(SpawnedProcess {
session,
output_rx,
exit_rx,
}) => {
let stdout_handle = spawn_process_output(SpawnProcessOutputParams {
connection_id: request_id.connection_id,
process_id: process_id.clone(),
output_rx: CommandOutputReceiver::Broadcast(output_rx),
stdio_timeout_rx: stdio_timeout_rx.clone(),
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stdout,
stream_output: stream_stdout_stderr,
output_bytes_cap,
});
let stderr_handle = tokio::spawn(async { String::new() });
(session, exit_rx, stdout_handle, stderr_handle)
}
SpawnedCommand::Split(SpawnedProcessSplit {
session,
stdout_rx,
stderr_rx,
exit_rx,
..
}) => {
let stdout_handle = spawn_process_output(SpawnProcessOutputParams {
connection_id: request_id.connection_id,
process_id: process_id.clone(),
output_rx: CommandOutputReceiver::Mpsc(stdout_rx),
stdio_timeout_rx: stdio_timeout_rx.clone(),
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stdout,
stream_output: stream_stdout_stderr,
output_bytes_cap,
});
let stderr_handle = spawn_process_output(SpawnProcessOutputParams {
connection_id: request_id.connection_id,
process_id,
output_rx: CommandOutputReceiver::Mpsc(stderr_rx),
stdio_timeout_rx,
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stderr,
stream_output: stream_stdout_stderr,
output_bytes_cap,
});
(session, exit_rx, stdout_handle, stderr_handle)
}
};
tokio::pin!(exit_rx);
let exit_code = loop {
tokio::select! {
@@ -564,7 +603,7 @@ fn spawn_process_output(params: SpawnProcessOutputParams) -> tokio::task::JoinHa
let SpawnProcessOutputParams {
connection_id,
process_id,
mut output_rx,
output_rx,
mut stdio_timeout_rx,
outgoing,
stream,
@@ -572,11 +611,23 @@ fn spawn_process_output(params: SpawnProcessOutputParams) -> tokio::task::JoinHa
output_bytes_cap,
} = params;
tokio::spawn(async move {
let mut output_rx = output_rx;
let mut buffer: Vec<u8> = Vec::new();
let mut observed_num_bytes = 0usize;
loop {
let chunk = tokio::select! {
chunk = output_rx.recv() => match chunk {
chunk = async {
match &mut output_rx {
CommandOutputReceiver::Mpsc(output_rx) => output_rx.recv().await,
CommandOutputReceiver::Broadcast(output_rx) => loop {
match output_rx.recv().await {
Ok(chunk) => break Some(chunk),
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break None,
}
},
}
} => match chunk {
Some(chunk) => chunk,
None => break,
},

View File

@@ -374,6 +374,9 @@
"js_repl": {
"type": "boolean"
},
"js_repl_polling": {
"type": "boolean"
},
"js_repl_tools_only": {
"type": "boolean"
},
@@ -1843,6 +1846,9 @@
"js_repl": {
"type": "boolean"
},
"js_repl_polling": {
"type": "boolean"
},
"js_repl_tools_only": {
"type": "boolean"
},

View File

@@ -80,6 +80,8 @@ pub enum Feature {
// Experimental
/// Enable JavaScript REPL tools backed by a persistent Node kernel.
JsRepl,
/// Enable js_repl polling helpers and tool.
JsReplPolling,
/// Only expose js_repl tools directly to the model.
JsReplToolsOnly,
/// Use the single unified PTY-backed exec tool.
@@ -503,6 +505,12 @@ pub const FEATURES: &[FeatureSpec] = &[
},
default_enabled: false,
},
FeatureSpec {
id: Feature::JsReplPolling,
key: "js_repl_polling",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::JsReplToolsOnly,
key: "js_repl_tools_only",

View File

@@ -65,6 +65,17 @@ fn render_js_repl_instructions(config: &Config) -> Option<String> {
section.push_str("- Top-level bindings persist across cells. If a cell throws, prior bindings remain available and bindings that finished initializing before the throw often remain usable in later cells. For code you plan to reuse across cells, prefer declaring or assigning it in direct top-level statements before operations that might throw. If you hit `SyntaxError: Identifier 'x' has already been declared`, first reuse the existing binding, reassign a previously declared `let`, or pick a new descriptive name. Use `{ ... }` only for a short temporary block when you specifically need local scratch names; do not wrap an entire cell in block scope if you want those names reusable later. Reset the kernel with `js_repl_reset` only when you need a clean state.\n");
section.push_str("- Top-level static import declarations (for example `import x from \"./file.js\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")`, `await import(\"./file.js\")`, or `await import(\"/abs/path/file.mjs\")` instead. Imported local files must be ESM `.js`/`.mjs` files and run in the same REPL VM context. Bare package imports always resolve from REPL-global search roots (`CODEX_JS_REPL_NODE_MODULE_DIRS`, then cwd), not relative to the imported file location. Local files may statically import only other local relative/absolute/`file://` `.js`/`.mjs` files; package and builtin imports from local files must stay dynamic. `import.meta.resolve()` returns importable strings such as `file://...`, bare package names, and `node:...` specifiers. Local file modules reload between execs, while top-level bindings persist until `js_repl_reset`.\n");
if config.features.enabled(Feature::JsReplPolling) {
section.push_str("- Polling mode is session-based: call `js_repl` with first-line pragma `// codex-js-repl: poll=true` to get `exec_id` and `session_id`; provide `session_id=<id>` in later `js_repl` calls to reuse that session state. Omit `session_id` to create a new polling session, and note that unknown `session_id` values fail. Use space-separated pragma arguments. If a task says to use polling mode only, every `js_repl` submission for that task must include `// codex-js-repl: poll=true`; do not mix polling and non-polling submissions.\n");
section.push_str("- Reuse the same `session_id` across related polling cells so top-level state, imported modules, and handles persist. Omit `session_id` only when you intentionally want a fresh polling session.\n");
section.push_str("- Use `js_repl_poll` with `exec_id` until `status` is `completed` or `error`; if a poll returns `status: running`, keep polling the same `exec_id` even if the logs or `final_output` already look complete. Poll responses also include `session_id`, and completed polls can also include nested multimodal tool output after the JSON status item.\n");
section.push_str("- Keep polling submissions short and single-purpose. If a submission errors, prefer resending a smaller cell in the same session before reaching for `js_repl_reset`.\n");
section.push_str("- Use `js_repl_reset({\"session_id\":\"...\"})` to stop one polling session (including any running exec), or `js_repl_reset({})` to reset all js_repl kernels.\n");
section.push_str("- `js_repl_poll` must not be called before a successful `js_repl` polling submission returns an `exec_id`.\n");
section.push_str("- In polling mode, `timeout_ms` is not supported on `js_repl`; use `js_repl_poll` `yield_time_ms` to control poll wait duration. Omitted values, or values below `5000`, wait up to 5 seconds before returning if nothing new arrives.\n");
section.push_str("- If `js_repl` rejects your payload format, resend raw JS with the pragma; do not retry with JSON, quoted strings, or markdown fences.\n");
}
if config.features.enabled(Feature::JsReplToolsOnly) {
section.push_str("- Do not call tools directly; use `js_repl` + `codex.tool(...)` for all tool calls, including shell commands.\n");
section
@@ -390,6 +401,47 @@ mod tests {
}])
}
fn expected_js_repl_instructions(polling: bool, tools_only: bool) -> String {
let mut expected = String::from("## JavaScript REPL (Node)\n");
expected.push_str(
"- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel.\n",
);
expected.push_str("- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n");
expected.push_str(
"- Helpers: `codex.tmpDir`, `codex.tool(name, args?)`, and `codex.emitImage(imageLike)`.\n",
);
expected.push_str("- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike. Nested tool outputs stay inside JavaScript unless you emit them explicitly.\n");
expected.push_str("- `codex.emitImage(...)` adds one image to the outer `js_repl` function output each time you call it, so you can call it multiple times to emit multiple images. It accepts a data URL, a single `input_image` item, an object like `{ bytes, mimeType }`, or a raw tool response object with exactly one image and no text. It rejects mixed text-and-image content.\n");
expected.push_str("- Example of sharing an in-memory Playwright screenshot: `await codex.emitImage({ bytes: await page.screenshot({ type: \"jpeg\", quality: 85 }), mimeType: \"image/jpeg\" })`.\n");
expected.push_str("- Example of sharing a local image tool result: `await codex.emitImage(codex.tool(\"view_image\", { path: \"/absolute/path\" }))`.\n");
expected.push_str("- When encoding an image to send with `codex.emitImage(...)` or `view_image`, prefer JPEG at about 85 quality when lossy compression is acceptable; use PNG when transparency or lossless detail matters. Smaller uploads are faster and less likely to hit size limits.\n");
expected.push_str("- Top-level bindings persist across cells. If a cell throws, prior bindings remain available and bindings that finished initializing before the throw often remain usable in later cells. For code you plan to reuse across cells, prefer declaring or assigning it in direct top-level statements before operations that might throw. If you hit `SyntaxError: Identifier 'x' has already been declared`, first reuse the existing binding, reassign a previously declared `let`, or pick a new descriptive name. Use `{ ... }` only for a short temporary block when you specifically need local scratch names; do not wrap an entire cell in block scope if you want those names reusable later. Reset the kernel with `js_repl_reset` only when you need a clean state.\n");
expected.push_str("- Top-level static import declarations (for example `import x from \"./file.js\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")`, `await import(\"./file.js\")`, or `await import(\"/abs/path/file.mjs\")` instead. Imported local files must be ESM `.js`/`.mjs` files and run in the same REPL VM context. Bare package imports always resolve from REPL-global search roots (`CODEX_JS_REPL_NODE_MODULE_DIRS`, then cwd), not relative to the imported file location. Local files may statically import only other local relative/absolute/`file://` `.js`/`.mjs` files; package and builtin imports from local files must stay dynamic. `import.meta.resolve()` returns importable strings such as `file://...`, bare package names, and `node:...` specifiers. Local file modules reload between execs, while top-level bindings persist until `js_repl_reset`.\n");
if polling {
expected.push_str("- Polling mode is session-based: call `js_repl` with first-line pragma `// codex-js-repl: poll=true` to get `exec_id` and `session_id`; provide `session_id=<id>` in later `js_repl` calls to reuse that session state. Omit `session_id` to create a new polling session, and note that unknown `session_id` values fail. Use space-separated pragma arguments. If a task says to use polling mode only, every `js_repl` submission for that task must include `// codex-js-repl: poll=true`; do not mix polling and non-polling submissions.\n");
expected.push_str("- Reuse the same `session_id` across related polling cells so top-level state, imported modules, and handles persist. Omit `session_id` only when you intentionally want a fresh polling session.\n");
expected.push_str("- Use `js_repl_poll` with `exec_id` until `status` is `completed` or `error`; if a poll returns `status: running`, keep polling the same `exec_id` even if the logs or `final_output` already look complete. Poll responses also include `session_id`, and completed polls can also include nested multimodal tool output after the JSON status item.\n");
expected.push_str("- Keep polling submissions short and single-purpose. If a submission errors, prefer resending a smaller cell in the same session before reaching for `js_repl_reset`.\n");
expected.push_str("- Use `js_repl_reset({\"session_id\":\"...\"})` to stop one polling session (including any running exec), or `js_repl_reset({})` to reset all js_repl kernels.\n");
expected.push_str("- `js_repl_poll` must not be called before a successful `js_repl` polling submission returns an `exec_id`.\n");
expected.push_str("- In polling mode, `timeout_ms` is not supported on `js_repl`; use `js_repl_poll` `yield_time_ms` to control poll wait duration. Omitted values, or values below `5000`, wait up to 5 seconds before returning if nothing new arrives.\n");
expected.push_str("- If `js_repl` rejects your payload format, resend raw JS with the pragma; do not retry with JSON, quoted strings, or markdown fences.\n");
}
if tools_only {
expected.push_str(
"- Do not call tools directly; use `js_repl` + `codex.tool(...)` for all tool calls, including shell commands.\n",
);
expected.push_str(
"- MCP tools (if any) can also be called by name via `codex.tool(...)`.\n",
);
}
expected.push_str("- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`, `codex.tool(...)`, and `codex.emitImage(...)`.");
expected
}
/// AGENTS.md missing should yield `None`.
#[tokio::test]
async fn no_doc_file_returns_none() {
@@ -400,7 +452,6 @@ mod tests {
res.is_none(),
"Expected None when AGENTS.md is absent and no system instructions provided"
);
assert!(res.is_none(), "Expected None when AGENTS.md is absent");
}
/// Small file within the byte-limit is returned unmodified.
@@ -490,8 +541,7 @@ mod tests {
let res = get_user_instructions(&cfg, None, None)
.await
.expect("js_repl instructions expected");
let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.tmpDir`, `codex.tool(name, args?)`, and `codex.emitImage(imageLike)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike. Nested tool outputs stay inside JavaScript unless you emit them explicitly.\n- `codex.emitImage(...)` adds one image to the outer `js_repl` function output each time you call it, so you can call it multiple times to emit multiple images. It accepts a data URL, a single `input_image` item, an object like `{ bytes, mimeType }`, or a raw tool response object with exactly one image and no text. It rejects mixed text-and-image content.\n- Example of sharing an in-memory Playwright screenshot: `await codex.emitImage({ bytes: await page.screenshot({ type: \"jpeg\", quality: 85 }), mimeType: \"image/jpeg\" })`.\n- Example of sharing a local image tool result: `await codex.emitImage(codex.tool(\"view_image\", { path: \"/absolute/path\" }))`.\n- When encoding an image to send with `codex.emitImage(...)` or `view_image`, prefer JPEG at about 85 quality when lossy compression is acceptable; use PNG when transparency or lossless detail matters. Smaller uploads are faster and less likely to hit size limits.\n- Top-level bindings persist across cells. If a cell throws, prior bindings remain available and bindings that finished initializing before the throw often remain usable in later cells. For code you plan to reuse across cells, prefer declaring or assigning it in direct top-level statements before operations that might throw. If you hit `SyntaxError: Identifier 'x' has already been declared`, first reuse the existing binding, reassign a previously declared `let`, or pick a new descriptive name. Use `{ ... }` only for a short temporary block when you specifically need local scratch names; do not wrap an entire cell in block scope if you want those names reusable later. Reset the kernel with `js_repl_reset` only when you need a clean state.\n- Top-level static import declarations (for example `import x from \"./file.js\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")`, `await import(\"./file.js\")`, or `await import(\"/abs/path/file.mjs\")` instead. Imported local files must be ESM `.js`/`.mjs` files and run in the same REPL VM context. Bare package imports always resolve from REPL-global search roots (`CODEX_JS_REPL_NODE_MODULE_DIRS`, then cwd), not relative to the imported file location. Local files may statically import only other local relative/absolute/`file://` `.js`/`.mjs` files; package and builtin imports from local files must stay dynamic. `import.meta.resolve()` returns importable strings such as `file://...`, bare package names, and `node:...` specifiers. Local file modules reload between execs, while top-level bindings persist until `js_repl_reset`.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`, `codex.tool(...)`, and `codex.emitImage(...)`.";
assert_eq!(res, expected);
assert_eq!(res, expected_js_repl_instructions(false, false));
}
#[tokio::test]
@@ -509,8 +559,7 @@ mod tests {
let res = get_user_instructions(&cfg, None, None)
.await
.expect("js_repl instructions expected");
let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.tmpDir`, `codex.tool(name, args?)`, and `codex.emitImage(imageLike)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike. Nested tool outputs stay inside JavaScript unless you emit them explicitly.\n- `codex.emitImage(...)` adds one image to the outer `js_repl` function output each time you call it, so you can call it multiple times to emit multiple images. It accepts a data URL, a single `input_image` item, an object like `{ bytes, mimeType }`, or a raw tool response object with exactly one image and no text. It rejects mixed text-and-image content.\n- Example of sharing an in-memory Playwright screenshot: `await codex.emitImage({ bytes: await page.screenshot({ type: \"jpeg\", quality: 85 }), mimeType: \"image/jpeg\" })`.\n- Example of sharing a local image tool result: `await codex.emitImage(codex.tool(\"view_image\", { path: \"/absolute/path\" }))`.\n- When encoding an image to send with `codex.emitImage(...)` or `view_image`, prefer JPEG at about 85 quality when lossy compression is acceptable; use PNG when transparency or lossless detail matters. Smaller uploads are faster and less likely to hit size limits.\n- Top-level bindings persist across cells. If a cell throws, prior bindings remain available and bindings that finished initializing before the throw often remain usable in later cells. For code you plan to reuse across cells, prefer declaring or assigning it in direct top-level statements before operations that might throw. If you hit `SyntaxError: Identifier 'x' has already been declared`, first reuse the existing binding, reassign a previously declared `let`, or pick a new descriptive name. Use `{ ... }` only for a short temporary block when you specifically need local scratch names; do not wrap an entire cell in block scope if you want those names reusable later. Reset the kernel with `js_repl_reset` only when you need a clean state.\n- Top-level static import declarations (for example `import x from \"./file.js\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")`, `await import(\"./file.js\")`, or `await import(\"/abs/path/file.mjs\")` instead. Imported local files must be ESM `.js`/`.mjs` files and run in the same REPL VM context. Bare package imports always resolve from REPL-global search roots (`CODEX_JS_REPL_NODE_MODULE_DIRS`, then cwd), not relative to the imported file location. Local files may statically import only other local relative/absolute/`file://` `.js`/`.mjs` files; package and builtin imports from local files must stay dynamic. `import.meta.resolve()` returns importable strings such as `file://...`, bare package names, and `node:...` specifiers. Local file modules reload between execs, while top-level bindings persist until `js_repl_reset`.\n- Do not call tools directly; use `js_repl` + `codex.tool(...)` for all tool calls, including shell commands.\n- MCP tools (if any) can also be called by name via `codex.tool(...)`.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`, `codex.tool(...)`, and `codex.emitImage(...)`.";
assert_eq!(res, expected);
assert_eq!(res, expected_js_repl_instructions(false, true));
}
#[tokio::test]
@@ -528,8 +577,24 @@ mod tests {
let res = get_user_instructions(&cfg, None, None)
.await
.expect("js_repl instructions expected");
let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.tmpDir`, `codex.tool(name, args?)`, and `codex.emitImage(imageLike)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike. Nested tool outputs stay inside JavaScript unless you emit them explicitly.\n- `codex.emitImage(...)` adds one image to the outer `js_repl` function output each time you call it, so you can call it multiple times to emit multiple images. It accepts a data URL, a single `input_image` item, an object like `{ bytes, mimeType }`, or a raw tool response object with exactly one image and no text. It rejects mixed text-and-image content.\n- Example of sharing an in-memory Playwright screenshot: `await codex.emitImage({ bytes: await page.screenshot({ type: \"jpeg\", quality: 85 }), mimeType: \"image/jpeg\" })`.\n- Example of sharing a local image tool result: `await codex.emitImage(codex.tool(\"view_image\", { path: \"/absolute/path\" }))`.\n- When encoding an image to send with `codex.emitImage(...)` or `view_image`, prefer JPEG at about 85 quality when lossy compression is acceptable; use PNG when transparency or lossless detail matters. Smaller uploads are faster and less likely to hit size limits.\n- Top-level bindings persist across cells. If a cell throws, prior bindings remain available and bindings that finished initializing before the throw often remain usable in later cells. For code you plan to reuse across cells, prefer declaring or assigning it in direct top-level statements before operations that might throw. If you hit `SyntaxError: Identifier 'x' has already been declared`, first reuse the existing binding, reassign a previously declared `let`, or pick a new descriptive name. Use `{ ... }` only for a short temporary block when you specifically need local scratch names; do not wrap an entire cell in block scope if you want those names reusable later. Reset the kernel with `js_repl_reset` only when you need a clean state.\n- Top-level static import declarations (for example `import x from \"./file.js\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")`, `await import(\"./file.js\")`, or `await import(\"/abs/path/file.mjs\")` instead. Imported local files must be ESM `.js`/`.mjs` files and run in the same REPL VM context. Bare package imports always resolve from REPL-global search roots (`CODEX_JS_REPL_NODE_MODULE_DIRS`, then cwd), not relative to the imported file location. Local files may statically import only other local relative/absolute/`file://` `.js`/`.mjs` files; package and builtin imports from local files must stay dynamic. `import.meta.resolve()` returns importable strings such as `file://...`, bare package names, and `node:...` specifiers. Local file modules reload between execs, while top-level bindings persist until `js_repl_reset`.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`, `codex.tool(...)`, and `codex.emitImage(...)`.";
assert_eq!(res, expected);
assert_eq!(res, expected_js_repl_instructions(false, false));
}
#[tokio::test]
async fn js_repl_polling_instructions_are_feature_gated() {
let tmp = tempfile::tempdir().expect("tempdir");
let mut cfg = make_config(&tmp, 4096, None).await;
cfg.features
.enable(Feature::JsRepl)
.expect("feature flag should enable");
cfg.features
.enable(Feature::JsReplPolling)
.expect("feature flag should enable");
let res = get_user_instructions(&cfg, None, None)
.await
.expect("js_repl instructions expected");
assert_eq!(res, expected_js_repl_instructions(true, false));
}
/// When both system instructions *and* a project doc are present the two

View File

@@ -1,11 +1,9 @@
use async_trait::async_trait;
use serde::Deserialize;
use serde_json::Value as JsonValue;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use crate::exec::ExecToolCallOutput;
use crate::exec::StreamOutput;
use crate::features::Feature;
use crate::function_tool::FunctionCallError;
use crate::protocol::ExecCommandSource;
@@ -14,11 +12,14 @@ use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
use crate::tools::events::ToolEmitter;
use crate::tools::events::ToolEventCtx;
use crate::tools::events::ToolEventFailure;
use crate::tools::events::ToolEventStage;
use crate::tools::handlers::parse_arguments;
use crate::tools::js_repl::JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE;
use crate::tools::js_repl::JS_REPL_PRAGMA_PREFIX;
use crate::tools::js_repl::JsExecPollResult;
use crate::tools::js_repl::JsReplArgs;
use crate::tools::js_repl::JsReplExecuteError;
use crate::tools::js_repl::emit_js_repl_exec_end;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use codex_protocol::models::FunctionCallOutputBody;
@@ -26,33 +27,21 @@ use codex_protocol::models::FunctionCallOutputContentItem;
pub struct JsReplHandler;
pub struct JsReplResetHandler;
pub struct JsReplPollHandler;
fn join_outputs(stdout: &str, stderr: &str) -> String {
if stdout.is_empty() {
stderr.to_string()
} else if stderr.is_empty() {
stdout.to_string()
} else {
format!("{stdout}\n{stderr}")
}
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct JsReplPollArgs {
exec_id: String,
#[serde(default)]
yield_time_ms: Option<u64>,
}
fn build_js_repl_exec_output(
output: &str,
error: Option<&str>,
duration: Duration,
) -> ExecToolCallOutput {
let stdout = output.to_string();
let stderr = error.unwrap_or("").to_string();
let aggregated_output = join_outputs(&stdout, &stderr);
ExecToolCallOutput {
exit_code: if error.is_some() { 1 } else { 0 },
stdout: StreamOutput::new(stdout),
stderr: StreamOutput::new(stderr),
aggregated_output: StreamOutput::new(aggregated_output),
duration,
timed_out: false,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct JsReplResetArgs {
#[serde(default)]
session_id: Option<String>,
}
async fn emit_js_repl_exec_begin(
@@ -70,29 +59,6 @@ async fn emit_js_repl_exec_begin(
emitter.emit(ctx, ToolEventStage::Begin).await;
}
async fn emit_js_repl_exec_end(
session: &crate::codex::Session,
turn: &crate::codex::TurnContext,
call_id: &str,
output: &str,
error: Option<&str>,
duration: Duration,
) {
let exec_output = build_js_repl_exec_output(output, error, duration);
let emitter = ToolEmitter::shell(
vec!["js_repl".to_string()],
turn.cwd.clone(),
ExecCommandSource::Agent,
false,
);
let ctx = ToolEventCtx::new(session, turn, call_id, None);
let stage = if error.is_some() {
ToolEventStage::Failure(ToolEventFailure::Output(exec_output))
} else {
ToolEventStage::Success(exec_output)
};
emitter.emit(ctx, stage).await;
}
#[async_trait]
impl ToolHandler for JsReplHandler {
fn kind(&self) -> ToolKind {
@@ -124,22 +90,93 @@ impl ToolHandler for JsReplHandler {
let args = match payload {
ToolPayload::Function { arguments } => parse_arguments(&arguments)?,
ToolPayload::Custom { input } => parse_freeform_args(&input)?,
ToolPayload::Custom { input } => {
parse_freeform_args(&input, session.features().enabled(Feature::JsReplPolling))?
}
_ => {
return Err(FunctionCallError::RespondToModel(
"js_repl expects custom or function payload".to_string(),
));
}
};
if args.poll {
if args
.session_id
.as_deref()
.is_some_and(|session_id| session_id.trim().is_empty())
{
return Err(FunctionCallError::RespondToModel(
"js_repl session_id must not be empty".to_string(),
));
}
if args.timeout_ms.is_some() {
return Err(FunctionCallError::RespondToModel(
JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE.to_string(),
));
}
if !session.features().enabled(Feature::JsReplPolling) {
return Err(FunctionCallError::RespondToModel(
"js_repl polling is disabled by feature flag".to_string(),
));
}
} else if args.session_id.is_some() {
return Err(FunctionCallError::RespondToModel(
"js_repl session_id is only supported when poll=true".to_string(),
));
}
let manager = turn.js_repl.manager().await?;
let started_at = Instant::now();
emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), &call_id).await;
if args.poll {
let submission = Arc::clone(&manager)
.submit(
Arc::clone(&session),
Arc::clone(&turn),
tracker,
call_id.clone(),
args,
)
.await;
let submission = match submission {
Ok(submission) => submission,
Err(err) => {
let message = err.to_string();
emit_js_repl_exec_end(
session.as_ref(),
turn.as_ref(),
&call_id,
"",
Some(&message),
started_at.elapsed(),
false,
)
.await;
return Err(err);
}
};
let content = serde_json::to_string(&serde_json::json!({
"exec_id": submission.exec_id,
"session_id": submission.session_id,
"status": "running",
}))
.map_err(|err| {
FunctionCallError::Fatal(format!(
"failed to serialize js_repl submission result: {err}"
))
})?;
return Ok(ToolOutput::Function {
body: FunctionCallOutputBody::Text(content),
success: Some(true),
});
}
let result = manager
.execute(Arc::clone(&session), Arc::clone(&turn), tracker, args)
.await;
let result = match result {
Ok(result) => result,
Err(err) => {
let timed_out = matches!(err, JsReplExecuteError::TimedOut);
let message = err.to_string();
emit_js_repl_exec_end(
session.as_ref(),
@@ -148,9 +185,10 @@ impl ToolHandler for JsReplHandler {
"",
Some(&message),
started_at.elapsed(),
timed_out,
)
.await;
return Err(err);
return Err(err.into());
}
};
@@ -170,6 +208,7 @@ impl ToolHandler for JsReplHandler {
&content,
None,
started_at.elapsed(),
false,
)
.await;
@@ -191,21 +230,158 @@ impl ToolHandler for JsReplResetHandler {
}
async fn handle(&self, invocation: ToolInvocation) -> Result<ToolOutput, FunctionCallError> {
if !invocation.session.features().enabled(Feature::JsRepl) {
let ToolInvocation {
session,
turn,
payload,
..
} = invocation;
if !session.features().enabled(Feature::JsRepl) {
return Err(FunctionCallError::RespondToModel(
"js_repl is disabled by feature flag".to_string(),
));
}
let manager = invocation.turn.js_repl.manager().await?;
manager.reset().await?;
let ToolPayload::Function { arguments } = payload else {
return Err(FunctionCallError::RespondToModel(
"js_repl_reset expects function payload".to_string(),
));
};
let args: JsReplResetArgs = parse_arguments(&arguments)?;
let manager = turn.js_repl.manager().await?;
let content = if let Some(session_id) = args.session_id {
if session_id.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"js_repl session_id must not be empty".to_string(),
));
}
manager.reset_session(&session_id).await?;
serde_json::to_string(&serde_json::json!({
"status": "reset",
"session_id": session_id,
}))
.map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize js_repl reset result: {err}"))
})?
} else {
manager.reset().await?;
serde_json::to_string(&serde_json::json!({
"status": "reset_all",
}))
.map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize js_repl reset result: {err}"))
})?
};
Ok(ToolOutput::Function {
body: FunctionCallOutputBody::Text("js_repl kernel reset".to_string()),
body: FunctionCallOutputBody::Text(content),
success: Some(true),
})
}
}
fn parse_freeform_args(input: &str) -> Result<JsReplArgs, FunctionCallError> {
#[async_trait]
impl ToolHandler for JsReplPollHandler {
fn kind(&self) -> ToolKind {
ToolKind::Function
}
async fn handle(&self, invocation: ToolInvocation) -> Result<ToolOutput, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
..
} = invocation;
if !session.features().enabled(Feature::JsRepl) {
return Err(FunctionCallError::RespondToModel(
"js_repl is disabled by feature flag".to_string(),
));
}
if !session.features().enabled(Feature::JsReplPolling) {
return Err(FunctionCallError::RespondToModel(
"js_repl polling is disabled by feature flag".to_string(),
));
}
let ToolPayload::Function { arguments } = payload else {
return Err(FunctionCallError::RespondToModel(
"js_repl_poll expects function payload".to_string(),
));
};
let args: JsReplPollArgs = parse_arguments(&arguments)?;
let manager = turn.js_repl.manager().await?;
let result = manager
.poll(
&args.exec_id,
args.yield_time_ms.map(|yield_time_ms| {
yield_time_ms.max(crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS)
}),
)
.await?;
let output = format_poll_output(&result)?;
Ok(ToolOutput::Function {
body: output.body,
success: Some(true),
})
}
}
struct JsReplPollOutput {
body: FunctionCallOutputBody,
}
#[derive(Default)]
struct ParsedJsReplPragma {
timeout_ms: Option<u64>,
poll: Option<bool>,
session_id: Option<String>,
}
fn format_poll_output(result: &JsExecPollResult) -> Result<JsReplPollOutput, FunctionCallError> {
let status = if result.done {
if result.error.is_some() {
"error"
} else {
"completed"
}
} else {
"running"
};
let logs = if result.logs.is_empty() {
None
} else {
Some(result.logs.join("\n"))
};
let payload = serde_json::json!({
"exec_id": result.exec_id,
"session_id": result.session_id,
"status": status,
"logs": logs,
"final_output": result.final_output,
"error": result.error,
});
let content = serde_json::to_string(&payload).map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize js_repl poll result: {err}"))
})?;
let body = if result.content_items.is_empty() {
FunctionCallOutputBody::Text(content)
} else {
let mut items = Vec::with_capacity(result.content_items.len() + 1);
items.push(FunctionCallOutputContentItem::InputText { text: content });
items.extend(result.content_items.clone());
FunctionCallOutputBody::ContentItems(items)
};
Ok(JsReplPollOutput { body })
}
fn parse_freeform_args(
input: &str,
polling_enabled: bool,
) -> Result<JsReplArgs, FunctionCallError> {
if input.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"js_repl expects raw JavaScript tool input (non-empty). Provide JS source text, optionally with first-line `// codex-js-repl: ...`."
@@ -213,61 +389,119 @@ fn parse_freeform_args(input: &str) -> Result<JsReplArgs, FunctionCallError> {
));
}
let mut args = JsReplArgs {
code: input.to_string(),
timeout_ms: None,
};
let mut lines = input.splitn(2, '\n');
let first_line = lines.next().unwrap_or_default();
let rest = lines.next().unwrap_or_default();
let trimmed = first_line.trim_start();
let Some(pragma) = trimmed.strip_prefix(JS_REPL_PRAGMA_PREFIX) else {
reject_json_or_quoted_source(&args.code)?;
return Ok(args);
reject_json_or_quoted_source(input)?;
return Ok(JsReplArgs {
code: input.to_string(),
timeout_ms: None,
poll: false,
session_id: None,
});
};
let mut timeout_ms: Option<u64> = None;
let directive = pragma.trim();
if !directive.is_empty() {
for token in directive.split_whitespace() {
let (key, value) = token.split_once('=').ok_or_else(|| {
FunctionCallError::RespondToModel(format!(
"js_repl pragma expects space-separated key=value pairs (supported keys: timeout_ms); got `{token}`"
))
})?;
match key {
"timeout_ms" => {
if timeout_ms.is_some() {
return Err(FunctionCallError::RespondToModel(
"js_repl pragma specifies timeout_ms more than once".to_string(),
));
}
let parsed = value.parse::<u64>().map_err(|_| {
FunctionCallError::RespondToModel(format!(
"js_repl pragma timeout_ms must be an integer; got `{value}`"
))
})?;
timeout_ms = Some(parsed);
}
_ => {
return Err(FunctionCallError::RespondToModel(format!(
"js_repl pragma only supports timeout_ms; got `{key}`"
)));
}
}
}
}
if rest.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"js_repl pragma must be followed by JavaScript source on subsequent lines".to_string(),
));
}
let pragma_args = parse_js_repl_pragma(pragma.trim(), polling_enabled)?;
reject_json_or_quoted_source(rest)?;
args.code = rest.to_string();
args.timeout_ms = timeout_ms;
let args = JsReplArgs {
code: rest.to_string(),
timeout_ms: pragma_args.timeout_ms,
poll: pragma_args.poll.unwrap_or(false),
session_id: pragma_args.session_id,
};
if args.session_id.is_some() && !args.poll {
return Err(FunctionCallError::RespondToModel(
"js_repl session_id is only supported when poll=true".to_string(),
));
}
if args.poll && args.timeout_ms.is_some() {
return Err(FunctionCallError::RespondToModel(
JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE.to_string(),
));
}
Ok(args)
}
fn parse_js_repl_pragma(
directive: &str,
polling_enabled: bool,
) -> Result<ParsedJsReplPragma, FunctionCallError> {
let mut args = ParsedJsReplPragma::default();
let supported_keys = if polling_enabled {
"timeout_ms, poll, session_id"
} else {
"timeout_ms"
};
for token in directive
.split(|c: char| c.is_ascii_whitespace() || c == ',')
.filter(|token| !token.is_empty())
{
let (key, value) = token.split_once('=').ok_or_else(|| {
FunctionCallError::RespondToModel(format!(
"js_repl pragma expects space-separated key=value pairs (supported keys: {supported_keys}); got `{token}`"
))
})?;
match key {
"timeout_ms" => {
if args.timeout_ms.is_some() {
return Err(FunctionCallError::RespondToModel(
"js_repl pragma specifies timeout_ms more than once".to_string(),
));
}
let parsed = value.parse::<u64>().map_err(|_| {
FunctionCallError::RespondToModel(format!(
"js_repl pragma timeout_ms must be an integer; got `{value}`"
))
})?;
args.timeout_ms = Some(parsed);
}
"poll" => {
if args.poll.is_some() {
return Err(FunctionCallError::RespondToModel(
"js_repl pragma specifies poll more than once".to_string(),
));
}
let parsed = match value.to_ascii_lowercase().as_str() {
"true" => true,
"false" => false,
_ => {
return Err(FunctionCallError::RespondToModel(format!(
"js_repl pragma poll must be true or false; got `{value}`"
)));
}
};
args.poll = Some(parsed);
}
"session_id" => {
if args.session_id.is_some() {
return Err(FunctionCallError::RespondToModel(
"js_repl pragma specifies session_id more than once".to_string(),
));
}
if value.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"js_repl session_id must not be empty".to_string(),
));
}
args.session_id = Some(value.to_string());
}
_ => {
return Err(FunctionCallError::RespondToModel(format!(
"js_repl pragma only supports {supported_keys}; got `{key}`"
)));
}
}
}
Ok(args)
}
@@ -295,30 +529,131 @@ fn reject_json_or_quoted_source(code: &str) -> Result<(), FunctionCallError> {
mod tests {
use std::time::Duration;
use super::format_poll_output;
use super::parse_freeform_args;
use crate::codex::make_session_and_context_with_rx;
use crate::protocol::EventMsg;
use crate::protocol::ExecCommandSource;
use crate::tools::js_repl::JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE;
use crate::tools::js_repl::JS_REPL_TIMEOUT_ERROR_MESSAGE;
use crate::tools::js_repl::JsExecPollResult;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputContentItem;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn parse_freeform_args_without_pragma() {
let args = parse_freeform_args("console.log('ok');").expect("parse args");
let args = parse_freeform_args("console.log('ok');", true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, None);
assert!(!args.poll);
assert_eq!(args.session_id, None);
}
#[test]
fn parse_freeform_args_with_pragma() {
let input = "// codex-js-repl: timeout_ms=15000\nconsole.log('ok');";
let args = parse_freeform_args(input).expect("parse args");
let args = parse_freeform_args(input, true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, Some(15_000));
assert!(!args.poll);
assert_eq!(args.session_id, None);
}
#[test]
fn parse_freeform_args_with_poll() {
let input = "// codex-js-repl: poll=true\nconsole.log('ok');";
let args = parse_freeform_args(input, true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, None);
assert!(args.poll);
assert_eq!(args.session_id, None);
}
#[test]
fn parse_freeform_args_rejects_timeout_ms_when_poll_true() {
let input = "// codex-js-repl: poll=true timeout_ms=15000\nconsole.log('ok');";
let err = parse_freeform_args(input, true).expect_err("expected error");
assert_eq!(err.to_string(), JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE);
}
#[test]
fn parse_freeform_args_with_poll_and_session_id() {
let input = "// codex-js-repl: poll=true session_id=my-session\nconsole.log('ok');";
let args = parse_freeform_args(input, true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, None);
assert!(args.poll);
assert_eq!(args.session_id.as_deref(), Some("my-session"));
}
#[test]
fn parse_freeform_args_with_comma_separated_poll_and_session_id() {
let input = "// codex-js-repl: poll=true, session_id=my-session\nconsole.log('ok');";
let args = parse_freeform_args(input, true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, None);
assert!(args.poll);
assert_eq!(args.session_id.as_deref(), Some("my-session"));
}
#[test]
fn parse_freeform_args_with_comma_separated_pragma_without_spaces() {
let input = "// codex-js-repl: timeout_ms=15000,poll=false\nconsole.log('ok');";
let args = parse_freeform_args(input, true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, Some(15_000));
assert!(!args.poll);
assert_eq!(args.session_id, None);
}
#[test]
fn parse_freeform_args_rejects_session_id_without_poll() {
let input = "// codex-js-repl: session_id=my-session\nconsole.log('ok');";
let err = parse_freeform_args(input, true).expect_err("expected error");
assert_eq!(
err.to_string(),
"js_repl session_id is only supported when poll=true"
);
}
#[test]
fn parse_freeform_args_rejects_unknown_key() {
let err = parse_freeform_args("// codex-js-repl: nope=1\nconsole.log('ok');")
let err = parse_freeform_args("// codex-js-repl: nope=1\nconsole.log('ok');", true)
.expect_err("expected error");
assert_eq!(
err.to_string(),
"js_repl pragma only supports timeout_ms, poll, session_id; got `nope`"
);
}
#[test]
fn parse_freeform_args_rejects_duplicate_poll() {
let err = parse_freeform_args(
"// codex-js-repl: poll=true poll=false\nconsole.log('ok');",
true,
)
.expect_err("expected error");
assert_eq!(
err.to_string(),
"js_repl pragma specifies poll more than once"
);
}
#[test]
fn parse_freeform_args_rejects_json_wrapped_code() {
let err =
parse_freeform_args(r#"{"code":"await doThing()"}"#, true).expect_err("expected error");
assert_eq!(
err.to_string(),
"js_repl is a freeform tool and expects raw JavaScript source. Resend plain JS only (optional first line `// codex-js-repl: ...`); do not send JSON (`{\"code\":...}`), quoted code, or markdown fences."
);
}
#[test]
fn parse_freeform_args_hides_polling_keys_when_polling_disabled() {
let err = parse_freeform_args("// codex-js-repl: nope=1\nconsole.log('ok');", false)
.expect_err("expected error");
assert_eq!(
err.to_string(),
@@ -327,27 +662,121 @@ mod tests {
}
#[test]
fn parse_freeform_args_rejects_reset_key() {
let err = parse_freeform_args("// codex-js-repl: reset=true\nconsole.log('ok');")
.expect_err("expected error");
fn format_poll_output_serializes_logs_in_json_payload() {
let result = JsExecPollResult {
exec_id: "exec-1".to_string(),
session_id: "session-1".to_string(),
logs: vec!["line 1".to_string(), "line 2".to_string()],
final_output: None,
content_items: Vec::new(),
error: None,
done: false,
};
let output = format_poll_output(&result).expect("format poll output");
let FunctionCallOutputBody::Text(content) = output.body else {
panic!("expected text poll output");
};
let payload: serde_json::Value =
serde_json::from_str(&content).expect("valid json payload");
assert_eq!(
err.to_string(),
"js_repl pragma only supports timeout_ms; got `reset`"
payload,
json!({
"exec_id": "exec-1",
"session_id": "session-1",
"status": "running",
"logs": "line 1\nline 2",
"final_output": null,
"error": null,
})
);
}
#[test]
fn parse_freeform_args_rejects_json_wrapped_code() {
let err = parse_freeform_args(r#"{"code":"await doThing()"}"#).expect_err("expected error");
fn format_poll_output_preserves_empty_final_output() {
let result = JsExecPollResult {
exec_id: "exec-1".to_string(),
session_id: "session-1".to_string(),
logs: Vec::new(),
final_output: Some(String::new()),
content_items: Vec::new(),
error: None,
done: true,
};
let output = format_poll_output(&result).expect("format poll output");
let FunctionCallOutputBody::Text(content) = output.body else {
panic!("expected text poll output");
};
let payload: serde_json::Value =
serde_json::from_str(&content).expect("valid json payload");
assert_eq!(
err.to_string(),
"js_repl is a freeform tool and expects raw JavaScript source. Resend plain JS only (optional first line `// codex-js-repl: ...`); do not send JSON (`{\"code\":...}`), quoted code, or markdown fences."
payload,
json!({
"exec_id": "exec-1",
"session_id": "session-1",
"status": "completed",
"logs": null,
"final_output": "",
"error": null,
})
);
}
#[test]
fn format_poll_output_serializes_multimodal_content_items() {
let result = JsExecPollResult {
exec_id: "exec-1".to_string(),
session_id: "session-1".to_string(),
logs: Vec::new(),
final_output: Some("stdout".to_string()),
content_items: vec![FunctionCallOutputContentItem::InputImage {
image_url: "data:image/png;base64,abc".to_string(),
detail: None,
}],
error: None,
done: true,
};
let output = format_poll_output(&result).expect("format poll output");
let FunctionCallOutputBody::ContentItems(items) = output.body else {
panic!("expected content item poll output");
};
assert_eq!(
items,
vec![
FunctionCallOutputContentItem::InputText {
text: json!({
"exec_id": "exec-1",
"session_id": "session-1",
"status": "completed",
"logs": null,
"final_output": "stdout",
"error": null,
})
.to_string(),
},
FunctionCallOutputContentItem::InputImage {
image_url: "data:image/png;base64,abc".to_string(),
detail: None,
},
]
);
}
#[test]
fn js_repl_poll_args_reject_unknown_fields() {
let err = serde_json::from_str::<super::JsReplPollArgs>(
r#"{"exec_id":"exec-1","unknown":"value"}"#,
)
.expect_err("expected unknown-field deserialization error");
assert!(
err.to_string().contains("unknown field `unknown`"),
"unexpected deserialization error: {err}"
);
}
#[tokio::test]
async fn emit_js_repl_exec_end_sends_event() {
let (session, turn, rx) = make_session_and_context_with_rx().await;
super::emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), "call-1").await;
super::emit_js_repl_exec_end(
session.as_ref(),
turn.as_ref(),
@@ -355,6 +784,7 @@ mod tests {
"hello",
None,
Duration::from_millis(12),
false,
)
.await;
@@ -381,6 +811,42 @@ mod tests {
assert_eq!(event.exit_code, 0);
assert_eq!(event.duration, Duration::from_millis(12));
assert!(event.formatted_output.contains("hello"));
assert!(!event.formatted_output.contains("command timed out after"));
assert!(!event.parsed_cmd.is_empty());
}
#[tokio::test]
async fn emit_js_repl_exec_end_sends_timed_out_event() {
let (session, turn, rx) = make_session_and_context_with_rx().await;
super::emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), "call-timeout").await;
super::emit_js_repl_exec_end(
session.as_ref(),
turn.as_ref(),
"call-timeout",
"",
Some(JS_REPL_TIMEOUT_ERROR_MESSAGE),
Duration::from_millis(50),
true,
)
.await;
let event = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let event = rx.recv().await.expect("event");
if let EventMsg::ExecCommandEnd(end) = event.msg {
break end;
}
}
})
.await
.expect("timed out waiting for exec end");
assert_eq!(event.call_id, "call-timeout");
assert!(
event
.formatted_output
.contains("command timed out after 50 milliseconds")
);
assert!(!event.parsed_cmd.is_empty());
}
}

View File

@@ -34,6 +34,7 @@ use codex_protocol::protocol::AskForApproval;
pub use dynamic::DynamicToolHandler;
pub use grep_files::GrepFilesHandler;
pub use js_repl::JsReplHandler;
pub use js_repl::JsReplPollHandler;
pub use js_repl::JsReplResetHandler;
pub use list_dir::ListDirHandler;
pub use mcp::McpHandler;

View File

@@ -1153,29 +1153,40 @@ function formatLog(args) {
.join(" ");
}
function withCapturedConsole(ctx, fn) {
const logs = [];
function withCapturedConsole(ctx, onLog, captureLogs, fn) {
const logs = captureLogs ? [] : null;
const original = ctx.console ?? console;
function record(line) {
if (logs) {
logs.push(line);
}
if (onLog) onLog(line);
}
const captured = {
...original,
log: (...args) => {
logs.push(formatLog(args));
const line = formatLog(args);
record(line);
},
info: (...args) => {
logs.push(formatLog(args));
const line = formatLog(args);
record(line);
},
warn: (...args) => {
logs.push(formatLog(args));
const line = formatLog(args);
record(line);
},
error: (...args) => {
logs.push(formatLog(args));
const line = formatLog(args);
record(line);
},
debug: (...args) => {
logs.push(formatLog(args));
const line = formatLog(args);
record(line);
},
};
ctx.console = captured;
return fn(logs).finally(() => {
return fn(logs ?? []).finally(() => {
ctx.console = original;
});
}
@@ -1532,6 +1543,7 @@ async function handleExec(message) {
try {
const code = typeof message.code === "string" ? message.code : "";
const streamLogs = Boolean(message.stream_logs);
const builtSource = await buildModuleSource(code);
const source = builtSource.source;
currentBindings = builtSource.currentBindings;
@@ -1542,61 +1554,63 @@ async function handleExec(message) {
context.codex = { tmpDir, tool, emitImage };
context.tmpDir = tmpDir;
await withCapturedConsole(context, async (logs) => {
const cellIdentifier = path.join(
process.cwd(),
`.codex_js_repl_cell_${cellCounter++}.mjs`,
);
module = new SourceTextModule(source, {
context,
identifier: cellIdentifier,
initializeImportMeta(meta, mod) {
setImportMeta(meta, mod, true);
meta.__codexInternalMarkCommittedBindings = markCommittedBindings;
meta.__codexInternalMarkPreludeCompleted = markPreludeCompleted;
},
importModuleDynamically(specifier, referrer) {
return importResolved(resolveSpecifier(specifier, referrer?.identifier));
},
});
await withCapturedConsole(
context,
streamLogs ? (line) => send({ type: "exec_log", id: message.id, text: line }) : null,
!streamLogs,
async (logs) => {
const cellIdentifier = path.join(
process.cwd(),
`.codex_js_repl_cell_${cellCounter++}.mjs`,
);
module = new SourceTextModule(source, {
context,
identifier: cellIdentifier,
initializeImportMeta(meta, mod) {
setImportMeta(meta, mod, true);
meta.__codexInternalMarkCommittedBindings = markCommittedBindings;
meta.__codexInternalMarkPreludeCompleted = markPreludeCompleted;
},
importModuleDynamically(specifier, referrer) {
return importResolved(resolveSpecifier(specifier, referrer?.identifier));
},
});
await module.link(async (specifier) => {
if (specifier === "@prev" && previousModule) {
const exportNames = previousBindings.map((b) => b.name);
// Build a synthetic module snapshot of the prior cell's exports.
// This is the bridge that carries values from cell N to cell N+1.
const synthetic = new SyntheticModule(
exportNames,
function initSynthetic() {
for (const binding of previousBindings) {
this.setExport(
binding.name,
previousModule.namespace[binding.name],
);
}
},
{ context },
await module.link(async (specifier) => {
if (specifier === "@prev" && previousModule) {
const exportNames = previousBindings.map((b) => b.name);
// Build a synthetic module snapshot of the prior cell's exports.
// This is the bridge that carries values from cell N to cell N+1.
const synthetic = new SyntheticModule(
exportNames,
function initSynthetic() {
for (const binding of previousBindings) {
this.setExport(binding.name, previousModule.namespace[binding.name]);
}
},
{ context },
);
return synthetic;
}
throw new Error(
`Top-level static import "${specifier}" is not supported in js_repl. Use await import("${specifier}") instead.`,
);
return synthetic;
}
throw new Error(
`Top-level static import "${specifier}" is not supported in js_repl. Use await import("${specifier}") instead.`,
);
});
moduleLinked = true;
});
moduleLinked = true;
await module.evaluate();
if (pendingBackgroundTasks.size > 0) {
const backgroundResults = await Promise.all([...pendingBackgroundTasks]);
const firstUnhandledBackgroundError = backgroundResults.find(
(result) => !result.ok && !result.observation.observed,
);
if (firstUnhandledBackgroundError) {
throw firstUnhandledBackgroundError.error;
await module.evaluate();
if (pendingBackgroundTasks.size > 0) {
const backgroundResults = await Promise.all([...pendingBackgroundTasks]);
const firstUnhandledBackgroundError = backgroundResults.find(
(result) => !result.ok && !result.observation.observed,
);
if (firstUnhandledBackgroundError) {
throw firstUnhandledBackgroundError.error;
}
}
}
output = logs.join("\n");
});
output = logs.join("\n");
},
);
previousModule = module;
previousBindings = nextBindings;

File diff suppressed because it is too large Load Diff

View File

@@ -9,7 +9,6 @@ caching).
use crate::error::CodexErr;
use crate::error::SandboxErr;
use crate::exec::ExecToolCallOutput;
use crate::features::Feature;
use crate::network_policy_decision::network_approval_context_from_payload;
use crate::sandboxing::SandboxManager;
use crate::tools::network_approval::DeferredNetworkApproval;
@@ -25,6 +24,7 @@ use crate::tools::sandboxing::ToolCtx;
use crate::tools::sandboxing::ToolError;
use crate::tools::sandboxing::ToolRuntime;
use crate::tools::sandboxing::default_exec_approval_requirement;
use crate::tools::sandboxing::has_managed_network_requirements;
use codex_otel::ToolDecisionSource;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::NetworkPolicyRuleAction;
@@ -160,38 +160,13 @@ impl ToolOrchestrator {
}
// 2) First attempt under the selected sandbox.
let has_managed_network_requirements = turn_ctx
.config
.config_layer_stack
.requirements_toml()
.network
.is_some();
let initial_sandbox = match tool.sandbox_mode_for_first_attempt(req) {
SandboxOverride::BypassSandboxFirstAttempt => crate::exec::SandboxType::None,
SandboxOverride::NoOverride => self.sandbox.select_initial(
&turn_ctx.file_system_sandbox_policy,
turn_ctx.network_sandbox_policy,
tool.sandbox_preference(),
turn_ctx.windows_sandbox_level,
has_managed_network_requirements,
),
};
// Platform-specific flag gating is handled by SandboxManager::select_initial
// via crate::safety::get_platform_sandbox(..).
let use_linux_sandbox_bwrap = turn_ctx.features.enabled(Feature::UseLinuxSandboxBwrap);
let initial_attempt = SandboxAttempt {
sandbox: initial_sandbox,
policy: &turn_ctx.sandbox_policy,
file_system_policy: &turn_ctx.file_system_sandbox_policy,
network_policy: turn_ctx.network_sandbox_policy,
enforce_managed_network: has_managed_network_requirements,
manager: &self.sandbox,
sandbox_cwd: &turn_ctx.cwd,
codex_linux_sandbox_exe: turn_ctx.codex_linux_sandbox_exe.as_ref(),
use_linux_sandbox_bwrap,
windows_sandbox_level: turn_ctx.windows_sandbox_level,
};
let has_managed_network_requirements = has_managed_network_requirements(turn_ctx);
let initial_attempt = SandboxAttempt::initial_for_turn(
&self.sandbox,
turn_ctx,
tool.sandbox_preference(),
tool.sandbox_mode_for_first_attempt(req),
);
let (first_result, first_deferred_network_approval) = Self::run_attempt(
tool,
@@ -296,18 +271,12 @@ impl ToolOrchestrator {
}
}
let escalated_attempt = SandboxAttempt {
sandbox: crate::exec::SandboxType::None,
policy: &turn_ctx.sandbox_policy,
file_system_policy: &turn_ctx.file_system_sandbox_policy,
network_policy: turn_ctx.network_sandbox_policy,
enforce_managed_network: has_managed_network_requirements,
manager: &self.sandbox,
sandbox_cwd: &turn_ctx.cwd,
codex_linux_sandbox_exe: None,
use_linux_sandbox_bwrap,
windows_sandbox_level: turn_ctx.windows_sandbox_level,
};
let escalated_attempt = SandboxAttempt::initial_for_turn(
&self.sandbox,
turn_ctx,
tool.sandbox_preference(),
SandboxOverride::BypassSandboxFirstAttempt,
);
// Second attempt.
let (retry_result, retry_deferred_network_approval) = Self::run_attempt(

View File

@@ -156,6 +156,8 @@ impl ToolRouter {
if source == ToolCallSource::Direct
&& turn.tools_config.js_repl_tools_only
&& !matches!(tool_name.as_str(), "js_repl" | "js_repl_reset")
&& !(turn.tools_config.js_repl_poll_enabled
&& matches!(tool_name.as_str(), "js_repl_poll"))
{
let err = FunctionCallError::RespondToModel(
"direct tool calls are disabled; use js_repl and codex.tool(...) instead"
@@ -331,4 +333,64 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn js_repl_tools_only_allows_direct_poll_calls_when_polling_enabled() -> anyhow::Result<()>
{
let (session, mut turn) = make_session_and_context().await;
turn.tools_config.js_repl_tools_only = true;
turn.tools_config.js_repl_poll_enabled = true;
let session = Arc::new(session);
let turn = Arc::new(turn);
let mcp_tools = session
.services
.mcp_connection_manager
.read()
.await
.list_all_tools()
.await;
let app_tools = Some(mcp_tools.clone());
let router = ToolRouter::from_config(
&turn.tools_config,
Some(
mcp_tools
.into_iter()
.map(|(name, tool)| (name, tool.tool))
.collect(),
),
app_tools,
turn.dynamic_tools.as_slice(),
);
let call = ToolCall {
tool_name: "js_repl_poll".to_string(),
call_id: "call-poll".to_string(),
payload: ToolPayload::Function {
arguments: r#"{"exec_id":"exec-1"}"#.to_string(),
},
};
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let response = router
.dispatch_tool_call(session, turn, tracker, call, ToolCallSource::Direct)
.await?;
match response {
ResponseInputItem::FunctionCallOutput { output, .. } => {
let content = output.text_content().unwrap_or_default();
assert!(
!content.contains("direct tool calls are disabled"),
"polling helper should bypass direct-call policy gate"
);
assert!(
content.contains("js_repl is disabled by feature flag")
|| content.contains("unsupported call: js_repl_poll"),
"expected js_repl handler/registry failure, got: {content}"
);
}
other => panic!("expected function call output, got {other:?}"),
}
Ok(())
}
}

View File

@@ -7,6 +7,7 @@
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::error::CodexErr;
use crate::features::Feature;
use crate::protocol::SandboxPolicy;
use crate::sandboxing::CommandSpec;
use crate::sandboxing::SandboxManager;
@@ -320,7 +321,7 @@ pub(crate) trait ToolRuntime<Req, Out>: Approvable<Req> + Sandboxable {
pub(crate) struct SandboxAttempt<'a> {
pub sandbox: crate::exec::SandboxType,
pub policy: &'a crate::protocol::SandboxPolicy,
pub file_system_policy: &'a FileSystemSandboxPolicy,
pub file_system_policy: FileSystemSandboxPolicy,
pub network_policy: NetworkSandboxPolicy,
pub enforce_managed_network: bool,
pub(crate) manager: &'a SandboxManager,
@@ -330,8 +331,52 @@ pub(crate) struct SandboxAttempt<'a> {
pub windows_sandbox_level: codex_protocol::config_types::WindowsSandboxLevel,
}
pub(crate) fn has_managed_network_requirements(turn_ctx: &TurnContext) -> bool {
turn_ctx
.config
.config_layer_stack
.requirements_toml()
.network
.is_some()
}
impl<'a> SandboxAttempt<'a> {
pub fn env_for(
pub(crate) fn initial_for_turn(
manager: &'a SandboxManager,
turn_ctx: &'a TurnContext,
preference: SandboxablePreference,
sandbox_override: SandboxOverride,
) -> Self {
let enforce_managed_network = has_managed_network_requirements(turn_ctx);
let policy = turn_ctx.sandbox_policy.get();
let file_system_policy = FileSystemSandboxPolicy::from(policy);
let network_policy = NetworkSandboxPolicy::from(policy);
let sandbox = match sandbox_override {
SandboxOverride::BypassSandboxFirstAttempt => crate::exec::SandboxType::None,
SandboxOverride::NoOverride => manager.select_initial(
&file_system_policy,
network_policy,
preference,
turn_ctx.windows_sandbox_level,
enforce_managed_network,
),
};
Self {
sandbox,
policy,
file_system_policy,
network_policy,
enforce_managed_network,
manager,
sandbox_cwd: &turn_ctx.cwd,
codex_linux_sandbox_exe: turn_ctx.codex_linux_sandbox_exe.as_ref(),
use_linux_sandbox_bwrap: turn_ctx.features.enabled(Feature::UseLinuxSandboxBwrap),
windows_sandbox_level: turn_ctx.windows_sandbox_level,
}
}
pub(crate) fn env_for(
&self,
spec: CommandSpec,
network: Option<&NetworkProxy>,
@@ -340,7 +385,7 @@ impl<'a> SandboxAttempt<'a> {
.transform(crate::sandboxing::SandboxTransformRequest {
spec,
policy: self.policy,
file_system_policy: self.file_system_policy,
file_system_policy: &self.file_system_policy,
network_policy: self.network_policy,
sandbox: self.sandbox,
enforce_managed_network: self.enforce_managed_network,

View File

@@ -66,6 +66,7 @@ pub(crate) struct ToolsConfig {
pub search_tool: bool,
pub request_permission_enabled: bool,
pub js_repl_enabled: bool,
pub js_repl_poll_enabled: bool,
pub js_repl_tools_only: bool,
pub collab_tools: bool,
pub artifact_tools: bool,
@@ -93,6 +94,7 @@ impl ToolsConfig {
} = params;
let include_apply_patch_tool = features.enabled(Feature::ApplyPatchFreeform);
let include_js_repl = features.enabled(Feature::JsRepl);
let include_js_repl_polling = include_js_repl && features.enabled(Feature::JsReplPolling);
let include_js_repl_tools_only =
include_js_repl && features.enabled(Feature::JsReplToolsOnly);
let include_collab_tools = features.enabled(Feature::Collab);
@@ -167,6 +169,7 @@ impl ToolsConfig {
search_tool: include_search_tool,
request_permission_enabled,
js_repl_enabled: include_js_repl,
js_repl_poll_enabled: include_js_repl_polling,
js_repl_tools_only: include_js_repl_tools_only,
collab_tools: include_collab_tools,
artifact_tools: include_artifact_tools,
@@ -1429,7 +1432,7 @@ fn create_list_dir_tool() -> ToolSpec {
})
}
fn create_js_repl_tool() -> ToolSpec {
fn create_js_repl_tool(polling_enabled: bool) -> ToolSpec {
// Keep JS input freeform, but block the most common malformed payload shapes
// (JSON wrappers, quoted strings, and markdown fences) before they reach the
// runtime `reject_json_or_quoted_source` validation. The API's regex engine
@@ -1448,11 +1451,17 @@ NEWLINE: /\r?\n/
PLAIN_JS_SOURCE: /(?:\s*)(?:[^\s{\"`]|`[^`]|``[^`])[\s\S]*/
JS_SOURCE: /(?:\s*)(?:[^\s{\"`]|`[^`]|``[^`])[\s\S]*/
"#;
let mut description = "Runs JavaScript in a persistent Node kernel with top-level await. This is a freeform tool: send raw JavaScript source text, optionally with a first-line pragma like `// codex-js-repl: timeout_ms=15000`; do not send JSON/quotes/markdown fences."
.to_string();
if polling_enabled {
description.push_str(
" Add `poll=true` in the first-line pragma to return `exec_id`/`session_id` for polling. Reuse `session_id=<id>` across related polling cells so state and handles persist; omit it only when you intentionally want a fresh polling session, and unknown ids return an error. If a task requires polling mode only, keep every `js_repl` submission in polling mode. Use `js_repl_poll` to fetch incremental results, and keep polling the same `exec_id` until `status` becomes `completed` or `error`, even if earlier logs or `final_output` already look complete. Use space-separated pragma arguments. `timeout_ms` is only supported when `poll=false`; with polling, use `js_repl_poll` `yield_time_ms`.",
);
}
ToolSpec::Freeform(FreeformTool {
name: "js_repl".to_string(),
description: "Runs JavaScript in a persistent Node kernel with top-level await. This is a freeform tool: send raw JavaScript source text, optionally with a first-line pragma like `// codex-js-repl: timeout_ms=15000`; do not send JSON/quotes/markdown fences."
.to_string(),
description,
format: FreeformToolFormat {
r#type: "grammar".to_string(),
syntax: "lark".to_string(),
@@ -1488,15 +1497,54 @@ JS_SOURCE: /(?:\s*)(?:[^\s{\"`]|`[^`]|``[^`])[\s\S]*/
})
}
fn create_js_repl_reset_tool() -> ToolSpec {
fn create_js_repl_poll_tool() -> ToolSpec {
let properties = BTreeMap::from([
(
"exec_id".to_string(),
JsonSchema::String {
description: Some("Identifier returned by js_repl when poll=true.".to_string()),
},
),
(
"yield_time_ms".to_string(),
JsonSchema::Number {
description: Some(
"How long to wait (in milliseconds) for logs or completion before yielding. When omitted, or when set below 5000, waits up to 5 seconds before returning if nothing new arrives."
.to_string(),
),
},
),
]);
ToolSpec::Function(ResponsesApiTool {
name: "js_repl_reset".to_string(),
name: "js_repl_poll".to_string(),
description:
"Restarts the js_repl kernel for this run and clears persisted top-level bindings."
"Poll a running js_repl exec for incremental logs or terminal completion. Keep polling the same exec_id until status becomes completed or error. The JSON status item includes per-poll `logs` and terminal `final_output` (which may be empty when the exec only emitted streamed logs). Completed polls may also include nested multimodal tool output after the JSON status item."
.to_string(),
strict: false,
parameters: JsonSchema::Object {
properties: BTreeMap::new(),
properties,
required: Some(vec!["exec_id".to_string()]),
additional_properties: Some(false.into()),
},
})
}
fn create_js_repl_reset_tool() -> ToolSpec {
let properties = BTreeMap::from([(
"session_id".to_string(),
JsonSchema::String {
description: Some(
"Optional polling session identifier. When omitted, resets all js_repl kernels."
.to_string(),
),
},
)]);
ToolSpec::Function(ResponsesApiTool {
name: "js_repl_reset".to_string(),
description: "Reset js_repl state. With `session_id`, resets that polling session; without it, resets all js_repl kernels.".to_string(),
strict: false,
parameters: JsonSchema::Object {
properties,
required: None,
additional_properties: Some(false.into()),
},
@@ -1812,6 +1860,7 @@ pub(crate) fn build_specs(
use crate::tools::handlers::DynamicToolHandler;
use crate::tools::handlers::GrepFilesHandler;
use crate::tools::handlers::JsReplHandler;
use crate::tools::handlers::JsReplPollHandler;
use crate::tools::handlers::JsReplResetHandler;
use crate::tools::handlers::ListDirHandler;
use crate::tools::handlers::McpHandler;
@@ -1844,6 +1893,7 @@ pub(crate) fn build_specs(
});
let search_tool_handler = Arc::new(SearchToolBm25Handler);
let js_repl_handler = Arc::new(JsReplHandler);
let js_repl_poll_handler = Arc::new(JsReplPollHandler);
let js_repl_reset_handler = Arc::new(JsReplResetHandler);
let artifacts_handler = Arc::new(ArtifactsHandler);
let request_permission_enabled = config.request_permission_enabled;
@@ -1899,7 +1949,17 @@ pub(crate) fn build_specs(
builder.register_handler("update_plan", plan_handler);
if config.js_repl_enabled {
builder.push_spec(create_js_repl_tool());
builder.push_spec_with_parallel_support(
create_js_repl_tool(config.js_repl_poll_enabled),
false,
);
if config.js_repl_poll_enabled {
// `js_repl_poll` drains buffered logs on each call, so it
// must remain non-parallel to avoid split-drain races. This mirrors
// `unified_exec` polling semantics (`write_stdin`).
builder.push_spec_with_parallel_support(create_js_repl_poll_tool(), false);
builder.register_handler("js_repl_poll", js_repl_poll_handler);
}
builder.push_spec(create_js_repl_reset_tool());
builder.register_handler("js_repl", js_repl_handler);
builder.register_handler("js_repl_reset", js_repl_reset_handler);
@@ -2463,6 +2523,10 @@ mod tests {
!tools.iter().any(|tool| tool.spec.name() == "js_repl"),
"js_repl should be disabled when the feature is off"
);
assert!(
!tools.iter().any(|tool| tool.spec.name() == "js_repl_poll"),
"js_repl_poll should be disabled when the feature is off"
);
assert!(
!tools.iter().any(|tool| tool.spec.name() == "js_repl_reset"),
"js_repl_reset should be disabled when the feature is off"
@@ -2485,6 +2549,23 @@ mod tests {
});
let (tools, _) = build_specs(&tools_config, None, None, &[]).build();
assert_contains_tool_names(&tools, &["js_repl", "js_repl_reset"]);
assert!(
!tools.iter().any(|tool| tool.spec.name() == "js_repl_poll"),
"js_repl_poll should be disabled when polling is off"
);
features.enable(Feature::JsReplPolling);
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_info: &model_info,
features: &features,
web_search_mode: Some(WebSearchMode::Cached),
session_source: SessionSource::Cli,
});
let (tools, _) = build_specs(&tools_config, None, None, &[]).build();
assert_contains_tool_names(&tools, &["js_repl", "js_repl_poll", "js_repl_reset"]);
assert!(
!find_tool(&tools, "js_repl_poll").supports_parallel_tool_calls,
"js_repl_poll should be non-parallel to avoid destructive log-drain races"
);
}
#[test]
@@ -2547,7 +2628,7 @@ mod tests {
#[test]
fn js_repl_freeform_grammar_blocks_common_non_js_prefixes() {
let ToolSpec::Freeform(FreeformTool { format, .. }) = create_js_repl_tool() else {
let ToolSpec::Freeform(FreeformTool { format, .. }) = create_js_repl_tool(false) else {
panic!("js_repl should use a freeform tool spec");
};

View File

@@ -49,6 +49,7 @@ pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) {
}
pub(crate) use errors::UnifiedExecError;
pub(crate) use process::ManagedSplitProcess;
pub(crate) use process::NoopSpawnLifecycle;
#[cfg(unix)]
pub(crate) use process::SpawnLifecycle;

View File

@@ -7,6 +7,7 @@ use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::JoinHandle;
use tokio::time::Duration;
@@ -60,10 +61,17 @@ pub(crate) struct UnifiedExecProcess {
_spawn_lifecycle: SpawnLifecycleHandle,
}
pub(crate) struct ManagedSplitProcess {
pub(crate) process: UnifiedExecProcess,
pub(crate) stdin: mpsc::Sender<Vec<u8>>,
pub(crate) stdout_rx: mpsc::Receiver<Vec<u8>>,
pub(crate) stderr_rx: mpsc::Receiver<Vec<u8>>,
}
impl UnifiedExecProcess {
pub(super) fn new(
pub(crate) fn new(
process_handle: ExecCommandSession,
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
initial_output_rx: broadcast::Receiver<Vec<u8>>,
sandbox_type: SandboxType,
spawn_lifecycle: SpawnLifecycleHandle,
) -> Self {
@@ -113,7 +121,7 @@ impl UnifiedExecProcess {
}
}
pub(super) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
pub(crate) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
self.process_handle.writer_sender()
}
@@ -131,7 +139,7 @@ impl UnifiedExecProcess {
self.output_rx.resubscribe()
}
pub(super) fn cancellation_token(&self) -> CancellationToken {
pub(crate) fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}
@@ -139,15 +147,15 @@ impl UnifiedExecProcess {
Arc::clone(&self.output_drained)
}
pub(super) fn has_exited(&self) -> bool {
pub(crate) fn has_exited(&self) -> bool {
self.process_handle.has_exited()
}
pub(super) fn exit_code(&self) -> Option<i32> {
pub(crate) fn exit_code(&self) -> Option<i32> {
self.process_handle.exit_code()
}
pub(super) fn terminate(&self) {
pub(crate) fn terminate(&self) {
self.output_closed.store(true, Ordering::Release);
self.output_closed_notify.notify_waiters();
self.process_handle.terminate();
@@ -155,6 +163,14 @@ impl UnifiedExecProcess {
self.output_task.abort();
}
pub(crate) fn request_terminate(&self) {
self.process_handle.request_terminate();
}
pub(crate) fn pid(&self) -> Option<u32> {
self.process_handle.pid()
}
async fn snapshot_output(&self) -> Vec<Vec<u8>> {
let guard = self.output_buffer.lock().await;
guard.snapshot_chunks()
@@ -218,11 +234,26 @@ impl UnifiedExecProcess {
) -> Result<Self, UnifiedExecError> {
let SpawnedPty {
session: process_handle,
stdout_rx,
stderr_rx,
mut exit_rx,
output_rx,
exit_rx,
} = spawned;
let output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
Self::from_process_parts(
process_handle,
output_rx,
exit_rx,
sandbox_type,
spawn_lifecycle,
)
.await
}
pub(crate) async fn from_process_parts(
process_handle: ExecCommandSession,
output_rx: broadcast::Receiver<Vec<u8>>,
mut exit_rx: oneshot::Receiver<i32>,
sandbox_type: SandboxType,
spawn_lifecycle: SpawnLifecycleHandle,
) -> Result<Self, UnifiedExecError> {
let managed = Self::new(process_handle, output_rx, sandbox_type, spawn_lifecycle);
let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed));

View File

@@ -33,6 +33,7 @@ use crate::unified_exec::MAX_UNIFIED_EXEC_PROCESSES;
use crate::unified_exec::MAX_YIELD_TIME_MS;
use crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS;
use crate::unified_exec::MIN_YIELD_TIME_MS;
use crate::unified_exec::ManagedSplitProcess;
use crate::unified_exec::ProcessEntry;
use crate::unified_exec::ProcessStore;
use crate::unified_exec::UnifiedExecContext;
@@ -47,6 +48,7 @@ use crate::unified_exec::async_watcher::start_streaming_output;
use crate::unified_exec::clamp_yield_time;
use crate::unified_exec::generate_chunk_id;
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
use crate::unified_exec::process::NoopSpawnLifecycle;
use crate::unified_exec::process::OutputBuffer;
use crate::unified_exec::process::OutputHandles;
use crate::unified_exec::process::SpawnLifecycleHandle;
@@ -103,6 +105,17 @@ struct PreparedProcessHandles {
tty: bool,
}
#[derive(Clone, Copy)]
enum ExecEnvSpawnMode {
Merged { tty: bool },
SplitPipe,
}
enum SpawnedExecEnvProcess {
Merged(codex_utils_pty::SpawnedPty),
Split(codex_utils_pty::SpawnedProcessSplit),
}
impl UnifiedExecProcessManager {
pub(crate) async fn allocate_process_id(&self) -> String {
loop {
@@ -531,13 +544,63 @@ impl UnifiedExecProcessManager {
tty: bool,
mut spawn_lifecycle: SpawnLifecycleHandle,
) -> Result<UnifiedExecProcess, UnifiedExecError> {
match Self::spawn_exec_env_process(env, ExecEnvSpawnMode::Merged { tty }).await? {
SpawnedExecEnvProcess::Merged(spawned) => {
spawn_lifecycle.after_spawn();
UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await
}
SpawnedExecEnvProcess::Split(_) => {
unreachable!("merged spawn mode returned split process")
}
}
}
pub(crate) async fn open_split_pipe_session_with_exec_env(
&self,
env: &ExecRequest,
) -> Result<ManagedSplitProcess, UnifiedExecError> {
match Self::spawn_exec_env_process(env, ExecEnvSpawnMode::SplitPipe).await? {
SpawnedExecEnvProcess::Merged(_) => {
unreachable!("split pipe spawn mode returned merged process")
}
SpawnedExecEnvProcess::Split(spawned) => {
let codex_utils_pty::SpawnedProcessSplit {
session: process_handle,
output_rx,
stdout_rx,
stderr_rx,
exit_rx,
} = spawned;
let stdin = process_handle.writer_sender();
let process = UnifiedExecProcess::from_process_parts(
process_handle,
output_rx,
exit_rx,
env.sandbox,
Box::new(NoopSpawnLifecycle),
)
.await?;
Ok(ManagedSplitProcess {
process,
stdin,
stdout_rx,
stderr_rx,
})
}
}
}
async fn spawn_exec_env_process(
env: &ExecRequest,
mode: ExecEnvSpawnMode,
) -> Result<SpawnedExecEnvProcess, UnifiedExecError> {
let (program, args) = env
.command
.split_first()
.ok_or(UnifiedExecError::MissingCommandLine)?;
let spawn_result = if tty {
codex_utils_pty::pty::spawn_process(
let spawn_result = match mode {
ExecEnvSpawnMode::Merged { tty: true } => codex_utils_pty::pty::spawn_process(
program,
args,
env.cwd.as_path(),
@@ -546,8 +609,19 @@ impl UnifiedExecProcessManager {
codex_utils_pty::TerminalSize::default(),
)
.await
} else {
codex_utils_pty::pipe::spawn_process_no_stdin(
.map(SpawnedExecEnvProcess::Merged),
ExecEnvSpawnMode::Merged { tty: false } => {
codex_utils_pty::pipe::spawn_process_no_stdin(
program,
args,
env.cwd.as_path(),
&env.env,
&env.arg0,
)
.await
.map(SpawnedExecEnvProcess::Merged)
}
ExecEnvSpawnMode::SplitPipe => codex_utils_pty::pipe::spawn_process_split(
program,
args,
env.cwd.as_path(),
@@ -555,11 +629,10 @@ impl UnifiedExecProcessManager {
&env.arg0,
)
.await
.map(SpawnedExecEnvProcess::Split),
};
let spawned =
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
spawn_lifecycle.after_spawn();
UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))
}
pub(super) async fn open_session_with_sandbox(

View File

@@ -38,9 +38,18 @@ use image::Rgba;
use image::load_from_memory;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::time::Duration;
use wiremock::BodyPrintLimit;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Respond;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path_regex;
fn image_messages(body: &Value) -> Vec<&Value> {
body.get("input")
@@ -69,6 +78,27 @@ fn find_image_message(body: &Value) -> Option<&Value> {
image_messages(body).into_iter().next()
}
fn request_body_json(request: &wiremock::Request) -> Value {
let content_encoding = request
.headers
.get("content-encoding")
.and_then(|value| value.to_str().ok());
let body = if content_encoding
.is_some_and(|value| value.split(',').any(|entry| entry.trim() == "zstd"))
{
match zstd::stream::decode_all(std::io::Cursor::new(&request.body)) {
Ok(body) => body,
Err(err) => panic!("decode zstd request body: {err}"),
}
} else {
request.body.clone()
};
match serde_json::from_slice(&body) {
Ok(body) => body,
Err(err) => panic!("request body json: {err}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_turn_with_local_image_attaches_image() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
@@ -617,6 +647,582 @@ await codex.emitImage(out);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_view_image_tool_attaches_multiple_local_images() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::JsRepl)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let call_id = "js-repl-view-image-two";
let js_input = r#"
const fs = await import("node:fs/promises");
const path = await import("node:path");
const png = Buffer.from(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
"base64"
);
const imagePathA = path.join(codex.tmpDir, "js-repl-view-image-a.png");
const imagePathB = path.join(codex.tmpDir, "js-repl-view-image-b.png");
await fs.writeFile(imagePathA, png);
await fs.writeFile(imagePathB, png);
const outA = await codex.tool("view_image", { path: imagePathA });
const outB = await codex.tool("view_image", { path: imagePathB });
await codex.emitImage(outA);
await codex.emitImage(outB);
console.log("attached-two-images");
"#;
let first_response = sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call(call_id, "js_repl", js_input),
ev_completed("resp-1"),
]);
responses::mount_sse_once(&server, first_response).await;
let second_response = sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]);
let mock = responses::mount_sse_once(&server, second_response).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "use js_repl to write two images and attach them".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: Some(ReasoningSummary::Auto),
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event_with_timeout(
&codex,
|event| matches!(event, EventMsg::TurnComplete(_)),
Duration::from_secs(10),
)
.await;
let req = mock.single_request();
let custom_output = req.custom_tool_call_output(call_id);
assert_ne!(
custom_output.get("success").and_then(Value::as_bool),
Some(false),
"js_repl call failed unexpectedly: {custom_output}"
);
let output_items = custom_output
.get("output")
.and_then(Value::as_array)
.expect("custom_tool_call_output should be a content item array");
let js_repl_output = output_items
.iter()
.find_map(|item| {
(item.get("type").and_then(Value::as_str) == Some("input_text"))
.then(|| item.get("text").and_then(Value::as_str))
.flatten()
})
.expect("custom tool output text present");
assert!(
js_repl_output.contains("attached-two-images"),
"expected js_repl output marker, got {js_repl_output}"
);
let image_urls = output_items
.iter()
.filter_map(|item| {
(item.get("type").and_then(Value::as_str) == Some("input_image"))
.then(|| item.get("image_url").and_then(Value::as_str))
.flatten()
})
.collect::<Vec<_>>();
assert_eq!(
image_urls.len(),
2,
"js_repl should include one input_image content item per nested view_image call"
);
for image_url in image_urls {
assert!(
image_url.starts_with("data:image/png;base64,"),
"expected png data URL, got {image_url}"
);
}
let body = req.body_json();
assert_eq!(
image_messages(&body).len(),
0,
"js_repl should not inject pending input image messages"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_poll_view_image_tool_attaches_local_image() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
const POLL_WAIT_MS: u64 = 30_000;
const TURN_TIMEOUT_SECS: u64 = 30;
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::JsRepl)
.expect("test config should allow feature update");
config
.features
.enable(Feature::JsReplPolling)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let submit_call_id = "js-repl-poll-submit";
let poll_call_id = "js-repl-poll-image";
let js_input = r#"// codex-js-repl: poll=true
const fs = await import("node:fs/promises");
const path = await import("node:path");
const imagePath = path.join(codex.tmpDir, "js-repl-poll-view-image.png");
const png = Buffer.from(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
"base64"
);
await fs.writeFile(imagePath, png);
const out = await codex.tool("view_image", { path: imagePath });
await codex.emitImage(out);
"#
.to_string();
#[derive(Clone)]
struct PollSequenceResponder {
requests: Arc<Mutex<Vec<wiremock::Request>>>,
call_count: Arc<AtomicUsize>,
submit_call_id: &'static str,
poll_call_id: &'static str,
js_input: String,
}
impl Respond for PollSequenceResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
self.requests
.lock()
.expect("request log lock")
.push(request.clone());
let request_json = request_body_json(request);
let body = match self.call_count.fetch_add(1, Ordering::SeqCst) {
0 => sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call(self.submit_call_id, "js_repl", &self.js_input),
ev_completed("resp-1"),
]),
1 => {
let submit_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str)
== Some("custom_tool_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.submit_call_id)
})
.expect("js_repl polling submit output");
let submit_payload = submit_output["output"]
.as_str()
.expect("js_repl polling submit output string");
let submit_json: Value =
serde_json::from_str(submit_payload).expect("submit payload json");
let exec_id = submit_json["exec_id"]
.as_str()
.expect("exec_id present in submit payload");
let poll_args = serde_json::json!({
"exec_id": exec_id,
"yield_time_ms": POLL_WAIT_MS,
})
.to_string();
sse(vec![
ev_response_created("resp-2"),
ev_function_call(self.poll_call_id, "js_repl_poll", &poll_args),
ev_completed("resp-2"),
])
}
2 => {
let poll_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.poll_call_id)
})
.expect("js_repl_poll function_call_output present");
assert!(
poll_output.get("output").is_some(),
"js_repl_poll output should be present"
);
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
])
}
call_num => panic!("unexpected extra responses request {call_num}"),
};
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_string(body)
}
}
let requests = Arc::new(Mutex::new(Vec::new()));
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(PollSequenceResponder {
requests: Arc::clone(&requests),
call_count: Arc::new(AtomicUsize::new(0)),
submit_call_id,
poll_call_id,
js_input,
})
.up_to_n_times(3)
.expect(3)
.mount(&server)
.await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "use js_repl polling to write an image and attach it".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
service_tier: None,
summary: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event_with_timeout(
&codex,
|event| matches!(event, EventMsg::TurnComplete(_)),
Duration::from_secs(TURN_TIMEOUT_SECS),
)
.await;
let requests = requests.lock().expect("request log lock").clone();
assert_eq!(
requests.len(),
3,
"expected submit, poll, and completion requests"
);
let poll_request_json = request_body_json(
requests
.get(2)
.expect("final request with js_repl_poll output should be present"),
);
let poll_output = poll_request_json["input"]
.as_array()
.expect("poll request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(poll_call_id)
})
.expect("js_repl_poll function_call_output present");
let output_items = poll_output["output"]
.as_array()
.expect("js_repl_poll output should be a content item array");
let status_item = output_items
.first()
.expect("first poll output item should be status text");
assert_eq!(
status_item.get("type").and_then(Value::as_str),
Some("input_text")
);
let status_json: Value = serde_json::from_str(
status_item["text"]
.as_str()
.expect("status item text should be present"),
)
.expect("status item should be valid json");
assert_eq!(status_json["status"].as_str(), Some("completed"));
assert_eq!(status_json["error"], Value::Null);
assert_eq!(status_json["logs"], Value::Null);
let image_items = output_items
.iter()
.filter_map(|item| {
(item.get("type").and_then(Value::as_str) == Some("input_image"))
.then(|| item.get("image_url").and_then(Value::as_str))
.flatten()
})
.collect::<Vec<_>>();
assert_eq!(
image_items.len(),
1,
"expected one image item in poll output"
);
assert!(
image_items[0].starts_with("data:image/png;base64,"),
"expected png data URL, got {}",
image_items[0]
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_poll_view_image_requires_explicit_emit() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
const POLL_WAIT_MS: u64 = 30_000;
const TURN_TIMEOUT_SECS: u64 = 30;
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::JsRepl)
.expect("test config should allow feature update");
config
.features
.enable(Feature::JsReplPolling)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let submit_call_id = "js-repl-poll-submit-no-emit";
let poll_call_id = "js-repl-poll-no-image";
let js_input = r#"// codex-js-repl: poll=true
const fs = await import("node:fs/promises");
const path = await import("node:path");
const imagePath = path.join(codex.tmpDir, "js-repl-poll-view-image-no-emit.png");
const png = Buffer.from(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
"base64"
);
await fs.writeFile(imagePath, png);
const out = await codex.tool("view_image", { path: imagePath });
console.log(out.type);
"#
.to_string();
#[derive(Clone)]
struct PollSequenceResponder {
requests: Arc<Mutex<Vec<wiremock::Request>>>,
call_count: Arc<AtomicUsize>,
submit_call_id: &'static str,
poll_call_id: &'static str,
js_input: String,
}
impl Respond for PollSequenceResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
self.requests
.lock()
.expect("request log lock")
.push(request.clone());
let request_json = request_body_json(request);
let body = match self.call_count.fetch_add(1, Ordering::SeqCst) {
0 => sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call(self.submit_call_id, "js_repl", &self.js_input),
ev_completed("resp-1"),
]),
1 => {
let submit_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str)
== Some("custom_tool_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.submit_call_id)
})
.expect("js_repl polling submit output");
let submit_payload = submit_output["output"]
.as_str()
.expect("js_repl polling submit output string");
let submit_json: Value =
serde_json::from_str(submit_payload).expect("submit payload json");
let exec_id = submit_json["exec_id"]
.as_str()
.expect("exec_id present in submit payload");
let poll_args = serde_json::json!({
"exec_id": exec_id,
"yield_time_ms": POLL_WAIT_MS,
})
.to_string();
sse(vec![
ev_response_created("resp-2"),
ev_function_call(self.poll_call_id, "js_repl_poll", &poll_args),
ev_completed("resp-2"),
])
}
2 => {
let poll_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.poll_call_id)
})
.expect("js_repl_poll function_call_output present");
assert!(
poll_output.get("output").is_some(),
"js_repl_poll output should be present"
);
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
])
}
call_num => panic!("unexpected extra responses request {call_num}"),
};
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_string(body)
}
}
let requests = Arc::new(Mutex::new(Vec::new()));
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(PollSequenceResponder {
requests: Arc::clone(&requests),
call_count: Arc::new(AtomicUsize::new(0)),
submit_call_id,
poll_call_id,
js_input,
})
.up_to_n_times(3)
.expect(3)
.mount(&server)
.await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "use js_repl polling to write an image without emitting it".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
service_tier: None,
summary: None,
collaboration_mode: None,
personality: None,
})
.await?;
let mut tool_event = None;
wait_for_event_with_timeout(
&codex,
|event| match event {
EventMsg::ViewImageToolCall(_) => {
tool_event = Some(event.clone());
false
}
EventMsg::TurnComplete(_) => true,
_ => false,
},
Duration::from_secs(TURN_TIMEOUT_SECS),
)
.await;
let tool_event = match tool_event {
Some(EventMsg::ViewImageToolCall(event)) => event,
other => panic!("expected ViewImageToolCall event, got {other:?}"),
};
assert!(
tool_event
.path
.ends_with("js-repl-poll-view-image-no-emit.png"),
"unexpected image path: {}",
tool_event.path.display()
);
let requests = requests.lock().expect("request log lock").clone();
assert_eq!(
requests.len(),
3,
"expected submit, poll, and completion requests"
);
let poll_request_json = request_body_json(
requests
.get(2)
.expect("final request with js_repl_poll output should be present"),
);
let poll_output = poll_request_json["input"]
.as_array()
.expect("poll request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(poll_call_id)
})
.expect("js_repl_poll function_call_output present");
let status_json: Value = serde_json::from_str(
poll_output["output"]
.as_str()
.expect("js_repl_poll output should stay text without explicit emit"),
)
.expect("status item should be valid json");
assert_eq!(status_json["status"].as_str(), Some("completed"));
assert_eq!(status_json["error"], Value::Null);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_view_image_requires_explicit_emit() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -131,11 +131,10 @@ trust_level = "trusted"
let mut output = Vec::new();
let codex_utils_pty::SpawnedProcess {
session,
stdout_rx,
stderr_rx,
output_rx,
exit_rx,
} = spawned;
let mut output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
let mut output_rx = output_rx;
let mut exit_rx = exit_rx;
let writer_tx = session.writer_sender();
let interrupt_writer = writer_tx.clone();

View File

@@ -77,11 +77,10 @@ async fn run_codex_cli(
let mut output = Vec::new();
let codex_utils_pty::SpawnedProcess {
session,
stdout_rx,
stderr_rx,
output_rx,
exit_rx,
} = spawned;
let mut output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
let mut output_rx = output_rx;
let mut exit_rx = exit_rx;
let writer_tx = session.writer_sender();
let exit_code_result = timeout(Duration::from_secs(10), async {

View File

@@ -7,7 +7,7 @@ Lightweight helpers for spawning interactive processes either under a PTY (pseud
- `spawn_pty_process(program, args, cwd, env, arg0, size)``SpawnedProcess`
- `spawn_pipe_process(program, args, cwd, env, arg0)``SpawnedProcess`
- `spawn_pipe_process_no_stdin(program, args, cwd, env, arg0)``SpawnedProcess`
- `combine_output_receivers(stdout_rx, stderr_rx)``broadcast::Receiver<Vec<u8>>`
- `spawn_pipe_process_split(program, args, cwd, env, arg0)``SpawnedProcessSplit`
- `conpty_supported()``bool` (Windows only; always true elsewhere)
- `TerminalSize { rows, cols }` selects PTY dimensions in character cells.
- `ProcessHandle` exposes:
@@ -15,14 +15,14 @@ Lightweight helpers for spawning interactive processes either under a PTY (pseud
- `resize(TerminalSize)`
- `close_stdin()`
- `has_exited()`, `exit_code()`, `terminate()`
- `SpawnedProcess` bundles `session`, `stdout_rx`, `stderr_rx`, and `exit_rx` (oneshot exit code).
- `SpawnedProcess` bundles `session`, `output_rx`, and `exit_rx` (oneshot exit code).
- `SpawnedProcessSplit` bundles `session`, merged `output_rx`, split `stdout_rx` / `stderr_rx`, and `exit_rx`.
## Usage examples
```rust
use std::collections::HashMap;
use std::path::Path;
use codex_utils_pty::combine_output_receivers;
use codex_utils_pty::spawn_pty_process;
use codex_utils_pty::TerminalSize;
@@ -41,7 +41,7 @@ let writer = spawned.session.writer_sender();
writer.send(b"exit\n".to_vec()).await?;
// Collect output until the process exits.
let mut output_rx = combine_output_receivers(spawned.stdout_rx, spawned.stderr_rx);
let mut output_rx = spawned.output_rx;
let mut collected = Vec::new();
while let Ok(chunk) = output_rx.try_recv() {
collected.extend_from_slice(&chunk);

View File

@@ -13,18 +13,20 @@ pub const DEFAULT_OUTPUT_BYTES_CAP: usize = 1024 * 1024;
pub use pipe::spawn_process as spawn_pipe_process;
/// Spawn a non-interactive process using regular pipes, but close stdin immediately.
pub use pipe::spawn_process_no_stdin as spawn_pipe_process_no_stdin;
/// Combine stdout/stderr receivers into a single broadcast receiver.
pub use process::combine_output_receivers;
/// Handle for interacting with a spawned process (PTY or pipe).
pub use process::ProcessHandle;
/// Bundle of process handles plus split output and exit receivers returned by spawn helpers.
/// Bundle of process handles plus merged output and exit receivers returned by spawn helpers.
pub use process::SpawnedProcess;
/// Bundle of process handles plus split stdout/stderr receivers returned by pipe spawn helpers.
pub use process::SpawnedProcessSplit;
/// Terminal size in character cells used for PTY spawn and resize operations.
pub use process::TerminalSize;
/// Backwards-compatible alias for ProcessHandle.
pub type ExecCommandSession = ProcessHandle;
/// Backwards-compatible alias for SpawnedProcess.
pub type SpawnedPty = SpawnedProcess;
/// Spawn a non-interactive process using regular pipes and preserve split stdout/stderr streams.
pub use pipe::spawn_process_split as spawn_pipe_process_split;
/// Report whether ConPTY is available on this platform (Windows only).
pub use pty::conpty_supported;
/// Spawn a process attached to a PTY for interactive use.

View File

@@ -13,6 +13,7 @@ use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::process::Command;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
@@ -20,6 +21,7 @@ use tokio::task::JoinHandle;
use crate::process::ChildTerminator;
use crate::process::ProcessHandle;
use crate::process::SpawnedProcess;
use crate::process::SpawnedProcessSplit;
#[cfg(target_os = "linux")]
use libc;
@@ -72,8 +74,11 @@ fn kill_process(pid: u32) -> io::Result<()> {
}
}
async fn read_output_stream<R>(mut reader: R, output_tx: mpsc::Sender<Vec<u8>>)
where
async fn read_output_stream<R>(
mut reader: R,
merged_output_tx: broadcast::Sender<Vec<u8>>,
stream_output_tx: mpsc::Sender<Vec<u8>>,
) where
R: AsyncRead + Unpin,
{
let mut buf = vec![0u8; 8_192];
@@ -81,7 +86,9 @@ where
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
let _ = output_tx.send(buf[..n].to_vec()).await;
let chunk = buf[..n].to_vec();
let _ = merged_output_tx.send(chunk.clone());
let _ = stream_output_tx.send(chunk).await;
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(_) => break,
@@ -102,7 +109,7 @@ async fn spawn_process_with_stdin_mode(
env: &HashMap<String, String>,
arg0: &Option<String>,
stdin_mode: PipeStdinMode,
) -> Result<SpawnedProcess> {
) -> Result<SpawnedProcessSplit> {
if program.is_empty() {
anyhow::bail!("missing program for pipe spawn");
}
@@ -156,6 +163,8 @@ async fn spawn_process_with_stdin_mode(
let stderr = child.stderr.take();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
let output_rx = output_tx.subscribe();
let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(128);
let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(128);
let writer_handle = if let Some(stdin) = stdin {
@@ -173,15 +182,17 @@ async fn spawn_process_with_stdin_mode(
};
let stdout_handle = stdout.map(|stdout| {
let output_tx = output_tx.clone();
let stdout_tx = stdout_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stdout), stdout_tx).await;
read_output_stream(BufReader::new(stdout), output_tx, stdout_tx).await;
})
});
let stderr_handle = stderr.map(|stderr| {
let output_tx = output_tx.clone();
let stderr_tx = stderr_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stderr), stderr_tx).await;
read_output_stream(BufReader::new(stderr), output_tx, stderr_tx).await;
})
});
let mut reader_abort_handles = Vec::new();
@@ -219,6 +230,7 @@ async fn spawn_process_with_stdin_mode(
let handle = ProcessHandle::new(
writer_tx,
Some(pid),
Box::new(PipeChildTerminator {
#[cfg(windows)]
pid,
@@ -234,15 +246,16 @@ async fn spawn_process_with_stdin_mode(
None,
);
Ok(SpawnedProcess {
Ok(SpawnedProcessSplit {
session: handle,
output_rx,
stdout_rx,
stderr_rx,
exit_rx,
})
}
/// Spawn a process using regular pipes (no PTY), returning handles for stdin, split output, and exit.
/// Spawn a process using regular pipes (no PTY), returning handles for stdin, merged output, and exit.
pub async fn spawn_process(
program: &str,
args: &[String],
@@ -250,7 +263,13 @@ pub async fn spawn_process(
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped).await
let spawned =
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped).await?;
Ok(SpawnedProcess {
session: spawned.session,
output_rx: spawned.output_rx,
exit_rx: spawned.exit_rx,
})
}
/// Spawn a process using regular pipes, but close stdin immediately.
@@ -261,5 +280,22 @@ pub async fn spawn_process_no_stdin(
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Null).await
let spawned =
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Null).await?;
Ok(SpawnedProcess {
session: spawned.session,
output_rx: spawned.output_rx,
exit_rx: spawned.exit_rx,
})
}
/// Spawn a process using regular pipes (no PTY), preserving split stdout/stderr streams.
pub async fn spawn_process_split(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcessSplit> {
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped).await
}

View File

@@ -55,6 +55,7 @@ impl fmt::Debug for PtyHandles {
/// Handle for driving an interactive process (PTY or pipe).
pub struct ProcessHandle {
writer_tx: StdMutex<Option<mpsc::Sender<Vec<u8>>>>,
pid: Option<u32>,
killer: StdMutex<Option<Box<dyn ChildTerminator>>>,
reader_handle: StdMutex<Option<JoinHandle<()>>>,
reader_abort_handles: StdMutex<Vec<AbortHandle>>,
@@ -77,6 +78,7 @@ impl ProcessHandle {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
writer_tx: mpsc::Sender<Vec<u8>>,
pid: Option<u32>,
killer: Box<dyn ChildTerminator>,
reader_handle: JoinHandle<()>,
reader_abort_handles: Vec<AbortHandle>,
@@ -88,6 +90,7 @@ impl ProcessHandle {
) -> Self {
Self {
writer_tx: StdMutex::new(Some(writer_tx)),
pid,
killer: StdMutex::new(Some(killer)),
reader_handle: StdMutex::new(Some(reader_handle)),
reader_abort_handles: StdMutex::new(reader_abort_handles),
@@ -112,6 +115,11 @@ impl ProcessHandle {
writer_tx
}
/// Returns the child process ID when available.
pub fn pid(&self) -> Option<u32> {
self.pid
}
/// True if the child process has exited.
pub fn has_exited(&self) -> bool {
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
@@ -171,9 +179,7 @@ impl ProcessHandle {
}
}
if let Ok(mut h) = self.wait_handle.lock() {
if let Some(handle) = h.take() {
handle.abort();
}
let _ = h.take();
}
}
}
@@ -184,45 +190,19 @@ impl Drop for ProcessHandle {
}
}
/// Combine split stdout/stderr receivers into a single broadcast receiver.
pub fn combine_output_receivers(
mut stdout_rx: mpsc::Receiver<Vec<u8>>,
mut stderr_rx: mpsc::Receiver<Vec<u8>>,
) -> broadcast::Receiver<Vec<u8>> {
let (combined_tx, combined_rx) = broadcast::channel(256);
tokio::spawn(async move {
let mut stdout_open = true;
let mut stderr_open = true;
loop {
tokio::select! {
stdout = stdout_rx.recv(), if stdout_open => match stdout {
Some(chunk) => {
let _ = combined_tx.send(chunk);
}
None => {
stdout_open = false;
}
},
stderr = stderr_rx.recv(), if stderr_open => match stderr {
Some(chunk) => {
let _ = combined_tx.send(chunk);
}
None => {
stderr_open = false;
}
},
else => break,
}
}
});
combined_rx
}
/// Return value from PTY or pipe spawn helpers.
#[derive(Debug)]
pub struct SpawnedProcess {
pub session: ProcessHandle,
pub output_rx: broadcast::Receiver<Vec<u8>>,
pub exit_rx: oneshot::Receiver<i32>,
}
/// Return value from split-output spawn helpers.
#[derive(Debug)]
pub struct SpawnedProcessSplit {
pub session: ProcessHandle,
pub output_rx: broadcast::Receiver<Vec<u8>>,
pub stdout_rx: mpsc::Receiver<Vec<u8>>,
pub stderr_rx: mpsc::Receiver<Vec<u8>>,
pub exit_rx: oneshot::Receiver<i32>,

View File

@@ -10,6 +10,7 @@ use anyhow::Result;
#[cfg(not(windows))]
use portable_pty::native_pty_system;
use portable_pty::CommandBuilder;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
@@ -71,7 +72,7 @@ fn platform_native_pty_system() -> Box<dyn portable_pty::PtySystem + Send> {
}
}
/// Spawn a process attached to a PTY, returning handles for stdin, split output, and exit.
/// Spawn a process attached to a PTY, returning handles for stdin, merged output, and exit.
pub async fn spawn_process(
program: &str,
args: &[String],
@@ -98,6 +99,7 @@ pub async fn spawn_process(
}
let mut child = pair.slave.spawn_command(command_builder)?;
let pid = child.process_id();
#[cfg(unix)]
// portable-pty establishes the spawned PTY child as a new session leader on
// Unix, so PID == PGID and we can reuse the pipe backend's process-group
@@ -106,8 +108,8 @@ pub async fn spawn_process(
let killer = child.clone_killer();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(128);
let (_stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(1);
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
let output_rx = output_tx.subscribe();
let mut reader = pair.master.try_clone_reader()?;
let reader_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
let mut buf = [0u8; 8_192];
@@ -115,7 +117,7 @@ pub async fn spawn_process(
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let _ = stdout_tx.blocking_send(buf[..n].to_vec());
let _ = output_tx.send(buf[..n].to_vec());
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
@@ -169,6 +171,7 @@ pub async fn spawn_process(
let handle = ProcessHandle::new(
writer_tx,
pid,
Box::new(PtyChildTerminator {
killer,
#[cfg(unix)]
@@ -185,8 +188,7 @@ pub async fn spawn_process(
Ok(SpawnedProcess {
session: handle,
stdout_rx,
stderr_rx,
output_rx,
exit_rx,
})
}

View File

@@ -3,9 +3,8 @@ use std::path::Path;
use pretty_assertions::assert_eq;
use crate::combine_output_receivers;
use crate::spawn_pipe_process;
use crate::spawn_pipe_process_no_stdin;
use crate::spawn_pipe_process_split;
use crate::spawn_pty_process;
use crate::SpawnedProcess;
use crate::TerminalSize;
@@ -59,14 +58,6 @@ fn split_stdout_stderr_command() -> String {
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
}
async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>>) -> Vec<u8> {
let mut collected = Vec::new();
while let Some(chunk) = output_rx.recv().await {
collected.extend_from_slice(&chunk);
}
collected
}
fn combine_spawned_output(
spawned: SpawnedProcess,
) -> (
@@ -76,15 +67,18 @@ fn combine_spawned_output(
) {
let SpawnedProcess {
session,
stdout_rx,
stderr_rx,
output_rx,
exit_rx,
} = spawned;
(
session,
combine_output_receivers(stdout_rx, stderr_rx),
exit_rx,
)
(session, output_rx, exit_rx)
}
async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>>) -> Vec<u8> {
let mut collected = Vec::new();
while let Some(chunk) = output_rx.recv().await {
collected.extend_from_slice(&chunk);
}
collected
}
async fn collect_output_until_exit(
@@ -319,6 +313,106 @@ async fn pipe_process_round_trips_stdin() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_preserves_initial_output_for_quick_exit() -> anyhow::Result<()> {
let env_map: HashMap<String, String> = std::env::vars().collect();
let script = if cfg!(windows) {
"echo stdout-line & echo stderr-line 1>&2"
} else {
"printf 'stdout-line\\n'; printf 'stderr-line\\n' >&2"
};
let (program, args) = shell_command(script);
for _ in 0..32 {
let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
let (output, code) =
collect_output_until_exit(spawned.output_rx, spawned.exit_rx, 2_000).await;
let text = String::from_utf8_lossy(&output);
assert_eq!(code, 0, "expected quick pipe process to exit cleanly");
assert!(
text.contains("stdout-line"),
"expected stdout in merged output: {text:?}"
);
assert!(
text.contains("stderr-line"),
"expected stderr in merged output: {text:?}"
);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_split_preserves_stdout_and_stderr() -> anyhow::Result<()> {
let Some(python) = find_python() else {
eprintln!("python not found; skipping pipe_process_split_preserves_stdout_and_stderr");
return Ok(());
};
let args = vec![
"-u".to_string(),
"-c".to_string(),
"import sys; print('stdout-line'); sys.stderr.write('stderr-line\\n'); sys.stderr.flush()"
.to_string(),
];
let env_map: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_pipe_process_split(&python, &args, Path::new("."), &env_map, &None).await?;
let crate::process::SpawnedProcessSplit {
output_rx,
stdout_rx,
stderr_rx,
exit_rx,
..
} = spawned;
let stdout_task = tokio::spawn(async move { collect_split_output(stdout_rx).await });
let stderr_task = tokio::spawn(async move { collect_split_output(stderr_rx).await });
let merged_task =
tokio::spawn(async move { collect_output_until_exit(output_rx, exit_rx, 5_000).await });
let stdout = stdout_task.await?;
let stderr = stderr_task.await?;
let (merged, code) = merged_task.await?;
assert_eq!(code, 0, "expected python -c to exit cleanly");
assert_eq!(String::from_utf8_lossy(&stdout), "stdout-line\n");
assert_eq!(String::from_utf8_lossy(&stderr), "stderr-line\n");
let merged = String::from_utf8_lossy(&merged);
assert!(merged.contains("stdout-line"));
assert!(merged.contains("stderr-line"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_split_preserves_initial_merged_output_for_quick_exit() -> anyhow::Result<()> {
let env_map: HashMap<String, String> = std::env::vars().collect();
let script = if cfg!(windows) {
"echo stdout-line & echo stderr-line 1>&2"
} else {
"printf 'stdout-line\\n'; printf 'stderr-line\\n' >&2"
};
let (program, args) = shell_command(script);
for _ in 0..32 {
let spawned =
spawn_pipe_process_split(&program, &args, Path::new("."), &env_map, &None).await?;
let (output, code) =
collect_output_until_exit(spawned.output_rx, spawned.exit_rx, 2_000).await;
let text = String::from_utf8_lossy(&output);
assert_eq!(code, 0, "expected quick split pipe process to exit cleanly");
assert!(
text.contains("stdout-line"),
"expected stdout in merged output: {text:?}"
);
assert!(
text.contains("stderr-line"),
"expected stderr in merged output: {text:?}"
);
}
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_detaches_from_parent_session() -> anyhow::Result<()> {
@@ -443,12 +537,13 @@ async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()>
shell_command(&split_stdout_stderr_command())
};
let spawned =
spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?;
let SpawnedProcess {
spawn_pipe_process_split(&program, &args, Path::new("."), &env_map, &None).await?;
let crate::process::SpawnedProcessSplit {
session: _session,
stdout_rx,
stderr_rx,
exit_rx,
..
} = spawned;
let stdout_task = tokio::spawn(async move { collect_split_output(stdout_rx).await });

View File

@@ -19,7 +19,17 @@ js_repl = true
js_repl_tools_only = true
```
When enabled, direct model tool calls are restricted to `js_repl` and `js_repl_reset`; other tools remain available via `await codex.tool(...)` inside js_repl.
When enabled, direct model tool calls are restricted to `js_repl` and `js_repl_reset` (and `js_repl_poll` if polling is enabled). Other tools remain available via `await codex.tool(...)` inside `js_repl`.
`js_repl_polling` can be enabled to allow async/polled execution:
```toml
[features]
js_repl = true
js_repl_polling = true
```
When enabled, `js_repl` accepts `poll=true` in the first-line pragma and returns both `exec_id` and `session_id`. Reuse polling state by passing `session_id=<id>` in later `js_repl` pragmas. Omit `session_id` to create a new polling session; unknown `session_id` values return an error. Use `js_repl_poll` with `exec_id` until `status` becomes `completed` or `error`.
## Node runtime
@@ -59,6 +69,9 @@ imported local file. They are not resolved relative to the imported file's locat
- `js_repl` is a freeform tool: send raw JavaScript source text.
- Optional first-line pragma:
- `// codex-js-repl: timeout_ms=15000`
- `// codex-js-repl: poll=true`
- `// codex-js-repl: poll=true session_id=my-session`
- Use space-separated pragma arguments.
- Top-level bindings persist across calls.
- If a cell throws, prior bindings remain available, lexical bindings whose initialization completed before the throw stay available in later calls, and hoisted `var` / `function` bindings persist only when execution clearly reached their declaration or a supported write site.
- Supported hoisted-`var` failed-cell cases are direct top-level identifier writes and updates before the declaration (for example `x = 1`, `x += 1`, `x++`, `x &&= 1`) and non-empty top-level `for...in` / `for...of` loops.
@@ -70,6 +83,17 @@ imported local file. They are not resolved relative to the imported file's locat
- Local file modules reload between execs, so a later `await import("./file.js")` picks up edits and fixed failures. Top-level bindings you already created still persist until `js_repl_reset`.
- Use `js_repl_reset` to clear the kernel state.
### Polling flow
1. Submit with `js_repl` and `poll=true` pragma.
2. Read `exec_id` and `session_id` from the JSON response.
3. Call `js_repl_poll` with `{"exec_id":"...","yield_time_ms":5000}`.
4. Repeat until `status` is `completed` or `error`. If a poll returns `status: running`, keep polling the same `exec_id` even if the logs or `final_output` already look complete. Completed polls can also include nested multimodal tool output after the JSON status item.
5. Optional: reuse session state by submitting another polled `js_repl` call with `session_id=<id>` (must already exist). Omit `session_id` to create a new polling session.
6. Reset one session with `js_repl_reset({"session_id":"..."})`, or reset all kernels with `js_repl_reset({})`.
`timeout_ms` is only supported for non-polling `js_repl` executions. With `poll=true`, use `js_repl_poll.yield_time_ms` to control how long each poll waits before returning. If omitted, or set below `5000`, `js_repl_poll` waits up to 5 seconds before returning if nothing new arrives.
## Helper APIs inside the kernel
`js_repl` exposes these globals: