This commit is contained in:
jimmyfraiture
2025-09-07 16:53:29 -07:00
parent f00c6819d1
commit 16236b699e
4 changed files with 252 additions and 101 deletions

View File

@@ -21,6 +21,7 @@ use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use futures::prelude::*;
use mcp_types::CallToolResult;
use serde::Deserialize;
use serde::Serialize;
use serde_json;
use tokio::sync::oneshot;
@@ -1997,62 +1998,10 @@ async fn handle_response_item(
.clone()
.or_else(|| session_id.clone())
.unwrap_or_else(|| format!("ishell:{}", Uuid::new_v4()));
let parsed_session_id = match session_id {
Some(value) => match value.parse::<i32>() {
Ok(parsed) => Some(parsed),
Err(_) => {
return Ok(Some(ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!("invalid session_id: {value}"),
success: Some(false),
},
}));
}
},
None => None,
};
let request = crate::ishell::InteractiveShellRequest {
session_id: parsed_session_id,
input: &arguments,
timeout_ms,
};
let result = sess.ishell_manager.handle_request(request).await;
let output_payload = match result {
Ok(value) => {
#[derive(serde::Serialize)]
struct SerializedIshellResult<'a> {
session_id: Option<i32>,
output: &'a str,
}
match serde_json::to_string(&SerializedIshellResult {
session_id: value.session_id,
output: &value.output,
}) {
Ok(serialized) => FunctionCallOutputPayload {
content: serialized,
success: Some(true),
},
Err(err) => FunctionCallOutputPayload {
content: format!("failed to serialize interactive shell output: {err}"),
success: Some(false),
},
}
}
Err(err) => FunctionCallOutputPayload {
content: err.to_string(),
success: Some(false),
},
};
Some(ResponseInputItem::FunctionCallOutput {
call_id,
output: output_payload,
})
Some(
handle_ishell_tool_call(sess, call_id, session_id.clone(), arguments, timeout_ms)
.await,
)
}
ResponseItem::CustomToolCall {
id: _,
@@ -2098,6 +2047,72 @@ async fn handle_response_item(
Ok(output)
}
async fn handle_ishell_tool_call(
sess: &Session,
call_id: String,
session_id: Option<String>,
arguments: Vec<String>,
timeout_ms: Option<u64>,
) -> ResponseInputItem {
let parsed_session_id = if let Some(session_id) = session_id {
match session_id.parse::<i32>() {
Ok(parsed) => Some(parsed),
Err(output) => return ResponseInputItem::FunctionCallOutput {
call_id: call_id.to_string(),
output: FunctionCallOutputPayload {
content: format!("invalid session_id: {session_id} due to error {output}"),
success: Some(false),
},
}
}
} else {
None
};
let request = crate::ishell::InteractiveShellRequest {
session_id: parsed_session_id,
input_chunks: &arguments,
timeout_ms,
};
let result = sess.ishell_manager.handle_request(request).await;
let output_payload = match result {
Ok(value) => {
#[derive(Serialize)]
struct SerializedIshellResult<'a> {
session_id: Option<i32>,
output: &'a str,
}
match serde_json::to_string(&SerializedIshellResult {
session_id: value.session_id,
output: &value.output,
}) {
Ok(serialized) => FunctionCallOutputPayload {
content: serialized,
success: Some(true),
},
Err(err) => FunctionCallOutputPayload {
content: format!("failed to serialize interactive shell output: {err}"),
success: Some(false),
},
}
}
Err(err) => FunctionCallOutputPayload {
content: err.to_string(),
success: Some(false),
},
};
ResponseInputItem::FunctionCallOutput {
call_id,
output: output_payload,
}
}
async fn handle_function_call(
sess: &Session,
turn_context: &TurnContext,
@@ -2125,6 +2140,32 @@ async fn handle_function_call(
)
.await
}
"unified_exec" => {
#[derive(Deserialize)]
struct UnifiedExecArgs {
input: Vec<String>,
#[serde(default)]
session_id: Option<String>,
#[serde(default)]
timeout_ms: Option<u64>,
}
let args = match serde_json::from_str::<UnifiedExecArgs>(&arguments) {
Ok(args) => args,
Err(err) => {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!("failed to parse function arguments: {err}"),
success: Some(false),
},
};
}
};
handle_ishell_tool_call(sess, call_id, args.session_id, args.input, args.timeout_ms)
.await
}
"view_image" => {
#[derive(serde::Deserialize)]
struct SeeImageArgs {

View File

@@ -3,8 +3,11 @@ use portable_pty::PtySize;
use portable_pty::native_pty_system;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::env;
use std::io::ErrorKind;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
@@ -28,7 +31,7 @@ const ISHELL_OUTPUT_MAX_BYTES: usize = 16 * 1024; // 16 KiB
#[derive(Debug)]
pub(crate) struct InteractiveShellRequest<'a> {
pub session_id: Option<i32>,
pub input: &'a str,
pub input_chunks: &'a [String],
pub timeout_ms: Option<u64>,
}
@@ -106,6 +109,7 @@ impl InteractiveShellSessionManager {
&self,
request: InteractiveShellRequest<'_>,
) -> Result<InteractiveShellResult, error::InteractiveShellError> {
tracing::error!("In the exec");
// todo update the errors
let timeout_ms = request.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS);
@@ -132,7 +136,7 @@ impl InteractiveShellSessionManager {
}
}
} else {
let command = parse_command_line(request.input)?;
let command = command_from_chunks(request.input_chunks)?;
let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
let session = create_shell_session(&command).await?;
let managed_session = ManagedInteractiveSession::new(session);
@@ -144,14 +148,12 @@ impl InteractiveShellSessionManager {
new_session = Some(managed_session);
};
if request.session_id.is_some()
&& !request.input.is_empty()
&& writer_tx
.send(request.input.as_bytes().to_vec())
.await
.is_err()
{
return Err(error::InteractiveShellError::WriteToStdin);
if request.session_id.is_some() {
let joined_input = join_input_chunks(request.input_chunks);
if !joined_input.is_empty() && writer_tx.send(joined_input.into_bytes()).await.is_err()
{
return Err(error::InteractiveShellError::WriteToStdin);
}
}
let mut collected: Vec<u8> = Vec::with_capacity(4096);
@@ -249,7 +251,8 @@ async fn create_shell_session(
})
.map_err(error::InteractiveShellError::create_session)?;
let mut command_builder = CommandBuilder::new(&command[0]);
let resolved_command = resolve_command_path(&command[0])?;
let mut command_builder = CommandBuilder::new(&resolved_command);
for arg in &command[1..] {
command_builder.arg(arg);
}
@@ -326,6 +329,106 @@ async fn create_shell_session(
))
}
fn resolve_command_path(command: &str) -> Result<String, error::InteractiveShellError> {
if command.is_empty() {
return Err(error::InteractiveShellError::MissingCommandLine);
}
if is_explicit_path(command) {
return ensure_executable(Path::new(command))
.then_some(command.to_string())
.ok_or_else(|| error::InteractiveShellError::CommandNotFound {
command: command.to_string(),
});
}
if let Some(resolved) = find_in_path(command) {
return Ok(resolved.to_string_lossy().to_string());
}
Err(error::InteractiveShellError::CommandNotFound {
command: command.to_string(),
})
}
fn command_from_chunks(chunks: &[String]) -> Result<Vec<String>, error::InteractiveShellError> {
match chunks {
[] => Err(error::InteractiveShellError::MissingCommandLine),
[single] => parse_command_line(single),
_ => Ok(chunks.to_vec()),
}
}
fn join_input_chunks(chunks: &[String]) -> String {
match chunks {
[] => String::new(),
[single] => single.clone(),
_ => chunks.concat(),
}
}
fn is_explicit_path(command: &str) -> bool {
let path = Path::new(command);
path.is_absolute() || path.components().count() > 1
}
fn find_in_path(command: &str) -> Option<PathBuf> {
let path_var = env::var_os("PATH")?;
env::split_paths(&path_var)
.flat_map(|dir| candidate_paths(dir, command))
.find(|candidate| ensure_executable(candidate))
}
fn candidate_paths(dir: PathBuf, command: &str) -> Vec<PathBuf> {
build_platform_candidates(dir.join(command))
}
#[cfg(unix)]
fn build_platform_candidates(candidate: PathBuf) -> Vec<PathBuf> {
vec![candidate]
}
#[cfg(windows)]
fn build_platform_candidates(candidate: PathBuf) -> Vec<PathBuf> {
if candidate.extension().is_some() {
return vec![candidate];
}
let pathext = env::var("PATHEXT").unwrap_or_else(|_| ".COM;.EXE;.BAT;.CMD".to_string());
let mut candidates = Vec::new();
for ext in pathext.split(';') {
if ext.is_empty() {
continue;
}
let mut path_with_ext = candidate.clone();
let new_ext = ext.trim_start_matches('.');
path_with_ext.set_extension(new_ext);
candidates.push(path_with_ext);
}
if candidates.is_empty() {
candidates.push(candidate);
}
candidates
}
fn ensure_executable(path: &Path) -> bool {
match path.metadata() {
Ok(metadata) => metadata.is_file() && is_executable(&metadata),
Err(_) => false,
}
}
#[cfg(unix)]
fn is_executable(metadata: &std::fs::Metadata) -> bool {
use std::os::unix::fs::PermissionsExt;
metadata.permissions().mode() & 0o111 != 0
}
#[cfg(not(unix))]
fn is_executable(metadata: &std::fs::Metadata) -> bool {
metadata.is_file()
}
/// Truncate the middle of a UTF-8 string to at most `max_bytes` bytes,
/// preserving the beginning and the end. Returns the possibly truncated
/// string and `Some(original_token_count)` (estimated at 4 bytes/token)
@@ -437,6 +540,8 @@ mod error {
MissingCommandLine,
#[error("invalid command line: {command_line}")]
InvalidCommandLine { command_line: String },
#[error("command not found: {command}")]
CommandNotFound { command: String },
}
impl InteractiveShellError {
@@ -486,7 +591,7 @@ mod tests {
let open_shell = manager
.handle_request(InteractiveShellRequest {
session_id: None,
input: "/bin/bash -i",
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
timeout_ms: Some(1_500),
})
.await?;
@@ -495,7 +600,7 @@ mod tests {
manager
.handle_request(InteractiveShellRequest {
session_id: Some(session_id),
input: "export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
input_chunks: &["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
timeout_ms: Some(1_500),
})
.await?;
@@ -503,7 +608,7 @@ mod tests {
let out_2 = manager
.handle_request(InteractiveShellRequest {
session_id: Some(session_id),
input: "echo $CODEX_INTERACTIVE_SHELL_VAR\n",
input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(1_500),
})
.await?;
@@ -523,7 +628,7 @@ mod tests {
let shell_a = manager
.handle_request(InteractiveShellRequest {
session_id: None,
input: "/bin/bash -i",
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
timeout_ms: Some(1_500),
})
.await?;
@@ -532,7 +637,7 @@ mod tests {
manager
.handle_request(InteractiveShellRequest {
session_id: Some(session_a),
input: "export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
input_chunks: &["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
timeout_ms: Some(1_500),
})
.await?;
@@ -540,7 +645,7 @@ mod tests {
let out_2 = manager
.handle_request(InteractiveShellRequest {
session_id: None,
input: "/bin/echo $CODEX_INTERACTIVE_SHELL_VAR\n",
input_chunks: &["/bin/echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(1_500),
})
.await?;
@@ -549,7 +654,7 @@ mod tests {
let out_3 = manager
.handle_request(InteractiveShellRequest {
session_id: Some(session_a),
input: "echo $CODEX_INTERACTIVE_SHELL_VAR\n",
input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(1_500),
})
.await?;
@@ -558,6 +663,8 @@ mod tests {
Ok(())
}
// todo add a test for the path
#[cfg(unix)]
/// Confirms that output emitted after an initial request times out can be
/// collected by a follow-up request against the same session.
@@ -568,7 +675,7 @@ mod tests {
let open_shell = manager
.handle_request(InteractiveShellRequest {
session_id: None,
input: "/bin/bash -i",
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
timeout_ms: Some(1_500),
})
.await?;
@@ -577,7 +684,7 @@ mod tests {
manager
.handle_request(InteractiveShellRequest {
session_id: Some(session_id),
input: "export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
input_chunks: &["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
timeout_ms: Some(1_500),
})
.await?;
@@ -585,7 +692,7 @@ mod tests {
let out_2 = manager
.handle_request(InteractiveShellRequest {
session_id: Some(session_id),
input: "sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n",
input_chunks: &["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(10),
})
.await?;
@@ -594,10 +701,11 @@ mod tests {
// Wait for the end of the bash sleep (preventing the usage of tokio controlled clock).
tokio::time::sleep(Duration::from_secs(7)).await;
let empty = Vec::new();
let out_3 = manager
.handle_request(InteractiveShellRequest {
session_id: Some(session_id),
input: "",
input_chunks: &empty,
timeout_ms: Some(100),
})
.await?;
@@ -609,13 +717,16 @@ mod tests {
#[cfg(unix)]
#[tokio::test]
/// Ensure that commands which immediately complete do not create persistent
/// interactive shell sessions, preventing leftover empty sessions from
/// accumulating.
async fn completed_commands_do_not_persist_sessions() -> Result<(), error::InteractiveShellError>
{
let manager = InteractiveShellSessionManager::default();
let result = manager
.handle_request(InteractiveShellRequest {
session_id: None,
input: "/bin/echo codex",
input_chunks: &["/bin/echo".to_string(), "codex".to_string()],
timeout_ms: Some(1_500),
})
.await?;
@@ -630,12 +741,13 @@ mod tests {
#[test]
fn truncate_middle_no_newlines_fallback() {
// todo double check the truncation logic
// Long string without newlines forces a pure byte/char-boundary truncation.
let s = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let max_bytes = 16; // force truncation
let (out, original) = truncate_middle(s, max_bytes);
// For very small caps, we return a full, untruncated marker that may exceed the cap.
assert_eq!(out, "…16 tokens truncated…");
assert_eq!(out, "…16 tokens truncated…"); // todo rename the token word
// Original is ceil(62/4) = 16 tokens.
assert_eq!(original, Some(16));
}

View File

@@ -204,11 +204,12 @@ fn create_unified_exec_tool() -> OpenAiTool {
let mut properties = BTreeMap::new();
properties.insert(
"input".to_string(),
JsonSchema::String {
JsonSchema::Array {
items: Box::new(JsonSchema::String { description: None }),
description: Some(
"When no session_id is provided, the entire input is treated as the command \
to launch. When a session_id is present, this string is written to the \
session's stdin."
"When no session_id is provided, treat the array as the command and arguments \
to launch. When session_id is set, concatenate the strings (in order) and write \
them to the session's stdin."
.to_string(),
),
},
@@ -625,22 +626,19 @@ pub(crate) fn get_openai_tools(
if config.include_view_image_tool {
tools.push(create_view_image_tool());
}
//
// if let Some(mcp_tools) = mcp_tools {
// // Ensure deterministic ordering to maximize prompt cache hits.
// // HashMap iteration order is non-deterministic, so sort by fully-qualified tool name.
// let mut entries: Vec<(String, mcp_types::Tool)> = mcp_tools.into_iter().collect();
// entries.sort_by(|a, b| a.0.cmp(&b.0));
//
// for (name, tool) in entries.into_iter() {
// match mcp_tool_to_openai_tool(name.clone(), tool.clone()) {
// Ok(converted_tool) => tools.push(OpenAiTool::Function(converted_tool)),
// Err(e) => {
// tracing::error!("Failed to convert {name:?} MCP tool to OpenAI tool: {e:?}");
// }
// }
// }
// }
if let Some(mcp_tools) = mcp_tools {
let mut entries: Vec<(String, mcp_types::Tool)> = mcp_tools.into_iter().collect();
entries.sort_by(|a, b| a.0.cmp(&b.0));
for (name, tool) in entries.into_iter() {
match mcp_tool_to_openai_tool(name.clone(), tool.clone()) {
Ok(converted_tool) => tools.push(OpenAiTool::Function(converted_tool)),
Err(e) => {
tracing::error!("Failed to convert {name:?} MCP tool to OpenAI tool: {e:?}");
}
}
}
}
tools
}

View File

@@ -122,7 +122,7 @@ pub enum ResponseItem {
session_id: Option<String>,
/// Characters that should be written to the interactive shell's
/// standard input.
arguments: String,
arguments: Vec<String>,
/// Maximum amount of time to wait for additional output after writing
/// to stdin.
timeout_ms: Option<u64>,