mirror of
https://github.com/openai/codex.git
synced 2026-03-12 09:33:32 +00:00
Compare commits
8 Commits
dev/shaqay
...
pakrym/cod
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a0072cf521 | ||
|
|
2ddde61431 | ||
|
|
5c06a41d7f | ||
|
|
97268c2a00 | ||
|
|
5067c4c084 | ||
|
|
b44eb3bb35 | ||
|
|
e1cc893f95 | ||
|
|
cf0785e99b |
@@ -1648,7 +1648,9 @@ impl Session {
|
||||
config.features.enabled(Feature::RuntimeMetrics),
|
||||
Self::build_model_client_beta_features_header(config.as_ref()),
|
||||
),
|
||||
code_mode_store: Default::default(),
|
||||
code_mode_service: crate::tools::code_mode::CodeModeService::new(
|
||||
config.js_repl_node_path.clone(),
|
||||
),
|
||||
};
|
||||
let js_repl = Arc::new(JsReplHandle::with_node_path(
|
||||
config.js_repl_node_path.clone(),
|
||||
@@ -5462,6 +5464,11 @@ pub(crate) async fn run_turn(
|
||||
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
|
||||
// many turns, from the perspective of the user, it is a single turn.
|
||||
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
|
||||
let _code_mode_worker = sess
|
||||
.services
|
||||
.code_mode_service
|
||||
.start_turn_worker(&sess, &turn_context, &turn_diff_tracker)
|
||||
.await;
|
||||
let mut server_model_warning_emitted_for_turn = false;
|
||||
|
||||
// `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse
|
||||
|
||||
@@ -2162,7 +2162,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
config.features.enabled(Feature::RuntimeMetrics),
|
||||
Session::build_model_client_beta_features_header(config.as_ref()),
|
||||
),
|
||||
code_mode_store: Default::default(),
|
||||
code_mode_service: crate::tools::code_mode::CodeModeService::new(
|
||||
config.js_repl_node_path.clone(),
|
||||
),
|
||||
};
|
||||
let js_repl = Arc::new(JsReplHandle::with_node_path(
|
||||
config.js_repl_node_path.clone(),
|
||||
@@ -2723,7 +2725,9 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
|
||||
config.features.enabled(Feature::RuntimeMetrics),
|
||||
Session::build_model_client_beta_features_header(config.as_ref()),
|
||||
),
|
||||
code_mode_store: Default::default(),
|
||||
code_mode_service: crate::tools::code_mode::CodeModeService::new(
|
||||
config.js_repl_node_path.clone(),
|
||||
),
|
||||
};
|
||||
let js_repl = Arc::new(JsReplHandle::with_node_path(
|
||||
config.js_repl_node_path.clone(),
|
||||
|
||||
@@ -15,6 +15,7 @@ use crate::models_manager::manager::ModelsManager;
|
||||
use crate::plugins::PluginsManager;
|
||||
use crate::skills::SkillsManager;
|
||||
use crate::state_db::StateDbHandle;
|
||||
use crate::tools::code_mode::CodeModeService;
|
||||
use crate::tools::network_approval::NetworkApprovalService;
|
||||
use crate::tools::runtimes::ExecveSessionApproval;
|
||||
use crate::tools::sandboxing::ApprovalStore;
|
||||
@@ -22,35 +23,12 @@ use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use codex_hooks::Hooks;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::path::PathBuf;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub(crate) struct CodeModeStoreService {
|
||||
stored_values: Mutex<HashMap<String, JsonValue>>,
|
||||
}
|
||||
|
||||
impl Default for CodeModeStoreService {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
stored_values: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CodeModeStoreService {
|
||||
pub(crate) async fn stored_values(&self) -> HashMap<String, JsonValue> {
|
||||
self.stored_values.lock().await.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
|
||||
*self.stored_values.lock().await = values;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct SessionServices {
|
||||
pub(crate) mcp_connection_manager: Arc<RwLock<McpConnectionManager>>,
|
||||
pub(crate) mcp_startup_cancellation_token: Mutex<CancellationToken>,
|
||||
@@ -82,5 +60,5 @@ pub(crate) struct SessionServices {
|
||||
pub(crate) state_db: Option<StateDbHandle>,
|
||||
/// Session-scoped model client shared across turns.
|
||||
pub(crate) model_client: ModelClient,
|
||||
pub(crate) code_mode_store: CodeModeStoreService,
|
||||
pub(crate) code_mode_service: CodeModeService,
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -6,7 +7,6 @@ use crate::client_common::tools::ToolSpec;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::config::Config;
|
||||
use crate::exec_env::create_env;
|
||||
use crate::features::Feature;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::tools::ToolRouter;
|
||||
@@ -30,10 +30,17 @@ use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::warn;
|
||||
|
||||
const CODE_MODE_RUNNER_SOURCE: &str = include_str!("code_mode_runner.cjs");
|
||||
const CODE_MODE_BRIDGE_SOURCE: &str = include_str!("code_mode_bridge.js");
|
||||
pub(crate) const PUBLIC_TOOL_NAME: &str = "exec";
|
||||
pub(crate) const WAIT_TOOL_NAME: &str = "exec_wait";
|
||||
pub(crate) const DEFAULT_WAIT_YIELD_TIME_MS: u64 = 10_000;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ExecContext {
|
||||
@@ -42,6 +49,199 @@ struct ExecContext {
|
||||
tracker: SharedTurnDiffTracker,
|
||||
}
|
||||
|
||||
pub(crate) struct CodeModeProcess {
|
||||
child: tokio::process::Child,
|
||||
stdin: Arc<Mutex<tokio::process::ChildStdin>>,
|
||||
stdout_task: JoinHandle<()>,
|
||||
response_waiters: Arc<Mutex<HashMap<String, oneshot::Sender<NodeToHostMessage>>>>,
|
||||
tool_call_rx: Arc<Mutex<mpsc::UnboundedReceiver<CodeModeToolCall>>>,
|
||||
}
|
||||
|
||||
pub(crate) struct CodeModeWorker {
|
||||
shutdown_tx: Option<oneshot::Sender<()>>,
|
||||
task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
struct CodeModeToolCall {
|
||||
request_id: String,
|
||||
id: String,
|
||||
name: String,
|
||||
#[serde(default)]
|
||||
input: Option<JsonValue>,
|
||||
}
|
||||
|
||||
impl Drop for CodeModeWorker {
|
||||
fn drop(&mut self) {
|
||||
if let Some(shutdown_tx) = self.shutdown_tx.take() {
|
||||
let _ = shutdown_tx.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CodeModeProcess {
|
||||
fn worker(&self, exec: ExecContext) -> CodeModeWorker {
|
||||
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
|
||||
let stdin = Arc::clone(&self.stdin);
|
||||
let tool_call_rx = Arc::clone(&self.tool_call_rx);
|
||||
let task = tokio::spawn(async move {
|
||||
loop {
|
||||
let tool_call = tokio::select! {
|
||||
_ = &mut shutdown_rx => break,
|
||||
tool_call = async {
|
||||
let mut tool_call_rx = tool_call_rx.lock().await;
|
||||
tool_call_rx.recv().await
|
||||
} => tool_call,
|
||||
};
|
||||
let Some(tool_call) = tool_call else {
|
||||
break;
|
||||
};
|
||||
let response = HostToNodeMessage::Response {
|
||||
request_id: tool_call.request_id,
|
||||
id: tool_call.id,
|
||||
code_mode_result: call_nested_tool(
|
||||
exec.clone(),
|
||||
tool_call.name,
|
||||
tool_call.input,
|
||||
)
|
||||
.await,
|
||||
};
|
||||
if let Err(err) = write_message(&stdin, &response).await {
|
||||
warn!("failed to write {PUBLIC_TOOL_NAME} tool response: {err}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
CodeModeWorker {
|
||||
shutdown_tx: Some(shutdown_tx),
|
||||
task,
|
||||
}
|
||||
}
|
||||
|
||||
async fn send(
|
||||
&mut self,
|
||||
request_id: &str,
|
||||
message: &HostToNodeMessage,
|
||||
) -> Result<NodeToHostMessage, std::io::Error> {
|
||||
if self.stdout_task.is_finished() {
|
||||
return Err(std::io::Error::other(format!(
|
||||
"{PUBLIC_TOOL_NAME} runner is not available"
|
||||
)));
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.response_waiters
|
||||
.lock()
|
||||
.await
|
||||
.insert(request_id.to_string(), tx);
|
||||
if let Err(err) = write_message(&self.stdin, message).await {
|
||||
self.response_waiters.lock().await.remove(request_id);
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
match rx.await {
|
||||
Ok(message) => Ok(message),
|
||||
Err(_) => Err(std::io::Error::other(format!(
|
||||
"{PUBLIC_TOOL_NAME} runner is not available"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn has_exited(&mut self) -> Result<bool, std::io::Error> {
|
||||
self.child
|
||||
.try_wait()
|
||||
.map(|status| status.is_some())
|
||||
.map_err(std::io::Error::other)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct CodeModeService {
|
||||
js_repl_node_path: Option<PathBuf>,
|
||||
stored_values: Mutex<HashMap<String, JsonValue>>,
|
||||
process: Arc<Mutex<Option<CodeModeProcess>>>,
|
||||
next_session_id: Mutex<i32>,
|
||||
}
|
||||
|
||||
impl CodeModeService {
|
||||
pub(crate) fn new(js_repl_node_path: Option<PathBuf>) -> Self {
|
||||
Self {
|
||||
js_repl_node_path,
|
||||
stored_values: Mutex::new(HashMap::new()),
|
||||
process: Arc::new(Mutex::new(None)),
|
||||
next_session_id: Mutex::new(1),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn stored_values(&self) -> HashMap<String, JsonValue> {
|
||||
self.stored_values.lock().await.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
|
||||
*self.stored_values.lock().await = values;
|
||||
}
|
||||
|
||||
async fn ensure_started(
|
||||
&self,
|
||||
) -> Result<tokio::sync::OwnedMutexGuard<Option<CodeModeProcess>>, std::io::Error> {
|
||||
let mut process_slot = self.process.lock().await;
|
||||
let needs_spawn = match process_slot.as_mut() {
|
||||
Some(process) => !matches!(process.has_exited(), Ok(false)),
|
||||
None => true,
|
||||
};
|
||||
if needs_spawn {
|
||||
let node_path = resolve_compatible_node(self.js_repl_node_path.as_deref())
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
*process_slot = Some(spawn_code_mode_process(&node_path).await?);
|
||||
}
|
||||
drop(process_slot);
|
||||
Ok(self.process.clone().lock_owned().await)
|
||||
}
|
||||
|
||||
pub(crate) async fn start_turn_worker(
|
||||
&self,
|
||||
session: &Arc<Session>,
|
||||
turn: &Arc<TurnContext>,
|
||||
tracker: &SharedTurnDiffTracker,
|
||||
) -> Option<CodeModeWorker> {
|
||||
if !turn.features.enabled(Feature::CodeMode) {
|
||||
return None;
|
||||
}
|
||||
let exec = ExecContext {
|
||||
session: Arc::clone(session),
|
||||
turn: Arc::clone(turn),
|
||||
tracker: Arc::clone(tracker),
|
||||
};
|
||||
let mut process_slot = match self.ensure_started().await {
|
||||
Ok(process_slot) => process_slot,
|
||||
Err(err) => {
|
||||
warn!("failed to start {PUBLIC_TOOL_NAME} worker for turn: {err}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let Some(process) = process_slot.as_mut() else {
|
||||
warn!(
|
||||
"failed to start {PUBLIC_TOOL_NAME} worker for turn: {PUBLIC_TOOL_NAME} runner failed to start"
|
||||
);
|
||||
return None;
|
||||
};
|
||||
Some(process.worker(exec))
|
||||
}
|
||||
|
||||
pub(crate) async fn allocate_session_id(&self) -> i32 {
|
||||
let mut next_session_id = self.next_session_id.lock().await;
|
||||
let session_id = *next_session_id;
|
||||
*next_session_id = next_session_id.saturating_add(1);
|
||||
session_id
|
||||
}
|
||||
|
||||
pub(crate) async fn allocate_request_id(&self) -> String {
|
||||
uuid::Uuid::new_v4().to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
enum CodeModeToolKind {
|
||||
@@ -63,12 +263,24 @@ struct EnabledTool {
|
||||
#[derive(Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
enum HostToNodeMessage {
|
||||
Init {
|
||||
Start {
|
||||
request_id: String,
|
||||
session_id: i32,
|
||||
enabled_tools: Vec<EnabledTool>,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
source: String,
|
||||
},
|
||||
Poll {
|
||||
request_id: String,
|
||||
session_id: i32,
|
||||
yield_time_ms: u64,
|
||||
},
|
||||
Terminate {
|
||||
request_id: String,
|
||||
session_id: i32,
|
||||
},
|
||||
Response {
|
||||
request_id: String,
|
||||
id: String,
|
||||
code_mode_result: JsonValue,
|
||||
},
|
||||
@@ -78,12 +290,19 @@ enum HostToNodeMessage {
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
enum NodeToHostMessage {
|
||||
ToolCall {
|
||||
id: String,
|
||||
name: String,
|
||||
#[serde(default)]
|
||||
input: Option<JsonValue>,
|
||||
#[serde(flatten)]
|
||||
tool_call: CodeModeToolCall,
|
||||
},
|
||||
Yielded {
|
||||
request_id: String,
|
||||
content_items: Vec<JsonValue>,
|
||||
},
|
||||
Terminated {
|
||||
request_id: String,
|
||||
content_items: Vec<JsonValue>,
|
||||
},
|
||||
Result {
|
||||
request_id: String,
|
||||
content_items: Vec<JsonValue>,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
#[serde(default)]
|
||||
@@ -93,6 +312,18 @@ enum NodeToHostMessage {
|
||||
},
|
||||
}
|
||||
|
||||
enum CodeModeSessionProgress {
|
||||
Finished(FunctionToolOutput),
|
||||
Yielded { output: FunctionToolOutput },
|
||||
}
|
||||
|
||||
enum CodeModeExecutionStatus {
|
||||
Completed,
|
||||
Failed,
|
||||
Running(i32),
|
||||
Terminated,
|
||||
}
|
||||
|
||||
pub(crate) fn instructions(config: &Config) -> Option<String> {
|
||||
if !config.features.enabled(Feature::CodeMode) {
|
||||
return None;
|
||||
@@ -113,7 +344,10 @@ pub(crate) fn instructions(config: &Config) -> Option<String> {
|
||||
));
|
||||
section.push_str("- Import nested tools from `tools.js`, for example `import { exec_command } from \"tools.js\"` or `import { ALL_TOOLS } from \"tools.js\"` to inspect the available `{ module, name, description }` entries. Namespaced tools are also available from `tools/<namespace...>.js`; MCP tools use `tools/mcp/<server>.js`, for example `import { append_notebook_logs_chart } from \"tools/mcp/ologs.js\"`. Nested tool calls resolve to their code-mode result values.\n");
|
||||
section.push_str(&format!(
|
||||
"- Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, store, load }}` from `@openai/code_mode` (or `\"openai/code_mode\"`). `output_text(value)` surfaces text back to the model and stringifies non-string objects with `JSON.stringify(...)` when possible. `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs. `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, and `load(key)` returns a cloned stored value or `undefined`. `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate the final Rust-side result of the current `{PUBLIC_TOOL_NAME}` execution; the default is `10000`. This guards the overall `{PUBLIC_TOOL_NAME}` output, not individual nested tool invocations. The returned content starts with a separate `Script completed` or `Script failed` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker.\n",
|
||||
"- Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, set_yield_time, store, load }}` from `@openai/code_mode` (or `\"openai/code_mode\"`). `output_text(value)` surfaces text back to the model and stringifies non-string objects with `JSON.stringify(...)` when possible. `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs. `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, and `load(key)` returns a cloned stored value or `undefined`. `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate direct `{PUBLIC_TOOL_NAME}` returns; `{WAIT_TOOL_NAME}` uses its own `max_tokens` argument instead and defaults to `10000`. `set_yield_time(value)` asks `{PUBLIC_TOOL_NAME}` to return early if the script is still running after that many milliseconds so `{WAIT_TOOL_NAME}` can resume it later. The returned content starts with a separate `Script completed`, `Script failed`, or `Script running with session ID …` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker.\n",
|
||||
));
|
||||
section.push_str(&format!(
|
||||
"- If `{PUBLIC_TOOL_NAME}` returns `Script running with session ID …`, call `{WAIT_TOOL_NAME}` with that `session_id` to keep waiting for more output, completion, or termination.\n",
|
||||
));
|
||||
section.push_str(
|
||||
"- Function tools require JSON object arguments. Freeform tools require raw strings.\n",
|
||||
@@ -136,186 +370,330 @@ pub(crate) async fn execute(
|
||||
tracker,
|
||||
};
|
||||
let enabled_tools = build_enabled_tools(&exec).await;
|
||||
let stored_values = exec.session.services.code_mode_store.stored_values().await;
|
||||
let service = &exec.session.services.code_mode_service;
|
||||
let stored_values = service.stored_values().await;
|
||||
let source = build_source(&code, &enabled_tools).map_err(FunctionCallError::RespondToModel)?;
|
||||
execute_node(exec, source, enabled_tools, stored_values)
|
||||
let session_id = service.allocate_session_id().await;
|
||||
let request_id = service.allocate_request_id().await;
|
||||
let process_slot = service
|
||||
.ensure_started()
|
||||
.await
|
||||
.map_err(FunctionCallError::RespondToModel)
|
||||
.map_err(|err| FunctionCallError::RespondToModel(err.to_string()))?;
|
||||
let started_at = std::time::Instant::now();
|
||||
let message = HostToNodeMessage::Start {
|
||||
request_id: request_id.clone(),
|
||||
session_id,
|
||||
enabled_tools,
|
||||
stored_values,
|
||||
source,
|
||||
};
|
||||
let result = {
|
||||
let mut process_slot = process_slot;
|
||||
let Some(process) = process_slot.as_mut() else {
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"{PUBLIC_TOOL_NAME} runner failed to start"
|
||||
)));
|
||||
};
|
||||
let message = process
|
||||
.send(&request_id, &message)
|
||||
.await
|
||||
.map_err(|err| err.to_string());
|
||||
let message = match message {
|
||||
Ok(message) => message,
|
||||
Err(error) => return Err(FunctionCallError::RespondToModel(error)),
|
||||
};
|
||||
handle_node_message(&exec, session_id, message, None, started_at).await
|
||||
};
|
||||
match result {
|
||||
Ok(CodeModeSessionProgress::Finished(output))
|
||||
| Ok(CodeModeSessionProgress::Yielded { output }) => Ok(output),
|
||||
Err(error) => Err(FunctionCallError::RespondToModel(error)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_node(
|
||||
exec: ExecContext,
|
||||
source: String,
|
||||
enabled_tools: Vec<EnabledTool>,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
) -> Result<FunctionToolOutput, String> {
|
||||
let node_path = resolve_compatible_node(exec.turn.config.js_repl_node_path.as_deref()).await?;
|
||||
pub(crate) async fn wait(
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
tracker: SharedTurnDiffTracker,
|
||||
session_id: i32,
|
||||
yield_time_ms: u64,
|
||||
max_output_tokens: Option<usize>,
|
||||
terminate: bool,
|
||||
) -> Result<FunctionToolOutput, FunctionCallError> {
|
||||
let exec = ExecContext {
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
};
|
||||
let request_id = exec
|
||||
.session
|
||||
.services
|
||||
.code_mode_service
|
||||
.allocate_request_id()
|
||||
.await;
|
||||
let started_at = std::time::Instant::now();
|
||||
let message = if terminate {
|
||||
HostToNodeMessage::Terminate {
|
||||
request_id: request_id.clone(),
|
||||
session_id,
|
||||
}
|
||||
} else {
|
||||
HostToNodeMessage::Poll {
|
||||
request_id: request_id.clone(),
|
||||
session_id,
|
||||
yield_time_ms,
|
||||
}
|
||||
};
|
||||
let process_slot = exec
|
||||
.session
|
||||
.services
|
||||
.code_mode_service
|
||||
.ensure_started()
|
||||
.await
|
||||
.map_err(|err| FunctionCallError::RespondToModel(err.to_string()))?;
|
||||
let result = {
|
||||
let mut process_slot = process_slot;
|
||||
let Some(process) = process_slot.as_mut() else {
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"{PUBLIC_TOOL_NAME} runner failed to start"
|
||||
)));
|
||||
};
|
||||
if !matches!(process.has_exited(), Ok(false)) {
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"{PUBLIC_TOOL_NAME} runner failed to start"
|
||||
)));
|
||||
}
|
||||
let message = process
|
||||
.send(&request_id, &message)
|
||||
.await
|
||||
.map_err(|err| err.to_string());
|
||||
let message = match message {
|
||||
Ok(message) => message,
|
||||
Err(error) => return Err(FunctionCallError::RespondToModel(error)),
|
||||
};
|
||||
handle_node_message(
|
||||
&exec,
|
||||
session_id,
|
||||
message,
|
||||
Some(max_output_tokens),
|
||||
started_at,
|
||||
)
|
||||
.await
|
||||
};
|
||||
match result {
|
||||
Ok(CodeModeSessionProgress::Finished(output))
|
||||
| Ok(CodeModeSessionProgress::Yielded { output }) => Ok(output),
|
||||
Err(error) => Err(FunctionCallError::RespondToModel(error)),
|
||||
}
|
||||
}
|
||||
|
||||
let env = create_env(&exec.turn.shell_environment_policy, None);
|
||||
let mut cmd = tokio::process::Command::new(&node_path);
|
||||
async fn handle_node_message(
|
||||
exec: &ExecContext,
|
||||
session_id: i32,
|
||||
message: NodeToHostMessage,
|
||||
poll_max_output_tokens: Option<Option<usize>>,
|
||||
started_at: std::time::Instant,
|
||||
) -> Result<CodeModeSessionProgress, String> {
|
||||
match message {
|
||||
NodeToHostMessage::ToolCall { .. } => Err(format!(
|
||||
"{PUBLIC_TOOL_NAME} received an unexpected tool call response"
|
||||
)),
|
||||
NodeToHostMessage::Yielded { content_items, .. } => {
|
||||
let mut delta_items = output_content_items_from_json_values(content_items)?;
|
||||
delta_items = truncate_code_mode_result(delta_items, poll_max_output_tokens.flatten());
|
||||
prepend_script_status(
|
||||
&mut delta_items,
|
||||
CodeModeExecutionStatus::Running(session_id),
|
||||
started_at.elapsed(),
|
||||
);
|
||||
Ok(CodeModeSessionProgress::Yielded {
|
||||
output: FunctionToolOutput::from_content(delta_items, Some(true)),
|
||||
})
|
||||
}
|
||||
NodeToHostMessage::Terminated { content_items, .. } => {
|
||||
let mut delta_items = output_content_items_from_json_values(content_items)?;
|
||||
delta_items = truncate_code_mode_result(delta_items, poll_max_output_tokens.flatten());
|
||||
prepend_script_status(
|
||||
&mut delta_items,
|
||||
CodeModeExecutionStatus::Terminated,
|
||||
started_at.elapsed(),
|
||||
);
|
||||
Ok(CodeModeSessionProgress::Finished(
|
||||
FunctionToolOutput::from_content(delta_items, Some(true)),
|
||||
))
|
||||
}
|
||||
NodeToHostMessage::Result {
|
||||
content_items,
|
||||
stored_values,
|
||||
error_text,
|
||||
max_output_tokens_per_exec_call,
|
||||
..
|
||||
} => {
|
||||
exec.session
|
||||
.services
|
||||
.code_mode_service
|
||||
.replace_stored_values(stored_values)
|
||||
.await;
|
||||
let mut delta_items = output_content_items_from_json_values(content_items)?;
|
||||
let success = error_text.is_none();
|
||||
if let Some(error_text) = error_text {
|
||||
delta_items.push(FunctionCallOutputContentItem::InputText {
|
||||
text: format!("Script error:\n{error_text}"),
|
||||
});
|
||||
}
|
||||
|
||||
let mut delta_items = truncate_code_mode_result(
|
||||
delta_items,
|
||||
poll_max_output_tokens.unwrap_or(max_output_tokens_per_exec_call),
|
||||
);
|
||||
prepend_script_status(
|
||||
&mut delta_items,
|
||||
if success {
|
||||
CodeModeExecutionStatus::Completed
|
||||
} else {
|
||||
CodeModeExecutionStatus::Failed
|
||||
},
|
||||
started_at.elapsed(),
|
||||
);
|
||||
Ok(CodeModeSessionProgress::Finished(
|
||||
FunctionToolOutput::from_content(delta_items, Some(success)),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn_code_mode_process(
|
||||
node_path: &std::path::Path,
|
||||
) -> Result<CodeModeProcess, std::io::Error> {
|
||||
let mut cmd = tokio::process::Command::new(node_path);
|
||||
cmd.arg("--experimental-vm-modules");
|
||||
cmd.arg("--eval");
|
||||
cmd.arg(CODE_MODE_RUNNER_SOURCE);
|
||||
cmd.current_dir(&exec.turn.cwd);
|
||||
cmd.env_clear();
|
||||
cmd.envs(env);
|
||||
cmd.stdin(std::process::Stdio::piped())
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.kill_on_drop(true);
|
||||
|
||||
let mut child = cmd
|
||||
.spawn()
|
||||
.map_err(|err| format!("failed to start {PUBLIC_TOOL_NAME} Node runtime: {err}"))?;
|
||||
let stdout = child
|
||||
.stdout
|
||||
.take()
|
||||
.ok_or_else(|| format!("{PUBLIC_TOOL_NAME} runner missing stdout"))?;
|
||||
let stderr = child
|
||||
.stderr
|
||||
.take()
|
||||
.ok_or_else(|| format!("{PUBLIC_TOOL_NAME} runner missing stderr"))?;
|
||||
let mut stdin = child
|
||||
let mut child = cmd.spawn().map_err(std::io::Error::other)?;
|
||||
let stdout = child.stdout.take().ok_or_else(|| {
|
||||
std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stdout"))
|
||||
})?;
|
||||
let stderr = child.stderr.take().ok_or_else(|| {
|
||||
std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stderr"))
|
||||
})?;
|
||||
let stdin = child
|
||||
.stdin
|
||||
.take()
|
||||
.ok_or_else(|| format!("{PUBLIC_TOOL_NAME} runner missing stdin"))?;
|
||||
.ok_or_else(|| std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stdin")))?;
|
||||
let stdin = Arc::new(Mutex::new(stdin));
|
||||
let response_waiters = Arc::new(Mutex::new(HashMap::<
|
||||
String,
|
||||
oneshot::Sender<NodeToHostMessage>,
|
||||
>::new()));
|
||||
let (tool_call_tx, tool_call_rx) = mpsc::unbounded_channel();
|
||||
|
||||
let stderr_task = tokio::spawn(async move {
|
||||
tokio::spawn(async move {
|
||||
let mut reader = BufReader::new(stderr);
|
||||
let mut buf = Vec::new();
|
||||
let _ = reader.read_to_end(&mut buf).await;
|
||||
String::from_utf8_lossy(&buf).trim().to_string()
|
||||
match reader.read_to_end(&mut buf).await {
|
||||
Ok(_) => {
|
||||
let stderr = String::from_utf8_lossy(&buf).trim().to_string();
|
||||
if !stderr.is_empty() {
|
||||
warn!("{PUBLIC_TOOL_NAME} runner stderr: {stderr}");
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("failed to read {PUBLIC_TOOL_NAME} stderr: {err}");
|
||||
}
|
||||
}
|
||||
});
|
||||
let stdout_task = tokio::spawn({
|
||||
let response_waiters = Arc::clone(&response_waiters);
|
||||
let tool_call_tx = tool_call_tx.clone();
|
||||
async move {
|
||||
let mut stdout_lines = BufReader::new(stdout).lines();
|
||||
loop {
|
||||
let line = match stdout_lines.next_line().await {
|
||||
Ok(line) => line,
|
||||
Err(err) => {
|
||||
warn!("failed to read {PUBLIC_TOOL_NAME} stdout: {err}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
let Some(line) = line else {
|
||||
break;
|
||||
};
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let message: NodeToHostMessage = match serde_json::from_str(&line) {
|
||||
Ok(message) => message,
|
||||
Err(err) => {
|
||||
warn!("failed to parse {PUBLIC_TOOL_NAME} stdout message: {err}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
match message {
|
||||
NodeToHostMessage::ToolCall { tool_call } => {
|
||||
let _ = tool_call_tx.send(tool_call);
|
||||
}
|
||||
message => {
|
||||
let request_id = message_request_id(&message).to_string();
|
||||
if let Some(waiter) = response_waiters.lock().await.remove(&request_id) {
|
||||
let _ = waiter.send(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
response_waiters.lock().await.clear();
|
||||
}
|
||||
});
|
||||
|
||||
write_message(
|
||||
&mut stdin,
|
||||
&HostToNodeMessage::Init {
|
||||
enabled_tools: enabled_tools.clone(),
|
||||
stored_values,
|
||||
source,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut stdout_lines = BufReader::new(stdout).lines();
|
||||
let mut pending_result = None;
|
||||
while let Some(line) = stdout_lines
|
||||
.next_line()
|
||||
.await
|
||||
.map_err(|err| format!("failed to read {PUBLIC_TOOL_NAME} runner stdout: {err}"))?
|
||||
{
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let message: NodeToHostMessage = serde_json::from_str(&line).map_err(|err| {
|
||||
format!("invalid {PUBLIC_TOOL_NAME} runner message: {err}; line={line}")
|
||||
})?;
|
||||
match message {
|
||||
NodeToHostMessage::ToolCall { id, name, input } => {
|
||||
let response = HostToNodeMessage::Response {
|
||||
id,
|
||||
code_mode_result: call_nested_tool(exec.clone(), name, input).await,
|
||||
};
|
||||
write_message(&mut stdin, &response).await?;
|
||||
}
|
||||
NodeToHostMessage::Result {
|
||||
content_items,
|
||||
stored_values,
|
||||
error_text,
|
||||
max_output_tokens_per_exec_call,
|
||||
} => {
|
||||
exec.session
|
||||
.services
|
||||
.code_mode_store
|
||||
.replace_stored_values(stored_values)
|
||||
.await;
|
||||
pending_result = Some((
|
||||
output_content_items_from_json_values(content_items)?,
|
||||
error_text,
|
||||
max_output_tokens_per_exec_call,
|
||||
));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(stdin);
|
||||
|
||||
let status = child
|
||||
.wait()
|
||||
.await
|
||||
.map_err(|err| format!("failed to wait for {PUBLIC_TOOL_NAME} runner: {err}"))?;
|
||||
let stderr = stderr_task
|
||||
.await
|
||||
.map_err(|err| format!("failed to collect {PUBLIC_TOOL_NAME} stderr: {err}"))?;
|
||||
let wall_time = started_at.elapsed();
|
||||
let success = status.success();
|
||||
|
||||
let Some((mut content_items, error_text, max_output_tokens_per_exec_call)) = pending_result
|
||||
else {
|
||||
let message = if stderr.is_empty() {
|
||||
format!("{PUBLIC_TOOL_NAME} runner exited without returning a result (status {status})")
|
||||
} else {
|
||||
stderr
|
||||
};
|
||||
return Err(message);
|
||||
};
|
||||
|
||||
if !success {
|
||||
let error_text = error_text.unwrap_or_else(|| {
|
||||
if stderr.is_empty() {
|
||||
format!("Process exited with status {status}")
|
||||
} else {
|
||||
stderr
|
||||
}
|
||||
});
|
||||
content_items.push(FunctionCallOutputContentItem::InputText {
|
||||
text: format!("Script error:\n{error_text}"),
|
||||
});
|
||||
}
|
||||
|
||||
let mut content_items =
|
||||
truncate_code_mode_result(content_items, max_output_tokens_per_exec_call);
|
||||
prepend_script_status(&mut content_items, success, wall_time);
|
||||
Ok(FunctionToolOutput::from_content(
|
||||
content_items,
|
||||
Some(success),
|
||||
))
|
||||
Ok(CodeModeProcess {
|
||||
child,
|
||||
stdin,
|
||||
stdout_task,
|
||||
response_waiters,
|
||||
tool_call_rx: Arc::new(Mutex::new(tool_call_rx)),
|
||||
})
|
||||
}
|
||||
|
||||
async fn write_message(
|
||||
stdin: &mut tokio::process::ChildStdin,
|
||||
stdin: &Arc<Mutex<tokio::process::ChildStdin>>,
|
||||
message: &HostToNodeMessage,
|
||||
) -> Result<(), String> {
|
||||
let line = serde_json::to_string(message)
|
||||
.map_err(|err| format!("failed to serialize {PUBLIC_TOOL_NAME} message: {err}"))?;
|
||||
stdin
|
||||
.write_all(line.as_bytes())
|
||||
.await
|
||||
.map_err(|err| format!("failed to write {PUBLIC_TOOL_NAME} message: {err}"))?;
|
||||
stdin
|
||||
.write_all(b"\n")
|
||||
.await
|
||||
.map_err(|err| format!("failed to write {PUBLIC_TOOL_NAME} message newline: {err}"))?;
|
||||
stdin
|
||||
.flush()
|
||||
.await
|
||||
.map_err(|err| format!("failed to flush {PUBLIC_TOOL_NAME} message: {err}"))
|
||||
) -> Result<(), std::io::Error> {
|
||||
let line = serde_json::to_string(message).map_err(std::io::Error::other)?;
|
||||
let mut stdin = stdin.lock().await;
|
||||
stdin.write_all(line.as_bytes()).await?;
|
||||
stdin.write_all(b"\n").await?;
|
||||
stdin.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn message_request_id(message: &NodeToHostMessage) -> &str {
|
||||
match message {
|
||||
NodeToHostMessage::ToolCall { tool_call } => &tool_call.request_id,
|
||||
NodeToHostMessage::Yielded { request_id, .. }
|
||||
| NodeToHostMessage::Terminated { request_id, .. }
|
||||
| NodeToHostMessage::Result { request_id, .. } => request_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn prepend_script_status(
|
||||
content_items: &mut Vec<FunctionCallOutputContentItem>,
|
||||
success: bool,
|
||||
status: CodeModeExecutionStatus,
|
||||
wall_time: Duration,
|
||||
) {
|
||||
let wall_time_seconds = ((wall_time.as_secs_f32()) * 10.0).round() / 10.0;
|
||||
let header = format!(
|
||||
"{}\nWall time {wall_time_seconds:.1} seconds\nOutput:\n",
|
||||
if success {
|
||||
"Script completed"
|
||||
} else {
|
||||
"Script failed"
|
||||
match status {
|
||||
CodeModeExecutionStatus::Completed => "Script completed".to_string(),
|
||||
CodeModeExecutionStatus::Failed => "Script failed".to_string(),
|
||||
CodeModeExecutionStatus::Running(session_id) => {
|
||||
format!("Script running with session ID {session_id}")
|
||||
}
|
||||
CodeModeExecutionStatus::Terminated => "Script terminated".to_string(),
|
||||
}
|
||||
);
|
||||
content_items.insert(0, FunctionCallOutputContentItem::InputText { text: header });
|
||||
@@ -365,7 +743,7 @@ async fn build_enabled_tools(exec: &ExecContext) -> Vec<EnabledTool> {
|
||||
|
||||
fn enabled_tool_from_spec(spec: ToolSpec) -> Option<EnabledTool> {
|
||||
let tool_name = spec.name().to_string();
|
||||
if tool_name == PUBLIC_TOOL_NAME {
|
||||
if tool_name == PUBLIC_TOOL_NAME || tool_name == WAIT_TOOL_NAME {
|
||||
return None;
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,16 +1,35 @@
|
||||
use async_trait::async_trait;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::features::Feature;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::tools::code_mode;
|
||||
use crate::tools::code_mode::DEFAULT_WAIT_YIELD_TIME_MS;
|
||||
use crate::tools::code_mode::PUBLIC_TOOL_NAME;
|
||||
use crate::tools::code_mode::WAIT_TOOL_NAME;
|
||||
use crate::tools::context::FunctionToolOutput;
|
||||
use crate::tools::context::ToolInvocation;
|
||||
use crate::tools::context::ToolPayload;
|
||||
use crate::tools::handlers::parse_arguments;
|
||||
use crate::tools::registry::ToolHandler;
|
||||
use crate::tools::registry::ToolKind;
|
||||
|
||||
pub struct CodeModeHandler;
|
||||
pub struct CodeModeWaitHandler;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ExecWaitArgs {
|
||||
session_id: i32,
|
||||
#[serde(default = "default_wait_yield_time_ms")]
|
||||
yield_time_ms: u64,
|
||||
#[serde(default)]
|
||||
max_tokens: Option<usize>,
|
||||
#[serde(default)]
|
||||
terminate: bool,
|
||||
}
|
||||
|
||||
fn default_wait_yield_time_ms() -> u64 {
|
||||
DEFAULT_WAIT_YIELD_TIME_MS
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ToolHandler for CodeModeHandler {
|
||||
@@ -29,25 +48,57 @@ impl ToolHandler for CodeModeHandler {
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
tool_name,
|
||||
payload,
|
||||
..
|
||||
} = invocation;
|
||||
|
||||
if !session.features().enabled(Feature::CodeMode) {
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"{PUBLIC_TOOL_NAME} is disabled by feature flag"
|
||||
)));
|
||||
}
|
||||
|
||||
let code = match payload {
|
||||
ToolPayload::Custom { input } => input,
|
||||
_ => {
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"{PUBLIC_TOOL_NAME} expects raw JavaScript source text"
|
||||
)));
|
||||
match payload {
|
||||
ToolPayload::Custom { input } if tool_name == PUBLIC_TOOL_NAME => {
|
||||
code_mode::execute(session, turn, tracker, input).await
|
||||
}
|
||||
};
|
||||
|
||||
code_mode::execute(session, turn, tracker, code).await
|
||||
_ => Err(FunctionCallError::RespondToModel(format!(
|
||||
"{PUBLIC_TOOL_NAME} expects raw JavaScript source text"
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ToolHandler for CodeModeWaitHandler {
|
||||
type Output = FunctionToolOutput;
|
||||
|
||||
fn kind(&self) -> ToolKind {
|
||||
ToolKind::Function
|
||||
}
|
||||
|
||||
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
|
||||
let ToolInvocation {
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
tool_name,
|
||||
payload,
|
||||
..
|
||||
} = invocation;
|
||||
|
||||
match payload {
|
||||
ToolPayload::Function { arguments } if tool_name == WAIT_TOOL_NAME => {
|
||||
let args: ExecWaitArgs = parse_arguments(&arguments)?;
|
||||
code_mode::wait(
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
args.session_id,
|
||||
args.yield_time_ms,
|
||||
args.max_tokens,
|
||||
args.terminate,
|
||||
)
|
||||
.await
|
||||
}
|
||||
_ => Err(FunctionCallError::RespondToModel(format!(
|
||||
"{WAIT_TOOL_NAME} expects JSON arguments"
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ use crate::sandboxing::normalize_additional_permissions;
|
||||
pub use apply_patch::ApplyPatchHandler;
|
||||
pub use artifacts::ArtifactsHandler;
|
||||
pub use code_mode::CodeModeHandler;
|
||||
pub use code_mode::CodeModeWaitHandler;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
pub use dynamic::DynamicToolHandler;
|
||||
|
||||
@@ -8,7 +8,9 @@ use crate::features::Features;
|
||||
use crate::mcp_connection_manager::ToolInfo;
|
||||
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use crate::original_image_detail::can_request_original_image_detail;
|
||||
use crate::tools::code_mode::DEFAULT_WAIT_YIELD_TIME_MS;
|
||||
use crate::tools::code_mode::PUBLIC_TOOL_NAME;
|
||||
use crate::tools::code_mode::WAIT_TOOL_NAME;
|
||||
use crate::tools::code_mode_description::augment_tool_spec_for_code_mode;
|
||||
use crate::tools::handlers::PLAN_TOOL;
|
||||
use crate::tools::handlers::TOOL_SEARCH_DEFAULT_LIMIT;
|
||||
@@ -579,6 +581,55 @@ fn create_write_stdin_tool() -> ToolSpec {
|
||||
})
|
||||
}
|
||||
|
||||
fn create_exec_wait_tool() -> ToolSpec {
|
||||
let properties = BTreeMap::from([
|
||||
(
|
||||
"session_id".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some("Identifier of the running exec session.".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
"yield_time_ms".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"How long to wait (in milliseconds) for more output before yielding again."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"max_tokens".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"Maximum number of output tokens to return for this wait call.".to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"terminate".to_string(),
|
||||
JsonSchema::Boolean {
|
||||
description: Some("Whether to terminate the running exec session.".to_string()),
|
||||
},
|
||||
),
|
||||
]);
|
||||
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: WAIT_TOOL_NAME.to_string(),
|
||||
description: format!(
|
||||
"Waits on a yielded `{PUBLIC_TOOL_NAME}` session and returns new output or completion."
|
||||
),
|
||||
strict: false,
|
||||
parameters: JsonSchema::Object {
|
||||
properties,
|
||||
required: Some(vec!["session_id".to_string()]),
|
||||
additional_properties: Some(false.into()),
|
||||
},
|
||||
output_schema: None,
|
||||
defer_loading: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn create_shell_tool(request_permission_enabled: bool) -> ToolSpec {
|
||||
let mut properties = BTreeMap::from([
|
||||
(
|
||||
@@ -1692,7 +1743,7 @@ source: /[\s\S]+/
|
||||
enabled_tool_names.join(", ")
|
||||
};
|
||||
let description = format!(
|
||||
"Runs JavaScript in a Node-backed `node:vm` context. This is a freeform tool: send raw JavaScript source text (no JSON/quotes/markdown fences). Direct tool calls remain available while `{PUBLIC_TOOL_NAME}` is enabled. Inside JavaScript, import nested tools from `tools.js`, for example `import {{ exec_command }} from \"tools.js\"` or `import {{ ALL_TOOLS }} from \"tools.js\"` to inspect the available `{{ module, name, description }}` entries. Namespaced tools are also available from `tools/<namespace...>.js`; MCP tools use `tools/mcp/<server>.js`, for example `import {{ append_notebook_logs_chart }} from \"tools/mcp/ologs.js\"`. Nested tool calls resolve to their code-mode result values. Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, store, load }}` from `\"@openai/code_mode\"` (or `\"openai/code_mode\"`); `output_text(value)` surfaces text back to the model and stringifies non-string objects when possible, `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs, `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, `load(key)` returns a cloned stored value or `undefined`, and `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate the final Rust-side result of the current `{PUBLIC_TOOL_NAME}` execution. The default is `10000`. This guards the overall `{PUBLIC_TOOL_NAME}` output, not individual nested tool invocations. The returned content starts with a separate `Script completed` or `Script failed` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker. Function tools require JSON object arguments. Freeform tools require raw strings. `add_content(value)` remains available for compatibility with a content item, content-item array, or string. Structured nested-tool results should be converted to text first, for example with `JSON.stringify(...)`. Only content passed to `output_text(...)`, `output_image(...)`, or `add_content(value)` is surfaced back to the model. Enabled nested tools: {enabled_list}."
|
||||
"Runs JavaScript in a Node-backed `node:vm` context. This is a freeform tool: send raw JavaScript source text (no JSON/quotes/markdown fences). Direct tool calls remain available while `{PUBLIC_TOOL_NAME}` is enabled. Inside JavaScript, import nested tools from `tools.js`, for example `import {{ exec_command }} from \"tools.js\"` or `import {{ ALL_TOOLS }} from \"tools.js\"` to inspect the available `{{ module, name, description }}` entries. Namespaced tools are also available from `tools/<namespace...>.js`; MCP tools use `tools/mcp/<server>.js`, for example `import {{ append_notebook_logs_chart }} from \"tools/mcp/ologs.js\"`. Nested tool calls resolve to their code-mode result values. Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, set_yield_time, store, load }}` from `\"@openai/code_mode\"` (or `\"openai/code_mode\"`); `output_text(value)` surfaces text back to the model and stringifies non-string objects when possible, `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs, `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, `load(key)` returns a cloned stored value or `undefined`, `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate direct `{PUBLIC_TOOL_NAME}` returns, and `{WAIT_TOOL_NAME}` uses its own `max_tokens` argument with a default of `10000`. `set_yield_time(value)` asks `{PUBLIC_TOOL_NAME}` to return early if the script is still running after that many milliseconds so `{WAIT_TOOL_NAME}` can resume it later. The default wait timeout for `{WAIT_TOOL_NAME}` is {DEFAULT_WAIT_YIELD_TIME_MS}. The returned content starts with a separate `Script completed`, `Script failed`, or `Script running with session ID …` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker. Function tools require JSON object arguments. Freeform tools require raw strings. `add_content(value)` remains available for compatibility with a content item, content-item array, or string. Structured nested-tool results should be converted to text first, for example with `JSON.stringify(...)`. Only content passed to `output_text(...)`, `output_image(...)`, or `add_content(value)` is surfaced back to the model. Enabled nested tools: {enabled_list}."
|
||||
);
|
||||
|
||||
ToolSpec::Freeform(FreeformTool {
|
||||
@@ -1707,7 +1758,9 @@ source: /[\s\S]+/
|
||||
}
|
||||
|
||||
fn is_code_mode_nested_tool(spec: &ToolSpec) -> bool {
|
||||
spec.name() != PUBLIC_TOOL_NAME && matches!(spec, ToolSpec::Function(_) | ToolSpec::Freeform(_))
|
||||
spec.name() != PUBLIC_TOOL_NAME
|
||||
&& spec.name() != WAIT_TOOL_NAME
|
||||
&& matches!(spec, ToolSpec::Function(_) | ToolSpec::Freeform(_))
|
||||
}
|
||||
|
||||
fn create_list_mcp_resources_tool() -> ToolSpec {
|
||||
@@ -2092,6 +2145,7 @@ pub(crate) fn build_specs(
|
||||
use crate::tools::handlers::ApplyPatchHandler;
|
||||
use crate::tools::handlers::ArtifactsHandler;
|
||||
use crate::tools::handlers::CodeModeHandler;
|
||||
use crate::tools::handlers::CodeModeWaitHandler;
|
||||
use crate::tools::handlers::DynamicToolHandler;
|
||||
use crate::tools::handlers::GrepFilesHandler;
|
||||
use crate::tools::handlers::JsReplHandler;
|
||||
@@ -2128,6 +2182,7 @@ pub(crate) fn build_specs(
|
||||
default_mode_request_user_input: config.default_mode_request_user_input,
|
||||
});
|
||||
let code_mode_handler = Arc::new(CodeModeHandler);
|
||||
let code_mode_wait_handler = Arc::new(CodeModeWaitHandler);
|
||||
let js_repl_handler = Arc::new(JsReplHandler);
|
||||
let js_repl_reset_handler = Arc::new(JsReplResetHandler);
|
||||
let artifacts_handler = Arc::new(ArtifactsHandler);
|
||||
@@ -2157,6 +2212,13 @@ pub(crate) fn build_specs(
|
||||
config.code_mode_enabled,
|
||||
);
|
||||
builder.register_handler(PUBLIC_TOOL_NAME, code_mode_handler);
|
||||
push_tool_spec(
|
||||
&mut builder,
|
||||
create_exec_wait_tool(),
|
||||
false,
|
||||
config.code_mode_enabled,
|
||||
);
|
||||
builder.register_handler(WAIT_TOOL_NAME, code_mode_wait_handler);
|
||||
}
|
||||
|
||||
match &config.shell_type {
|
||||
|
||||
@@ -21,6 +21,7 @@ use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use wiremock::MockServer;
|
||||
|
||||
@@ -32,6 +33,16 @@ fn custom_tool_output_items(req: &ResponsesRequest, call_id: &str) -> Vec<Value>
|
||||
.clone()
|
||||
}
|
||||
|
||||
fn function_tool_output_items(req: &ResponsesRequest, call_id: &str) -> Vec<Value> {
|
||||
match req.function_call_output(call_id).get("output") {
|
||||
Some(Value::Array(items)) => items.clone(),
|
||||
Some(Value::String(text)) => {
|
||||
vec![serde_json::json!({ "type": "input_text", "text": text })]
|
||||
}
|
||||
_ => panic!("function tool output should be serialized as text or content items"),
|
||||
}
|
||||
}
|
||||
|
||||
fn text_item(items: &[Value], index: usize) -> &str {
|
||||
items[index]
|
||||
.get("text")
|
||||
@@ -39,6 +50,23 @@ fn text_item(items: &[Value], index: usize) -> &str {
|
||||
.expect("content item should be input_text")
|
||||
}
|
||||
|
||||
fn extract_running_session_id(text: &str) -> i32 {
|
||||
text.strip_prefix("Script running with session ID ")
|
||||
.and_then(|rest| rest.split('\n').next())
|
||||
.expect("running header should contain a session ID")
|
||||
.parse()
|
||||
.expect("session ID should parse as i32")
|
||||
}
|
||||
|
||||
fn wait_for_file_source(path: &Path) -> Result<String> {
|
||||
let quoted_path = shlex::try_join([path.to_string_lossy().as_ref()])?;
|
||||
let command = format!("if [ -f {quoted_path} ]; then printf ready; fi");
|
||||
Ok(format!(
|
||||
r#"while ((await exec_command({{ cmd: {command:?} }})).output !== "ready") {{
|
||||
}}"#
|
||||
))
|
||||
}
|
||||
|
||||
fn custom_tool_output_body_and_success(
|
||||
req: &ResponsesRequest,
|
||||
call_id: &str,
|
||||
@@ -289,6 +317,799 @@ Error:\ boom\n
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_can_yield_and_resume_with_exec_wait() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let phase_2_gate = test.workspace_path("code-mode-phase-2.ready");
|
||||
let phase_3_gate = test.workspace_path("code-mode-phase-3.ready");
|
||||
let phase_2_wait = wait_for_file_source(&phase_2_gate)?;
|
||||
let phase_3_wait = wait_for_file_source(&phase_3_gate)?;
|
||||
|
||||
let code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
output_text("phase 1");
|
||||
set_yield_time(10);
|
||||
{phase_2_wait}
|
||||
output_text("phase 2");
|
||||
{phase_3_wait}
|
||||
output_text("phase 3");
|
||||
"#
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_custom_tool_call("call-1", "exec", &code),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let first_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "waiting"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start the long exec").await?;
|
||||
|
||||
let first_request = first_completion.single_request();
|
||||
let first_items = custom_tool_output_items(&first_request, "call-1");
|
||||
assert_eq!(first_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&first_items, 0),
|
||||
);
|
||||
assert_eq!(text_item(&first_items, 1), "phase 1");
|
||||
let session_id = extract_running_session_id(text_item(&first_items, 0));
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
responses::ev_function_call(
|
||||
"call-2",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"yield_time_ms": 1_000,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let second_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-2", "still waiting"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
fs::write(&phase_2_gate, "ready")?;
|
||||
test.submit_turn("wait again").await?;
|
||||
|
||||
let second_request = second_completion.single_request();
|
||||
let second_items = function_tool_output_items(&second_request, "call-2");
|
||||
assert_eq!(second_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&second_items, 0),
|
||||
);
|
||||
assert_eq!(
|
||||
extract_running_session_id(text_item(&second_items, 0)),
|
||||
session_id
|
||||
);
|
||||
assert_eq!(text_item(&second_items, 1), "phase 2");
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-5"),
|
||||
responses::ev_function_call(
|
||||
"call-3",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"yield_time_ms": 1_000,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-5"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let third_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-3", "done"),
|
||||
ev_completed("resp-6"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
fs::write(&phase_3_gate, "ready")?;
|
||||
test.submit_turn("wait for completion").await?;
|
||||
|
||||
let third_request = third_completion.single_request();
|
||||
let third_items = function_tool_output_items(&third_request, "call-3");
|
||||
assert_eq!(third_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&third_items, 0),
|
||||
);
|
||||
assert_eq!(text_item(&third_items, 1), "phase 3");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_can_run_multiple_yielded_sessions() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let session_a_gate = test.workspace_path("code-mode-session-a.ready");
|
||||
let session_b_gate = test.workspace_path("code-mode-session-b.ready");
|
||||
let session_a_wait = wait_for_file_source(&session_a_gate)?;
|
||||
let session_b_wait = wait_for_file_source(&session_b_gate)?;
|
||||
|
||||
let session_a_code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
output_text("session a start");
|
||||
set_yield_time(10);
|
||||
{session_a_wait}
|
||||
output_text("session a done");
|
||||
"#
|
||||
);
|
||||
let session_b_code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
output_text("session b start");
|
||||
set_yield_time(10);
|
||||
{session_b_wait}
|
||||
output_text("session b done");
|
||||
"#
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_custom_tool_call("call-1", "exec", &session_a_code),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let first_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "session a waiting"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start session a").await?;
|
||||
|
||||
let first_request = first_completion.single_request();
|
||||
let first_items = custom_tool_output_items(&first_request, "call-1");
|
||||
assert_eq!(first_items.len(), 2);
|
||||
let session_a_id = extract_running_session_id(text_item(&first_items, 0));
|
||||
assert_eq!(text_item(&first_items, 1), "session a start");
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
ev_custom_tool_call("call-2", "exec", &session_b_code),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let second_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-2", "session b waiting"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start session b").await?;
|
||||
|
||||
let second_request = second_completion.single_request();
|
||||
let second_items = custom_tool_output_items(&second_request, "call-2");
|
||||
assert_eq!(second_items.len(), 2);
|
||||
let session_b_id = extract_running_session_id(text_item(&second_items, 0));
|
||||
assert_eq!(text_item(&second_items, 1), "session b start");
|
||||
assert_ne!(session_a_id, session_b_id);
|
||||
|
||||
fs::write(&session_a_gate, "ready")?;
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-5"),
|
||||
responses::ev_function_call(
|
||||
"call-3",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_a_id,
|
||||
"yield_time_ms": 1_000,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-5"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let third_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-3", "session a done"),
|
||||
ev_completed("resp-6"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("wait session a").await?;
|
||||
|
||||
let third_request = third_completion.single_request();
|
||||
let third_items = function_tool_output_items(&third_request, "call-3");
|
||||
assert_eq!(third_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&third_items, 0),
|
||||
);
|
||||
assert_eq!(text_item(&third_items, 1), "session a done");
|
||||
|
||||
fs::write(&session_b_gate, "ready")?;
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-7"),
|
||||
responses::ev_function_call(
|
||||
"call-4",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_b_id,
|
||||
"yield_time_ms": 1_000,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-7"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let fourth_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-4", "session b done"),
|
||||
ev_completed("resp-8"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("wait session b").await?;
|
||||
|
||||
let fourth_request = fourth_completion.single_request();
|
||||
let fourth_items = function_tool_output_items(&fourth_request, "call-4");
|
||||
assert_eq!(fourth_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&fourth_items, 0),
|
||||
);
|
||||
assert_eq!(text_item(&fourth_items, 1), "session b done");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_wait_can_terminate_and_continue() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let termination_gate = test.workspace_path("code-mode-terminate.ready");
|
||||
let termination_wait = wait_for_file_source(&termination_gate)?;
|
||||
|
||||
let code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
output_text("phase 1");
|
||||
set_yield_time(10);
|
||||
{termination_wait}
|
||||
output_text("phase 2");
|
||||
"#
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_custom_tool_call("call-1", "exec", &code),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let first_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "waiting"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start the long exec").await?;
|
||||
|
||||
let first_request = first_completion.single_request();
|
||||
let first_items = custom_tool_output_items(&first_request, "call-1");
|
||||
assert_eq!(first_items.len(), 2);
|
||||
let session_id = extract_running_session_id(text_item(&first_items, 0));
|
||||
assert_eq!(text_item(&first_items, 1), "phase 1");
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
responses::ev_function_call(
|
||||
"call-2",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"terminate": true,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let second_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-2", "terminated"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("terminate it").await?;
|
||||
|
||||
let second_request = second_completion.single_request();
|
||||
let second_items = function_tool_output_items(&second_request, "call-2");
|
||||
assert_eq!(second_items.len(), 1);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script terminated\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&second_items, 0),
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-5"),
|
||||
ev_custom_tool_call(
|
||||
"call-3",
|
||||
"exec",
|
||||
r#"
|
||||
import { output_text } from "@openai/code_mode";
|
||||
|
||||
output_text("after terminate");
|
||||
"#,
|
||||
),
|
||||
ev_completed("resp-5"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let third_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-3", "done"),
|
||||
ev_completed("resp-6"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("run another exec").await?;
|
||||
|
||||
let third_request = third_completion.single_request();
|
||||
let third_items = custom_tool_output_items(&third_request, "call-3");
|
||||
assert_eq!(third_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&third_items, 0),
|
||||
);
|
||||
assert_eq!(text_item(&third_items, 1), "after terminate");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_wait_returns_error_for_unknown_session() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
responses::ev_function_call(
|
||||
"call-1",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": 999_999,
|
||||
"yield_time_ms": 1_000,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("wait on an unknown exec session").await?;
|
||||
|
||||
let request = completion.single_request();
|
||||
let (_, success) = request
|
||||
.function_call_output_content_and_success("call-1")
|
||||
.expect("function tool output should be present");
|
||||
assert_ne!(success, Some(true));
|
||||
|
||||
let items = function_tool_output_items(&request, "call-1");
|
||||
assert_eq!(items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script failed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&items, 0),
|
||||
);
|
||||
assert_eq!(
|
||||
text_item(&items, 1),
|
||||
"Script error:\nexec session 999999 not found"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_wait_terminate_returns_completed_session_if_it_finished_in_background()
|
||||
-> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let session_a_gate = test.workspace_path("code-mode-session-a-finished.ready");
|
||||
let session_b_gate = test.workspace_path("code-mode-session-b-blocked.ready");
|
||||
let session_a_wait = wait_for_file_source(&session_a_gate)?;
|
||||
let session_b_wait = wait_for_file_source(&session_b_gate)?;
|
||||
|
||||
let session_a_code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
output_text("session a start");
|
||||
set_yield_time(10);
|
||||
{session_a_wait}
|
||||
output_text("session a done");
|
||||
"#
|
||||
);
|
||||
let session_b_code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
output_text("session b start");
|
||||
set_yield_time(10);
|
||||
{session_b_wait}
|
||||
output_text("session b done");
|
||||
"#
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_custom_tool_call("call-1", "exec", &session_a_code),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let first_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "session a waiting"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start session a").await?;
|
||||
|
||||
let first_request = first_completion.single_request();
|
||||
let first_items = custom_tool_output_items(&first_request, "call-1");
|
||||
assert_eq!(first_items.len(), 2);
|
||||
let session_a_id = extract_running_session_id(text_item(&first_items, 0));
|
||||
assert_eq!(text_item(&first_items, 1), "session a start");
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
ev_custom_tool_call("call-2", "exec", &session_b_code),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let second_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-2", "session b waiting"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start session b").await?;
|
||||
|
||||
let second_request = second_completion.single_request();
|
||||
let second_items = custom_tool_output_items(&second_request, "call-2");
|
||||
assert_eq!(second_items.len(), 2);
|
||||
let session_b_id = extract_running_session_id(text_item(&second_items, 0));
|
||||
assert_eq!(text_item(&second_items, 1), "session b start");
|
||||
|
||||
fs::write(&session_a_gate, "ready")?;
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-5"),
|
||||
responses::ev_function_call(
|
||||
"call-3",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_b_id,
|
||||
"yield_time_ms": 1_000,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-5"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let third_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-3", "session b still waiting"),
|
||||
ev_completed("resp-6"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("wait session b").await?;
|
||||
|
||||
let third_request = third_completion.single_request();
|
||||
let third_items = function_tool_output_items(&third_request, "call-3");
|
||||
assert_eq!(third_items.len(), 1);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&third_items, 0),
|
||||
);
|
||||
assert_eq!(
|
||||
extract_running_session_id(text_item(&third_items, 0)),
|
||||
session_b_id
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-7"),
|
||||
responses::ev_function_call(
|
||||
"call-4",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_a_id,
|
||||
"terminate": true,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-7"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let fourth_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-4", "session a already done"),
|
||||
ev_completed("resp-8"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("terminate session a").await?;
|
||||
|
||||
let fourth_request = fourth_completion.single_request();
|
||||
let fourth_items = function_tool_output_items(&fourth_request, "call-4");
|
||||
assert_eq!(fourth_items.len(), 1);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script terminated\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&fourth_items, 0),
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_wait_uses_its_own_max_tokens_budget() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let completion_gate = test.workspace_path("code-mode-max-tokens.ready");
|
||||
let completion_wait = wait_for_file_source(&completion_gate)?;
|
||||
|
||||
let code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_max_output_tokens_per_exec_call, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
output_text("phase 1");
|
||||
set_max_output_tokens_per_exec_call(100);
|
||||
set_yield_time(10);
|
||||
{completion_wait}
|
||||
output_text("token one token two token three token four token five token six token seven");
|
||||
"#
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_custom_tool_call("call-1", "exec", &code),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let first_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "waiting"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start the long exec").await?;
|
||||
|
||||
let first_request = first_completion.single_request();
|
||||
let first_items = custom_tool_output_items(&first_request, "call-1");
|
||||
assert_eq!(first_items.len(), 2);
|
||||
assert_eq!(text_item(&first_items, 1), "phase 1");
|
||||
let session_id = extract_running_session_id(text_item(&first_items, 0));
|
||||
|
||||
fs::write(&completion_gate, "ready")?;
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
responses::ev_function_call(
|
||||
"call-2",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"yield_time_ms": 1_000,
|
||||
"max_tokens": 6,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let second_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-2", "done"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("wait for completion").await?;
|
||||
|
||||
let second_request = second_completion.single_request();
|
||||
let second_items = function_tool_output_items(&second_request, "call-2");
|
||||
assert_eq!(second_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&second_items, 0),
|
||||
);
|
||||
let expected_pattern = r#"(?sx)
|
||||
\A
|
||||
Total\ output\ lines:\ 1\n
|
||||
\n
|
||||
.*…\d+\ tokens\ truncated….*
|
||||
\z
|
||||
"#;
|
||||
assert_regex_match(expected_pattern, text_item(&second_items, 1));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_can_output_serialized_text_via_openai_code_mode_module() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
Reference in New Issue
Block a user