Compare commits

...

8 Commits

Author SHA1 Message Date
pakrym-oai
a0072cf521 Refactor code mode worker dispatch 2026-03-11 23:14:52 -07:00
pakrym-oai
2ddde61431 codex: fix CI failure on PR #14295 2026-03-11 21:02:35 -07:00
pakrym-oai
5c06a41d7f Simplify code mode file wait helper 2026-03-11 20:56:21 -07:00
pakrym-oai
97268c2a00 Refine code mode session handling 2026-03-11 20:48:31 -07:00
pakrym-oai
5067c4c084 add blob size policy check and introduce tool search
- enforce blob size policy in CI with allowlist and Python checker
- update initialize capabilities docs and tests to use `thread/started` example
- extend response schemas and TS types with `namespace` on function calls and new `tool_search_call` / `tool_search_output` items
- wire tool search through Responses API, SSE parsing, history normalization, compaction, rollout policy, telemetry, and arc monitor
- add
2026-03-11 19:39:28 -07:00
pakrym-oai
b44eb3bb35 Simplify code-mode shared session state 2026-03-11 17:39:47 -07:00
pakrym-oai
e1cc893f95 Add long-running code-mode session waits 2026-03-11 17:00:51 -07:00
pakrym-oai
cf0785e99b codex: persist code mode runner sessions 2026-03-11 14:02:34 -07:00
9 changed files with 2201 additions and 512 deletions

View File

