This commit is contained in:
jimmyfraiture
2025-09-07 17:20:34 -07:00
parent 16236b699e
commit e1809a8f13
6 changed files with 333 additions and 338 deletions

View File

@@ -63,7 +63,6 @@ use crate::exec_command::ExecSessionManager;
use crate::exec_command::WRITE_STDIN_TOOL_NAME;
use crate::exec_command::WriteStdinParams;
use crate::exec_env::create_env;
use crate::ishell::InteractiveShellSessionManager;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_tool_call::handle_mcp_tool_call;
use crate::model_family::find_family_for_model;
@@ -109,6 +108,7 @@ use crate::safety::assess_command_safety;
use crate::safety::assess_safety_for_untrusted_command;
use crate::shell;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::unified_exec::UnifiedExecSessionManager;
use crate::user_instructions::UserInstructions;
use crate::user_notification::UserNotification;
use crate::util::backoff;
@@ -275,7 +275,7 @@ pub(crate) struct Session {
/// Manager for external MCP servers/tools.
mcp_connection_manager: McpConnectionManager,
session_manager: ExecSessionManager,
ishell_manager: InteractiveShellSessionManager,
unified_exec_manager: UnifiedExecSessionManager,
/// External notifier command (will be passed as args to exec()). When
/// `None` this feature is disabled.
@@ -466,7 +466,7 @@ impl Session {
tx_event: tx_event.clone(),
mcp_connection_manager,
session_manager: ExecSessionManager::default(),
ishell_manager: InteractiveShellSessionManager::default(),
unified_exec_manager: UnifiedExecSessionManager::default(),
notify,
state: Mutex::new(state),
rollout: Mutex::new(Some(rollout_recorder)),
@@ -1997,10 +1997,16 @@ async fn handle_response_item(
let call_id = id
.clone()
.or_else(|| session_id.clone())
.unwrap_or_else(|| format!("ishell:{}", Uuid::new_v4()));
.unwrap_or_else(|| format!("unified_exec:{}", Uuid::new_v4()));
Some(
handle_ishell_tool_call(sess, call_id, session_id.clone(), arguments, timeout_ms)
.await,
handle_unified_exec_tool_call(
sess,
call_id,
session_id.clone(),
arguments,
timeout_ms,
)
.await,
)
}
ResponseItem::CustomToolCall {
@@ -2047,47 +2053,47 @@ async fn handle_response_item(
Ok(output)
}
async fn handle_ishell_tool_call(
async fn handle_unified_exec_tool_call(
sess: &Session,
call_id: String,
session_id: Option<String>,
arguments: Vec<String>,
timeout_ms: Option<u64>,
) -> ResponseInputItem {
let parsed_session_id = if let Some(session_id) = session_id {
match session_id.parse::<i32>() {
Ok(parsed) => Some(parsed),
Err(output) => return ResponseInputItem::FunctionCallOutput {
call_id: call_id.to_string(),
output: FunctionCallOutputPayload {
content: format!("invalid session_id: {session_id} due to error {output}"),
success: Some(false),
},
Err(output) => {
return ResponseInputItem::FunctionCallOutput {
call_id: call_id.to_string(),
output: FunctionCallOutputPayload {
content: format!("invalid session_id: {session_id} due to error {output}"),
success: Some(false),
},
};
}
}
} else {
None
};
let request = crate::ishell::InteractiveShellRequest {
let request = crate::unified_exec::UnifiedExecRequest {
session_id: parsed_session_id,
input_chunks: &arguments,
timeout_ms,
};
let result = sess.ishell_manager.handle_request(request).await;
let result = sess.unified_exec_manager.handle_request(request).await;
let output_payload = match result {
Ok(value) => {
#[derive(Serialize)]
struct SerializedIshellResult<'a> {
struct SerializedUnifiedExecResult<'a> {
session_id: Option<i32>,
output: &'a str,
}
match serde_json::to_string(&SerializedIshellResult {
match serde_json::to_string(&SerializedUnifiedExecResult {
session_id: value.session_id,
output: &value.output,
}) {
@@ -2096,13 +2102,13 @@ async fn handle_ishell_tool_call(
success: Some(true),
},
Err(err) => FunctionCallOutputPayload {
content: format!("failed to serialize interactive shell output: {err}"),
content: format!("failed to serialize unified exec output: {err}"),
success: Some(false),
},
}
}
Err(err) => FunctionCallOutputPayload {
content: err.to_string(),
content: format!("unified exec failed: {err}"),
success: Some(false),
},
};
@@ -2163,8 +2169,14 @@ async fn handle_function_call(
}
};
handle_ishell_tool_call(sess, call_id, args.session_id, args.input, args.timeout_ms)
.await
handle_unified_exec_tool_call(
sess,
call_id,
args.session_id,
args.input,
args.timeout_ms,
)
.await
}
"view_image" => {
#[derive(serde::Deserialize)]

View File

@@ -28,13 +28,13 @@ pub mod exec_env;
mod flags;
pub mod git_info;
mod is_safe_command;
mod ishell;
pub mod landlock;
mod mcp_connection_manager;
mod mcp_tool_call;
mod message_history;
mod model_provider_info;
pub mod parse_command;
mod unified_exec;
mod user_instructions;
pub use model_provider_info::BUILT_IN_OSS_MODEL_PROVIDER_ID;
pub use model_provider_info::ModelProviderInfo;

View File

@@ -0,0 +1,26 @@
use thiserror::Error;
#[derive(Debug, Error)]
pub(crate) enum UnifiedExecError {
#[error("Failed to create unified exec session: {pty_error}")]
CreateSession {
#[source]
pty_error: anyhow::Error,
},
#[error("Unknown session id {session_id}")]
UnknownSessionId { session_id: i32 },
#[error("failed to write to stdin")]
WriteToStdin,
#[error("missing command line for unified exec request")]
MissingCommandLine,
#[error("invalid command line: {command_line}")]
InvalidCommandLine { command_line: String },
#[error("command not found: {command}")]
CommandNotFound { command: String },
}
impl UnifiedExecError {
pub(crate) fn create_session(error: anyhow::Error) -> Self {
Self::CreateSession { pty_error: error }
}
}

View File

@@ -3,11 +3,8 @@ use portable_pty::PtySize;
use portable_pty::native_pty_system;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::env;
use std::io::ErrorKind;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
@@ -22,33 +19,41 @@ use tokio::time::Instant;
use crate::exec_command::ExecCommandSession;
mod errors;
mod path;
mod truncation;
pub(crate) use errors::UnifiedExecError;
use path::command_from_chunks;
use path::join_input_chunks;
use path::resolve_command_path;
use truncation::truncate_middle;
const DEFAULT_TIMEOUT_MS: u64 = 250;
// Cap on how many bytes of interactive shell output we retain per request.
// If exceeded, output is truncated in the middle, preserving the beginning and end
// with an elision marker to indicate how much was removed.
const ISHELL_OUTPUT_MAX_BYTES: usize = 16 * 1024; // 16 KiB
const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 16 * 1024; // 16 KiB
#[derive(Debug)]
pub(crate) struct InteractiveShellRequest<'a> {
pub(crate) struct UnifiedExecRequest<'a> {
pub session_id: Option<i32>,
pub input_chunks: &'a [String],
pub timeout_ms: Option<u64>,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct InteractiveShellResult {
pub(crate) struct UnifiedExecResult {
pub session_id: Option<i32>,
pub output: String,
}
#[derive(Debug, Default)]
pub(crate) struct InteractiveShellSessionManager {
pub(crate) struct UnifiedExecSessionManager {
next_session_id: AtomicI32,
sessions: Mutex<HashMap<i32, ManagedInteractiveSession>>,
sessions: Mutex<HashMap<i32, ManagedUnifiedExecSession>>,
}
#[derive(Debug)]
struct ManagedInteractiveSession {
struct ManagedUnifiedExecSession {
session: ExecCommandSession,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
@@ -58,7 +63,7 @@ struct ManagedInteractiveSession {
type OutputBuffer = Arc<Mutex<VecDeque<Vec<u8>>>>;
type OutputHandles = (OutputBuffer, Arc<Notify>);
impl ManagedInteractiveSession {
impl ManagedUnifiedExecSession {
fn new(session: ExecCommandSession) -> Self {
let output_buffer = Arc::new(Mutex::new(VecDeque::new()));
let output_notify = Arc::new(Notify::new());
@@ -98,22 +103,22 @@ impl ManagedInteractiveSession {
}
}
impl Drop for ManagedInteractiveSession {
impl Drop for ManagedUnifiedExecSession {
fn drop(&mut self) {
self.output_task.abort();
}
}
impl InteractiveShellSessionManager {
impl UnifiedExecSessionManager {
pub async fn handle_request(
&self,
request: InteractiveShellRequest<'_>,
) -> Result<InteractiveShellResult, error::InteractiveShellError> {
request: UnifiedExecRequest<'_>,
) -> Result<UnifiedExecResult, UnifiedExecError> {
tracing::error!("In the exec");
// todo update the errors
let timeout_ms = request.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS);
let mut new_session: Option<ManagedInteractiveSession> = None;
let mut new_session: Option<ManagedUnifiedExecSession> = None;
let session_id;
let writer_tx;
let output_buffer;
@@ -130,7 +135,7 @@ impl InteractiveShellSessionManager {
output_notify = notify;
}
None => {
return Err(error::InteractiveShellError::UnknownSessionId {
return Err(UnifiedExecError::UnknownSessionId {
session_id: existing_id,
});
}
@@ -138,8 +143,8 @@ impl InteractiveShellSessionManager {
} else {
let command = command_from_chunks(request.input_chunks)?;
let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
let session = create_shell_session(&command).await?;
let managed_session = ManagedInteractiveSession::new(session);
let session = create_unified_exec_session(&command).await?;
let managed_session = ManagedUnifiedExecSession::new(session);
let (buffer, notify) = managed_session.output_handles();
writer_tx = managed_session.writer_sender();
output_buffer = buffer;
@@ -152,7 +157,7 @@ impl InteractiveShellSessionManager {
let joined_input = join_input_chunks(request.input_chunks);
if !joined_input.is_empty() && writer_tx.send(joined_input.into_bytes()).await.is_err()
{
return Err(error::InteractiveShellError::WriteToStdin);
return Err(UnifiedExecError::WriteToStdin);
}
}
@@ -193,7 +198,7 @@ impl InteractiveShellSessionManager {
let (output, _maybe_tokens) = truncate_middle(
&String::from_utf8_lossy(&collected),
ISHELL_OUTPUT_MAX_BYTES,
UNIFIED_EXEC_OUTPUT_MAX_BYTES,
);
let should_store_session = if let Some(session) = new_session.as_ref() {
@@ -206,12 +211,12 @@ impl InteractiveShellSessionManager {
if let Some(session) = new_session {
self.sessions.lock().await.insert(session_id, session);
}
Ok(InteractiveShellResult {
Ok(UnifiedExecResult {
session_id: Some(session_id),
output,
})
} else {
Ok(InteractiveShellResult {
Ok(UnifiedExecResult {
session_id: None,
output,
})
@@ -219,25 +224,11 @@ impl InteractiveShellSessionManager {
}
}
pub(crate) fn parse_command_line(line: &str) -> Result<Vec<String>, error::InteractiveShellError> {
let trimmed = line.trim();
if trimmed.is_empty() {
return Err(error::InteractiveShellError::MissingCommandLine);
}
match shlex::split(trimmed) {
Some(parts) if !parts.is_empty() => Ok(parts),
_ => Err(error::InteractiveShellError::InvalidCommandLine {
command_line: trimmed.to_string(),
}),
}
}
async fn create_shell_session(
async fn create_unified_exec_session(
command: &[String],
) -> Result<ExecCommandSession, error::InteractiveShellError> {
) -> Result<ExecCommandSession, UnifiedExecError> {
if command.is_empty() {
return Err(error::InteractiveShellError::MissingCommandLine);
return Err(UnifiedExecError::MissingCommandLine);
}
let pty_system = native_pty_system();
@@ -249,7 +240,7 @@ async fn create_shell_session(
pixel_width: 0,
pixel_height: 0,
})
.map_err(error::InteractiveShellError::create_session)?;
.map_err(UnifiedExecError::create_session)?;
let resolved_command = resolve_command_path(&command[0])?;
let mut command_builder = CommandBuilder::new(&resolved_command);
@@ -260,7 +251,7 @@ async fn create_shell_session(
let mut child = pair
.slave
.spawn_command(command_builder)
.map_err(error::InteractiveShellError::create_session)?;
.map_err(UnifiedExecError::create_session)?;
let killer = child.clone_killer();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
@@ -269,7 +260,7 @@ async fn create_shell_session(
let mut reader = pair
.master
.try_clone_reader()
.map_err(error::InteractiveShellError::create_session)?;
.map_err(UnifiedExecError::create_session)?;
let output_tx_clone = output_tx.clone();
let reader_handle = tokio::task::spawn_blocking(move || {
let mut buf = [0u8; 8192];
@@ -292,7 +283,7 @@ async fn create_shell_session(
let writer = pair
.master
.take_writer()
.map_err(error::InteractiveShellError::create_session)?;
.map_err(UnifiedExecError::create_session)?;
let writer = Arc::new(StdMutex::new(writer));
let writer_handle = tokio::spawn({
let writer = writer.clone();
@@ -329,231 +320,10 @@ async fn create_shell_session(
))
}
fn resolve_command_path(command: &str) -> Result<String, error::InteractiveShellError> {
if command.is_empty() {
return Err(error::InteractiveShellError::MissingCommandLine);
}
if is_explicit_path(command) {
return ensure_executable(Path::new(command))
.then_some(command.to_string())
.ok_or_else(|| error::InteractiveShellError::CommandNotFound {
command: command.to_string(),
});
}
if let Some(resolved) = find_in_path(command) {
return Ok(resolved.to_string_lossy().to_string());
}
Err(error::InteractiveShellError::CommandNotFound {
command: command.to_string(),
})
}
fn command_from_chunks(chunks: &[String]) -> Result<Vec<String>, error::InteractiveShellError> {
match chunks {
[] => Err(error::InteractiveShellError::MissingCommandLine),
[single] => parse_command_line(single),
_ => Ok(chunks.to_vec()),
}
}
fn join_input_chunks(chunks: &[String]) -> String {
match chunks {
[] => String::new(),
[single] => single.clone(),
_ => chunks.concat(),
}
}
fn is_explicit_path(command: &str) -> bool {
let path = Path::new(command);
path.is_absolute() || path.components().count() > 1
}
fn find_in_path(command: &str) -> Option<PathBuf> {
let path_var = env::var_os("PATH")?;
env::split_paths(&path_var)
.flat_map(|dir| candidate_paths(dir, command))
.find(|candidate| ensure_executable(candidate))
}
fn candidate_paths(dir: PathBuf, command: &str) -> Vec<PathBuf> {
build_platform_candidates(dir.join(command))
}
#[cfg(unix)]
fn build_platform_candidates(candidate: PathBuf) -> Vec<PathBuf> {
vec![candidate]
}
#[cfg(windows)]
fn build_platform_candidates(candidate: PathBuf) -> Vec<PathBuf> {
if candidate.extension().is_some() {
return vec![candidate];
}
let pathext = env::var("PATHEXT").unwrap_or_else(|_| ".COM;.EXE;.BAT;.CMD".to_string());
let mut candidates = Vec::new();
for ext in pathext.split(';') {
if ext.is_empty() {
continue;
}
let mut path_with_ext = candidate.clone();
let new_ext = ext.trim_start_matches('.');
path_with_ext.set_extension(new_ext);
candidates.push(path_with_ext);
}
if candidates.is_empty() {
candidates.push(candidate);
}
candidates
}
fn ensure_executable(path: &Path) -> bool {
match path.metadata() {
Ok(metadata) => metadata.is_file() && is_executable(&metadata),
Err(_) => false,
}
}
#[cfg(unix)]
fn is_executable(metadata: &std::fs::Metadata) -> bool {
use std::os::unix::fs::PermissionsExt;
metadata.permissions().mode() & 0o111 != 0
}
#[cfg(not(unix))]
fn is_executable(metadata: &std::fs::Metadata) -> bool {
metadata.is_file()
}
/// Truncate the middle of a UTF-8 string to at most `max_bytes` bytes,
/// preserving the beginning and the end. Returns the possibly truncated
/// string and `Some(original_token_count)` (estimated at 4 bytes/token)
/// if truncation occurred; otherwise returns the original string and `None`.
fn truncate_middle(s: &str, max_bytes: usize) -> (String, Option<u64>) {
if s.len() <= max_bytes {
return (s.to_string(), None);
}
let est_tokens = (s.len() as u64).div_ceil(4);
if max_bytes == 0 {
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
fn truncate_on_boundary(input: &str, max_len: usize) -> &str {
if input.len() <= max_len {
return input;
}
let mut end = max_len;
while end > 0 && !input.is_char_boundary(end) {
end -= 1;
}
&input[..end]
}
fn pick_prefix_end(s: &str, left_budget: usize) -> usize {
if let Some(head) = s.get(..left_budget)
&& let Some(i) = head.rfind('\n')
{
return i + 1;
}
truncate_on_boundary(s, left_budget).len()
}
fn pick_suffix_start(s: &str, right_budget: usize) -> usize {
let start_tail = s.len().saturating_sub(right_budget);
if let Some(tail) = s.get(start_tail..)
&& let Some(i) = tail.find('\n')
{
return start_tail + i + 1;
}
let mut idx = start_tail.min(s.len());
while idx < s.len() && !s.is_char_boundary(idx) {
idx += 1;
}
idx
}
let mut guess_tokens = est_tokens;
for _ in 0..4 {
let marker = format!("{guess_tokens} tokens truncated…");
let marker_len = marker.len();
let keep_budget = max_bytes.saturating_sub(marker_len);
if keep_budget == 0 {
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
let left_budget = keep_budget / 2;
let right_budget = keep_budget - left_budget;
let prefix_end = pick_prefix_end(s, left_budget);
let mut suffix_start = pick_suffix_start(s, right_budget);
if suffix_start < prefix_end {
suffix_start = prefix_end;
}
let kept_content_bytes = prefix_end + (s.len() - suffix_start);
let truncated_content_bytes = s.len().saturating_sub(kept_content_bytes);
let new_tokens = (truncated_content_bytes as u64).div_ceil(4);
if new_tokens == guess_tokens {
let mut out = String::with_capacity(marker_len + kept_content_bytes + 1);
out.push_str(&s[..prefix_end]);
out.push_str(&marker);
out.push('\n');
out.push_str(&s[suffix_start..]);
return (out, Some(est_tokens));
}
guess_tokens = new_tokens;
}
let marker = format!("{guess_tokens} tokens truncated…");
let marker_len = marker.len();
let keep_budget = max_bytes.saturating_sub(marker_len);
if keep_budget == 0 {
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
let left_budget = keep_budget / 2;
let right_budget = keep_budget - left_budget;
let prefix_end = pick_prefix_end(s, left_budget);
let suffix_start = pick_suffix_start(s, right_budget);
let mut out = String::with_capacity(marker_len + prefix_end + (s.len() - suffix_start) + 1);
out.push_str(&s[..prefix_end]);
out.push_str(&marker);
out.push('\n');
out.push_str(&s[suffix_start..]);
(out, Some(est_tokens))
}
mod error {
#[derive(Debug, thiserror::Error)]
pub(crate) enum InteractiveShellError {
#[error("Failed to create interactive shell: {pty_error}")]
CreateSession {
#[source]
pty_error: anyhow::Error,
},
#[error("Unknown session id {session_id}")]
UnknownSessionId { session_id: i32 },
#[error("failed to write to stdin")]
WriteToStdin,
#[error("missing command line for interactive shell request")]
MissingCommandLine,
#[error("invalid command line: {command_line}")]
InvalidCommandLine { command_line: String },
#[error("command not found: {command}")]
CommandNotFound { command: String },
}
impl InteractiveShellError {
pub(crate) fn create_session(error: anyhow::Error) -> Self {
Self::CreateSession { pty_error: error }
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::path::parse_command_line;
#[test]
fn parse_command_line_splits_words() {
@@ -574,22 +344,16 @@ mod tests {
#[test]
fn parse_command_line_rejects_empty() {
let err = parse_command_line(" ").expect_err("expected error");
assert!(matches!(
err,
error::InteractiveShellError::MissingCommandLine
));
assert!(matches!(err, UnifiedExecError::MissingCommandLine));
}
#[cfg(unix)]
/// Ensures that environment state persists when reusing the same
/// interactive shell session across multiple requests.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interactive_shell_persists_across_requests() -> Result<(), error::InteractiveShellError>
{
let manager = InteractiveShellSessionManager::default();
async fn unified_exec_persists_across_requests() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let open_shell = manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
timeout_ms: Some(1_500),
@@ -598,7 +362,7 @@ mod tests {
let session_id = open_shell.session_id.expect("expected session_id");
manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
timeout_ms: Some(1_500),
@@ -606,7 +370,7 @@ mod tests {
.await?;
let out_2 = manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(1_500),
@@ -619,14 +383,12 @@ mod tests {
}
#[cfg(unix)]
/// Verifies independent shell sessions maintain separate state while
/// previously created sessions continue to function.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multi_interactive_shells() -> Result<(), error::InteractiveShellError> {
let manager = InteractiveShellSessionManager::default();
async fn multi_unified_exec_sessions() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let shell_a = manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
timeout_ms: Some(1_500),
@@ -635,7 +397,7 @@ mod tests {
let session_a = shell_a.session_id.expect("expected session id");
manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: Some(session_a),
input_chunks: &["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
timeout_ms: Some(1_500),
@@ -643,7 +405,7 @@ mod tests {
.await?;
let out_2 = manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["/bin/echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(1_500),
@@ -652,7 +414,7 @@ mod tests {
assert!(!out_2.output.contains("codex"));
let out_3 = manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: Some(session_a),
input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(1_500),
@@ -663,17 +425,13 @@ mod tests {
Ok(())
}
// todo add a test for the path
#[cfg(unix)]
/// Confirms that output emitted after an initial request times out can be
/// collected by a follow-up request against the same session.
#[tokio::test]
async fn interactive_shell_timeouts() -> Result<(), error::InteractiveShellError> {
let manager = InteractiveShellSessionManager::default();
async fn unified_exec_timeouts() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let open_shell = manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
timeout_ms: Some(1_500),
@@ -682,7 +440,7 @@ mod tests {
let session_id = open_shell.session_id.expect("expected session id");
manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
timeout_ms: Some(1_500),
@@ -690,7 +448,7 @@ mod tests {
.await?;
let out_2 = manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(10),
@@ -698,12 +456,11 @@ mod tests {
.await?;
assert!(!out_2.output.contains("codex"));
// Wait for the end of the bash sleep (preventing the usage of tokio controlled clock).
tokio::time::sleep(Duration::from_secs(7)).await;
let empty = Vec::new();
let out_3 = manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &empty,
timeout_ms: Some(100),
@@ -717,14 +474,10 @@ mod tests {
#[cfg(unix)]
#[tokio::test]
/// Ensure that commands which immediately complete do not create persistent
/// interactive shell sessions, preventing leftover empty sessions from
/// accumulating.
async fn completed_commands_do_not_persist_sessions() -> Result<(), error::InteractiveShellError>
{
let manager = InteractiveShellSessionManager::default();
async fn completed_commands_do_not_persist_sessions() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let result = manager
.handle_request(InteractiveShellRequest {
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["/bin/echo".to_string(), "codex".to_string()],
timeout_ms: Some(1_500),
@@ -741,27 +494,22 @@ mod tests {
#[test]
fn truncate_middle_no_newlines_fallback() {
// todo double check the truncation logic
// Long string without newlines forces a pure byte/char-boundary truncation.
let s = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let max_bytes = 16; // force truncation
let max_bytes = 16;
let (out, original) = truncate_middle(s, max_bytes);
// For very small caps, we return a full, untruncated marker that may exceed the cap.
assert_eq!(out, "…16 tokens truncated…"); // todo rename the token word
// Original is ceil(62/4) = 16 tokens.
assert_eq!(out, "…16 tokens truncated…");
assert_eq!(original, Some(16));
}
#[test]
fn truncate_middle_prefers_newline_boundaries() {
// Build a multi-line string of 20 numbered lines (each "NNN\n").
let mut s = String::new();
for i in 1..=20 {
s.push_str(&format!("{i:03}\n"));
}
assert_eq!(s.len(), 80);
let max_bytes = 64; // force truncation while leaving room for head/tail
let max_bytes = 64;
assert_eq!(
truncate_middle(&s, max_bytes),
(

View File

@@ -0,0 +1,119 @@
use std::env;
use std::path::Path;
use std::path::PathBuf;
use super::errors::UnifiedExecError;
pub(crate) fn parse_command_line(line: &str) -> Result<Vec<String>, UnifiedExecError> {
let trimmed = line.trim();
if trimmed.is_empty() {
return Err(UnifiedExecError::MissingCommandLine);
}
match shlex::split(trimmed) {
Some(parts) if !parts.is_empty() => Ok(parts),
_ => Err(UnifiedExecError::InvalidCommandLine {
command_line: trimmed.to_string(),
}),
}
}
pub(crate) fn command_from_chunks(chunks: &[String]) -> Result<Vec<String>, UnifiedExecError> {
match chunks {
[] => Err(UnifiedExecError::MissingCommandLine),
[single] => parse_command_line(single),
_ => Ok(chunks.to_vec()),
}
}
pub(crate) fn join_input_chunks(chunks: &[String]) -> String {
match chunks {
[] => String::new(),
[single] => single.clone(),
_ => chunks.concat(),
}
}
pub(crate) fn resolve_command_path(command: &str) -> Result<String, UnifiedExecError> {
if command.is_empty() {
return Err(UnifiedExecError::MissingCommandLine);
}
if is_explicit_path(command) {
return ensure_executable(Path::new(command))
.then_some(command.to_string())
.ok_or_else(|| UnifiedExecError::CommandNotFound {
command: command.to_string(),
});
}
if let Some(resolved) = find_in_path(command) {
return Ok(resolved.to_string_lossy().to_string());
}
Err(UnifiedExecError::CommandNotFound {
command: command.to_string(),
})
}
fn is_explicit_path(command: &str) -> bool {
let path = Path::new(command);
path.is_absolute() || path.components().count() > 1
}
fn find_in_path(command: &str) -> Option<PathBuf> {
let path_var = env::var_os("PATH")?;
env::split_paths(&path_var)
.flat_map(|dir| candidate_paths(dir, command))
.find(|candidate| ensure_executable(candidate))
}
fn candidate_paths(dir: PathBuf, command: &str) -> Vec<PathBuf> {
build_platform_candidates(dir.join(command))
}
#[cfg(unix)]
fn build_platform_candidates(candidate: PathBuf) -> Vec<PathBuf> {
vec![candidate]
}
#[cfg(windows)]
fn build_platform_candidates(candidate: PathBuf) -> Vec<PathBuf> {
if candidate.extension().is_some() {
return vec![candidate];
}
let pathext = env::var("PATHEXT").unwrap_or_else(|_| ".COM;.EXE;.BAT;.CMD".to_string());
let mut candidates = Vec::new();
for ext in pathext.split(';') {
if ext.is_empty() {
continue;
}
let mut path_with_ext = candidate.clone();
let new_ext = ext.trim_start_matches('.');
path_with_ext.set_extension(new_ext);
candidates.push(path_with_ext);
}
if candidates.is_empty() {
candidates.push(candidate);
}
candidates
}
fn ensure_executable(path: &Path) -> bool {
match path.metadata() {
Ok(metadata) => metadata.is_file() && is_executable(&metadata),
Err(_) => false,
}
}
#[cfg(unix)]
fn is_executable(metadata: &std::fs::Metadata) -> bool {
use std::os::unix::fs::PermissionsExt;
metadata.permissions().mode() & 0o111 != 0
}
#[cfg(not(unix))]
fn is_executable(metadata: &std::fs::Metadata) -> bool {
metadata.is_file()
}

View File

@@ -0,0 +1,90 @@
pub(crate) fn truncate_middle(s: &str, max_bytes: usize) -> (String, Option<u64>) {
if s.len() <= max_bytes {
return (s.to_string(), None);
}
let est_tokens = (s.len() as u64).div_ceil(4);
if max_bytes == 0 {
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
fn truncate_on_boundary(input: &str, max_len: usize) -> &str {
if input.len() <= max_len {
return input;
}
let mut end = max_len;
while end > 0 && !input.is_char_boundary(end) {
end -= 1;
}
&input[..end]
}
fn pick_prefix_end(s: &str, left_budget: usize) -> usize {
if let Some(head) = s.get(..left_budget)
&& let Some(i) = head.rfind('\n')
{
return i + 1;
}
truncate_on_boundary(s, left_budget).len()
}
fn pick_suffix_start(s: &str, right_budget: usize) -> usize {
let start_tail = s.len().saturating_sub(right_budget);
if let Some(tail) = s.get(start_tail..)
&& let Some(i) = tail.find('\n')
{
return start_tail + i + 1;
}
let mut idx = start_tail.min(s.len());
while idx < s.len() && !s.is_char_boundary(idx) {
idx += 1;
}
idx
}
let mut guess_tokens = est_tokens;
for _ in 0..4 {
let marker = format!("{guess_tokens} tokens truncated…");
let marker_len = marker.len();
let keep_budget = max_bytes.saturating_sub(marker_len);
if keep_budget == 0 {
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
let left_budget = keep_budget / 2;
let right_budget = keep_budget - left_budget;
let prefix_end = pick_prefix_end(s, left_budget);
let mut suffix_start = pick_suffix_start(s, right_budget);
if suffix_start < prefix_end {
suffix_start = prefix_end;
}
let kept_content_bytes = prefix_end + (s.len() - suffix_start);
let truncated_content_bytes = s.len().saturating_sub(kept_content_bytes);
let new_tokens = (truncated_content_bytes as u64).div_ceil(4);
if new_tokens == guess_tokens {
let mut out = String::with_capacity(marker_len + kept_content_bytes + 1);
out.push_str(&s[..prefix_end]);
out.push_str(&marker);
out.push('\n');
out.push_str(&s[suffix_start..]);
return (out, Some(est_tokens));
}
guess_tokens = new_tokens;
}
let marker = format!("{guess_tokens} tokens truncated…");
let marker_len = marker.len();
let keep_budget = max_bytes.saturating_sub(marker_len);
if keep_budget == 0 {
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
let left_budget = keep_budget / 2;
let right_budget = keep_budget - left_budget;
let prefix_end = pick_prefix_end(s, left_budget);
let suffix_start = pick_suffix_start(s, right_budget);
let mut out = String::with_capacity(marker_len + prefix_end + (s.len() - suffix_start) + 1);
out.push_str(&s[..prefix_end]);
out.push_str(&marker);
out.push('\n');
out.push_str(&s[suffix_start..]);
(out, Some(est_tokens))
}