Compare commits

...

1 Commits

Author SHA1 Message Date
jif-oai
9b06d83279 V1 2025-10-18 23:18:51 +02:00
9 changed files with 1079 additions and 899 deletions

View File

@@ -27,18 +27,35 @@ pub(crate) struct ExecCommandSession {
/// Tracks whether the underlying process has exited.
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
/// Captures the process exit code once the child terminates.
exit_code: std::sync::Arc<StdMutex<Option<i32>>>,
}
#[derive(Debug)]
pub(crate) struct ExecCommandSessionParams {
pub writer_tx: mpsc::Sender<Vec<u8>>,
pub output_tx: broadcast::Sender<Vec<u8>>,
pub killer: Box<dyn portable_pty::ChildKiller + Send + Sync>,
pub reader_handle: JoinHandle<()>,
pub writer_handle: JoinHandle<()>,
pub wait_handle: JoinHandle<()>,
pub exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
pub exit_code: std::sync::Arc<StdMutex<Option<i32>>>,
}
impl ExecCommandSession {
pub(crate) fn new(
writer_tx: mpsc::Sender<Vec<u8>>,
output_tx: broadcast::Sender<Vec<u8>>,
killer: Box<dyn portable_pty::ChildKiller + Send + Sync>,
reader_handle: JoinHandle<()>,
writer_handle: JoinHandle<()>,
wait_handle: JoinHandle<()>,
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> (Self, broadcast::Receiver<Vec<u8>>) {
pub(crate) fn new(params: ExecCommandSessionParams) -> (Self, broadcast::Receiver<Vec<u8>>) {
let ExecCommandSessionParams {
writer_tx,
output_tx,
killer,
reader_handle,
writer_handle,
wait_handle,
exit_status,
exit_code,
} = params;
let initial_output_rx = output_tx.subscribe();
(
Self {
@@ -49,6 +66,7 @@ impl ExecCommandSession {
writer_handle: StdMutex::new(Some(writer_handle)),
wait_handle: StdMutex::new(Some(wait_handle)),
exit_status,
exit_code,
},
initial_output_rx,
)
@@ -65,6 +83,10 @@ impl ExecCommandSession {
pub(crate) fn has_exited(&self) -> bool {
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
}
pub(crate) fn exit_code(&self) -> Option<i32> {
self.exit_code.lock().ok().and_then(|guard| *guard)
}
}
impl Drop for ExecCommandSession {

View File

@@ -7,6 +7,7 @@ mod session_manager;
pub use exec_command_params::ExecCommandParams;
pub use exec_command_params::WriteStdinParams;
pub(crate) use exec_command_session::ExecCommandSession;
pub(crate) use exec_command_session::ExecCommandSessionParams;
pub use responses_api::EXEC_COMMAND_TOOL_NAME;
pub use responses_api::WRITE_STDIN_TOOL_NAME;
pub use responses_api::create_exec_command_tool_for_responses_api;

View File

@@ -19,6 +19,7 @@ use tokio::time::timeout;
use crate::exec_command::exec_command_params::ExecCommandParams;
use crate::exec_command::exec_command_params::WriteStdinParams;
use crate::exec_command::exec_command_session::ExecCommandSession;
use crate::exec_command::exec_command_session::ExecCommandSessionParams;
use crate::exec_command::session_id::SessionId;
use crate::truncate::truncate_middle;
@@ -318,17 +319,22 @@ async fn create_exec_command_session(
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
let exit_status = Arc::new(AtomicBool::new(false));
let wait_exit_status = exit_status.clone();
let exit_code = Arc::new(StdMutex::new(None));
let exit_code_for_wait = exit_code.clone();
let wait_handle = tokio::task::spawn_blocking(move || {
let code = match child.wait() {
Ok(status) => status.exit_code() as i32,
Err(_) => -1,
};
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
if let Ok(mut guard) = exit_code_for_wait.lock() {
*guard = Some(code);
}
let _ = exit_tx.send(code);
});
// Create and store the session with channels.
let (session, initial_output_rx) = ExecCommandSession::new(
let (session, initial_output_rx) = ExecCommandSession::new(ExecCommandSessionParams {
writer_tx,
output_tx,
killer,
@@ -336,7 +342,8 @@ async fn create_exec_command_session(
writer_handle,
wait_handle,
exit_status,
);
exit_code,
});
Ok((session, initial_output_rx, exit_rx))
}

View File

@@ -42,6 +42,11 @@ pub mod token_data;
mod truncate;
mod unified_exec;
mod user_instructions;
pub use crate::unified_exec::UnifiedExecError;
pub use crate::unified_exec::UnifiedExecMode;
pub use crate::unified_exec::UnifiedExecRequest;
pub use crate::unified_exec::UnifiedExecResult;
pub use crate::unified_exec::UnifiedExecSessionManager;
pub use model_provider_info::BUILT_IN_OSS_MODEL_PROVIDER_ID;
pub use model_provider_info::ModelProviderInfo;
pub use model_provider_info::WireApi;

View File

@@ -1,5 +1,8 @@
use async_trait::async_trait;
use serde::Deserialize;
use serde_json::Value as JsonValue;
use std::borrow::Cow;
use std::convert::TryFrom;
use crate::function_tool::FunctionCallError;
use crate::tools::context::ToolInvocation;
@@ -7,17 +10,37 @@ use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use crate::unified_exec::UnifiedExecError;
use crate::unified_exec::UnifiedExecMode;
use crate::unified_exec::UnifiedExecRequest;
use crate::unified_exec::UnifiedExecResult;
pub struct UnifiedExecHandler;
#[derive(Deserialize)]
struct UnifiedExecArgs {
input: Vec<String>,
#[serde(default)]
session_id: Option<String>,
cmd: Option<String>,
#[serde(default)]
timeout_ms: Option<u64>,
session_id: Option<JsonValue>,
#[serde(default)]
chars: Option<String>,
#[serde(default)]
yield_time_ms: Option<u64>,
#[serde(default)]
max_output_tokens: Option<usize>,
#[serde(default)]
output_chunk_id: Option<bool>,
#[serde(default)]
output_wall_time: Option<bool>,
#[serde(default)]
output_json: Option<bool>,
#[serde(default)]
shell: Option<String>,
#[serde(default)]
login: Option<bool>,
#[serde(default)]
cwd: Option<String>,
}
#[async_trait]
@@ -54,56 +77,114 @@ impl ToolHandler for UnifiedExecHandler {
};
let UnifiedExecArgs {
input,
cmd,
session_id,
timeout_ms,
chars,
yield_time_ms,
max_output_tokens,
output_chunk_id,
output_wall_time,
output_json,
shell,
login,
cwd,
} = args;
let parsed_session_id = if let Some(session_id) = session_id {
match session_id.parse::<i32>() {
Ok(parsed) => Some(parsed),
Err(output) => {
return Err(FunctionCallError::RespondToModel(format!(
"invalid session_id: {session_id} due to error {output:?}"
)));
}
let chars = chars.unwrap_or_default();
let mode = if let Some(raw_session_id) = session_id {
if cmd.is_some() {
return Err(FunctionCallError::RespondToModel(
"provide either cmd or session_id, not both".to_string(),
));
}
let session_id = parse_session_id(raw_session_id)?;
UnifiedExecMode::Write {
session_id,
chars: chars.as_str(),
yield_time_ms,
max_output_tokens,
}
} else {
None
let cmd_value = cmd.ok_or_else(|| {
FunctionCallError::RespondToModel(
"cmd is required when session_id is not provided".to_string(),
)
})?;
UnifiedExecMode::Start {
cmd: Cow::Owned(cmd_value),
yield_time_ms,
max_output_tokens,
shell: shell.as_deref(),
login,
cwd: cwd.as_deref(),
}
};
let request = UnifiedExecRequest {
session_id: parsed_session_id,
input_chunks: &input,
timeout_ms,
mode,
output_chunk_id,
output_wall_time,
output_json,
};
let value = session
let result = session
.run_unified_exec_request(request)
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!("unified exec failed: {err:?}"))
})?;
.map_err(map_unified_exec_error)?;
#[derive(serde::Serialize)]
struct SerializedUnifiedExecResult {
session_id: Option<String>,
output: String,
}
let content = serde_json::to_string(&SerializedUnifiedExecResult {
session_id: value.session_id.map(|id| id.to_string()),
output: value.output,
})
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to serialize unified exec output: {err:?}"
))
})?;
Ok(ToolOutput::Function {
content,
success: Some(true),
})
Ok(tool_output_from_result(result))
}
}
fn tool_output_from_result(result: UnifiedExecResult) -> ToolOutput {
let content = result.content.into_string();
ToolOutput::Function {
content,
success: Some(true),
}
}
fn parse_session_id(value: JsonValue) -> Result<i32, FunctionCallError> {
match value {
JsonValue::Number(num) => {
if let Some(int) = num.as_i64() {
i32::try_from(int).map_err(|_| {
FunctionCallError::RespondToModel(format!(
"session_id value {int} exceeds i32 range"
))
})
} else {
Err(FunctionCallError::RespondToModel(
"session_id must be an integer".to_string(),
))
}
}
JsonValue::String(text) => text.parse::<i32>().map_err(|err| {
FunctionCallError::RespondToModel(format!("invalid session_id '{text}': {err}"))
}),
other => Err(FunctionCallError::RespondToModel(format!(
"session_id must be a string or integer, got {other}"
))),
}
}
fn map_unified_exec_error(err: UnifiedExecError) -> FunctionCallError {
match err {
UnifiedExecError::SessionExited {
session_id,
exit_code,
} => {
let detail = exit_code
.map(|code| format!(" with code {code}"))
.unwrap_or_default();
FunctionCallError::RespondToModel(format!(
"session {session_id} has already exited{detail}. Start a new session with cmd."
))
}
UnifiedExecError::WriteToStdin { session_id } => FunctionCallError::RespondToModel(
format!("failed to write to session {session_id}; the process may have exited"),
),
other => FunctionCallError::RespondToModel(format!("unified exec failed: {other}")),
}
}