@@ -1648,7 +1648,9 @@ impl Session {
config.features.enabled(Feature::RuntimeMetrics),
Self::build_model_client_beta_features_header(config.as_ref()),
),
code_mode_store: Default::default(),
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
),
};
let js_repl = Arc::new(JsReplHandle::with_node_path(
config.js_repl_node_path.clone(),
@@ -5462,6 +5464,11 @@ pub(crate) async fn run_turn(
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
// many turns, from the perspective of the user, it is a single turn.
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let _code_mode_worker = sess
.services
.code_mode_service
.start_turn_worker(&sess, &turn_context, &turn_diff_tracker)
.await;
let mut server_model_warning_emitted_for_turn = false;
// `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse

View File

@@ -2162,7 +2162,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
config.features.enabled(Feature::RuntimeMetrics),
Session::build_model_client_beta_features_header(config.as_ref()),
),
code_mode_store: Default::default(),
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
),
};
let js_repl = Arc::new(JsReplHandle::with_node_path(
config.js_repl_node_path.clone(),
@@ -2723,7 +2725,9 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
config.features.enabled(Feature::RuntimeMetrics),
Session::build_model_client_beta_features_header(config.as_ref()),
),
code_mode_store: Default::default(),
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
),
};
let js_repl = Arc::new(JsReplHandle::with_node_path(
config.js_repl_node_path.clone(),

View File

@@ -15,6 +15,7 @@ use crate::models_manager::manager::ModelsManager;
use crate::plugins::PluginsManager;
use crate::skills::SkillsManager;
use crate::state_db::StateDbHandle;
use crate::tools::code_mode::CodeModeService;
use crate::tools::network_approval::NetworkApprovalService;
use crate::tools::runtimes::ExecveSessionApproval;
use crate::tools::sandboxing::ApprovalStore;
@@ -22,35 +23,12 @@ use crate::unified_exec::UnifiedExecProcessManager;
use codex_hooks::Hooks;
use codex_otel::SessionTelemetry;
use codex_utils_absolute_path::AbsolutePathBuf;
use serde_json::Value as JsonValue;
use std::path::PathBuf;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
pub(crate) struct CodeModeStoreService {
stored_values: Mutex<HashMap<String, JsonValue>>,
}
impl Default for CodeModeStoreService {
fn default() -> Self {
Self {
stored_values: Mutex::new(HashMap::new()),
}
}
}
impl CodeModeStoreService {
pub(crate) async fn stored_values(&self) -> HashMap<String, JsonValue> {
self.stored_values.lock().await.clone()
}
pub(crate) async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
*self.stored_values.lock().await = values;
}
}
pub(crate) struct SessionServices {
pub(crate) mcp_connection_manager: Arc<RwLock<McpConnectionManager>>,
pub(crate) mcp_startup_cancellation_token: Mutex<CancellationToken>,
@@ -82,5 +60,5 @@ pub(crate) struct SessionServices {
pub(crate) state_db: Option<StateDbHandle>,
/// Session-scoped model client shared across turns.
pub(crate) model_client: ModelClient,
pub(crate) code_mode_store: CodeModeStoreService,
pub(crate) code_mode_service: CodeModeService,
}

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -6,7 +7,6 @@ use crate::client_common::tools::ToolSpec;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::config::Config;
use crate::exec_env::create_env;
use crate::features::Feature;
use crate::function_tool::FunctionCallError;
use crate::tools::ToolRouter;
@@ -30,10 +30,17 @@ use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::warn;
const CODE_MODE_RUNNER_SOURCE: &str = include_str!("code_mode_runner.cjs");
const CODE_MODE_BRIDGE_SOURCE: &str = include_str!("code_mode_bridge.js");
pub(crate) const PUBLIC_TOOL_NAME: &str = "exec";
pub(crate) const WAIT_TOOL_NAME: &str = "exec_wait";
pub(crate) const DEFAULT_WAIT_YIELD_TIME_MS: u64 = 10_000;
#[derive(Clone)]
struct ExecContext {
@@ -42,6 +49,199 @@ struct ExecContext {
tracker: SharedTurnDiffTracker,
}
pub(crate) struct CodeModeProcess {
child: tokio::process::Child,
stdin: Arc<Mutex<tokio::process::ChildStdin>>,
stdout_task: JoinHandle<()>,
response_waiters: Arc<Mutex<HashMap<String, oneshot::Sender<NodeToHostMessage>>>>,
tool_call_rx: Arc<Mutex<mpsc::UnboundedReceiver<CodeModeToolCall>>>,
}
pub(crate) struct CodeModeWorker {
shutdown_tx: Option<oneshot::Sender<()>>,
task: JoinHandle<()>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct CodeModeToolCall {
request_id: String,
id: String,
name: String,
#[serde(default)]
input: Option<JsonValue>,
}
impl Drop for CodeModeWorker {
fn drop(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
}
impl CodeModeProcess {
fn worker(&self, exec: ExecContext) -> CodeModeWorker {
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let stdin = Arc::clone(&self.stdin);
let tool_call_rx = Arc::clone(&self.tool_call_rx);
let task = tokio::spawn(async move {
loop {
let tool_call = tokio::select! {
_ = &mut shutdown_rx => break,
tool_call = async {
let mut tool_call_rx = tool_call_rx.lock().await;
tool_call_rx.recv().await
} => tool_call,
};
let Some(tool_call) = tool_call else {
break;
};
let response = HostToNodeMessage::Response {
request_id: tool_call.request_id,
id: tool_call.id,
code_mode_result: call_nested_tool(
exec.clone(),
tool_call.name,
tool_call.input,
)
.await,
};
if let Err(err) = write_message(&stdin, &response).await {
warn!("failed to write {PUBLIC_TOOL_NAME} tool response: {err}");
break;
}
}
});
CodeModeWorker {
shutdown_tx: Some(shutdown_tx),
task,
}
}
async fn send(
&mut self,
request_id: &str,
message: &HostToNodeMessage,
) -> Result<NodeToHostMessage, std::io::Error> {
if self.stdout_task.is_finished() {
return Err(std::io::Error::other(format!(
"{PUBLIC_TOOL_NAME} runner is not available"
)));
}
let (tx, rx) = oneshot::channel();
self.response_waiters
.lock()
.await
.insert(request_id.to_string(), tx);
if let Err(err) = write_message(&self.stdin, message).await {
self.response_waiters.lock().await.remove(request_id);
return Err(err);
}
match rx.await {
Ok(message) => Ok(message),
Err(_) => Err(std::io::Error::other(format!(
"{PUBLIC_TOOL_NAME} runner is not available"
))),
}
}
fn has_exited(&mut self) -> Result<bool, std::io::Error> {
self.child
.try_wait()
.map(|status| status.is_some())
.map_err(std::io::Error::other)
}
}
pub(crate) struct CodeModeService {
js_repl_node_path: Option<PathBuf>,
stored_values: Mutex<HashMap<String, JsonValue>>,
process: Arc<Mutex<Option<CodeModeProcess>>>,
next_session_id: Mutex<i32>,
}
impl CodeModeService {
pub(crate) fn new(js_repl_node_path: Option<PathBuf>) -> Self {
Self {
js_repl_node_path,
stored_values: Mutex::new(HashMap::new()),
process: Arc::new(Mutex::new(None)),
next_session_id: Mutex::new(1),
}
}
pub(crate) async fn stored_values(&self) -> HashMap<String, JsonValue> {
self.stored_values.lock().await.clone()
}
pub(crate) async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
*self.stored_values.lock().await = values;
}
async fn ensure_started(
&self,
) -> Result<tokio::sync::OwnedMutexGuard<Option<CodeModeProcess>>, std::io::Error> {
let mut process_slot = self.process.lock().await;
let needs_spawn = match process_slot.as_mut() {
Some(process) => !matches!(process.has_exited(), Ok(false)),
None => true,
};
if needs_spawn {
let node_path = resolve_compatible_node(self.js_repl_node_path.as_deref())
.await
.map_err(std::io::Error::other)?;
*process_slot = Some(spawn_code_mode_process(&node_path).await?);
}
drop(process_slot);
Ok(self.process.clone().lock_owned().await)
}
pub(crate) async fn start_turn_worker(
&self,
session: &Arc<Session>,
turn: &Arc<TurnContext>,
tracker: &SharedTurnDiffTracker,
) -> Option<CodeModeWorker> {
if !turn.features.enabled(Feature::CodeMode) {
return None;
}
let exec = ExecContext {
session: Arc::clone(session),
turn: Arc::clone(turn),
tracker: Arc::clone(tracker),
};
let mut process_slot = match self.ensure_started().await {
Ok(process_slot) => process_slot,
Err(err) => {
warn!("failed to start {PUBLIC_TOOL_NAME} worker for turn: {err}");
return None;
}
};
let Some(process) = process_slot.as_mut() else {
warn!(
"failed to start {PUBLIC_TOOL_NAME} worker for turn: {PUBLIC_TOOL_NAME} runner failed to start"
);
return None;
};
Some(process.worker(exec))
}
pub(crate) async fn allocate_session_id(&self) -> i32 {
let mut next_session_id = self.next_session_id.lock().await;
let session_id = *next_session_id;
*next_session_id = next_session_id.saturating_add(1);
session_id
}
pub(crate) async fn allocate_request_id(&self) -> String {
uuid::Uuid::new_v4().to_string()
}
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
enum CodeModeToolKind {
@@ -63,12 +263,24 @@ struct EnabledTool {
#[derive(Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum HostToNodeMessage {
Init {
Start {
request_id: String,
session_id: i32,
enabled_tools: Vec<EnabledTool>,
stored_values: HashMap<String, JsonValue>,
source: String,
},
Poll {
request_id: String,
session_id: i32,
yield_time_ms: u64,
},
Terminate {
request_id: String,
session_id: i32,
},
Response {
request_id: String,
id: String,
code_mode_result: JsonValue,
},
@@ -78,12 +290,19 @@ enum HostToNodeMessage {
#[serde(tag = "type", rename_all = "snake_case")]
enum NodeToHostMessage {
ToolCall {
id: String,
name: String,
#[serde(default)]
input: Option<JsonValue>,
#[serde(flatten)]
tool_call: CodeModeToolCall,
},
Yielded {
request_id: String,
content_items: Vec<JsonValue>,
},
Terminated {
request_id: String,
content_items: Vec<JsonValue>,
},
Result {
request_id: String,
content_items: Vec<JsonValue>,
stored_values: HashMap<String, JsonValue>,
#[serde(default)]
@@ -93,6 +312,18 @@ enum NodeToHostMessage {
},
}
enum CodeModeSessionProgress {
Finished(FunctionToolOutput),
Yielded { output: FunctionToolOutput },
}
enum CodeModeExecutionStatus {
Completed,
Failed,
Running(i32),
Terminated,
}
pub(crate) fn instructions(config: &Config) -> Option<String> {
if !config.features.enabled(Feature::CodeMode) {
return None;
@@ -113,7 +344,10 @@ pub(crate) fn instructions(config: &Config) -> Option<String> {
));
section.push_str("- Import nested tools from `tools.js`, for example `import { exec_command } from \"tools.js\"` or `import { ALL_TOOLS } from \"tools.js\"` to inspect the available `{ module, name, description }` entries. Namespaced tools are also available from `tools/<namespace...>.js`; MCP tools use `tools/mcp/<server>.js`, for example `import { append_notebook_logs_chart } from \"tools/mcp/ologs.js\"`. Nested tool calls resolve to their code-mode result values.\n");
section.push_str(&format!(
"- Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, store, load }}` from `@openai/code_mode` (or `\"openai/code_mode\"`). `output_text(value)` surfaces text back to the model and stringifies non-string objects with `JSON.stringify(...)` when possible. `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs. `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, and `load(key)` returns a cloned stored value or `undefined`. `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate the final Rust-side result of the current `{PUBLIC_TOOL_NAME}` execution; the default is `10000`. This guards the overall `{PUBLIC_TOOL_NAME}` output, not individual nested tool invocations. The returned content starts with a separate `Script completed` or `Script failed` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker.\n",
"- Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, set_yield_time, store, load }}` from `@openai/code_mode` (or `\"openai/code_mode\"`). `output_text(value)` surfaces text back to the model and stringifies non-string objects with `JSON.stringify(...)` when possible. `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs. `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, and `load(key)` returns a cloned stored value or `undefined`. `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate direct `{PUBLIC_TOOL_NAME}` returns; `{WAIT_TOOL_NAME}` uses its own `max_tokens` argument instead and defaults to `10000`. `set_yield_time(value)` asks `{PUBLIC_TOOL_NAME}` to return early if the script is still running after that many milliseconds so `{WAIT_TOOL_NAME}` can resume it later. The returned content starts with a separate `Script completed`, `Script failed`, or `Script running with session ID …` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker.\n",
));
section.push_str(&format!(
"- If `{PUBLIC_TOOL_NAME}` returns `Script running with session ID …`, call `{WAIT_TOOL_NAME}` with that `session_id` to keep waiting for more output, completion, or termination.\n",
));
section.push_str(
"- Function tools require JSON object arguments. Freeform tools require raw strings.\n",
@@ -136,186 +370,330 @@ pub(crate) async fn execute(
tracker,
};
let enabled_tools = build_enabled_tools(&exec).await;
let stored_values = exec.session.services.code_mode_store.stored_values().await;
let service = &exec.session.services.code_mode_service;
let stored_values = service.stored_values().await;
let source = build_source(&code, &enabled_tools).map_err(FunctionCallError::RespondToModel)?;
execute_node(exec, source, enabled_tools, stored_values)
let session_id = service.allocate_session_id().await;
let request_id = service.allocate_request_id().await;
let process_slot = service
.ensure_started()
.await
.map_err(FunctionCallError::RespondToModel)
.map_err(|err| FunctionCallError::RespondToModel(err.to_string()))?;
let started_at = std::time::Instant::now();
let message = HostToNodeMessage::Start {
request_id: request_id.clone(),
session_id,
enabled_tools,
stored_values,
source,
};
let result = {
let mut process_slot = process_slot;
let Some(process) = process_slot.as_mut() else {
return Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} runner failed to start"
)));
};
let message = process
.send(&request_id, &message)
.await
.map_err(|err| err.to_string());
let message = match message {
Ok(message) => message,
Err(error) => return Err(FunctionCallError::RespondToModel(error)),
};
handle_node_message(&exec, session_id, message, None, started_at).await
};
match result {
Ok(CodeModeSessionProgress::Finished(output))
| Ok(CodeModeSessionProgress::Yielded { output }) => Ok(output),
Err(error) => Err(FunctionCallError::RespondToModel(error)),
}
}
async fn execute_node(
exec: ExecContext,
source: String,
enabled_tools: Vec<EnabledTool>,
stored_values: HashMap<String, JsonValue>,
) -> Result<FunctionToolOutput, String> {
let node_path = resolve_compatible_node(exec.turn.config.js_repl_node_path.as_deref()).await?;
pub(crate) async fn wait(
session: Arc<Session>,
turn: Arc<TurnContext>,
tracker: SharedTurnDiffTracker,
session_id: i32,
yield_time_ms: u64,
max_output_tokens: Option<usize>,
terminate: bool,
) -> Result<FunctionToolOutput, FunctionCallError> {
let exec = ExecContext {
session,
turn,
tracker,
};
let request_id = exec
.session
.services
.code_mode_service
.allocate_request_id()
.await;
let started_at = std::time::Instant::now();
let message = if terminate {
HostToNodeMessage::Terminate {
request_id: request_id.clone(),
session_id,
}
} else {
HostToNodeMessage::Poll {
request_id: request_id.clone(),
session_id,
yield_time_ms,
}
};
let process_slot = exec
.session
.services
.code_mode_service
.ensure_started()
.await
.map_err(|err| FunctionCallError::RespondToModel(err.to_string()))?;
let result = {
let mut process_slot = process_slot;
let Some(process) = process_slot.as_mut() else {
return Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} runner failed to start"
)));
};
if !matches!(process.has_exited(), Ok(false)) {
return Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} runner failed to start"
)));
}
let message = process
.send(&request_id, &message)
.await
.map_err(|err| err.to_string());
let message = match message {
Ok(message) => message,
Err(error) => return Err(FunctionCallError::RespondToModel(error)),
};
handle_node_message(
&exec,
session_id,
message,
Some(max_output_tokens),
started_at,
)
.await
};
match result {
Ok(CodeModeSessionProgress::Finished(output))
| Ok(CodeModeSessionProgress::Yielded { output }) => Ok(output),
Err(error) => Err(FunctionCallError::RespondToModel(error)),
}
}
let env = create_env(&exec.turn.shell_environment_policy, None);
let mut cmd = tokio::process::Command::new(&node_path);
async fn handle_node_message(
exec: &ExecContext,
session_id: i32,
message: NodeToHostMessage,
poll_max_output_tokens: Option<Option<usize>>,
started_at: std::time::Instant,
) -> Result<CodeModeSessionProgress, String> {
match message {
NodeToHostMessage::ToolCall { .. } => Err(format!(
"{PUBLIC_TOOL_NAME} received an unexpected tool call response"
)),
NodeToHostMessage::Yielded { content_items, .. } => {
let mut delta_items = output_content_items_from_json_values(content_items)?;
delta_items = truncate_code_mode_result(delta_items, poll_max_output_tokens.flatten());
prepend_script_status(
&mut delta_items,
CodeModeExecutionStatus::Running(session_id),
started_at.elapsed(),
);
Ok(CodeModeSessionProgress::Yielded {
output: FunctionToolOutput::from_content(delta_items, Some(true)),
})
}
NodeToHostMessage::Terminated { content_items, .. } => {
let mut delta_items = output_content_items_from_json_values(content_items)?;
delta_items = truncate_code_mode_result(delta_items, poll_max_output_tokens.flatten());
prepend_script_status(
&mut delta_items,
CodeModeExecutionStatus::Terminated,
started_at.elapsed(),
);
Ok(CodeModeSessionProgress::Finished(
FunctionToolOutput::from_content(delta_items, Some(true)),
))
}
NodeToHostMessage::Result {
content_items,
stored_values,
error_text,
max_output_tokens_per_exec_call,
..
} => {
exec.session
.services
.code_mode_service
.replace_stored_values(stored_values)
.await;
let mut delta_items = output_content_items_from_json_values(content_items)?;
let success = error_text.is_none();
if let Some(error_text) = error_text {
delta_items.push(FunctionCallOutputContentItem::InputText {
text: format!("Script error:\n{error_text}"),
});
}
let mut delta_items = truncate_code_mode_result(
delta_items,
poll_max_output_tokens.unwrap_or(max_output_tokens_per_exec_call),
);
prepend_script_status(
&mut delta_items,
if success {
CodeModeExecutionStatus::Completed
} else {
CodeModeExecutionStatus::Failed
},
started_at.elapsed(),
);
Ok(CodeModeSessionProgress::Finished(
FunctionToolOutput::from_content(delta_items, Some(success)),
))
}
}
}
async fn spawn_code_mode_process(
node_path: &std::path::Path,
) -> Result<CodeModeProcess, std::io::Error> {
let mut cmd = tokio::process::Command::new(node_path);
cmd.arg("--experimental-vm-modules");
cmd.arg("--eval");
cmd.arg(CODE_MODE_RUNNER_SOURCE);
cmd.current_dir(&exec.turn.cwd);
cmd.env_clear();
cmd.envs(env);
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true);
let mut child = cmd
.spawn()
.map_err(|err| format!("failed to start {PUBLIC_TOOL_NAME} Node runtime: {err}"))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| format!("{PUBLIC_TOOL_NAME} runner missing stdout"))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| format!("{PUBLIC_TOOL_NAME} runner missing stderr"))?;
let mut stdin = child
let mut child = cmd.spawn().map_err(std::io::Error::other)?;
let stdout = child.stdout.take().ok_or_else(|| {
std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stdout"))
})?;
let stderr = child.stderr.take().ok_or_else(|| {
std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stderr"))
})?;
let stdin = child
.stdin
.take()
.ok_or_else(|| format!("{PUBLIC_TOOL_NAME} runner missing stdin"))?;
.ok_or_else(|| std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stdin")))?;
let stdin = Arc::new(Mutex::new(stdin));
let response_waiters = Arc::new(Mutex::new(HashMap::<
String,
oneshot::Sender<NodeToHostMessage>,
>::new()));
let (tool_call_tx, tool_call_rx) = mpsc::unbounded_channel();
let stderr_task = tokio::spawn(async move {
tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut buf = Vec::new();
let _ = reader.read_to_end(&mut buf).await;
String::from_utf8_lossy(&buf).trim().to_string()
match reader.read_to_end(&mut buf).await {
Ok(_) => {
let stderr = String::from_utf8_lossy(&buf).trim().to_string();
if !stderr.is_empty() {
warn!("{PUBLIC_TOOL_NAME} runner stderr: {stderr}");
}
}
Err(err) => {
warn!("failed to read {PUBLIC_TOOL_NAME} stderr: {err}");
}
}
});
let stdout_task = tokio::spawn({
let response_waiters = Arc::clone(&response_waiters);
let tool_call_tx = tool_call_tx.clone();
async move {
let mut stdout_lines = BufReader::new(stdout).lines();
loop {
let line = match stdout_lines.next_line().await {
Ok(line) => line,
Err(err) => {
warn!("failed to read {PUBLIC_TOOL_NAME} stdout: {err}");
break;
}
};
let Some(line) = line else {
break;
};
if line.trim().is_empty() {
continue;
}
let message: NodeToHostMessage = match serde_json::from_str(&line) {
Ok(message) => message,
Err(err) => {
warn!("failed to parse {PUBLIC_TOOL_NAME} stdout message: {err}");
break;
}
};
match message {
NodeToHostMessage::ToolCall { tool_call } => {
let _ = tool_call_tx.send(tool_call);
}
message => {
let request_id = message_request_id(&message).to_string();
if let Some(waiter) = response_waiters.lock().await.remove(&request_id) {
let _ = waiter.send(message);
}
}
}
}
response_waiters.lock().await.clear();
}
});
write_message(
&mut stdin,
&HostToNodeMessage::Init {
enabled_tools: enabled_tools.clone(),
stored_values,
source,
},
)
.await?;
let mut stdout_lines = BufReader::new(stdout).lines();
let mut pending_result = None;
while let Some(line) = stdout_lines
.next_line()
.await
.map_err(|err| format!("failed to read {PUBLIC_TOOL_NAME} runner stdout: {err}"))?
{
if line.trim().is_empty() {
continue;
}
let message: NodeToHostMessage = serde_json::from_str(&line).map_err(|err| {
format!("invalid {PUBLIC_TOOL_NAME} runner message: {err}; line={line}")
})?;
match message {
NodeToHostMessage::ToolCall { id, name, input } => {
let response = HostToNodeMessage::Response {
id,
code_mode_result: call_nested_tool(exec.clone(), name, input).await,
};
write_message(&mut stdin, &response).await?;
}
NodeToHostMessage::Result {
content_items,
stored_values,
error_text,
max_output_tokens_per_exec_call,
} => {
exec.session
.services
.code_mode_store
.replace_stored_values(stored_values)
.await;
pending_result = Some((
output_content_items_from_json_values(content_items)?,
error_text,
max_output_tokens_per_exec_call,
));
break;
}
}
}
drop(stdin);
let status = child
.wait()
.await
.map_err(|err| format!("failed to wait for {PUBLIC_TOOL_NAME} runner: {err}"))?;
let stderr = stderr_task
.await
.map_err(|err| format!("failed to collect {PUBLIC_TOOL_NAME} stderr: {err}"))?;
let wall_time = started_at.elapsed();
let success = status.success();
let Some((mut content_items, error_text, max_output_tokens_per_exec_call)) = pending_result
else {
let message = if stderr.is_empty() {
format!("{PUBLIC_TOOL_NAME} runner exited without returning a result (status {status})")
} else {
stderr
};
return Err(message);
};
if !success {
let error_text = error_text.unwrap_or_else(|| {
if stderr.is_empty() {
format!("Process exited with status {status}")
} else {
stderr
}
});
content_items.push(FunctionCallOutputContentItem::InputText {
text: format!("Script error:\n{error_text}"),
});
}
let mut content_items =
truncate_code_mode_result(content_items, max_output_tokens_per_exec_call);
prepend_script_status(&mut content_items, success, wall_time);
Ok(FunctionToolOutput::from_content(
content_items,
Some(success),
))
Ok(CodeModeProcess {
child,
stdin,
stdout_task,
response_waiters,
tool_call_rx: Arc::new(Mutex::new(tool_call_rx)),
})
}
async fn write_message(
stdin: &mut tokio::process::ChildStdin,
stdin: &Arc<Mutex<tokio::process::ChildStdin>>,
message: &HostToNodeMessage,
) -> Result<(), String> {
let line = serde_json::to_string(message)
.map_err(|err| format!("failed to serialize {PUBLIC_TOOL_NAME} message: {err}"))?;
stdin
.write_all(line.as_bytes())
.await
.map_err(|err| format!("failed to write {PUBLIC_TOOL_NAME} message: {err}"))?;
stdin
.write_all(b"\n")
.await
.map_err(|err| format!("failed to write {PUBLIC_TOOL_NAME} message newline: {err}"))?;
stdin
.flush()
.await
.map_err(|err| format!("failed to flush {PUBLIC_TOOL_NAME} message: {err}"))
) -> Result<(), std::io::Error> {
let line = serde_json::to_string(message).map_err(std::io::Error::other)?;
let mut stdin = stdin.lock().await;
stdin.write_all(line.as_bytes()).await?;
stdin.write_all(b"\n").await?;
stdin.flush().await?;
Ok(())
}
fn message_request_id(message: &NodeToHostMessage) -> &str {
match message {
NodeToHostMessage::ToolCall { tool_call } => &tool_call.request_id,
NodeToHostMessage::Yielded { request_id, .. }
| NodeToHostMessage::Terminated { request_id, .. }
| NodeToHostMessage::Result { request_id, .. } => request_id,
}
}
fn prepend_script_status(
content_items: &mut Vec<FunctionCallOutputContentItem>,
success: bool,
status: CodeModeExecutionStatus,
wall_time: Duration,
) {
let wall_time_seconds = ((wall_time.as_secs_f32()) * 10.0).round() / 10.0;
let header = format!(
"{}\nWall time {wall_time_seconds:.1} seconds\nOutput:\n",
if success {
"Script completed"
} else {
"Script failed"
match status {
CodeModeExecutionStatus::Completed => "Script completed".to_string(),
CodeModeExecutionStatus::Failed => "Script failed".to_string(),
CodeModeExecutionStatus::Running(session_id) => {
format!("Script running with session ID {session_id}")
}
CodeModeExecutionStatus::Terminated => "Script terminated".to_string(),
}
);
content_items.insert(0, FunctionCallOutputContentItem::InputText { text: header });
@@ -365,7 +743,7 @@ async fn build_enabled_tools(exec: &ExecContext) -> Vec<EnabledTool> {
fn enabled_tool_from_spec(spec: ToolSpec) -> Option<EnabledTool> {
let tool_name = spec.name().to_string();
if tool_name == PUBLIC_TOOL_NAME {
if tool_name == PUBLIC_TOOL_NAME || tool_name == WAIT_TOOL_NAME {
return None;
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,16 +1,35 @@
use async_trait::async_trait;
use serde::Deserialize;
use crate::features::Feature;
use crate::function_tool::FunctionCallError;
use crate::tools::code_mode;
use crate::tools::code_mode::DEFAULT_WAIT_YIELD_TIME_MS;
use crate::tools::code_mode::PUBLIC_TOOL_NAME;
use crate::tools::code_mode::WAIT_TOOL_NAME;
use crate::tools::context::FunctionToolOutput;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolPayload;
use crate::tools::handlers::parse_arguments;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
pub struct CodeModeHandler;
pub struct CodeModeWaitHandler;
#[derive(Debug, Deserialize)]
struct ExecWaitArgs {
session_id: i32,
#[serde(default = "default_wait_yield_time_ms")]
yield_time_ms: u64,
#[serde(default)]
max_tokens: Option<usize>,
#[serde(default)]
terminate: bool,
}
fn default_wait_yield_time_ms() -> u64 {
DEFAULT_WAIT_YIELD_TIME_MS
}
#[async_trait]
impl ToolHandler for CodeModeHandler {
@@ -29,25 +48,57 @@ impl ToolHandler for CodeModeHandler {
session,
turn,
tracker,
tool_name,
payload,
..
} = invocation;
if !session.features().enabled(Feature::CodeMode) {
return Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} is disabled by feature flag"
)));
}
let code = match payload {
ToolPayload::Custom { input } => input,
_ => {
return Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} expects raw JavaScript source text"
)));
match payload {
ToolPayload::Custom { input } if tool_name == PUBLIC_TOOL_NAME => {
code_mode::execute(session, turn, tracker, input).await
}
};
code_mode::execute(session, turn, tracker, code).await
_ => Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} expects raw JavaScript source text"
))),
}
}
}
#[async_trait]
impl ToolHandler for CodeModeWaitHandler {
type Output = FunctionToolOutput;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
tracker,
tool_name,
payload,
..
} = invocation;
match payload {
ToolPayload::Function { arguments } if tool_name == WAIT_TOOL_NAME => {
let args: ExecWaitArgs = parse_arguments(&arguments)?;
code_mode::wait(
session,
turn,
tracker,
args.session_id,
args.yield_time_ms,
args.max_tokens,
args.terminate,
)
.await
}
_ => Err(FunctionCallError::RespondToModel(format!(
"{WAIT_TOOL_NAME} expects JSON arguments"
))),
}
}
}

