Support waiting for code_mode sessions (#14295)

## Summary
- persist the code mode runner process in the session-scoped code mode
store
- switch the runner protocol from `init` to `start` with explicit
session ids
- handle runner-side session processing without the init waiter queue

## Validation
- just fmt
- cargo check -p codex-core
- node --check codex-rs/core/src/tools/code_mode_runner.cjs
This commit is contained in:
pakrym-oai
2026-03-11 23:13:54 -07:00
committed by GitHub
parent 367a8a2210
commit f6c6128fc7
9 changed files with 2059 additions and 497 deletions

View File

@@ -21,6 +21,7 @@ use pretty_assertions::assert_eq;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::time::Duration;
use wiremock::MockServer;
@@ -32,6 +33,16 @@ fn custom_tool_output_items(req: &ResponsesRequest, call_id: &str) -> Vec<Value>
.clone()
}
fn function_tool_output_items(req: &ResponsesRequest, call_id: &str) -> Vec<Value> {
match req.function_call_output(call_id).get("output") {
Some(Value::Array(items)) => items.clone(),
Some(Value::String(text)) => {
vec![serde_json::json!({ "type": "input_text", "text": text })]
}
_ => panic!("function tool output should be serialized as text or content items"),
}
}
fn text_item(items: &[Value], index: usize) -> &str {
items[index]
.get("text")
@@ -39,6 +50,23 @@ fn text_item(items: &[Value], index: usize) -> &str {
.expect("content item should be input_text")
}
fn extract_running_session_id(text: &str) -> i32 {
text.strip_prefix("Script running with session ID ")
.and_then(|rest| rest.split('\n').next())
.expect("running header should contain a session ID")
.parse()
.expect("session ID should parse as i32")
}
fn wait_for_file_source(path: &Path) -> Result<String> {
let quoted_path = shlex::try_join([path.to_string_lossy().as_ref()])?;
let command = format!("if [ -f {quoted_path} ]; then printf ready; fi");
Ok(format!(
r#"while ((await exec_command({{ cmd: {command:?} }})).output !== "ready") {{
}}"#
))
}
fn custom_tool_output_body_and_success(
req: &ResponsesRequest,
call_id: &str,
@@ -289,6 +317,799 @@ Error:\ boom\n
Ok(())
}
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_can_yield_and_resume_with_exec_wait() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
let phase_2_gate = test.workspace_path("code-mode-phase-2.ready");
let phase_3_gate = test.workspace_path("code-mode-phase-3.ready");
let phase_2_wait = wait_for_file_source(&phase_2_gate)?;
let phase_3_wait = wait_for_file_source(&phase_3_gate)?;
let code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("phase 1");
set_yield_time(10);
{phase_2_wait}
output_text("phase 2");
{phase_3_wait}
output_text("phase 3");
"#
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call("call-1", "exec", &code),
ev_completed("resp-1"),
]),
)
.await;
let first_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "waiting"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("start the long exec").await?;
let first_request = first_completion.single_request();
let first_items = custom_tool_output_items(&first_request, "call-1");
assert_eq!(first_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&first_items, 0),
);
assert_eq!(text_item(&first_items, 1), "phase 1");
let session_id = extract_running_session_id(text_item(&first_items, 0));
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-3"),
responses::ev_function_call(
"call-2",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_id,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-3"),
]),
)
.await;
let second_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-2", "still waiting"),
ev_completed("resp-4"),
]),
)
.await;
fs::write(&phase_2_gate, "ready")?;
test.submit_turn("wait again").await?;
let second_request = second_completion.single_request();
let second_items = function_tool_output_items(&second_request, "call-2");
assert_eq!(second_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&second_items, 0),
);
assert_eq!(
extract_running_session_id(text_item(&second_items, 0)),
session_id
);
assert_eq!(text_item(&second_items, 1), "phase 2");
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-5"),
responses::ev_function_call(
"call-3",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_id,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-5"),
]),
)
.await;
let third_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-3", "done"),
ev_completed("resp-6"),
]),
)
.await;
fs::write(&phase_3_gate, "ready")?;
test.submit_turn("wait for completion").await?;
let third_request = third_completion.single_request();
let third_items = function_tool_output_items(&third_request, "call-3");
assert_eq!(third_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&third_items, 0),
);
assert_eq!(text_item(&third_items, 1), "phase 3");
Ok(())
}
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_can_run_multiple_yielded_sessions() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
let session_a_gate = test.workspace_path("code-mode-session-a.ready");
let session_b_gate = test.workspace_path("code-mode-session-b.ready");
let session_a_wait = wait_for_file_source(&session_a_gate)?;
let session_b_wait = wait_for_file_source(&session_b_gate)?;
let session_a_code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("session a start");
set_yield_time(10);
{session_a_wait}
output_text("session a done");
"#
);
let session_b_code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("session b start");
set_yield_time(10);
{session_b_wait}
output_text("session b done");
"#
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call("call-1", "exec", &session_a_code),
ev_completed("resp-1"),
]),
)
.await;
let first_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "session a waiting"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("start session a").await?;
let first_request = first_completion.single_request();
let first_items = custom_tool_output_items(&first_request, "call-1");
assert_eq!(first_items.len(), 2);
let session_a_id = extract_running_session_id(text_item(&first_items, 0));
assert_eq!(text_item(&first_items, 1), "session a start");
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-3"),
ev_custom_tool_call("call-2", "exec", &session_b_code),
ev_completed("resp-3"),
]),
)
.await;
let second_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-2", "session b waiting"),
ev_completed("resp-4"),
]),
)
.await;
test.submit_turn("start session b").await?;
let second_request = second_completion.single_request();
let second_items = custom_tool_output_items(&second_request, "call-2");
assert_eq!(second_items.len(), 2);
let session_b_id = extract_running_session_id(text_item(&second_items, 0));
assert_eq!(text_item(&second_items, 1), "session b start");
assert_ne!(session_a_id, session_b_id);
fs::write(&session_a_gate, "ready")?;
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-5"),
responses::ev_function_call(
"call-3",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_a_id,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-5"),
]),
)
.await;
let third_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-3", "session a done"),
ev_completed("resp-6"),
]),
)
.await;
test.submit_turn("wait session a").await?;
let third_request = third_completion.single_request();
let third_items = function_tool_output_items(&third_request, "call-3");
assert_eq!(third_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&third_items, 0),
);
assert_eq!(text_item(&third_items, 1), "session a done");
fs::write(&session_b_gate, "ready")?;
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-7"),
responses::ev_function_call(
"call-4",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_b_id,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-7"),
]),
)
.await;
let fourth_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-4", "session b done"),
ev_completed("resp-8"),
]),
)
.await;
test.submit_turn("wait session b").await?;
let fourth_request = fourth_completion.single_request();
let fourth_items = function_tool_output_items(&fourth_request, "call-4");
assert_eq!(fourth_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&fourth_items, 0),
);
assert_eq!(text_item(&fourth_items, 1), "session b done");
Ok(())
}
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_can_terminate_and_continue() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
let termination_gate = test.workspace_path("code-mode-terminate.ready");
let termination_wait = wait_for_file_source(&termination_gate)?;
let code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("phase 1");
set_yield_time(10);
{termination_wait}
output_text("phase 2");
"#
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call("call-1", "exec", &code),
ev_completed("resp-1"),
]),
)
.await;
let first_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "waiting"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("start the long exec").await?;
let first_request = first_completion.single_request();
let first_items = custom_tool_output_items(&first_request, "call-1");
assert_eq!(first_items.len(), 2);
let session_id = extract_running_session_id(text_item(&first_items, 0));
assert_eq!(text_item(&first_items, 1), "phase 1");
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-3"),
responses::ev_function_call(
"call-2",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_id,
"terminate": true,
}))?,
),
ev_completed("resp-3"),
]),
)
.await;
let second_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-2", "terminated"),
ev_completed("resp-4"),
]),
)
.await;
test.submit_turn("terminate it").await?;
let second_request = second_completion.single_request();
let second_items = function_tool_output_items(&second_request, "call-2");
assert_eq!(second_items.len(), 1);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script terminated\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&second_items, 0),
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-5"),
ev_custom_tool_call(
"call-3",
"exec",
r#"
import { output_text } from "@openai/code_mode";
output_text("after terminate");
"#,
),
ev_completed("resp-5"),
]),
)
.await;
let third_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-3", "done"),
ev_completed("resp-6"),
]),
)
.await;
test.submit_turn("run another exec").await?;
let third_request = third_completion.single_request();
let third_items = custom_tool_output_items(&third_request, "call-3");
assert_eq!(third_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&third_items, 0),
);
assert_eq!(text_item(&third_items, 1), "after terminate");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_returns_error_for_unknown_session() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
responses::ev_function_call(
"call-1",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": 999_999,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-1"),
]),
)
.await;
let completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("wait on an unknown exec session").await?;
let request = completion.single_request();
let (_, success) = request
.function_call_output_content_and_success("call-1")
.expect("function tool output should be present");
assert_ne!(success, Some(true));
let items = function_tool_output_items(&request, "call-1");
assert_eq!(items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script failed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&items, 0),
);
assert_eq!(
text_item(&items, 1),
"Script error:\nexec session 999999 not found"
);
Ok(())
}
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_terminate_returns_completed_session_if_it_finished_in_background()
-> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
let session_a_gate = test.workspace_path("code-mode-session-a-finished.ready");
let session_b_gate = test.workspace_path("code-mode-session-b-blocked.ready");
let session_a_wait = wait_for_file_source(&session_a_gate)?;
let session_b_wait = wait_for_file_source(&session_b_gate)?;
let session_a_code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("session a start");
set_yield_time(10);
{session_a_wait}
output_text("session a done");
"#
);
let session_b_code = format!(
r#"
import {{ output_text, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("session b start");
set_yield_time(10);
{session_b_wait}
output_text("session b done");
"#
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call("call-1", "exec", &session_a_code),
ev_completed("resp-1"),
]),
)
.await;
let first_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "session a waiting"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("start session a").await?;
let first_request = first_completion.single_request();
let first_items = custom_tool_output_items(&first_request, "call-1");
assert_eq!(first_items.len(), 2);
let session_a_id = extract_running_session_id(text_item(&first_items, 0));
assert_eq!(text_item(&first_items, 1), "session a start");
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-3"),
ev_custom_tool_call("call-2", "exec", &session_b_code),
ev_completed("resp-3"),
]),
)
.await;
let second_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-2", "session b waiting"),
ev_completed("resp-4"),
]),
)
.await;
test.submit_turn("start session b").await?;
let second_request = second_completion.single_request();
let second_items = custom_tool_output_items(&second_request, "call-2");
assert_eq!(second_items.len(), 2);
let session_b_id = extract_running_session_id(text_item(&second_items, 0));
assert_eq!(text_item(&second_items, 1), "session b start");
fs::write(&session_a_gate, "ready")?;
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-5"),
responses::ev_function_call(
"call-3",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_b_id,
"yield_time_ms": 1_000,
}))?,
),
ev_completed("resp-5"),
]),
)
.await;
let third_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-3", "session b still waiting"),
ev_completed("resp-6"),
]),
)
.await;
test.submit_turn("wait session b").await?;
let third_request = third_completion.single_request();
let third_items = function_tool_output_items(&third_request, "call-3");
assert_eq!(third_items.len(), 1);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&third_items, 0),
);
assert_eq!(
extract_running_session_id(text_item(&third_items, 0)),
session_b_id
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-7"),
responses::ev_function_call(
"call-4",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_a_id,
"terminate": true,
}))?,
),
ev_completed("resp-7"),
]),
)
.await;
let fourth_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-4", "session a already done"),
ev_completed("resp-8"),
]),
)
.await;
test.submit_turn("terminate session a").await?;
let fourth_request = fourth_completion.single_request();
let fourth_items = function_tool_output_items(&fourth_request, "call-4");
assert_eq!(fourth_items.len(), 1);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script terminated\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&fourth_items, 0),
);
Ok(())
}
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exec_wait_uses_its_own_max_tokens_budget() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_config(move |config| {
let _ = config.features.enable(Feature::CodeMode);
});
let test = builder.build(&server).await?;
let completion_gate = test.workspace_path("code-mode-max-tokens.ready");
let completion_wait = wait_for_file_source(&completion_gate)?;
let code = format!(
r#"
import {{ output_text, set_max_output_tokens_per_exec_call, set_yield_time }} from "@openai/code_mode";
import {{ exec_command }} from "tools.js";
output_text("phase 1");
set_max_output_tokens_per_exec_call(100);
set_yield_time(10);
{completion_wait}
output_text("token one token two token three token four token five token six token seven");
"#
);
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call("call-1", "exec", &code),
ev_completed("resp-1"),
]),
)
.await;
let first_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-1", "waiting"),
ev_completed("resp-2"),
]),
)
.await;
test.submit_turn("start the long exec").await?;
let first_request = first_completion.single_request();
let first_items = custom_tool_output_items(&first_request, "call-1");
assert_eq!(first_items.len(), 2);
assert_eq!(text_item(&first_items, 1), "phase 1");
let session_id = extract_running_session_id(text_item(&first_items, 0));
fs::write(&completion_gate, "ready")?;
responses::mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-3"),
responses::ev_function_call(
"call-2",
"exec_wait",
&serde_json::to_string(&serde_json::json!({
"session_id": session_id,
"yield_time_ms": 1_000,
"max_tokens": 6,
}))?,
),
ev_completed("resp-3"),
]),
)
.await;
let second_completion = responses::mount_sse_once(
&server,
sse(vec![
ev_assistant_message("msg-2", "done"),
ev_completed("resp-4"),
]),
)
.await;
test.submit_turn("wait for completion").await?;
let second_request = second_completion.single_request();
let second_items = function_tool_output_items(&second_request, "call-2");
assert_eq!(second_items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&second_items, 0),
);
let expected_pattern = r#"(?sx)
\A
Total\ output\ lines:\ 1\n
\n
.*…\d+\ tokens\ truncated….*
\z
"#;
assert_regex_match(expected_pattern, text_item(&second_items, 1));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_can_output_serialized_text_via_openai_code_mode_module() -> Result<()> {
skip_if_no_network!(Ok(()));