mirror of
https://github.com/openai/codex.git
synced 2026-04-19 20:24:50 +00:00
Compare commits
1 Commits
codex-debu
...
jif/unifie
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b06d83279 |
@@ -27,18 +27,35 @@ pub(crate) struct ExecCommandSession {
|
||||
|
||||
/// Tracks whether the underlying process has exited.
|
||||
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
|
||||
/// Captures the process exit code once the child terminates.
|
||||
exit_code: std::sync::Arc<StdMutex<Option<i32>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ExecCommandSessionParams {
|
||||
pub writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
pub output_tx: broadcast::Sender<Vec<u8>>,
|
||||
pub killer: Box<dyn portable_pty::ChildKiller + Send + Sync>,
|
||||
pub reader_handle: JoinHandle<()>,
|
||||
pub writer_handle: JoinHandle<()>,
|
||||
pub wait_handle: JoinHandle<()>,
|
||||
pub exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
pub exit_code: std::sync::Arc<StdMutex<Option<i32>>>,
|
||||
}
|
||||
|
||||
impl ExecCommandSession {
|
||||
pub(crate) fn new(
|
||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
killer: Box<dyn portable_pty::ChildKiller + Send + Sync>,
|
||||
reader_handle: JoinHandle<()>,
|
||||
writer_handle: JoinHandle<()>,
|
||||
wait_handle: JoinHandle<()>,
|
||||
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
) -> (Self, broadcast::Receiver<Vec<u8>>) {
|
||||
pub(crate) fn new(params: ExecCommandSessionParams) -> (Self, broadcast::Receiver<Vec<u8>>) {
|
||||
let ExecCommandSessionParams {
|
||||
writer_tx,
|
||||
output_tx,
|
||||
killer,
|
||||
reader_handle,
|
||||
writer_handle,
|
||||
wait_handle,
|
||||
exit_status,
|
||||
exit_code,
|
||||
} = params;
|
||||
let initial_output_rx = output_tx.subscribe();
|
||||
(
|
||||
Self {
|
||||
@@ -49,6 +66,7 @@ impl ExecCommandSession {
|
||||
writer_handle: StdMutex::new(Some(writer_handle)),
|
||||
wait_handle: StdMutex::new(Some(wait_handle)),
|
||||
exit_status,
|
||||
exit_code,
|
||||
},
|
||||
initial_output_rx,
|
||||
)
|
||||
@@ -65,6 +83,10 @@ impl ExecCommandSession {
|
||||
pub(crate) fn has_exited(&self) -> bool {
|
||||
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub(crate) fn exit_code(&self) -> Option<i32> {
|
||||
self.exit_code.lock().ok().and_then(|guard| *guard)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ExecCommandSession {
|
||||
|
||||
@@ -7,6 +7,7 @@ mod session_manager;
|
||||
pub use exec_command_params::ExecCommandParams;
|
||||
pub use exec_command_params::WriteStdinParams;
|
||||
pub(crate) use exec_command_session::ExecCommandSession;
|
||||
pub(crate) use exec_command_session::ExecCommandSessionParams;
|
||||
pub use responses_api::EXEC_COMMAND_TOOL_NAME;
|
||||
pub use responses_api::WRITE_STDIN_TOOL_NAME;
|
||||
pub use responses_api::create_exec_command_tool_for_responses_api;
|
||||
|
||||
@@ -19,6 +19,7 @@ use tokio::time::timeout;
|
||||
use crate::exec_command::exec_command_params::ExecCommandParams;
|
||||
use crate::exec_command::exec_command_params::WriteStdinParams;
|
||||
use crate::exec_command::exec_command_session::ExecCommandSession;
|
||||
use crate::exec_command::exec_command_session::ExecCommandSessionParams;
|
||||
use crate::exec_command::session_id::SessionId;
|
||||
use crate::truncate::truncate_middle;
|
||||
|
||||
@@ -318,17 +319,22 @@ async fn create_exec_command_session(
|
||||
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
|
||||
let exit_status = Arc::new(AtomicBool::new(false));
|
||||
let wait_exit_status = exit_status.clone();
|
||||
let exit_code = Arc::new(StdMutex::new(None));
|
||||
let exit_code_for_wait = exit_code.clone();
|
||||
let wait_handle = tokio::task::spawn_blocking(move || {
|
||||
let code = match child.wait() {
|
||||
Ok(status) => status.exit_code() as i32,
|
||||
Err(_) => -1,
|
||||
};
|
||||
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||
if let Ok(mut guard) = exit_code_for_wait.lock() {
|
||||
*guard = Some(code);
|
||||
}
|
||||
let _ = exit_tx.send(code);
|
||||
});
|
||||
|
||||
// Create and store the session with channels.
|
||||
let (session, initial_output_rx) = ExecCommandSession::new(
|
||||
let (session, initial_output_rx) = ExecCommandSession::new(ExecCommandSessionParams {
|
||||
writer_tx,
|
||||
output_tx,
|
||||
killer,
|
||||
@@ -336,7 +342,8 @@ async fn create_exec_command_session(
|
||||
writer_handle,
|
||||
wait_handle,
|
||||
exit_status,
|
||||
);
|
||||
exit_code,
|
||||
});
|
||||
Ok((session, initial_output_rx, exit_rx))
|
||||
}
|
||||
|
||||
|
||||
@@ -42,6 +42,11 @@ pub mod token_data;
|
||||
mod truncate;
|
||||
mod unified_exec;
|
||||
mod user_instructions;
|
||||
pub use crate::unified_exec::UnifiedExecError;
|
||||
pub use crate::unified_exec::UnifiedExecMode;
|
||||
pub use crate::unified_exec::UnifiedExecRequest;
|
||||
pub use crate::unified_exec::UnifiedExecResult;
|
||||
pub use crate::unified_exec::UnifiedExecSessionManager;
|
||||
pub use model_provider_info::BUILT_IN_OSS_MODEL_PROVIDER_ID;
|
||||
pub use model_provider_info::ModelProviderInfo;
|
||||
pub use model_provider_info::WireApi;
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
use async_trait::async_trait;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::borrow::Cow;
|
||||
use std::convert::TryFrom;
|
||||
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::tools::context::ToolInvocation;
|
||||
@@ -7,17 +10,37 @@ use crate::tools::context::ToolOutput;
|
||||
use crate::tools::context::ToolPayload;
|
||||
use crate::tools::registry::ToolHandler;
|
||||
use crate::tools::registry::ToolKind;
|
||||
use crate::unified_exec::UnifiedExecError;
|
||||
use crate::unified_exec::UnifiedExecMode;
|
||||
use crate::unified_exec::UnifiedExecRequest;
|
||||
use crate::unified_exec::UnifiedExecResult;
|
||||
|
||||
pub struct UnifiedExecHandler;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct UnifiedExecArgs {
|
||||
input: Vec<String>,
|
||||
#[serde(default)]
|
||||
session_id: Option<String>,
|
||||
cmd: Option<String>,
|
||||
#[serde(default)]
|
||||
timeout_ms: Option<u64>,
|
||||
session_id: Option<JsonValue>,
|
||||
#[serde(default)]
|
||||
chars: Option<String>,
|
||||
#[serde(default)]
|
||||
yield_time_ms: Option<u64>,
|
||||
#[serde(default)]
|
||||
max_output_tokens: Option<usize>,
|
||||
#[serde(default)]
|
||||
output_chunk_id: Option<bool>,
|
||||
#[serde(default)]
|
||||
output_wall_time: Option<bool>,
|
||||
#[serde(default)]
|
||||
output_json: Option<bool>,
|
||||
#[serde(default)]
|
||||
shell: Option<String>,
|
||||
#[serde(default)]
|
||||
login: Option<bool>,
|
||||
#[serde(default)]
|
||||
cwd: Option<String>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -54,56 +77,114 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
};
|
||||
|
||||
let UnifiedExecArgs {
|
||||
input,
|
||||
cmd,
|
||||
session_id,
|
||||
timeout_ms,
|
||||
chars,
|
||||
yield_time_ms,
|
||||
max_output_tokens,
|
||||
output_chunk_id,
|
||||
output_wall_time,
|
||||
output_json,
|
||||
shell,
|
||||
login,
|
||||
cwd,
|
||||
} = args;
|
||||
|
||||
let parsed_session_id = if let Some(session_id) = session_id {
|
||||
match session_id.parse::<i32>() {
|
||||
Ok(parsed) => Some(parsed),
|
||||
Err(output) => {
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"invalid session_id: {session_id} due to error {output:?}"
|
||||
)));
|
||||
}
|
||||
let chars = chars.unwrap_or_default();
|
||||
|
||||
let mode = if let Some(raw_session_id) = session_id {
|
||||
if cmd.is_some() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"provide either cmd or session_id, not both".to_string(),
|
||||
));
|
||||
}
|
||||
let session_id = parse_session_id(raw_session_id)?;
|
||||
UnifiedExecMode::Write {
|
||||
session_id,
|
||||
chars: chars.as_str(),
|
||||
yield_time_ms,
|
||||
max_output_tokens,
|
||||
}
|
||||
} else {
|
||||
None
|
||||
let cmd_value = cmd.ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(
|
||||
"cmd is required when session_id is not provided".to_string(),
|
||||
)
|
||||
})?;
|
||||
UnifiedExecMode::Start {
|
||||
cmd: Cow::Owned(cmd_value),
|
||||
yield_time_ms,
|
||||
max_output_tokens,
|
||||
shell: shell.as_deref(),
|
||||
login,
|
||||
cwd: cwd.as_deref(),
|
||||
}
|
||||
};
|
||||
|
||||
let request = UnifiedExecRequest {
|
||||
session_id: parsed_session_id,
|
||||
input_chunks: &input,
|
||||
timeout_ms,
|
||||
mode,
|
||||
output_chunk_id,
|
||||
output_wall_time,
|
||||
output_json,
|
||||
};
|
||||
|
||||
let value = session
|
||||
let result = session
|
||||
.run_unified_exec_request(request)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!("unified exec failed: {err:?}"))
|
||||
})?;
|
||||
.map_err(map_unified_exec_error)?;
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct SerializedUnifiedExecResult {
|
||||
session_id: Option<String>,
|
||||
output: String,
|
||||
}
|
||||
|
||||
let content = serde_json::to_string(&SerializedUnifiedExecResult {
|
||||
session_id: value.session_id.map(|id| id.to_string()),
|
||||
output: value.output,
|
||||
})
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to serialize unified exec output: {err:?}"
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok(ToolOutput::Function {
|
||||
content,
|
||||
success: Some(true),
|
||||
})
|
||||
Ok(tool_output_from_result(result))
|
||||
}
|
||||
}
|
||||
|
||||
fn tool_output_from_result(result: UnifiedExecResult) -> ToolOutput {
|
||||
let content = result.content.into_string();
|
||||
ToolOutput::Function {
|
||||
content,
|
||||
success: Some(true),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_session_id(value: JsonValue) -> Result<i32, FunctionCallError> {
|
||||
match value {
|
||||
JsonValue::Number(num) => {
|
||||
if let Some(int) = num.as_i64() {
|
||||
i32::try_from(int).map_err(|_| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"session_id value {int} exceeds i32 range"
|
||||
))
|
||||
})
|
||||
} else {
|
||||
Err(FunctionCallError::RespondToModel(
|
||||
"session_id must be an integer".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
JsonValue::String(text) => text.parse::<i32>().map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!("invalid session_id '{text}': {err}"))
|
||||
}),
|
||||
other => Err(FunctionCallError::RespondToModel(format!(
|
||||
"session_id must be a string or integer, got {other}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn map_unified_exec_error(err: UnifiedExecError) -> FunctionCallError {
|
||||
match err {
|
||||
UnifiedExecError::SessionExited {
|
||||
session_id,
|
||||
exit_code,
|
||||
} => {
|
||||
let detail = exit_code
|
||||
.map(|code| format!(" with code {code}"))
|
||||
.unwrap_or_default();
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"session {session_id} has already exited{detail}. Start a new session with cmd."
|
||||
))
|
||||
}
|
||||
UnifiedExecError::WriteToStdin { session_id } => FunctionCallError::RespondToModel(
|
||||
format!("failed to write to session {session_id}; the process may have exited"),
|
||||
),
|
||||
other => FunctionCallError::RespondToModel(format!("unified exec failed: {other}")),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,13 +142,10 @@ impl From<JsonSchema> for AdditionalProperties {
|
||||
fn create_unified_exec_tool() -> ToolSpec {
|
||||
let mut properties = BTreeMap::new();
|
||||
properties.insert(
|
||||
"input".to_string(),
|
||||
JsonSchema::Array {
|
||||
items: Box::new(JsonSchema::String { description: None }),
|
||||
"cmd".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some(
|
||||
"When no session_id is provided, treat the array as the command and arguments \
|
||||
to launch. When session_id is set, concatenate the strings (in order) and write \
|
||||
them to the session's stdin."
|
||||
"Command to execute when starting a new session. Mutually exclusive with session_id."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
@@ -157,30 +154,87 @@ fn create_unified_exec_tool() -> ToolSpec {
|
||||
"session_id".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some(
|
||||
"Identifier for an existing interactive session. If omitted, a new command \
|
||||
is spawned."
|
||||
"Identifier for an existing interactive session. When provided, cmd must be omitted and chars will be written to the session's stdin."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"timeout_ms".to_string(),
|
||||
JsonSchema::Number {
|
||||
"chars".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some(
|
||||
"Maximum time in milliseconds to wait for output after writing the input."
|
||||
"Characters to write to an interactive session. Use an empty string to poll for output without sending input."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"yield_time_ms".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"Milliseconds to wait for output before returning (clamped between 250 and 30_000)."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"max_output_tokens".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"Approximate maximum number of output tokens before truncation.".to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"output_chunk_id".to_string(),
|
||||
JsonSchema::Boolean {
|
||||
description: Some("Whether to include a chunk identifier in responses.".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"output_wall_time".to_string(),
|
||||
JsonSchema::Boolean {
|
||||
description: Some(
|
||||
"Whether to include wall-clock timing metadata in responses.".to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"output_json".to_string(),
|
||||
JsonSchema::Boolean {
|
||||
description: Some(
|
||||
"Return structured JSON instead of formatted text when true.".to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"shell".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Override the shell executable (default /bin/bash).".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"login".to_string(),
|
||||
JsonSchema::Boolean {
|
||||
description: Some(
|
||||
"Whether to start the shell as a login shell (default true).".to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"cwd".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Working directory for the command.".to_string()),
|
||||
},
|
||||
);
|
||||
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "unified_exec".to_string(),
|
||||
description:
|
||||
"Runs a command in a PTY. Provide a session_id to reuse an existing interactive session.".to_string(),
|
||||
description: "Runs commands in a PTY and streams output, supporting interactive sessions with stdin writes.".to_string(),
|
||||
strict: false,
|
||||
parameters: JsonSchema::Object {
|
||||
properties,
|
||||
required: Some(vec!["input".to_string()]),
|
||||
required: None,
|
||||
additional_properties: Some(false.into()),
|
||||
},
|
||||
})
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum UnifiedExecError {
|
||||
pub enum UnifiedExecError {
|
||||
#[error("Failed to create unified exec session: {pty_error}")]
|
||||
CreateSession {
|
||||
#[source]
|
||||
@@ -9,10 +9,19 @@ pub(crate) enum UnifiedExecError {
|
||||
},
|
||||
#[error("Unknown session id {session_id}")]
|
||||
UnknownSessionId { session_id: i32 },
|
||||
#[error("failed to write to stdin")]
|
||||
WriteToStdin,
|
||||
#[error("failed to write to stdin for session {session_id}")]
|
||||
WriteToStdin { session_id: i32 },
|
||||
#[error("missing command line for unified exec request")]
|
||||
MissingCommandLine,
|
||||
#[error("spawned process did not report a process id")]
|
||||
MissingProcessId,
|
||||
#[error("spawned process id {process_id} does not fit in i32")]
|
||||
ProcessIdOverflow { process_id: u32 },
|
||||
#[error("session {session_id} has already exited")]
|
||||
SessionExited {
|
||||
session_id: i32,
|
||||
exit_code: Option<i32>,
|
||||
},
|
||||
}
|
||||
|
||||
impl UnifiedExecError {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,409 +1,286 @@
|
||||
#![cfg(not(target_os = "windows"))]
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::InputItem;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use codex_core::UnifiedExecMode;
|
||||
use codex_core::UnifiedExecRequest;
|
||||
use codex_core::UnifiedExecSessionManager;
|
||||
#[cfg(unix)]
|
||||
use core_test_support::skip_if_sandbox;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use serde_json::Value;
|
||||
use tokio::time::Duration;
|
||||
|
||||
fn extract_output_text(item: &Value) -> Option<&str> {
|
||||
item.get("output").and_then(|value| match value {
|
||||
Value::String(text) => Some(text.as_str()),
|
||||
Value::Object(obj) => obj.get("content").and_then(Value::as_str),
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
|
||||
fn collect_tool_outputs(bodies: &[Value]) -> Result<HashMap<String, Value>> {
|
||||
let mut outputs = HashMap::new();
|
||||
for body in bodies {
|
||||
if let Some(items) = body.get("input").and_then(Value::as_array) {
|
||||
for item in items {
|
||||
if item.get("type").and_then(Value::as_str) != Some("function_call_output") {
|
||||
continue;
|
||||
}
|
||||
if let Some(call_id) = item.get("call_id").and_then(Value::as_str) {
|
||||
let content = extract_output_text(item)
|
||||
.ok_or_else(|| anyhow::anyhow!("missing tool output content"))?;
|
||||
let trimmed = content.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let parsed: Value = serde_json::from_str(trimmed).map_err(|err| {
|
||||
anyhow::anyhow!("failed to parse tool output content {trimmed:?}: {err}")
|
||||
})?;
|
||||
outputs.insert(call_id.to_string(), parsed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(outputs)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
async fn unified_exec_manager_supports_interactive_cat() -> Result<()> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
cwd,
|
||||
session_configured,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
let first_call_id = "uexec-start";
|
||||
let first_args = serde_json::json!({
|
||||
"input": ["/bin/cat"],
|
||||
"timeout_ms": 200,
|
||||
});
|
||||
|
||||
let second_call_id = "uexec-stdin";
|
||||
let second_args = serde_json::json!({
|
||||
"input": ["hello unified exec\n"],
|
||||
"session_id": "0",
|
||||
"timeout_ms": 500,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(
|
||||
first_call_id,
|
||||
"unified_exec",
|
||||
&serde_json::to_string(&first_args)?,
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_function_call(
|
||||
second_call_id,
|
||||
"unified_exec",
|
||||
&serde_json::to_string(&second_args)?,
|
||||
),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "all done"),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
];
|
||||
mount_sse_sequence(&server, responses).await;
|
||||
|
||||
let session_model = session_configured.model.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![InputItem::Text {
|
||||
text: "run unified exec".into(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
let result = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
mode: UnifiedExecMode::Start {
|
||||
cmd: std::borrow::Cow::Borrowed("cat"),
|
||||
yield_time_ms: Some(200),
|
||||
max_output_tokens: Some(1_000),
|
||||
shell: Some("/bin/sh"),
|
||||
login: Some(false),
|
||||
cwd: None,
|
||||
},
|
||||
output_chunk_id: Some(true),
|
||||
output_wall_time: Some(true),
|
||||
output_json: Some(false),
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
|
||||
let session_id = result.metadata.session_id.expect("expected session id");
|
||||
let poll = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
mode: UnifiedExecMode::Write {
|
||||
session_id,
|
||||
chars: "hello unified exec\n",
|
||||
yield_time_ms: Some(500),
|
||||
max_output_tokens: Some(1_000),
|
||||
},
|
||||
output_chunk_id: Some(false),
|
||||
output_wall_time: Some(false),
|
||||
output_json: Some(true),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let requests = server.received_requests().await.expect("recorded requests");
|
||||
assert!(!requests.is_empty(), "expected at least one POST request");
|
||||
|
||||
let bodies = requests
|
||||
.iter()
|
||||
.map(|req| req.body_json::<Value>().expect("request json"))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
|
||||
let start_output = outputs
|
||||
.get(first_call_id)
|
||||
.expect("missing first unified_exec output");
|
||||
let session_id = start_output["session_id"].as_str().unwrap_or_default();
|
||||
assert!(
|
||||
!session_id.is_empty(),
|
||||
"expected session id in first unified_exec response"
|
||||
);
|
||||
assert!(
|
||||
start_output["output"]
|
||||
.as_str()
|
||||
.unwrap_or_default()
|
||||
.is_empty()
|
||||
);
|
||||
|
||||
let reuse_output = outputs
|
||||
.get(second_call_id)
|
||||
.expect("missing reused unified_exec output");
|
||||
assert_eq!(
|
||||
reuse_output["session_id"].as_str().unwrap_or_default(),
|
||||
session_id
|
||||
);
|
||||
let echoed = reuse_output["output"].as_str().unwrap_or_default();
|
||||
assert!(
|
||||
echoed.contains("hello unified exec"),
|
||||
"expected echoed output, got {echoed:?}"
|
||||
);
|
||||
let output = poll.content.into_string();
|
||||
assert!(output.contains("hello unified exec"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_streams_after_lagged_output() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
async fn unified_exec_manager_streams_large_output() -> Result<()> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.use_experimental_unified_exec_tool = true;
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
cwd,
|
||||
session_configured,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
let script = r#"python3 - <<'PY'
|
||||
import sys
|
||||
import time
|
||||
|
||||
chunk = b'x' * (1 << 20)
|
||||
for _ in range(4):
|
||||
sys.stdout.buffer.write(chunk)
|
||||
sys.stdout.flush()
|
||||
|
||||
time.sleep(0.2)
|
||||
for _ in range(5):
|
||||
for _ in range(3):
|
||||
sys.stdout.write("TAIL-MARKER\n")
|
||||
sys.stdout.flush()
|
||||
time.sleep(0.05)
|
||||
|
||||
time.sleep(0.2)
|
||||
PY
|
||||
"#;
|
||||
|
||||
let first_call_id = "uexec-lag-start";
|
||||
let first_args = serde_json::json!({
|
||||
"input": ["/bin/sh", "-c", script],
|
||||
"timeout_ms": 25,
|
||||
});
|
||||
|
||||
let second_call_id = "uexec-lag-poll";
|
||||
let second_args = serde_json::json!({
|
||||
"input": Vec::<String>::new(),
|
||||
"session_id": "0",
|
||||
"timeout_ms": 2_000,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(
|
||||
first_call_id,
|
||||
"unified_exec",
|
||||
&serde_json::to_string(&first_args)?,
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_function_call(
|
||||
second_call_id,
|
||||
"unified_exec",
|
||||
&serde_json::to_string(&second_args)?,
|
||||
),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "lag handled"),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
];
|
||||
mount_sse_sequence(&server, responses).await;
|
||||
|
||||
let session_model = session_configured.model.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![InputItem::Text {
|
||||
text: "exercise lag handling".into(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
let start = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
mode: UnifiedExecMode::Start {
|
||||
cmd: std::borrow::Cow::Borrowed(script),
|
||||
yield_time_ms: Some(500),
|
||||
max_output_tokens: Some(5_000),
|
||||
shell: Some("/bin/sh"),
|
||||
login: Some(false),
|
||||
cwd: None,
|
||||
},
|
||||
output_chunk_id: None,
|
||||
output_wall_time: None,
|
||||
output_json: Some(false),
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
|
||||
|
||||
let requests = server.received_requests().await.expect("recorded requests");
|
||||
assert!(!requests.is_empty(), "expected at least one POST request");
|
||||
|
||||
let bodies = requests
|
||||
.iter()
|
||||
.map(|req| req.body_json::<Value>().expect("request json"))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
|
||||
let start_output = outputs
|
||||
.get(first_call_id)
|
||||
.expect("missing initial unified_exec output");
|
||||
let session_id = start_output["session_id"].as_str().unwrap_or_default();
|
||||
assert!(
|
||||
!session_id.is_empty(),
|
||||
"expected session id from initial unified_exec response"
|
||||
);
|
||||
|
||||
let poll_output = outputs
|
||||
.get(second_call_id)
|
||||
.expect("missing poll unified_exec output");
|
||||
let poll_text = poll_output["output"].as_str().unwrap_or_default();
|
||||
assert!(
|
||||
poll_text.contains("TAIL-MARKER"),
|
||||
"expected poll output to contain tail marker, got {poll_text:?}"
|
||||
);
|
||||
let output = start.content.into_string();
|
||||
assert!(output.contains("TAIL-MARKER"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
async fn unified_exec_manager_handles_timeout_then_poll() -> Result<()> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
cwd,
|
||||
session_configured,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
let first_call_id = "uexec-timeout";
|
||||
let first_args = serde_json::json!({
|
||||
"input": ["/bin/sh", "-c", "sleep 0.1; echo ready"],
|
||||
"timeout_ms": 10,
|
||||
});
|
||||
|
||||
let second_call_id = "uexec-poll";
|
||||
let second_args = serde_json::json!({
|
||||
"input": Vec::<String>::new(),
|
||||
"session_id": "0",
|
||||
"timeout_ms": 800,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(
|
||||
first_call_id,
|
||||
"unified_exec",
|
||||
&serde_json::to_string(&first_args)?,
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_function_call(
|
||||
second_call_id,
|
||||
"unified_exec",
|
||||
&serde_json::to_string(&second_args)?,
|
||||
),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
];
|
||||
mount_sse_sequence(&server, responses).await;
|
||||
|
||||
let session_model = session_configured.model.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![InputItem::Text {
|
||||
text: "check timeout".into(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
let result = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
mode: UnifiedExecMode::Start {
|
||||
cmd: std::borrow::Cow::Borrowed("sleep 0.1; echo ready"),
|
||||
yield_time_ms: Some(10),
|
||||
max_output_tokens: Some(1_000),
|
||||
shell: Some("/bin/sh"),
|
||||
login: Some(false),
|
||||
cwd: None,
|
||||
},
|
||||
output_chunk_id: None,
|
||||
output_wall_time: None,
|
||||
output_json: Some(false),
|
||||
})
|
||||
.await?;
|
||||
|
||||
loop {
|
||||
let event = codex.next_event().await.expect("event");
|
||||
if matches!(event.msg, EventMsg::TaskComplete(_)) {
|
||||
break;
|
||||
if let Some(session_id) = result.metadata.session_id {
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
match manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
mode: UnifiedExecMode::Write {
|
||||
session_id,
|
||||
chars: "",
|
||||
yield_time_ms: Some(500),
|
||||
max_output_tokens: Some(1_000),
|
||||
},
|
||||
output_chunk_id: None,
|
||||
output_wall_time: None,
|
||||
output_json: Some(false),
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(poll) => assert!(poll.content.into_string().contains("ready")),
|
||||
Err(codex_core::UnifiedExecError::SessionExited { .. }) => {}
|
||||
Err(other) => return Err(other.into()),
|
||||
}
|
||||
} else {
|
||||
assert!(result.content.into_string().contains("ready"));
|
||||
}
|
||||
|
||||
let requests = server.received_requests().await.expect("recorded requests");
|
||||
assert!(!requests.is_empty(), "expected at least one POST request");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let bodies = requests
|
||||
.iter()
|
||||
.map(|req| req.body_json::<Value>().expect("request json"))
|
||||
.collect::<Vec<_>>();
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_json_output_matches_metadata() -> Result<()> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
let command = "printf 'ready\\n' && read dummy";
|
||||
|
||||
let first_output = outputs.get(first_call_id).expect("missing timeout output");
|
||||
assert_eq!(first_output["session_id"], "0");
|
||||
assert!(
|
||||
first_output["output"]
|
||||
.as_str()
|
||||
.unwrap_or_default()
|
||||
.is_empty()
|
||||
let result = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
mode: UnifiedExecMode::Start {
|
||||
cmd: std::borrow::Cow::Borrowed(command),
|
||||
yield_time_ms: Some(500),
|
||||
max_output_tokens: Some(1_000),
|
||||
shell: Some("/bin/bash"),
|
||||
login: Some(false),
|
||||
cwd: None,
|
||||
},
|
||||
output_chunk_id: Some(true),
|
||||
output_wall_time: Some(true),
|
||||
output_json: Some(true),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let codex_core::UnifiedExecResult { content, metadata } = result;
|
||||
|
||||
let body = content.into_string();
|
||||
let json: Value = serde_json::from_str(&body)?;
|
||||
|
||||
assert!(json.get("chunk_id").is_some());
|
||||
assert!(json.get("wall_time").is_some());
|
||||
|
||||
let session_id = metadata.session_id.expect("expected running session");
|
||||
assert_eq!(json["session_id"].as_i64(), Some(i64::from(session_id)));
|
||||
|
||||
let output = json["output"]
|
||||
.as_object()
|
||||
.expect("output is object with numbered lines");
|
||||
assert_eq!(
|
||||
output.get("1").and_then(Value::as_str),
|
||||
Some("ready"),
|
||||
"expected first output line to contain ready"
|
||||
);
|
||||
|
||||
let poll_output = outputs.get(second_call_id).expect("missing poll output");
|
||||
let output_text = poll_output["output"].as_str().unwrap_or_default();
|
||||
assert_eq!(metadata.exec_cmd.as_deref(), Some(command));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_respects_output_preferences() -> Result<()> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
|
||||
let result = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
mode: UnifiedExecMode::Start {
|
||||
cmd: std::borrow::Cow::Borrowed("printf 'ready\\n' && read dummy"),
|
||||
yield_time_ms: Some(500),
|
||||
max_output_tokens: Some(1_000),
|
||||
shell: Some("/bin/bash"),
|
||||
login: Some(false),
|
||||
cwd: None,
|
||||
},
|
||||
output_chunk_id: Some(false),
|
||||
output_wall_time: Some(false),
|
||||
output_json: Some(false),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let codex_core::UnifiedExecResult { content, metadata } = result;
|
||||
|
||||
assert_eq!(
|
||||
metadata.exec_cmd.as_deref(),
|
||||
Some("printf 'ready\\n' && read dummy")
|
||||
);
|
||||
assert!(
|
||||
output_text.contains("ready"),
|
||||
"expected ready output, got {output_text:?}"
|
||||
metadata.session_id.is_some(),
|
||||
"session should remain active when waiting for stdin input"
|
||||
);
|
||||
|
||||
let text = content.into_string();
|
||||
assert!(
|
||||
!text.contains("Chunk ID:"),
|
||||
"chunk metadata should be omitted when output_chunk_id is false: {text}"
|
||||
);
|
||||
assert!(
|
||||
!text.contains("Wall time:"),
|
||||
"wall time metadata should be omitted when output_wall_time is false: {text}"
|
||||
);
|
||||
assert!(
|
||||
text.contains("Process running with session ID"),
|
||||
"expected running-session metadata in textual response: {text}"
|
||||
);
|
||||
assert!(
|
||||
text.contains("ready"),
|
||||
"expected command output to appear in textual response: {text}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_reports_truncation_metadata() -> Result<()> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
let script = r#"python3 - <<'PY'
|
||||
import sys
|
||||
sys.stdout.write("X" * 2048)
|
||||
sys.stdout.flush()
|
||||
PY
|
||||
"#;
|
||||
|
||||
let result = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
mode: UnifiedExecMode::Start {
|
||||
cmd: std::borrow::Cow::Borrowed(script),
|
||||
yield_time_ms: Some(500),
|
||||
max_output_tokens: Some(1),
|
||||
shell: Some("/bin/sh"),
|
||||
login: Some(false),
|
||||
cwd: None,
|
||||
},
|
||||
output_chunk_id: Some(true),
|
||||
output_wall_time: Some(true),
|
||||
output_json: Some(false),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let codex_core::UnifiedExecResult { content, metadata } = result;
|
||||
|
||||
assert!(
|
||||
metadata.original_token_count.is_some_and(|count| count > 0),
|
||||
"expected original_token_count metadata when truncation occurs"
|
||||
);
|
||||
|
||||
let text = content.into_string();
|
||||
assert!(
|
||||
text.contains("tokens truncated"),
|
||||
"expected truncation notice in textual output: {text}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user