View File

@@ -34,6 +34,7 @@ use crate::sandboxing::normalize_additional_permissions;
pub use apply_patch::ApplyPatchHandler;
pub use artifacts::ArtifactsHandler;
pub use code_mode::CodeModeHandler;
pub use code_mode::CodeModeWaitHandler;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::AskForApproval;
pub use dynamic::DynamicToolHandler;

View File

@@ -8,7 +8,9 @@ use crate::features::Features;
use crate::mcp_connection_manager::ToolInfo;
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use crate::original_image_detail::can_request_original_image_detail;
use crate::tools::code_mode::DEFAULT_WAIT_YIELD_TIME_MS;
use crate::tools::code_mode::PUBLIC_TOOL_NAME;
use crate::tools::code_mode::WAIT_TOOL_NAME;
use crate::tools::code_mode_description::augment_tool_spec_for_code_mode;
use crate::tools::handlers::PLAN_TOOL;
use crate::tools::handlers::TOOL_SEARCH_DEFAULT_LIMIT;
@@ -579,6 +581,55 @@ fn create_write_stdin_tool() -> ToolSpec {
})
}
fn create_exec_wait_tool() -> ToolSpec {
let properties = BTreeMap::from([
(
"session_id".to_string(),
JsonSchema::Number {
description: Some("Identifier of the running exec session.".to_string()),
},
),
(
"yield_time_ms".to_string(),
JsonSchema::Number {
description: Some(
"How long to wait (in milliseconds) for more output before yielding again."
.to_string(),
),
},
),
(
"max_tokens".to_string(),
JsonSchema::Number {
description: Some(
"Maximum number of output tokens to return for this wait call.".to_string(),
),
},
),
(
"terminate".to_string(),
JsonSchema::Boolean {
description: Some("Whether to terminate the running exec session.".to_string()),
},
),
]);
ToolSpec::Function(ResponsesApiTool {
name: WAIT_TOOL_NAME.to_string(),
description: format!(
"Waits on a yielded `{PUBLIC_TOOL_NAME}` session and returns new output or completion."
),
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["session_id".to_string()]),
additional_properties: Some(false.into()),
},
output_schema: None,
defer_loading: None,
})
}
fn create_shell_tool(request_permission_enabled: bool) -> ToolSpec {
let mut properties = BTreeMap::from([
(
@@ -1692,7 +1743,7 @@ source: /[\s\S]+/
enabled_tool_names.join(", ")
};
let description = format!(
"Runs JavaScript in a Node-backed `node:vm` context. This is a freeform tool: send raw JavaScript source text (no JSON/quotes/markdown fences). Direct tool calls remain available while `{PUBLIC_TOOL_NAME}` is enabled. Inside JavaScript, import nested tools from `tools.js`, for example `import {{ exec_command }} from \"tools.js\"` or `import {{ ALL_TOOLS }} from \"tools.js\"` to inspect the available `{{ module, name, description }}` entries. Namespaced tools are also available from `tools/<namespace...>.js`; MCP tools use `tools/mcp/<server>.js`, for example `import {{ append_notebook_logs_chart }} from \"tools/mcp/ologs.js\"`. Nested tool calls resolve to their code-mode result values. Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, store, load }}` from `\"@openai/code_mode\"` (or `\"openai/code_mode\"`); `output_text(value)` surfaces text back to the model and stringifies non-string objects when possible, `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs, `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, `load(key)` returns a cloned stored value or `undefined`, and `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate the final Rust-side result of the current `{PUBLIC_TOOL_NAME}` execution. The default is `10000`. This guards the overall `{PUBLIC_TOOL_NAME}` output, not individual nested tool invocations. The returned content starts with a separate `Script completed` or `Script failed` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker. Function tools require JSON object arguments. Freeform tools require raw strings. `add_content(value)` remains available for compatibility with a content item, content-item array, or string. Structured nested-tool results should be converted to text first, for example with `JSON.stringify(...)`. Only content passed to `output_text(...)`, `output_image(...)`, or `add_content(value)` is surfaced back to the model. Enabled nested tools: {enabled_list}."
"Runs JavaScript in a Node-backed `node:vm` context. This is a freeform tool: send raw JavaScript source text (no JSON/quotes/markdown fences). Direct tool calls remain available while `{PUBLIC_TOOL_NAME}` is enabled. Inside JavaScript, import nested tools from `tools.js`, for example `import {{ exec_command }} from \"tools.js\"` or `import {{ ALL_TOOLS }} from \"tools.js\"` to inspect the available `{{ module, name, description }}` entries. Namespaced tools are also available from `tools/<namespace...>.js`; MCP tools use `tools/mcp/<server>.js`, for example `import {{ append_notebook_logs_chart }} from \"tools/mcp/ologs.js\"`. Nested tool calls resolve to their code-mode result values. Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, set_yield_time, store, load }}` from `\"@openai/code_mode\"` (or `\"openai/code_mode\"`); `output_text(value)` surfaces text back to the model and stringifies non-string objects when possible, `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs, `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, `load(key)` returns a cloned stored value or `undefined`, `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate direct `{PUBLIC_TOOL_NAME}` returns, and `{WAIT_TOOL_NAME}` uses its own `max_tokens` argument with a default of `10000`. `set_yield_time(value)` asks `{PUBLIC_TOOL_NAME}` to return early if the script is still running after that many milliseconds so `{WAIT_TOOL_NAME}` can resume it later. The default wait timeout for `{WAIT_TOOL_NAME}` is {DEFAULT_WAIT_YIELD_TIME_MS}. The returned content starts with a separate `Script completed`, `Script failed`, or `Script running with session ID …` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker. Function tools require JSON object arguments. Freeform tools require raw strings. `add_content(value)` remains available for compatibility with a content item, content-item array, or string. Structured nested-tool results should be converted to text first, for example with `JSON.stringify(...)`. Only content passed to `output_text(...)`, `output_image(...)`, or `add_content(value)` is surfaced back to the model. Enabled nested tools: {enabled_list}."
);
ToolSpec::Freeform(FreeformTool {
@@ -1707,7 +1758,9 @@ source: /[\s\S]+/
}
fn is_code_mode_nested_tool(spec: &ToolSpec) -> bool {
spec.name() != PUBLIC_TOOL_NAME && matches!(spec, ToolSpec::Function(_) | ToolSpec::Freeform(_))
spec.name() != PUBLIC_TOOL_NAME
&& spec.name() != WAIT_TOOL_NAME
&& matches!(spec, ToolSpec::Function(_) | ToolSpec::Freeform(_))
}
fn create_list_mcp_resources_tool() -> ToolSpec {
@@ -2092,6 +2145,7 @@ pub(crate) fn build_specs(
use crate::tools::handlers::ApplyPatchHandler;
use crate::tools::handlers::ArtifactsHandler;
use crate::tools::handlers::CodeModeHandler;
use crate::tools::handlers::CodeModeWaitHandler;
use crate::tools::handlers::DynamicToolHandler;
use crate::tools::handlers::GrepFilesHandler;
use crate::tools::handlers::JsReplHandler;
@@ -2128,6 +2182,7 @@ pub(crate) fn build_specs(
default_mode_request_user_input: config.default_mode_request_user_input,
});
let code_mode_handler = Arc::new(CodeModeHandler);
let code_mode_wait_handler = Arc::new(CodeModeWaitHandler);
let js_repl_handler = Arc::new(JsReplHandler);
let js_repl_reset_handler = Arc::new(JsReplResetHandler);
let artifacts_handler = Arc::new(ArtifactsHandler);
@@ -2157,6 +2212,13 @@ pub(crate) fn build_specs(
config.code_mode_enabled,
);
builder.register_handler(PUBLIC_TOOL_NAME, code_mode_handler);
push_tool_spec(
&mut builder,
create_exec_wait_tool(),
false,
config.code_mode_enabled,
);
builder.register_handler(WAIT_TOOL_NAME, code_mode_wait_handler);
}
match &config.shell_type {

View File

@@ -21,6 +21,7 @@ use pretty_assertions::assert_eq;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::time::Duration;
use wiremock::MockServer;
@@ -32,6 +33,16 @@ fn custom_tool_output_items(req: &ResponsesRequest, call_id: &str) -> Vec<Value>
.clone()
}
fn function_tool_output_items(req: &ResponsesRequest, call_id: &str) -> Vec<Value> {
match req.function_call_output(call_id).get("output") {
Some(Value::Array(items)) => items.clone(),
Some(Value::String(text)) => {
vec![serde_json::json!({ "type": "input_text", "text": text })]
}
_ => panic!("function tool output should be serialized as text or content items"),
}
}
fn text_item(items: &[Value], index: usize) -> &str {
items[index]
.get("text")
@@ -39,6 +50,23 @@ fn text_item(items: &[Value], index: usize) -> &str {
.expect("content item should be input_text")
}
fn extract_running_session_id(text: &str) -> i32 {
text.strip_prefix("Script running with session ID ")
.and_then(|rest| rest.split('\n').next())
.expect("running header should contain a session ID")
.parse()
.expect("session ID should parse as i32")
}
fn wait_for_file_source(path: &Path) -> Result<String> {
let quoted_path = shlex::try_join([path.to_string_lossy().as_ref()])?;
let command = format!("if [ -f {quoted_path} ]; then printf ready; fi");
Ok(format!(
r#"while ((await exec_command({{ cmd: {command:?} }})).output !== "ready") {{
}}"#
))
}
fn custom_tool_output_body_and_success(
req: &ResponsesRequest,
call_id: &str,
@@ -289,6 +317,799 @@ Error:\ boom\n
Ok(())
}
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_can_yield_and_resume_with_exec_wait() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
let phase_2_gate = test.workspace_path("code-mode-phase-2.ready");
let phase_3_gate = test.workspace_path("code-mode-phase-3.ready");
let phase_2_wait = wait_for_file_source(&phase_2_gate)?;
let phase_3_wait = wait_for_file_source(&phase_3_gate)?;
let code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("phase 1");
set_yield_time(10);
{phase_2_wait}
output_text("phase 2");
{phase_3_wait}
output_text("phase 3");
"#
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call("call-1", "exec", &code),
ev_completed("resp-1"),
]),
)
.await;
let first_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "waiting"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("start the long exec").await?;
let first_request = first_completion.single_request();
let first_items = custom_tool_output_items(&first_request, "call-1");
assert_eq!(first_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&first_items, 0),
);
assert_eq!(text_item(&first_items, 1), "phase 1");
let session_id = extract_running_session_id(text_item(&first_items, 0));
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-3"),
responses::ev_function_call(
"call-2",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_id,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-3"),
]),
)
.await;
let second_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-2", "still waiting"),
ev_completed("resp-4"),
]),
)
.await;
fs::write(&phase_2_gate, "ready")?;
test.submit_turn("wait again").await?;
let second_request = second_completion.single_request();
let second_items = function_tool_output_items(&second_request, "call-2");
assert_eq!(second_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&second_items, 0),
);
assert_eq!(
extract_running_session_id(text_item(&second_items, 0)),
session_id
);
assert_eq!(text_item(&second_items, 1), "phase 2");
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-5"),
responses::ev_function_call(
"call-3",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_id,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-5"),
]),
)
.await;
let third_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-3", "done"),
ev_completed("resp-6"),
]),
)
.await;
fs::write(&phase_3_gate, "ready")?;
test.submit_turn("wait for completion").await?;
let third_request = third_completion.single_request();
let third_items = function_tool_output_items(&third_request, "call-3");
assert_eq!(third_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&third_items, 0),
);
assert_eq!(text_item(&third_items, 1), "phase 3");
Ok(())
}
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_can_run_multiple_yielded_sessions() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
let session_a_gate = test.workspace_path("code-mode-session-a.ready");
let session_b_gate = test.workspace_path("code-mode-session-b.ready");
let session_a_wait = wait_for_file_source(&session_a_gate)?;
let session_b_wait = wait_for_file_source(&session_b_gate)?;
let session_a_code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("session a start");
set_yield_time(10);
{session_a_wait}
output_text("session a done");
"#
);
let session_b_code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("session b start");
set_yield_time(10);
{session_b_wait}
output_text("session b done");
"#
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call("call-1", "exec", &session_a_code),
ev_completed("resp-1"),
]),
)
.await;
let first_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "session a waiting"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("start session a").await?;
let first_request = first_completion.single_request();
let first_items = custom_tool_output_items(&first_request, "call-1");
assert_eq!(first_items.len(), 2);
let session_a_id = extract_running_session_id(text_item(&first_items, 0));
assert_eq!(text_item(&first_items, 1), "session a start");
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-3"),
ev_custom_tool_call("call-2", "exec", &session_b_code),
ev_completed("resp-3"),
]),
)
.await;
let second_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-2", "session b waiting"),
ev_completed("resp-4"),
]),
)
.await;
test.submit_turn("start session b").await?;
let second_request = second_completion.single_request();
let second_items = custom_tool_output_items(&second_request, "call-2");
assert_eq!(second_items.len(), 2);
let session_b_id = extract_running_session_id(text_item(&second_items, 0));
assert_eq!(text_item(&second_items, 1), "session b start");
assert_ne!(session_a_id, session_b_id);
fs::write(&session_a_gate, "ready")?;
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-5"),
responses::ev_function_call(
"call-3",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_a_id,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-5"),
]),
)
.await;
let third_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-3", "session a done"),
ev_completed("resp-6"),
]),
)
.await;
test.submit_turn("wait session a").await?;
let third_request = third_completion.single_request();
let third_items = function_tool_output_items(&third_request, "call-3");
assert_eq!(third_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&third_items, 0),
);
assert_eq!(text_item(&third_items, 1), "session a done");
fs::write(&session_b_gate, "ready")?;
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-7"),
responses::ev_function_call(
"call-4",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_b_id,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-7"),
]),
)
.await;
let fourth_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-4", "session b done"),
ev_completed("resp-8"),
]),
)
.await;
test.submit_turn("wait session b").await?;
let fourth_request = fourth_completion.single_request();
let fourth_items = function_tool_output_items(&fourth_request, "call-4");
assert_eq!(fourth_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&fourth_items, 0),
);
assert_eq!(text_item(&fourth_items, 1), "session b done");
Ok(())
}
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_can_terminate_and_continue() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
let termination_gate = test.workspace_path("code-mode-terminate.ready");
let termination_wait = wait_for_file_source(&termination_gate)?;
let code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("phase 1");
set_yield_time(10);
{termination_wait}
output_text("phase 2");
"#
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call("call-1", "exec", &code),
ev_completed("resp-1"),
]),
)
.await;
let first_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "waiting"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("start the long exec").await?;
let first_request = first_completion.single_request();
let first_items = custom_tool_output_items(&first_request, "call-1");
assert_eq!(first_items.len(), 2);
let session_id = extract_running_session_id(text_item(&first_items, 0));
assert_eq!(text_item(&first_items, 1), "phase 1");
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-3"),
responses::ev_function_call(
"call-2",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_id,
"terminate": true,
}))?,
),
ev_completed("resp-3"),
]),
)
.await;
let second_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-2", "terminated"),
ev_completed("resp-4"),
]),
)
.await;
test.submit_turn("terminate it").await?;
let second_request = second_completion.single_request();
let second_items = function_tool_output_items(&second_request, "call-2");
assert_eq!(second_items.len(), 1);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script terminated\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&second_items, 0),
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-5"),
ev_custom_tool_call(
"call-3",
"exec",
r#"
import { output_text } from "@openai/code_mode";
output_text("after terminate");
"#,
),
ev_completed("resp-5"),
]),
)
.await;
let third_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-3", "done"),
ev_completed("resp-6"),
]),
)
.await;
test.submit_turn("run another exec").await?;
let third_request = third_completion.single_request();
let third_items = custom_tool_output_items(&third_request, "call-3");
assert_eq!(third_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&third_items, 0),
);
assert_eq!(text_item(&third_items, 1), "after terminate");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_returns_error_for_unknown_session() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
responses::ev_function_call(
"call-1",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": 999_999,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-1"),
]),
)
.await;
let completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("wait on an unknown exec session").await?;
let request = completion.single_request();
let (_, success) = request
.function_call_output_content_and_success("call-1")
.expect("function tool output should be present");
assert_ne!(success, Some(true));
let items = function_tool_output_items(&request, "call-1");
assert_eq!(items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script failed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&items, 0),
);
assert_eq!(
text_item(&items, 1),
"Script error:\nexec session 999999 not found"
);
Ok(())
}
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_terminate_returns_completed_session_if_it_finished_in_background()
-> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
let session_a_gate = test.workspace_path("code-mode-session-a-finished.ready");
let session_b_gate = test.workspace_path("code-mode-session-b-blocked.ready");
let session_a_wait = wait_for_file_source(&session_a_gate)?;
let session_b_wait = wait_for_file_source(&session_b_gate)?;
let session_a_code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("session a start");
set_yield_time(10);
{session_a_wait}
output_text("session a done");
"#
);
let session_b_code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("session b start");
set_yield_time(10);
{session_b_wait}
output_text("session b done");
"#
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call("call-1", "exec", &session_a_code),
ev_completed("resp-1"),
]),
)
.await;
let first_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "session a waiting"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("start session a").await?;
let first_request = first_completion.single_request();
let first_items = custom_tool_output_items(&first_request, "call-1");
assert_eq!(first_items.len(), 2);
let session_a_id = extract_running_session_id(text_item(&first_items, 0));
assert_eq!(text_item(&first_items, 1), "session a start");
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-3"),
ev_custom_tool_call("call-2", "exec", &session_b_code),
ev_completed("resp-3"),
]),
)
.await;
let second_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-2", "session b waiting"),
ev_completed("resp-4"),
]),
)
.await;
test.submit_turn("start session b").await?;
let second_request = second_completion.single_request();
let second_items = custom_tool_output_items(&second_request, "call-2");
assert_eq!(second_items.len(), 2);
let session_b_id = extract_running_session_id(text_item(&second_items, 0));
assert_eq!(text_item(&second_items, 1), "session b start");
fs::write(&session_a_gate, "ready")?;
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-5"),
responses::ev_function_call(
"call-3",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_b_id,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-5"),
]),
)
.await;
let third_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-3", "session b still waiting"),
ev_completed("resp-6"),
]),
)
.await;
test.submit_turn("wait session b").await?;
let third_request = third_completion.single_request();
let third_items = function_tool_output_items(&third_request, "call-3");
assert_eq!(third_items.len(), 1);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&third_items, 0),
);
assert_eq!(
extract_running_session_id(text_item(&third_items, 0)),
session_b_id
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-7"),
responses::ev_function_call(
"call-4",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_a_id,
"terminate": true,
}))?,
),
ev_completed("resp-7"),
]),
)
.await;
let fourth_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-4", "session a already done"),
ev_completed("resp-8"),
]),
)
.await;
test.submit_turn("terminate session a").await?;
let fourth_request = fourth_completion.single_request();
let fourth_items = function_tool_output_items(&fourth_request, "call-4");
assert_eq!(fourth_items.len(), 1);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script terminated\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&fourth_items, 0),
);
Ok(())
}
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_uses_its_own_max_tokens_budget() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
let completion_gate = test.workspace_path("code-mode-max-tokens.ready");
let completion_wait = wait_for_file_source(&completion_gate)?;
let code = format!(
r#"
import {{ output_text, set_max_output_tokens_per_exec_call, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("phase 1");
set_max_output_tokens_per_exec_call(100);
set_yield_time(10);
{completion_wait}
output_text("token one token two token three token four token five token six token seven");
"#
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call("call-1", "exec", &code),
ev_completed("resp-1"),
]),
)
.await;
let first_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "waiting"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("start the long exec").await?;
let first_request = first_completion.single_request();
let first_items = custom_tool_output_items(&first_request, "call-1");
assert_eq!(first_items.len(), 2);
assert_eq!(text_item(&first_items, 1), "phase 1");
let session_id = extract_running_session_id(text_item(&first_items, 0));
fs::write(&completion_gate, "ready")?;
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-3"),
responses::ev_function_call(
"call-2",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_id,
"yield_time_ms": 1_000,
"max_tokens": 6,
}))?,
),
ev_completed("resp-3"),
]),
)
.await;
let second_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-2", "done"),
ev_completed("resp-4"),
]),
)
.await;
test.submit_turn("wait for completion").await?;
let second_request = second_completion.single_request();
let second_items = function_tool_output_items(&second_request, "call-2");
assert_eq!(second_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&second_items, 0),
);
let expected_pattern = r#"(?sx)
\A
Total\ output\ lines:\ 1\n
\n
.*…\d+\ tokens\ truncated….*
\z
"#;
assert_regex_match(expected_pattern, text_item(&second_items, 1));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_can_output_serialized_text_via_openai_code_mode_module() -> Result<()> {
skip_if_no_network!(Ok(()));