View File

@@ -142,13 +142,10 @@ impl From<JsonSchema> for AdditionalProperties {
fn create_unified_exec_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
"input".to_string(),
JsonSchema::Array {
items: Box::new(JsonSchema::String { description: None }),
"cmd".to_string(),
JsonSchema::String {
description: Some(
"When no session_id is provided, treat the array as the command and arguments \
to launch. When session_id is set, concatenate the strings (in order) and write \
them to the session's stdin."
"Command to execute when starting a new session. Mutually exclusive with session_id."
.to_string(),
),
},
@@ -157,30 +154,87 @@ fn create_unified_exec_tool() -> ToolSpec {
"session_id".to_string(),
JsonSchema::String {
description: Some(
"Identifier for an existing interactive session. If omitted, a new command \
is spawned."
"Identifier for an existing interactive session. When provided, cmd must be omitted and chars will be written to the session's stdin."
.to_string(),
),
},
);
properties.insert(
"timeout_ms".to_string(),
JsonSchema::Number {
"chars".to_string(),
JsonSchema::String {
description: Some(
"Maximum time in milliseconds to wait for output after writing the input."
"Characters to write to an interactive session. Use an empty string to poll for output without sending input."
.to_string(),
),
},
);
properties.insert(
"yield_time_ms".to_string(),
JsonSchema::Number {
description: Some(
"Milliseconds to wait for output before returning (clamped between 250 and 30_000)."
.to_string(),
),
},
);
properties.insert(
"max_output_tokens".to_string(),
JsonSchema::Number {
description: Some(
"Approximate maximum number of output tokens before truncation.".to_string(),
),
},
);
properties.insert(
"output_chunk_id".to_string(),
JsonSchema::Boolean {
description: Some("Whether to include a chunk identifier in responses.".to_string()),
},
);
properties.insert(
"output_wall_time".to_string(),
JsonSchema::Boolean {
description: Some(
"Whether to include wall-clock timing metadata in responses.".to_string(),
),
},
);
properties.insert(
"output_json".to_string(),
JsonSchema::Boolean {
description: Some(
"Return structured JSON instead of formatted text when true.".to_string(),
),
},
);
properties.insert(
"shell".to_string(),
JsonSchema::String {
description: Some("Override the shell executable (default /bin/bash).".to_string()),
},
);
properties.insert(
"login".to_string(),
JsonSchema::Boolean {
description: Some(
"Whether to start the shell as a login shell (default true).".to_string(),
),
},
);
properties.insert(
"cwd".to_string(),
JsonSchema::String {
description: Some("Working directory for the command.".to_string()),
},
);
ToolSpec::Function(ResponsesApiTool {
name: "unified_exec".to_string(),
description:
"Runs a command in a PTY. Provide a session_id to reuse an existing interactive session.".to_string(),
description: "Runs commands in a PTY and streams output, supporting interactive sessions with stdin writes.".to_string(),
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["input".to_string()]),
required: None,
additional_properties: Some(false.into()),
},
})

View File

@@ -1,7 +1,7 @@
use thiserror::Error;
#[derive(Debug, Error)]
pub(crate) enum UnifiedExecError {
pub enum UnifiedExecError {
#[error("Failed to create unified exec session: {pty_error}")]
CreateSession {
#[source]
@@ -9,10 +9,19 @@ pub(crate) enum UnifiedExecError {
},
#[error("Unknown session id {session_id}")]
UnknownSessionId { session_id: i32 },
#[error("failed to write to stdin")]
WriteToStdin,
#[error("failed to write to stdin for session {session_id}")]
WriteToStdin { session_id: i32 },
#[error("missing command line for unified exec request")]
MissingCommandLine,
#[error("spawned process did not report a process id")]
MissingProcessId,
#[error("spawned process id {process_id} does not fit in i32")]
ProcessIdOverflow { process_id: u32 },
#[error("session {session_id} has already exited")]
SessionExited {
session_id: i32,
exit_code: Option<i32>,
},
}
impl UnifiedExecError {

File diff suppressed because it is too large Load Diff

View File

@@ -1,409 +1,286 @@
#![cfg(not(target_os = "windows"))]
use std::collections::HashMap;
use anyhow::Result;
use codex_core::features::Feature;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_protocol::config_types::ReasoningSummary;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use codex_core::UnifiedExecMode;
use codex_core::UnifiedExecRequest;
use codex_core::UnifiedExecSessionManager;
#[cfg(unix)]
use core_test_support::skip_if_sandbox;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use serde_json::Value;
use tokio::time::Duration;
fn extract_output_text(item: &Value) -> Option<&str> {
item.get("output").and_then(|value| match value {
Value::String(text) => Some(text.as_str()),
Value::Object(obj) => obj.get("content").and_then(Value::as_str),
_ => None,
})
}
fn collect_tool_outputs(bodies: &[Value]) -> Result<HashMap<String, Value>> {
let mut outputs = HashMap::new();
for body in bodies {
if let Some(items) = body.get("input").and_then(Value::as_array) {
for item in items {
if item.get("type").and_then(Value::as_str) != Some("function_call_output") {
continue;
}
if let Some(call_id) = item.get("call_id").and_then(Value::as_str) {
let content = extract_output_text(item)
.ok_or_else(|| anyhow::anyhow!("missing tool output content"))?;
let trimmed = content.trim();
if trimmed.is_empty() {
continue;
}
let parsed: Value = serde_json::from_str(trimmed).map_err(|err| {
anyhow::anyhow!("failed to parse tool output content {trimmed:?}: {err}")
})?;
outputs.insert(call_id.to_string(), parsed);
}
}
}
}
Ok(outputs)
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
async fn unified_exec_manager_supports_interactive_cat() -> Result<()> {
skip_if_sandbox!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::UnifiedExec);
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let first_call_id = "uexec-start";
let first_args = serde_json::json!({
"input": ["/bin/cat"],
"timeout_ms": 200,
});
let second_call_id = "uexec-stdin";
let second_args = serde_json::json!({
"input": ["hello unified exec\n"],
"session_id": "0",
"timeout_ms": 500,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
first_call_id,
"unified_exec",
&serde_json::to_string(&first_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
second_call_id,
"unified_exec",
&serde_json::to_string(&second_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_assistant_message("msg-1", "all done"),
ev_completed("resp-3"),
]),
];
mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![InputItem::Text {
text: "run unified exec".into(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
let manager = UnifiedExecSessionManager::default();
let result = manager
.handle_request(UnifiedExecRequest {
mode: UnifiedExecMode::Start {
cmd: std::borrow::Cow::Borrowed("cat"),
yield_time_ms: Some(200),
max_output_tokens: Some(1_000),
shell: Some("/bin/sh"),
login: Some(false),
cwd: None,
},
output_chunk_id: Some(true),
output_wall_time: Some(true),
output_json: Some(false),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let session_id = result.metadata.session_id.expect("expected session id");
let poll = manager
.handle_request(UnifiedExecRequest {
mode: UnifiedExecMode::Write {
session_id,
chars: "hello unified exec\n",
yield_time_ms: Some(500),
max_output_tokens: Some(1_000),
},
output_chunk_id: Some(false),
output_wall_time: Some(false),
output_json: Some(true),
})
.await?;
let requests = server.received_requests().await.expect("recorded requests");
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.iter()
.map(|req| req.body_json::<Value>().expect("request json"))
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let start_output = outputs
.get(first_call_id)
.expect("missing first unified_exec output");
let session_id = start_output["session_id"].as_str().unwrap_or_default();
assert!(
!session_id.is_empty(),
"expected session id in first unified_exec response"
);
assert!(
start_output["output"]
.as_str()
.unwrap_or_default()
.is_empty()
);
let reuse_output = outputs
.get(second_call_id)
.expect("missing reused unified_exec output");
assert_eq!(
reuse_output["session_id"].as_str().unwrap_or_default(),
session_id
);
let echoed = reuse_output["output"].as_str().unwrap_or_default();
assert!(
echoed.contains("hello unified exec"),
"expected echoed output, got {echoed:?}"
);
let output = poll.content.into_string();
assert!(output.contains("hello unified exec"));
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_streams_after_lagged_output() -> Result<()> {
skip_if_no_network!(Ok(()));
async fn unified_exec_manager_streams_large_output() -> Result<()> {
skip_if_sandbox!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config.features.enable(Feature::UnifiedExec);
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let manager = UnifiedExecSessionManager::default();
let script = r#"python3 - <<'PY'
import sys
import time
chunk = b'x' * (1 << 20)
for _ in range(4):
sys.stdout.buffer.write(chunk)
sys.stdout.flush()
time.sleep(0.2)
for _ in range(5):
for _ in range(3):
sys.stdout.write("TAIL-MARKER\n")
sys.stdout.flush()
time.sleep(0.05)
time.sleep(0.2)
PY
"#;
let first_call_id = "uexec-lag-start";
let first_args = serde_json::json!({
"input": ["/bin/sh", "-c", script],
"timeout_ms": 25,
});
let second_call_id = "uexec-lag-poll";
let second_args = serde_json::json!({
"input": Vec::<String>::new(),
"session_id": "0",
"timeout_ms": 2_000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
first_call_id,
"unified_exec",
&serde_json::to_string(&first_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
second_call_id,
"unified_exec",
&serde_json::to_string(&second_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_assistant_message("msg-1", "lag handled"),
ev_completed("resp-3"),
]),
];
mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![InputItem::Text {
text: "exercise lag handling".into(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
let start = manager
.handle_request(UnifiedExecRequest {
mode: UnifiedExecMode::Start {
cmd: std::borrow::Cow::Borrowed(script),
yield_time_ms: Some(500),
max_output_tokens: Some(5_000),
shell: Some("/bin/sh"),
login: Some(false),
cwd: None,
},
output_chunk_id: None,
output_wall_time: None,
output_json: Some(false),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
let requests = server.received_requests().await.expect("recorded requests");
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.iter()
.map(|req| req.body_json::<Value>().expect("request json"))
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let start_output = outputs
.get(first_call_id)
.expect("missing initial unified_exec output");
let session_id = start_output["session_id"].as_str().unwrap_or_default();
assert!(
!session_id.is_empty(),
"expected session id from initial unified_exec response"
);
let poll_output = outputs
.get(second_call_id)
.expect("missing poll unified_exec output");
let poll_text = poll_output["output"].as_str().unwrap_or_default();
assert!(
poll_text.contains("TAIL-MARKER"),
"expected poll output to contain tail marker, got {poll_text:?}"
);
let output = start.content.into_string();
assert!(output.contains("TAIL-MARKER"));
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
skip_if_no_network!(Ok(()));
async fn unified_exec_manager_handles_timeout_then_poll() -> Result<()> {
skip_if_sandbox!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::UnifiedExec);
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let first_call_id = "uexec-timeout";
let first_args = serde_json::json!({
"input": ["/bin/sh", "-c", "sleep 0.1; echo ready"],
"timeout_ms": 10,
});
let second_call_id = "uexec-poll";
let second_args = serde_json::json!({
"input": Vec::<String>::new(),
"session_id": "0",
"timeout_ms": 800,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
first_call_id,
"unified_exec",
&serde_json::to_string(&first_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
second_call_id,
"unified_exec",
&serde_json::to_string(&second_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
]),
];
mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![InputItem::Text {
text: "check timeout".into(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
let manager = UnifiedExecSessionManager::default();
let result = manager
.handle_request(UnifiedExecRequest {
mode: UnifiedExecMode::Start {
cmd: std::borrow::Cow::Borrowed("sleep 0.1; echo ready"),
yield_time_ms: Some(10),
max_output_tokens: Some(1_000),
shell: Some("/bin/sh"),
login: Some(false),
cwd: None,
},
output_chunk_id: None,
output_wall_time: None,
output_json: Some(false),
})
.await?;
loop {
let event = codex.next_event().await.expect("event");
if matches!(event.msg, EventMsg::TaskComplete(_)) {
break;
if let Some(session_id) = result.metadata.session_id {
tokio::time::sleep(Duration::from_millis(200)).await;
match manager
.handle_request(UnifiedExecRequest {
mode: UnifiedExecMode::Write {
session_id,
chars: "",
yield_time_ms: Some(500),
max_output_tokens: Some(1_000),
},
output_chunk_id: None,
output_wall_time: None,
output_json: Some(false),
})
.await
{
Ok(poll) => assert!(poll.content.into_string().contains("ready")),
Err(codex_core::UnifiedExecError::SessionExited { .. }) => {}
Err(other) => return Err(other.into()),
}
} else {
assert!(result.content.into_string().contains("ready"));
}
let requests = server.received_requests().await.expect("recorded requests");
assert!(!requests.is_empty(), "expected at least one POST request");
Ok(())
}
let bodies = requests
.iter()
.map(|req| req.body_json::<Value>().expect("request json"))
.collect::<Vec<_>>();
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_json_output_matches_metadata() -> Result<()> {
skip_if_sandbox!(Ok(()));
let outputs = collect_tool_outputs(&bodies)?;
let manager = UnifiedExecSessionManager::default();
let command = "printf 'ready\\n' && read dummy";
let first_output = outputs.get(first_call_id).expect("missing timeout output");
assert_eq!(first_output["session_id"], "0");
assert!(
first_output["output"]
.as_str()
.unwrap_or_default()
.is_empty()
let result = manager
.handle_request(UnifiedExecRequest {
mode: UnifiedExecMode::Start {
cmd: std::borrow::Cow::Borrowed(command),
yield_time_ms: Some(500),
max_output_tokens: Some(1_000),
shell: Some("/bin/bash"),
login: Some(false),
cwd: None,
},
output_chunk_id: Some(true),
output_wall_time: Some(true),
output_json: Some(true),
})
.await?;
let codex_core::UnifiedExecResult { content, metadata } = result;
let body = content.into_string();
let json: Value = serde_json::from_str(&body)?;
assert!(json.get("chunk_id").is_some());
assert!(json.get("wall_time").is_some());
let session_id = metadata.session_id.expect("expected running session");
assert_eq!(json["session_id"].as_i64(), Some(i64::from(session_id)));
let output = json["output"]
.as_object()
.expect("output is object with numbered lines");
assert_eq!(
output.get("1").and_then(Value::as_str),
Some("ready"),
"expected first output line to contain ready"
);
let poll_output = outputs.get(second_call_id).expect("missing poll output");
let output_text = poll_output["output"].as_str().unwrap_or_default();
assert_eq!(metadata.exec_cmd.as_deref(), Some(command));
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_respects_output_preferences() -> Result<()> {
skip_if_sandbox!(Ok(()));
let manager = UnifiedExecSessionManager::default();
let result = manager
.handle_request(UnifiedExecRequest {
mode: UnifiedExecMode::Start {
cmd: std::borrow::Cow::Borrowed("printf 'ready\\n' && read dummy"),
yield_time_ms: Some(500),
max_output_tokens: Some(1_000),
shell: Some("/bin/bash"),
login: Some(false),
cwd: None,
},
output_chunk_id: Some(false),
output_wall_time: Some(false),
output_json: Some(false),
})
.await?;
let codex_core::UnifiedExecResult { content, metadata } = result;
assert_eq!(
metadata.exec_cmd.as_deref(),
Some("printf 'ready\\n' && read dummy")
);
assert!(
output_text.contains("ready"),
"expected ready output, got {output_text:?}"
metadata.session_id.is_some(),
"session should remain active when waiting for stdin input"
);
let text = content.into_string();
assert!(
!text.contains("Chunk ID:"),
"chunk metadata should be omitted when output_chunk_id is false: {text}"
);
assert!(
!text.contains("Wall time:"),
"wall time metadata should be omitted when output_wall_time is false: {text}"
);
assert!(
text.contains("Process running with session ID"),
"expected running-session metadata in textual response: {text}"
);
assert!(
text.contains("ready"),
"expected command output to appear in textual response: {text}"
);
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_reports_truncation_metadata() -> Result<()> {
skip_if_sandbox!(Ok(()));
let manager = UnifiedExecSessionManager::default();
let script = r#"python3 - <<'PY'
import sys
sys.stdout.write("X" * 2048)
sys.stdout.flush()
PY
"#;
let result = manager
.handle_request(UnifiedExecRequest {
mode: UnifiedExecMode::Start {
cmd: std::borrow::Cow::Borrowed(script),
yield_time_ms: Some(500),
max_output_tokens: Some(1),
shell: Some("/bin/sh"),
login: Some(false),
cwd: None,
},
output_chunk_id: Some(true),
output_wall_time: Some(true),
output_json: Some(false),
})
.await?;
let codex_core::UnifiedExecResult { content, metadata } = result;
assert!(
metadata.original_token_count.is_some_and(|count| count > 0),
"expected original_token_count metadata when truncation occurs"
);
let text = content.into_string();
assert!(
text.contains("tokens truncated"),
"expected truncation notice in textual output: {text}"
);
Ok(())