diff --git a/codex-rs/core/src/exec_command/exec_command_session.rs b/codex-rs/core/src/exec_command/exec_command_session.rs index 31b3c929b2..4f91576527 100644 --- a/codex-rs/core/src/exec_command/exec_command_session.rs +++ b/codex-rs/core/src/exec_command/exec_command_session.rs @@ -27,18 +27,35 @@ pub(crate) struct ExecCommandSession { /// Tracks whether the underlying process has exited. exit_status: std::sync::Arc, + + /// Captures the process exit code once the child terminates. + exit_code: std::sync::Arc>>, +} + +#[derive(Debug)] +pub(crate) struct ExecCommandSessionParams { + pub writer_tx: mpsc::Sender>, + pub output_tx: broadcast::Sender>, + pub killer: Box, + pub reader_handle: JoinHandle<()>, + pub writer_handle: JoinHandle<()>, + pub wait_handle: JoinHandle<()>, + pub exit_status: std::sync::Arc, + pub exit_code: std::sync::Arc>>, } impl ExecCommandSession { - pub(crate) fn new( - writer_tx: mpsc::Sender>, - output_tx: broadcast::Sender>, - killer: Box, - reader_handle: JoinHandle<()>, - writer_handle: JoinHandle<()>, - wait_handle: JoinHandle<()>, - exit_status: std::sync::Arc, - ) -> (Self, broadcast::Receiver>) { + pub(crate) fn new(params: ExecCommandSessionParams) -> (Self, broadcast::Receiver>) { + let ExecCommandSessionParams { + writer_tx, + output_tx, + killer, + reader_handle, + writer_handle, + wait_handle, + exit_status, + exit_code, + } = params; let initial_output_rx = output_tx.subscribe(); ( Self { @@ -49,6 +66,7 @@ impl ExecCommandSession { writer_handle: StdMutex::new(Some(writer_handle)), wait_handle: StdMutex::new(Some(wait_handle)), exit_status, + exit_code, }, initial_output_rx, ) @@ -65,6 +83,10 @@ impl ExecCommandSession { pub(crate) fn has_exited(&self) -> bool { self.exit_status.load(std::sync::atomic::Ordering::SeqCst) } + + pub(crate) fn exit_code(&self) -> Option { + self.exit_code.lock().ok().and_then(|guard| *guard) + } } impl Drop for ExecCommandSession { diff --git a/codex-rs/core/src/exec_command/mod.rs b/codex-rs/core/src/exec_command/mod.rs index 2cfd022510..46ca33eb2c 100644 --- a/codex-rs/core/src/exec_command/mod.rs +++ b/codex-rs/core/src/exec_command/mod.rs @@ -7,6 +7,7 @@ mod session_manager; pub use exec_command_params::ExecCommandParams; pub use exec_command_params::WriteStdinParams; pub(crate) use exec_command_session::ExecCommandSession; +pub(crate) use exec_command_session::ExecCommandSessionParams; pub use responses_api::EXEC_COMMAND_TOOL_NAME; pub use responses_api::WRITE_STDIN_TOOL_NAME; pub use responses_api::create_exec_command_tool_for_responses_api; diff --git a/codex-rs/core/src/exec_command/session_manager.rs b/codex-rs/core/src/exec_command/session_manager.rs index cd1c5329a2..52f909a15d 100644 --- a/codex-rs/core/src/exec_command/session_manager.rs +++ b/codex-rs/core/src/exec_command/session_manager.rs @@ -19,6 +19,7 @@ use tokio::time::timeout; use crate::exec_command::exec_command_params::ExecCommandParams; use crate::exec_command::exec_command_params::WriteStdinParams; use crate::exec_command::exec_command_session::ExecCommandSession; +use crate::exec_command::exec_command_session::ExecCommandSessionParams; use crate::exec_command::session_id::SessionId; use crate::truncate::truncate_middle; @@ -318,17 +319,22 @@ async fn create_exec_command_session( let (exit_tx, exit_rx) = oneshot::channel::(); let exit_status = Arc::new(AtomicBool::new(false)); let wait_exit_status = exit_status.clone(); + let exit_code = Arc::new(StdMutex::new(None)); + let exit_code_for_wait = exit_code.clone(); let wait_handle = tokio::task::spawn_blocking(move || { let code = match child.wait() { Ok(status) => status.exit_code() as i32, Err(_) => -1, }; wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst); + if let Ok(mut guard) = exit_code_for_wait.lock() { + *guard = Some(code); + } let _ = exit_tx.send(code); }); // Create and store the session with channels. - let (session, initial_output_rx) = ExecCommandSession::new( + let (session, initial_output_rx) = ExecCommandSession::new(ExecCommandSessionParams { writer_tx, output_tx, killer, @@ -336,7 +342,8 @@ async fn create_exec_command_session( writer_handle, wait_handle, exit_status, - ); + exit_code, + }); Ok((session, initial_output_rx, exit_rx)) } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 8ddf8bff9c..e4bdf1d882 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -42,6 +42,11 @@ pub mod token_data; mod truncate; mod unified_exec; mod user_instructions; +pub use crate::unified_exec::UnifiedExecError; +pub use crate::unified_exec::UnifiedExecMode; +pub use crate::unified_exec::UnifiedExecRequest; +pub use crate::unified_exec::UnifiedExecResult; +pub use crate::unified_exec::UnifiedExecSessionManager; pub use model_provider_info::BUILT_IN_OSS_MODEL_PROVIDER_ID; pub use model_provider_info::ModelProviderInfo; pub use model_provider_info::WireApi; diff --git a/codex-rs/core/src/tools/handlers/unified_exec.rs b/codex-rs/core/src/tools/handlers/unified_exec.rs index ce47dded3c..c6be77a9c5 100644 --- a/codex-rs/core/src/tools/handlers/unified_exec.rs +++ b/codex-rs/core/src/tools/handlers/unified_exec.rs @@ -1,5 +1,8 @@ use async_trait::async_trait; use serde::Deserialize; +use serde_json::Value as JsonValue; +use std::borrow::Cow; +use std::convert::TryFrom; use crate::function_tool::FunctionCallError; use crate::tools::context::ToolInvocation; @@ -7,17 +10,37 @@ use crate::tools::context::ToolOutput; use crate::tools::context::ToolPayload; use crate::tools::registry::ToolHandler; use crate::tools::registry::ToolKind; +use crate::unified_exec::UnifiedExecError; +use crate::unified_exec::UnifiedExecMode; use crate::unified_exec::UnifiedExecRequest; +use crate::unified_exec::UnifiedExecResult; pub struct UnifiedExecHandler; #[derive(Deserialize)] struct UnifiedExecArgs { - input: Vec, #[serde(default)] - session_id: Option, + cmd: Option, #[serde(default)] - timeout_ms: Option, + session_id: Option, + #[serde(default)] + chars: Option, + #[serde(default)] + yield_time_ms: Option, + #[serde(default)] + max_output_tokens: Option, + #[serde(default)] + output_chunk_id: Option, + #[serde(default)] + output_wall_time: Option, + #[serde(default)] + output_json: Option, + #[serde(default)] + shell: Option, + #[serde(default)] + login: Option, + #[serde(default)] + cwd: Option, } #[async_trait] @@ -54,56 +77,114 @@ impl ToolHandler for UnifiedExecHandler { }; let UnifiedExecArgs { - input, + cmd, session_id, - timeout_ms, + chars, + yield_time_ms, + max_output_tokens, + output_chunk_id, + output_wall_time, + output_json, + shell, + login, + cwd, } = args; - let parsed_session_id = if let Some(session_id) = session_id { - match session_id.parse::() { - Ok(parsed) => Some(parsed), - Err(output) => { - return Err(FunctionCallError::RespondToModel(format!( - "invalid session_id: {session_id} due to error {output:?}" - ))); - } + let chars = chars.unwrap_or_default(); + + let mode = if let Some(raw_session_id) = session_id { + if cmd.is_some() { + return Err(FunctionCallError::RespondToModel( + "provide either cmd or session_id, not both".to_string(), + )); + } + let session_id = parse_session_id(raw_session_id)?; + UnifiedExecMode::Write { + session_id, + chars: chars.as_str(), + yield_time_ms, + max_output_tokens, } } else { - None + let cmd_value = cmd.ok_or_else(|| { + FunctionCallError::RespondToModel( + "cmd is required when session_id is not provided".to_string(), + ) + })?; + UnifiedExecMode::Start { + cmd: Cow::Owned(cmd_value), + yield_time_ms, + max_output_tokens, + shell: shell.as_deref(), + login, + cwd: cwd.as_deref(), + } }; let request = UnifiedExecRequest { - session_id: parsed_session_id, - input_chunks: &input, - timeout_ms, + mode, + output_chunk_id, + output_wall_time, + output_json, }; - let value = session + let result = session .run_unified_exec_request(request) .await - .map_err(|err| { - FunctionCallError::RespondToModel(format!("unified exec failed: {err:?}")) - })?; + .map_err(map_unified_exec_error)?; - #[derive(serde::Serialize)] - struct SerializedUnifiedExecResult { - session_id: Option, - output: String, - } - - let content = serde_json::to_string(&SerializedUnifiedExecResult { - session_id: value.session_id.map(|id| id.to_string()), - output: value.output, - }) - .map_err(|err| { - FunctionCallError::RespondToModel(format!( - "failed to serialize unified exec output: {err:?}" - )) - })?; - - Ok(ToolOutput::Function { - content, - success: Some(true), - }) + Ok(tool_output_from_result(result)) + } +} + +fn tool_output_from_result(result: UnifiedExecResult) -> ToolOutput { + let content = result.content.into_string(); + ToolOutput::Function { + content, + success: Some(true), + } +} + +fn parse_session_id(value: JsonValue) -> Result { + match value { + JsonValue::Number(num) => { + if let Some(int) = num.as_i64() { + i32::try_from(int).map_err(|_| { + FunctionCallError::RespondToModel(format!( + "session_id value {int} exceeds i32 range" + )) + }) + } else { + Err(FunctionCallError::RespondToModel( + "session_id must be an integer".to_string(), + )) + } + } + JsonValue::String(text) => text.parse::().map_err(|err| { + FunctionCallError::RespondToModel(format!("invalid session_id '{text}': {err}")) + }), + other => Err(FunctionCallError::RespondToModel(format!( + "session_id must be a string or integer, got {other}" + ))), + } +} + +fn map_unified_exec_error(err: UnifiedExecError) -> FunctionCallError { + match err { + UnifiedExecError::SessionExited { + session_id, + exit_code, + } => { + let detail = exit_code + .map(|code| format!(" with code {code}")) + .unwrap_or_default(); + FunctionCallError::RespondToModel(format!( + "session {session_id} has already exited{detail}. Start a new session with cmd." + )) + } + UnifiedExecError::WriteToStdin { session_id } => FunctionCallError::RespondToModel( + format!("failed to write to session {session_id}; the process may have exited"), + ), + other => FunctionCallError::RespondToModel(format!("unified exec failed: {other}")), } } diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index 5352051917..fa060e28e2 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -142,13 +142,10 @@ impl From for AdditionalProperties { fn create_unified_exec_tool() -> ToolSpec { let mut properties = BTreeMap::new(); properties.insert( - "input".to_string(), - JsonSchema::Array { - items: Box::new(JsonSchema::String { description: None }), + "cmd".to_string(), + JsonSchema::String { description: Some( - "When no session_id is provided, treat the array as the command and arguments \ - to launch. When session_id is set, concatenate the strings (in order) and write \ - them to the session's stdin." + "Command to execute when starting a new session. Mutually exclusive with session_id." .to_string(), ), }, @@ -157,30 +154,87 @@ fn create_unified_exec_tool() -> ToolSpec { "session_id".to_string(), JsonSchema::String { description: Some( - "Identifier for an existing interactive session. If omitted, a new command \ - is spawned." + "Identifier for an existing interactive session. When provided, cmd must be omitted and chars will be written to the session's stdin." .to_string(), ), }, ); properties.insert( - "timeout_ms".to_string(), - JsonSchema::Number { + "chars".to_string(), + JsonSchema::String { description: Some( - "Maximum time in milliseconds to wait for output after writing the input." + "Characters to write to an interactive session. Use an empty string to poll for output without sending input." .to_string(), ), }, ); + properties.insert( + "yield_time_ms".to_string(), + JsonSchema::Number { + description: Some( + "Milliseconds to wait for output before returning (clamped between 250 and 30_000)." + .to_string(), + ), + }, + ); + properties.insert( + "max_output_tokens".to_string(), + JsonSchema::Number { + description: Some( + "Approximate maximum number of output tokens before truncation.".to_string(), + ), + }, + ); + properties.insert( + "output_chunk_id".to_string(), + JsonSchema::Boolean { + description: Some("Whether to include a chunk identifier in responses.".to_string()), + }, + ); + properties.insert( + "output_wall_time".to_string(), + JsonSchema::Boolean { + description: Some( + "Whether to include wall-clock timing metadata in responses.".to_string(), + ), + }, + ); + properties.insert( + "output_json".to_string(), + JsonSchema::Boolean { + description: Some( + "Return structured JSON instead of formatted text when true.".to_string(), + ), + }, + ); + properties.insert( + "shell".to_string(), + JsonSchema::String { + description: Some("Override the shell executable (default /bin/bash).".to_string()), + }, + ); + properties.insert( + "login".to_string(), + JsonSchema::Boolean { + description: Some( + "Whether to start the shell as a login shell (default true).".to_string(), + ), + }, + ); + properties.insert( + "cwd".to_string(), + JsonSchema::String { + description: Some("Working directory for the command.".to_string()), + }, + ); ToolSpec::Function(ResponsesApiTool { name: "unified_exec".to_string(), - description: - "Runs a command in a PTY. Provide a session_id to reuse an existing interactive session.".to_string(), + description: "Runs commands in a PTY and streams output, supporting interactive sessions with stdin writes.".to_string(), strict: false, parameters: JsonSchema::Object { properties, - required: Some(vec!["input".to_string()]), + required: None, additional_properties: Some(false.into()), }, }) diff --git a/codex-rs/core/src/unified_exec/errors.rs b/codex-rs/core/src/unified_exec/errors.rs index 6bf5bf7ec5..7f1c0c31be 100644 --- a/codex-rs/core/src/unified_exec/errors.rs +++ b/codex-rs/core/src/unified_exec/errors.rs @@ -1,7 +1,7 @@ use thiserror::Error; #[derive(Debug, Error)] -pub(crate) enum UnifiedExecError { +pub enum UnifiedExecError { #[error("Failed to create unified exec session: {pty_error}")] CreateSession { #[source] @@ -9,10 +9,19 @@ pub(crate) enum UnifiedExecError { }, #[error("Unknown session id {session_id}")] UnknownSessionId { session_id: i32 }, - #[error("failed to write to stdin")] - WriteToStdin, + #[error("failed to write to stdin for session {session_id}")] + WriteToStdin { session_id: i32 }, #[error("missing command line for unified exec request")] MissingCommandLine, + #[error("spawned process did not report a process id")] + MissingProcessId, + #[error("spawned process id {process_id} does not fit in i32")] + ProcessIdOverflow { process_id: u32 }, + #[error("session {session_id} has already exited")] + SessionExited { + session_id: i32, + exit_code: Option, + }, } impl UnifiedExecError { diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index a8f754c92e..d35ea72bde 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -1,151 +1,186 @@ use portable_pty::CommandBuilder; use portable_pty::PtySize; use portable_pty::native_pty_system; +use rand::Rng; +use rand::distr::Alphanumeric; +use serde_json::Map as JsonMap; +use serde_json::Value as JsonValue; +use serde_json::json; +use std::borrow::Cow; use std::collections::HashMap; -use std::collections::VecDeque; +use std::convert::TryFrom; use std::io::ErrorKind; use std::io::Read; use std::sync::Arc; use std::sync::Mutex as StdMutex; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::AtomicI32; use std::sync::atomic::Ordering; use tokio::sync::Mutex; -use tokio::sync::Notify; +use tokio::sync::broadcast; use tokio::sync::mpsc; -use tokio::task::JoinHandle; use tokio::time::Duration; use tokio::time::Instant; use crate::exec_command::ExecCommandSession; +use crate::exec_command::ExecCommandSessionParams; use crate::truncate::truncate_middle; mod errors; -pub(crate) use errors::UnifiedExecError; +pub use errors::UnifiedExecError; -const DEFAULT_TIMEOUT_MS: u64 = 1_000; -const MAX_TIMEOUT_MS: u64 = 60_000; -const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 128 * 1024; // 128 KiB +const MIN_YIELD_TIME_MS: u64 = 250; +const MAX_YIELD_TIME_MS: u64 = 30_000; +const DEFAULT_EXEC_YIELD_TIME_MS: u64 = 10_000; +const DEFAULT_WRITE_YIELD_TIME_MS: u64 = 250; +const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000; +const PIPE_READ_LIMIT: usize = 1024 * 1024; +const POST_WRITE_SETTLE_MS: u64 = 100; + +#[derive(Debug, Clone)] +pub enum UnifiedExecMode<'a> { + Start { + cmd: Cow<'a, str>, + yield_time_ms: Option, + max_output_tokens: Option, + shell: Option<&'a str>, + login: Option, + cwd: Option<&'a str>, + }, + Write { + session_id: i32, + chars: &'a str, + yield_time_ms: Option, + max_output_tokens: Option, + }, +} + +#[derive(Debug, Clone)] +pub struct UnifiedExecRequest<'a> { + pub mode: UnifiedExecMode<'a>, + pub output_chunk_id: Option, + pub output_wall_time: Option, + pub output_json: Option, +} + +#[derive(Debug, Clone)] +pub struct UnifiedExecResult { + pub content: UnifiedExecContent, + pub metadata: UnifiedExecMetadata, +} + +#[derive(Debug, Clone)] +pub enum UnifiedExecContent { + Text(String), + Json(String), +} + +impl UnifiedExecContent { + pub fn into_string(self) -> String { + match self { + Self::Text(s) | Self::Json(s) => s, + } + } + + pub fn as_str(&self) -> &str { + match self { + Self::Text(s) | Self::Json(s) => s, + } + } + + pub fn is_json(&self) -> bool { + matches!(self, Self::Json(_)) + } +} + +#[derive(Debug, Clone)] +pub struct UnifiedExecMetadata { + pub chunk_id: String, + pub session_id: Option, + pub exit_code: Option, + pub wall_time_seconds: f64, + pub original_token_count: Option, + pub exec_cmd: Option, +} + +#[derive(Debug, Clone, Copy)] +struct OutputPreferences { + chunk_id: bool, + wall_time: bool, + json: bool, +} + +impl OutputPreferences { + fn from_request(request: &UnifiedExecRequest<'_>) -> Self { + Self { + chunk_id: request.output_chunk_id.unwrap_or(true), + wall_time: request.output_wall_time.unwrap_or(true), + json: request.output_json.unwrap_or(false), + } + } +} + +#[derive(Debug, Clone, Copy)] +struct StartSessionParams<'a> { + cmd: &'a str, + yield_time_ms: Option, + max_output_tokens: Option, + shell: Option<&'a str>, + login: Option, + cwd: Option<&'a str>, + preferences: OutputPreferences, +} #[derive(Debug)] -pub(crate) struct UnifiedExecRequest<'a> { - pub session_id: Option, - pub input_chunks: &'a [String], - pub timeout_ms: Option, +struct BuildResultParams { + process_id: i32, + exit_code: Option, + truncated_tokens: Option, + output: String, + wall_time: f64, + preferences: OutputPreferences, + keep_session: bool, + exec_cmd: Option, } -#[derive(Debug, Clone, PartialEq)] -pub(crate) struct UnifiedExecResult { - pub session_id: Option, - pub output: String, +#[derive(Debug)] +pub struct UnifiedExecSessionManager { + sessions: Mutex>>, } -#[derive(Debug, Default)] -pub(crate) struct UnifiedExecSessionManager { - next_session_id: AtomicI32, - sessions: Mutex>, +impl Default for UnifiedExecSessionManager { + fn default() -> Self { + Self { + sessions: Mutex::new(HashMap::new()), + } + } } #[derive(Debug)] struct ManagedUnifiedExecSession { session: ExecCommandSession, - output_buffer: OutputBuffer, - /// Notifies waiters whenever new output has been appended to - /// `output_buffer`, allowing clients to poll for fresh data. - output_notify: Arc, - output_task: JoinHandle<()>, + output_rx: Mutex>>, + process_id: i32, } -#[derive(Debug, Default)] -struct OutputBufferState { - chunks: VecDeque>, - total_bytes: usize, -} - -impl OutputBufferState { - fn push_chunk(&mut self, chunk: Vec) { - self.total_bytes = self.total_bytes.saturating_add(chunk.len()); - self.chunks.push_back(chunk); - - let mut excess = self - .total_bytes - .saturating_sub(UNIFIED_EXEC_OUTPUT_MAX_BYTES); - - while excess > 0 { - match self.chunks.front_mut() { - Some(front) if excess >= front.len() => { - excess -= front.len(); - self.total_bytes = self.total_bytes.saturating_sub(front.len()); - self.chunks.pop_front(); - } - Some(front) => { - front.drain(..excess); - self.total_bytes = self.total_bytes.saturating_sub(excess); - break; - } - None => break, - } - } - } - - fn drain(&mut self) -> Vec> { - let drained: Vec> = self.chunks.drain(..).collect(); - self.total_bytes = 0; - drained - } -} - -type OutputBuffer = Arc>; -type OutputHandles = (OutputBuffer, Arc); - impl ManagedUnifiedExecSession { fn new( session: ExecCommandSession, - initial_output_rx: tokio::sync::broadcast::Receiver>, + initial_output_rx: broadcast::Receiver>, + process_id: i32, ) -> Self { - let output_buffer = Arc::new(Mutex::new(OutputBufferState::default())); - let output_notify = Arc::new(Notify::new()); - let mut receiver = initial_output_rx; - let buffer_clone = Arc::clone(&output_buffer); - let notify_clone = Arc::clone(&output_notify); - let output_task = tokio::spawn(async move { - loop { - match receiver.recv().await { - Ok(chunk) => { - let mut guard = buffer_clone.lock().await; - guard.push_chunk(chunk); - drop(guard); - notify_clone.notify_waiters(); - } - // If we lag behind the broadcast buffer, skip missed - // messages but keep the task alive to continue streaming. - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { - continue; - } - // When the sender closes, exit the task. - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - }); - Self { session, - output_buffer, - output_notify, - output_task, + output_rx: Mutex::new(initial_output_rx), + process_id, } } - fn writer_sender(&self) -> mpsc::Sender> { + fn writer(&self) -> mpsc::Sender> { self.session.writer_sender() } - fn output_handles(&self) -> OutputHandles { - ( - Arc::clone(&self.output_buffer), - Arc::clone(&self.output_notify), - ) + fn exit_code(&self) -> Option { + self.session.exit_code() } fn has_exited(&self) -> bool { @@ -153,175 +188,383 @@ impl ManagedUnifiedExecSession { } } -impl Drop for ManagedUnifiedExecSession { - fn drop(&mut self) { - self.output_task.abort(); - } -} - impl UnifiedExecSessionManager { pub async fn handle_request( &self, request: UnifiedExecRequest<'_>, ) -> Result { - let (timeout_ms, timeout_warning) = match request.timeout_ms { - Some(requested) if requested > MAX_TIMEOUT_MS => ( - MAX_TIMEOUT_MS, - Some(format!( - "Warning: requested timeout {requested}ms exceeds maximum of {MAX_TIMEOUT_MS}ms; clamping to {MAX_TIMEOUT_MS}ms.\n" - )), - ), - Some(requested) => (requested, None), - None => (DEFAULT_TIMEOUT_MS, None), - }; - - let mut new_session: Option = None; - let session_id; - let writer_tx; - let output_buffer; - let output_notify; - - if let Some(existing_id) = request.session_id { - let mut sessions = self.sessions.lock().await; - match sessions.get(&existing_id) { - Some(session) => { - if session.has_exited() { - sessions.remove(&existing_id); - return Err(UnifiedExecError::UnknownSessionId { - session_id: existing_id, - }); - } - let (buffer, notify) = session.output_handles(); - session_id = existing_id; - writer_tx = session.writer_sender(); - output_buffer = buffer; - output_notify = notify; - } - None => { - return Err(UnifiedExecError::UnknownSessionId { - session_id: existing_id, - }); - } + let preferences = OutputPreferences::from_request(&request); + match request.mode { + UnifiedExecMode::Start { + cmd, + yield_time_ms, + max_output_tokens, + shell, + login, + cwd, + } => { + self.start_session(StartSessionParams { + cmd: cmd.as_ref(), + yield_time_ms, + max_output_tokens, + shell, + login, + cwd, + preferences, + }) + .await } - drop(sessions); - } else { - let command = request.input_chunks.to_vec(); - let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst); - let (session, initial_output_rx) = create_unified_exec_session(&command).await?; - let managed_session = ManagedUnifiedExecSession::new(session, initial_output_rx); - let (buffer, notify) = managed_session.output_handles(); - writer_tx = managed_session.writer_sender(); - output_buffer = buffer; - output_notify = notify; - session_id = new_id; - new_session = Some(managed_session); - }; - - if request.session_id.is_some() { - let joined_input = request.input_chunks.join(" "); - if !joined_input.is_empty() && writer_tx.send(joined_input.into_bytes()).await.is_err() - { - return Err(UnifiedExecError::WriteToStdin); + UnifiedExecMode::Write { + session_id, + chars, + yield_time_ms, + max_output_tokens, + } => { + self.write_to_session( + session_id, + chars, + yield_time_ms, + max_output_tokens, + preferences, + ) + .await } } + } - let mut collected: Vec = Vec::with_capacity(4096); - let start = Instant::now(); - let deadline = start + Duration::from_millis(timeout_ms); + async fn start_session( + &self, + params: StartSessionParams<'_>, + ) -> Result { + let StartSessionParams { + cmd, + yield_time_ms, + max_output_tokens, + shell, + login, + cwd, + preferences, + } = params; + if cmd.trim().is_empty() { + return Err(UnifiedExecError::MissingCommandLine); + } - loop { - let drained_chunks; - let mut wait_for_output = None; - { - let mut guard = output_buffer.lock().await; - drained_chunks = guard.drain(); - if drained_chunks.is_empty() { - wait_for_output = Some(output_notify.notified()); - } + let yield_ms = clamp_yield_time(yield_time_ms, DEFAULT_EXEC_YIELD_TIME_MS); + let max_tokens = normalize_max_output_tokens(max_output_tokens); + let shell = shell.unwrap_or("/bin/bash"); + let login = login.unwrap_or(true); + + let (session, initial_output_rx, process_id) = + create_unified_exec_session(cmd, shell, login, cwd).await?; + let managed = Arc::new(ManagedUnifiedExecSession::new( + session, + initial_output_rx, + process_id, + )); + + let output_start = Instant::now(); + let (output, truncated_tokens) = collect_output(&managed, yield_ms, max_tokens).await; + let wall_time = output_start.elapsed().as_secs_f64(); + let exit_code = managed.exit_code(); + let should_keep_session = exit_code.is_none(); + + if should_keep_session { + self.sessions + .lock() + .await + .insert(managed.process_id, managed.clone()); + } + + Ok(build_result(BuildResultParams { + process_id: managed.process_id, + exit_code, + truncated_tokens, + output, + wall_time, + preferences, + keep_session: should_keep_session, + exec_cmd: Some(cmd.to_string()), + })) + } + + async fn write_to_session( + &self, + session_id: i32, + chars: &str, + yield_time_ms: Option, + max_output_tokens: Option, + preferences: OutputPreferences, + ) -> Result { + let managed = { + let sessions = self.sessions.lock().await; + sessions + .get(&session_id) + .cloned() + .ok_or(UnifiedExecError::UnknownSessionId { session_id })? + }; + + if managed.has_exited() { + let exit_code = managed.exit_code(); + self.sessions.lock().await.remove(&session_id); + return Err(UnifiedExecError::SessionExited { + session_id, + exit_code, + }); + } + + if !chars.is_empty() + && managed + .writer() + .send(chars.as_bytes().to_vec()) + .await + .is_err() + { + self.sessions.lock().await.remove(&session_id); + return Err(UnifiedExecError::WriteToStdin { session_id }); + } + + if !chars.is_empty() { + tokio::time::sleep(Duration::from_millis(POST_WRITE_SETTLE_MS)).await; + } + + let yield_ms = clamp_yield_time(yield_time_ms, DEFAULT_WRITE_YIELD_TIME_MS); + let max_tokens = normalize_max_output_tokens(max_output_tokens); + let output_start = Instant::now(); + let (output, truncated_tokens) = collect_output(&managed, yield_ms, max_tokens).await; + let wall_time = output_start.elapsed().as_secs_f64(); + let exit_code = managed.exit_code(); + let should_keep_session = exit_code.is_none(); + + if !should_keep_session { + self.sessions.lock().await.remove(&session_id); + } + + Ok(build_result(BuildResultParams { + process_id: managed.process_id, + exit_code, + truncated_tokens, + output, + wall_time, + preferences, + keep_session: should_keep_session, + exec_cmd: None, + })) + } +} + +fn clamp_yield_time(value: Option, default_value: u64) -> u64 { + let requested = value.unwrap_or(default_value); + requested.clamp(MIN_YIELD_TIME_MS, MAX_YIELD_TIME_MS) +} + +fn normalize_max_output_tokens(value: Option) -> usize { + let requested = value.unwrap_or(DEFAULT_MAX_OUTPUT_TOKENS); + requested.max(1) +} + +fn random_chunk_id() -> String { + let mut rng = rand::rng(); + std::iter::repeat_with(|| rng.sample(Alphanumeric) as char) + .take(6) + .collect() +} + +async fn collect_output( + session: &ManagedUnifiedExecSession, + yield_time_ms: u64, + max_output_tokens: usize, +) -> (String, Option) { + let deadline = Instant::now() + Duration::from_millis(yield_time_ms); + let mut collected: Vec = Vec::with_capacity(4096); + let mut receiver = session.output_rx.lock().await; + + let mut observed_output = false; + loop { + if collected.len() >= PIPE_READ_LIMIT { + break; + } + + let remaining = deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + break; + } + + match tokio::time::timeout(remaining, receiver.recv()).await { + Ok(Ok(chunk)) => { + push_limited(&mut collected, &chunk, PIPE_READ_LIMIT); + observed_output = true; } - - if drained_chunks.is_empty() { - let remaining = deadline.saturating_duration_since(Instant::now()); - if remaining == Duration::ZERO { - break; - } - - let notified = wait_for_output.unwrap_or_else(|| output_notify.notified()); - tokio::pin!(notified); - tokio::select! { - _ = &mut notified => {} - _ = tokio::time::sleep(remaining) => break, - } + Ok(Err(broadcast::error::RecvError::Lagged(_))) => { continue; } - - for chunk in drained_chunks { - collected.extend_from_slice(&chunk); - } - - if Instant::now() >= deadline { + Ok(Err(broadcast::error::RecvError::Closed)) | Err(_) => { break; } } - let (output, _maybe_tokens) = truncate_middle( - &String::from_utf8_lossy(&collected), - UNIFIED_EXEC_OUTPUT_MAX_BYTES, - ); - let output = if let Some(warning) = timeout_warning { - format!("{warning}{output}") - } else { - output - }; + break; + } - let should_store_session = if let Some(session) = new_session.as_ref() { - !session.has_exited() - } else if request.session_id.is_some() { - let mut sessions = self.sessions.lock().await; - if let Some(existing) = sessions.get(&session_id) { - if existing.has_exited() { - sessions.remove(&session_id); - false - } else { - true - } - } else { - false + if observed_output { + while collected.len() < PIPE_READ_LIMIT { + match receiver.try_recv() { + Ok(chunk) => push_limited(&mut collected, &chunk, PIPE_READ_LIMIT), + Err(broadcast::error::TryRecvError::Lagged(_)) => continue, + Err(broadcast::error::TryRecvError::Empty) => break, + Err(broadcast::error::TryRecvError::Closed) => break, } - } else { - true - }; - - if should_store_session { - if let Some(session) = new_session { - self.sessions.lock().await.insert(session_id, session); - } - Ok(UnifiedExecResult { - session_id: Some(session_id), - output, - }) - } else { - Ok(UnifiedExecResult { - session_id: None, - output, - }) } } + + drop(receiver); + + let output = String::from_utf8_lossy(&collected).to_string(); + let cap_bytes = (max_output_tokens as u64) + .saturating_mul(4) + .min(PIPE_READ_LIMIT as u64) as usize; + truncate_middle(&output, cap_bytes) +} + +fn push_limited(buffer: &mut Vec, chunk: &[u8], limit: usize) { + if buffer.len() >= limit { + return; + } + let available = limit - buffer.len(); + if available == 0 { + return; + } + if chunk.len() <= available { + buffer.extend_from_slice(chunk); + } else { + buffer.extend_from_slice(&chunk[..available]); + } } -async fn create_unified_exec_session( - command: &[String], -) -> Result< - ( - ExecCommandSession, - tokio::sync::broadcast::Receiver>, - ), - UnifiedExecError, -> { - if command.is_empty() { - return Err(UnifiedExecError::MissingCommandLine); +fn build_result(params: BuildResultParams) -> UnifiedExecResult { + let BuildResultParams { + process_id, + exit_code, + truncated_tokens, + output, + wall_time, + preferences, + keep_session, + exec_cmd, + } = params; + let chunk_id = random_chunk_id(); + let content = if preferences.json { + UnifiedExecContent::Json(build_json_body( + &chunk_id, + process_id, + exit_code, + truncated_tokens, + wall_time, + &output, + preferences, + )) + } else { + UnifiedExecContent::Text(build_text_body( + &chunk_id, + process_id, + exit_code, + truncated_tokens, + wall_time, + &output, + preferences, + )) + }; + + let metadata = UnifiedExecMetadata { + chunk_id, + session_id: if keep_session { Some(process_id) } else { None }, + exit_code, + wall_time_seconds: wall_time, + original_token_count: truncated_tokens, + exec_cmd, + }; + + UnifiedExecResult { content, metadata } +} + +fn build_text_body( + chunk_id: &str, + process_id: i32, + exit_code: Option, + truncated_tokens: Option, + wall_time: f64, + output: &str, + preferences: OutputPreferences, +) -> String { + let mut parts = Vec::new(); + if preferences.chunk_id { + parts.push(format!("Chunk ID: {chunk_id}\n")); + } + if preferences.wall_time { + parts.push(format!("Wall time: {wall_time:.3} seconds\n")); + } + match exit_code { + Some(code) => parts.push(format!("Process exited with code {code}\n")), + None => parts.push(format!("Process running with session ID {process_id}\n")), + } + if let Some(tokens) = truncated_tokens { + parts.push(format!( + "Warning: truncated output (original token count: {tokens})\n" + )); + } + parts.push("Output:\n".to_string()); + parts.push(output.to_string()); + parts.concat() +} + +fn build_json_body( + chunk_id: &str, + process_id: i32, + exit_code: Option, + truncated_tokens: Option, + wall_time: f64, + output: &str, + preferences: OutputPreferences, +) -> String { + let mut map: JsonMap = JsonMap::new(); + + if preferences.chunk_id { + map.insert( + "chunk_id".to_string(), + JsonValue::String(chunk_id.to_string()), + ); } + if preferences.wall_time { + let rounded = (wall_time * 1000.0).round() / 1000.0; + map.insert("wall_time".to_string(), json!(rounded)); + } + + if let Some(code) = exit_code { + map.insert("exit_code".to_string(), json!(code)); + } else { + map.insert("session_id".to_string(), json!(process_id)); + } + + if let Some(tokens) = truncated_tokens { + map.insert("original_token_count".to_string(), json!(tokens)); + } + + let lines: JsonMap = output + .lines() + .enumerate() + .map(|(idx, line)| ((idx + 1).to_string(), JsonValue::String(line.to_string()))) + .collect(); + map.insert("output".to_string(), JsonValue::Object(lines)); + + JsonValue::Object(map).to_string() +} + +async fn create_unified_exec_session( + cmd: &str, + shell: &str, + login: bool, + cwd: Option<&str>, +) -> Result<(ExecCommandSession, broadcast::Receiver>, i32), UnifiedExecError> { let pty_system = native_pty_system(); let pair = pty_system @@ -333,10 +576,19 @@ async fn create_unified_exec_session( }) .map_err(UnifiedExecError::create_session)?; - // Safe thanks to the check at the top of the function. - let mut command_builder = CommandBuilder::new(command[0].clone()); - for arg in &command[1..] { - command_builder.arg(arg); + let mut command_builder = CommandBuilder::new(shell); + command_builder.arg(if login { "-lc" } else { "-c" }); + command_builder.arg(cmd); + command_builder.env("NO_COLOR", "1"); + command_builder.env("TERM", "dumb"); + command_builder.env("LANG", "C.UTF-8"); + command_builder.env("LC_CTYPE", "C.UTF-8"); + command_builder.env("LC_ALL", "C.UTF-8"); + command_builder.env("COLORTERM", ""); + command_builder.env("PAGER", "cat"); + command_builder.env("GIT_PAGER", "cat"); + if let Some(dir) = cwd { + command_builder.cwd(dir); } let mut child = pair @@ -344,9 +596,15 @@ async fn create_unified_exec_session( .spawn_command(command_builder) .map_err(UnifiedExecError::create_session)?; let killer = child.clone_killer(); + let raw_pid = child + .process_id() + .ok_or(UnifiedExecError::MissingProcessId)?; + let process_id = i32::try_from(raw_pid).map_err(|_| UnifiedExecError::ProcessIdOverflow { + process_id: raw_pid, + })?; let (writer_tx, mut writer_rx) = mpsc::channel::>(128); - let (output_tx, _) = tokio::sync::broadcast::channel::>(256); + let (output_tx, _) = broadcast::channel::>(256); let mut reader = pair .master @@ -393,14 +651,23 @@ async fn create_unified_exec_session( } }); - let exit_status = Arc::new(AtomicBool::new(false)); + let exit_status = Arc::new(std::sync::atomic::AtomicBool::new(false)); let wait_exit_status = Arc::clone(&exit_status); + let exit_code = Arc::new(StdMutex::new(None)); + let exit_code_handle = Arc::clone(&exit_code); let wait_handle = tokio::task::spawn_blocking(move || { - let _ = child.wait(); + let result = child.wait(); wait_exit_status.store(true, Ordering::SeqCst); + let code = result + .ok() + .and_then(|status| i32::try_from(status.exit_code()).ok()) + .unwrap_or(-1); + if let Ok(mut guard) = exit_code_handle.lock() { + *guard = Some(code); + } }); - let (session, initial_output_rx) = ExecCommandSession::new( + let (session, initial_output_rx) = ExecCommandSession::new(ExecCommandSessionParams { writer_tx, output_tx, killer, @@ -408,259 +675,116 @@ async fn create_unified_exec_session( writer_handle, wait_handle, exit_status, - ); - Ok((session, initial_output_rx)) + exit_code, + }); + + Ok((session, initial_output_rx, process_id)) } #[cfg(test)] mod tests { use super::*; + #[cfg(unix)] use core_test_support::skip_if_sandbox; - #[test] - fn push_chunk_trims_only_excess_bytes() { - let mut buffer = OutputBufferState::default(); - buffer.push_chunk(vec![b'a'; UNIFIED_EXEC_OUTPUT_MAX_BYTES]); - buffer.push_chunk(vec![b'b']); - buffer.push_chunk(vec![b'c']); - - assert_eq!(buffer.total_bytes, UNIFIED_EXEC_OUTPUT_MAX_BYTES); - assert_eq!(buffer.chunks.len(), 3); - assert_eq!( - buffer.chunks.front().unwrap().len(), - UNIFIED_EXEC_OUTPUT_MAX_BYTES - 2 - ); - assert_eq!(buffer.chunks.pop_back().unwrap(), vec![b'c']); - assert_eq!(buffer.chunks.pop_back().unwrap(), vec![b'b']); - } - #[cfg(unix)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn unified_exec_persists_across_requests_jif() -> Result<(), UnifiedExecError> { + async fn start_and_poll_session() -> Result<(), UnifiedExecError> { skip_if_sandbox!(Ok(())); - let manager = UnifiedExecSessionManager::default(); - - let open_shell = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["bash".to_string(), "-i".to_string()], - timeout_ms: Some(2_500), - }) - .await?; - let session_id = open_shell.session_id.expect("expected session_id"); - - manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &[ - "export".to_string(), - "CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(), - ], - timeout_ms: Some(2_500), - }) - .await?; - - let out_2 = manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], - timeout_ms: Some(2_500), - }) - .await?; - assert!(out_2.output.contains("codex")); - - Ok(()) - } - - #[cfg(unix)] - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn multi_unified_exec_sessions() -> Result<(), UnifiedExecError> { - skip_if_sandbox!(Ok(())); - - let manager = UnifiedExecSessionManager::default(); - - let shell_a = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["/bin/bash".to_string(), "-i".to_string()], - timeout_ms: Some(2_500), - }) - .await?; - let session_a = shell_a.session_id.expect("expected session id"); - - manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_a), - input_chunks: &["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()], - timeout_ms: Some(2_500), - }) - .await?; - - let out_2 = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &[ - "echo".to_string(), - "$CODEX_INTERACTIVE_SHELL_VAR\n".to_string(), - ], - timeout_ms: Some(2_500), - }) - .await?; - assert!(!out_2.output.contains("codex")); - - let out_3 = manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_a), - input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], - timeout_ms: Some(2_500), - }) - .await?; - assert!(out_3.output.contains("codex")); - - Ok(()) - } - - #[cfg(unix)] - #[tokio::test] - async fn unified_exec_timeouts() -> Result<(), UnifiedExecError> { - skip_if_sandbox!(Ok(())); - - let manager = UnifiedExecSessionManager::default(); - - let open_shell = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["bash".to_string(), "-i".to_string()], - timeout_ms: Some(2_500), - }) - .await?; - let session_id = open_shell.session_id.expect("expected session id"); - - manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &[ - "export".to_string(), - "CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(), - ], - timeout_ms: Some(2_500), - }) - .await?; - - let out_2 = manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], - timeout_ms: Some(10), - }) - .await?; - assert!(!out_2.output.contains("codex")); - - tokio::time::sleep(Duration::from_secs(7)).await; - - let empty = Vec::new(); - let out_3 = manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &empty, - timeout_ms: Some(100), - }) - .await?; - - assert!(out_3.output.contains("codex")); - - Ok(()) - } - - #[cfg(unix)] - #[tokio::test] - #[ignore] // Ignored while we have a better way to test this. - async fn requests_with_large_timeout_are_capped() -> Result<(), UnifiedExecError> { - let manager = UnifiedExecSessionManager::default(); - - let result = manager - .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["echo".to_string(), "codex".to_string()], - timeout_ms: Some(120_000), - }) - .await?; - - assert!(result.output.starts_with( - "Warning: requested timeout 120000ms exceeds maximum of 60000ms; clamping to 60000ms.\n" - )); - assert!(result.output.contains("codex")); - - Ok(()) - } - - #[cfg(unix)] - #[tokio::test] - #[ignore] // Ignored while we have a better way to test this. - async fn completed_commands_do_not_persist_sessions() -> Result<(), UnifiedExecError> { let manager = UnifiedExecSessionManager::default(); let result = manager .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["/bin/echo".to_string(), "codex".to_string()], - timeout_ms: Some(2_500), + mode: UnifiedExecMode::Start { + cmd: Cow::Borrowed("printf 'ready\\n' && read dummy"), + yield_time_ms: Some(1_000), + max_output_tokens: Some(1_000), + shell: Some("/bin/bash"), + login: Some(false), + cwd: None, + }, + output_chunk_id: Some(true), + output_wall_time: Some(true), + output_json: Some(false), }) .await?; - assert!(result.session_id.is_none()); - assert!(result.output.contains("codex")); + assert!(result.metadata.session_id.is_some()); + assert!(result.content.as_str().contains("ready")); - assert!(manager.sessions.lock().await.is_empty()); + let session_id = result.metadata.session_id.unwrap(); + let poll = manager + .handle_request(UnifiedExecRequest { + mode: UnifiedExecMode::Write { + session_id, + chars: "", + yield_time_ms: Some(500), + max_output_tokens: Some(1_000), + }, + output_chunk_id: Some(false), + output_wall_time: Some(false), + output_json: Some(true), + }) + .await?; + + assert!(poll.content.is_json()); + assert!(poll.metadata.session_id.is_some()); Ok(()) } #[cfg(unix)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn reusing_completed_session_returns_unknown_session() -> Result<(), UnifiedExecError> { + async fn session_cleans_up_after_exit() -> Result<(), UnifiedExecError> { skip_if_sandbox!(Ok(())); let manager = UnifiedExecSessionManager::default(); - - let open_shell = manager + let result = manager .handle_request(UnifiedExecRequest { - session_id: None, - input_chunks: &["/bin/bash".to_string(), "-i".to_string()], - timeout_ms: Some(2_500), - }) - .await?; - let session_id = open_shell.session_id.expect("expected session id"); - - manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &["exit\n".to_string()], - timeout_ms: Some(2_500), + mode: UnifiedExecMode::Start { + cmd: Cow::Borrowed("echo done"), + yield_time_ms: Some(1_000), + max_output_tokens: Some(1_000), + shell: Some("/bin/bash"), + login: Some(false), + cwd: None, + }, + output_chunk_id: None, + output_wall_time: None, + output_json: Some(false), }) .await?; - tokio::time::sleep(Duration::from_millis(200)).await; - - let err = manager - .handle_request(UnifiedExecRequest { - session_id: Some(session_id), - input_chunks: &[], - timeout_ms: Some(100), - }) - .await - .expect_err("expected unknown session error"); - - match err { - UnifiedExecError::UnknownSessionId { session_id: err_id } => { - assert_eq!(err_id, session_id); + if let Some(session_id) = result.metadata.session_id { + tokio::time::sleep(Duration::from_millis(100)).await; + match manager + .handle_request(UnifiedExecRequest { + mode: UnifiedExecMode::Write { + session_id, + chars: "", + yield_time_ms: Some(250), + max_output_tokens: Some(1_000), + }, + output_chunk_id: None, + output_wall_time: None, + output_json: Some(false), + }) + .await + { + Ok(poll) => { + assert!(poll.metadata.session_id.is_none()); + assert!(poll.content.into_string().contains("done")); + } + Err(UnifiedExecError::SessionExited { exit_code, .. }) => { + assert_eq!(exit_code, Some(0)); + } + Err(other) => return Err(other), } - other => panic!("expected UnknownSessionId, got {other:?}"), + } else { + assert!(result.content.into_string().contains("done")); } - assert!(!manager.sessions.lock().await.contains_key(&session_id)); - Ok(()) } } diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 6298ab06de..af2443b644 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -1,409 +1,286 @@ #![cfg(not(target_os = "windows"))] -use std::collections::HashMap; - use anyhow::Result; -use codex_core::features::Feature; -use codex_core::protocol::AskForApproval; -use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; -use codex_core::protocol::Op; -use codex_core::protocol::SandboxPolicy; -use codex_protocol::config_types::ReasoningSummary; -use core_test_support::responses::ev_assistant_message; -use core_test_support::responses::ev_completed; -use core_test_support::responses::ev_function_call; -use core_test_support::responses::ev_response_created; -use core_test_support::responses::mount_sse_sequence; -use core_test_support::responses::sse; -use core_test_support::responses::start_mock_server; -use core_test_support::skip_if_no_network; +use codex_core::UnifiedExecMode; +use codex_core::UnifiedExecRequest; +use codex_core::UnifiedExecSessionManager; +#[cfg(unix)] use core_test_support::skip_if_sandbox; -use core_test_support::test_codex::TestCodex; -use core_test_support::test_codex::test_codex; -use core_test_support::wait_for_event; use serde_json::Value; +use tokio::time::Duration; -fn extract_output_text(item: &Value) -> Option<&str> { - item.get("output").and_then(|value| match value { - Value::String(text) => Some(text.as_str()), - Value::Object(obj) => obj.get("content").and_then(Value::as_str), - _ => None, - }) -} - -fn collect_tool_outputs(bodies: &[Value]) -> Result> { - let mut outputs = HashMap::new(); - for body in bodies { - if let Some(items) = body.get("input").and_then(Value::as_array) { - for item in items { - if item.get("type").and_then(Value::as_str) != Some("function_call_output") { - continue; - } - if let Some(call_id) = item.get("call_id").and_then(Value::as_str) { - let content = extract_output_text(item) - .ok_or_else(|| anyhow::anyhow!("missing tool output content"))?; - let trimmed = content.trim(); - if trimmed.is_empty() { - continue; - } - let parsed: Value = serde_json::from_str(trimmed).map_err(|err| { - anyhow::anyhow!("failed to parse tool output content {trimmed:?}: {err}") - })?; - outputs.insert(call_id.to_string(), parsed); - } - } - } - } - Ok(outputs) -} - +#[cfg(unix)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn unified_exec_reuses_session_via_stdin() -> Result<()> { - skip_if_no_network!(Ok(())); +async fn unified_exec_manager_supports_interactive_cat() -> Result<()> { skip_if_sandbox!(Ok(())); - let server = start_mock_server().await; - - let mut builder = test_codex().with_config(|config| { - config.features.enable(Feature::UnifiedExec); - }); - let TestCodex { - codex, - cwd, - session_configured, - .. - } = builder.build(&server).await?; - - let first_call_id = "uexec-start"; - let first_args = serde_json::json!({ - "input": ["/bin/cat"], - "timeout_ms": 200, - }); - - let second_call_id = "uexec-stdin"; - let second_args = serde_json::json!({ - "input": ["hello unified exec\n"], - "session_id": "0", - "timeout_ms": 500, - }); - - let responses = vec![ - sse(vec![ - ev_response_created("resp-1"), - ev_function_call( - first_call_id, - "unified_exec", - &serde_json::to_string(&first_args)?, - ), - ev_completed("resp-1"), - ]), - sse(vec![ - ev_response_created("resp-2"), - ev_function_call( - second_call_id, - "unified_exec", - &serde_json::to_string(&second_args)?, - ), - ev_completed("resp-2"), - ]), - sse(vec![ - ev_assistant_message("msg-1", "all done"), - ev_completed("resp-3"), - ]), - ]; - mount_sse_sequence(&server, responses).await; - - let session_model = session_configured.model.clone(); - - codex - .submit(Op::UserTurn { - items: vec![InputItem::Text { - text: "run unified exec".into(), - }], - final_output_json_schema: None, - cwd: cwd.path().to_path_buf(), - approval_policy: AskForApproval::Never, - sandbox_policy: SandboxPolicy::DangerFullAccess, - model: session_model, - effort: None, - summary: ReasoningSummary::Auto, + let manager = UnifiedExecSessionManager::default(); + let result = manager + .handle_request(UnifiedExecRequest { + mode: UnifiedExecMode::Start { + cmd: std::borrow::Cow::Borrowed("cat"), + yield_time_ms: Some(200), + max_output_tokens: Some(1_000), + shell: Some("/bin/sh"), + login: Some(false), + cwd: None, + }, + output_chunk_id: Some(true), + output_wall_time: Some(true), + output_json: Some(false), }) .await?; - wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + let session_id = result.metadata.session_id.expect("expected session id"); + let poll = manager + .handle_request(UnifiedExecRequest { + mode: UnifiedExecMode::Write { + session_id, + chars: "hello unified exec\n", + yield_time_ms: Some(500), + max_output_tokens: Some(1_000), + }, + output_chunk_id: Some(false), + output_wall_time: Some(false), + output_json: Some(true), + }) + .await?; - let requests = server.received_requests().await.expect("recorded requests"); - assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = requests - .iter() - .map(|req| req.body_json::().expect("request json")) - .collect::>(); - - let outputs = collect_tool_outputs(&bodies)?; - - let start_output = outputs - .get(first_call_id) - .expect("missing first unified_exec output"); - let session_id = start_output["session_id"].as_str().unwrap_or_default(); - assert!( - !session_id.is_empty(), - "expected session id in first unified_exec response" - ); - assert!( - start_output["output"] - .as_str() - .unwrap_or_default() - .is_empty() - ); - - let reuse_output = outputs - .get(second_call_id) - .expect("missing reused unified_exec output"); - assert_eq!( - reuse_output["session_id"].as_str().unwrap_or_default(), - session_id - ); - let echoed = reuse_output["output"].as_str().unwrap_or_default(); - assert!( - echoed.contains("hello unified exec"), - "expected echoed output, got {echoed:?}" - ); + let output = poll.content.into_string(); + assert!(output.contains("hello unified exec")); Ok(()) } +#[cfg(unix)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn unified_exec_streams_after_lagged_output() -> Result<()> { - skip_if_no_network!(Ok(())); +async fn unified_exec_manager_streams_large_output() -> Result<()> { skip_if_sandbox!(Ok(())); - let server = start_mock_server().await; - - let mut builder = test_codex().with_config(|config| { - config.use_experimental_unified_exec_tool = true; - config.features.enable(Feature::UnifiedExec); - }); - let TestCodex { - codex, - cwd, - session_configured, - .. - } = builder.build(&server).await?; - + let manager = UnifiedExecSessionManager::default(); let script = r#"python3 - <<'PY' import sys -import time - -chunk = b'x' * (1 << 20) -for _ in range(4): - sys.stdout.buffer.write(chunk) - sys.stdout.flush() - -time.sleep(0.2) -for _ in range(5): +for _ in range(3): sys.stdout.write("TAIL-MARKER\n") sys.stdout.flush() - time.sleep(0.05) - -time.sleep(0.2) PY "#; - let first_call_id = "uexec-lag-start"; - let first_args = serde_json::json!({ - "input": ["/bin/sh", "-c", script], - "timeout_ms": 25, - }); - - let second_call_id = "uexec-lag-poll"; - let second_args = serde_json::json!({ - "input": Vec::::new(), - "session_id": "0", - "timeout_ms": 2_000, - }); - - let responses = vec![ - sse(vec![ - ev_response_created("resp-1"), - ev_function_call( - first_call_id, - "unified_exec", - &serde_json::to_string(&first_args)?, - ), - ev_completed("resp-1"), - ]), - sse(vec![ - ev_response_created("resp-2"), - ev_function_call( - second_call_id, - "unified_exec", - &serde_json::to_string(&second_args)?, - ), - ev_completed("resp-2"), - ]), - sse(vec![ - ev_assistant_message("msg-1", "lag handled"), - ev_completed("resp-3"), - ]), - ]; - mount_sse_sequence(&server, responses).await; - - let session_model = session_configured.model.clone(); - - codex - .submit(Op::UserTurn { - items: vec![InputItem::Text { - text: "exercise lag handling".into(), - }], - final_output_json_schema: None, - cwd: cwd.path().to_path_buf(), - approval_policy: AskForApproval::Never, - sandbox_policy: SandboxPolicy::DangerFullAccess, - model: session_model, - effort: None, - summary: ReasoningSummary::Auto, + let start = manager + .handle_request(UnifiedExecRequest { + mode: UnifiedExecMode::Start { + cmd: std::borrow::Cow::Borrowed(script), + yield_time_ms: Some(500), + max_output_tokens: Some(5_000), + shell: Some("/bin/sh"), + login: Some(false), + cwd: None, + }, + output_chunk_id: None, + output_wall_time: None, + output_json: Some(false), }) .await?; - wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; - - let requests = server.received_requests().await.expect("recorded requests"); - assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = requests - .iter() - .map(|req| req.body_json::().expect("request json")) - .collect::>(); - - let outputs = collect_tool_outputs(&bodies)?; - - let start_output = outputs - .get(first_call_id) - .expect("missing initial unified_exec output"); - let session_id = start_output["session_id"].as_str().unwrap_or_default(); - assert!( - !session_id.is_empty(), - "expected session id from initial unified_exec response" - ); - - let poll_output = outputs - .get(second_call_id) - .expect("missing poll unified_exec output"); - let poll_text = poll_output["output"].as_str().unwrap_or_default(); - assert!( - poll_text.contains("TAIL-MARKER"), - "expected poll output to contain tail marker, got {poll_text:?}" - ); + let output = start.content.into_string(); + assert!(output.contains("TAIL-MARKER")); Ok(()) } +#[cfg(unix)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn unified_exec_timeout_and_followup_poll() -> Result<()> { - skip_if_no_network!(Ok(())); +async fn unified_exec_manager_handles_timeout_then_poll() -> Result<()> { skip_if_sandbox!(Ok(())); - let server = start_mock_server().await; - - let mut builder = test_codex().with_config(|config| { - config.features.enable(Feature::UnifiedExec); - }); - let TestCodex { - codex, - cwd, - session_configured, - .. - } = builder.build(&server).await?; - - let first_call_id = "uexec-timeout"; - let first_args = serde_json::json!({ - "input": ["/bin/sh", "-c", "sleep 0.1; echo ready"], - "timeout_ms": 10, - }); - - let second_call_id = "uexec-poll"; - let second_args = serde_json::json!({ - "input": Vec::::new(), - "session_id": "0", - "timeout_ms": 800, - }); - - let responses = vec![ - sse(vec![ - ev_response_created("resp-1"), - ev_function_call( - first_call_id, - "unified_exec", - &serde_json::to_string(&first_args)?, - ), - ev_completed("resp-1"), - ]), - sse(vec![ - ev_response_created("resp-2"), - ev_function_call( - second_call_id, - "unified_exec", - &serde_json::to_string(&second_args)?, - ), - ev_completed("resp-2"), - ]), - sse(vec![ - ev_assistant_message("msg-1", "done"), - ev_completed("resp-3"), - ]), - ]; - mount_sse_sequence(&server, responses).await; - - let session_model = session_configured.model.clone(); - - codex - .submit(Op::UserTurn { - items: vec![InputItem::Text { - text: "check timeout".into(), - }], - final_output_json_schema: None, - cwd: cwd.path().to_path_buf(), - approval_policy: AskForApproval::Never, - sandbox_policy: SandboxPolicy::DangerFullAccess, - model: session_model, - effort: None, - summary: ReasoningSummary::Auto, + let manager = UnifiedExecSessionManager::default(); + let result = manager + .handle_request(UnifiedExecRequest { + mode: UnifiedExecMode::Start { + cmd: std::borrow::Cow::Borrowed("sleep 0.1; echo ready"), + yield_time_ms: Some(10), + max_output_tokens: Some(1_000), + shell: Some("/bin/sh"), + login: Some(false), + cwd: None, + }, + output_chunk_id: None, + output_wall_time: None, + output_json: Some(false), }) .await?; - loop { - let event = codex.next_event().await.expect("event"); - if matches!(event.msg, EventMsg::TaskComplete(_)) { - break; + if let Some(session_id) = result.metadata.session_id { + tokio::time::sleep(Duration::from_millis(200)).await; + match manager + .handle_request(UnifiedExecRequest { + mode: UnifiedExecMode::Write { + session_id, + chars: "", + yield_time_ms: Some(500), + max_output_tokens: Some(1_000), + }, + output_chunk_id: None, + output_wall_time: None, + output_json: Some(false), + }) + .await + { + Ok(poll) => assert!(poll.content.into_string().contains("ready")), + Err(codex_core::UnifiedExecError::SessionExited { .. }) => {} + Err(other) => return Err(other.into()), } + } else { + assert!(result.content.into_string().contains("ready")); } - let requests = server.received_requests().await.expect("recorded requests"); - assert!(!requests.is_empty(), "expected at least one POST request"); + Ok(()) +} - let bodies = requests - .iter() - .map(|req| req.body_json::().expect("request json")) - .collect::>(); +#[cfg(unix)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unified_exec_json_output_matches_metadata() -> Result<()> { + skip_if_sandbox!(Ok(())); - let outputs = collect_tool_outputs(&bodies)?; + let manager = UnifiedExecSessionManager::default(); + let command = "printf 'ready\\n' && read dummy"; - let first_output = outputs.get(first_call_id).expect("missing timeout output"); - assert_eq!(first_output["session_id"], "0"); - assert!( - first_output["output"] - .as_str() - .unwrap_or_default() - .is_empty() + let result = manager + .handle_request(UnifiedExecRequest { + mode: UnifiedExecMode::Start { + cmd: std::borrow::Cow::Borrowed(command), + yield_time_ms: Some(500), + max_output_tokens: Some(1_000), + shell: Some("/bin/bash"), + login: Some(false), + cwd: None, + }, + output_chunk_id: Some(true), + output_wall_time: Some(true), + output_json: Some(true), + }) + .await?; + + let codex_core::UnifiedExecResult { content, metadata } = result; + + let body = content.into_string(); + let json: Value = serde_json::from_str(&body)?; + + assert!(json.get("chunk_id").is_some()); + assert!(json.get("wall_time").is_some()); + + let session_id = metadata.session_id.expect("expected running session"); + assert_eq!(json["session_id"].as_i64(), Some(i64::from(session_id))); + + let output = json["output"] + .as_object() + .expect("output is object with numbered lines"); + assert_eq!( + output.get("1").and_then(Value::as_str), + Some("ready"), + "expected first output line to contain ready" ); - let poll_output = outputs.get(second_call_id).expect("missing poll output"); - let output_text = poll_output["output"].as_str().unwrap_or_default(); + assert_eq!(metadata.exec_cmd.as_deref(), Some(command)); + + Ok(()) +} + +#[cfg(unix)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unified_exec_respects_output_preferences() -> Result<()> { + skip_if_sandbox!(Ok(())); + + let manager = UnifiedExecSessionManager::default(); + + let result = manager + .handle_request(UnifiedExecRequest { + mode: UnifiedExecMode::Start { + cmd: std::borrow::Cow::Borrowed("printf 'ready\\n' && read dummy"), + yield_time_ms: Some(500), + max_output_tokens: Some(1_000), + shell: Some("/bin/bash"), + login: Some(false), + cwd: None, + }, + output_chunk_id: Some(false), + output_wall_time: Some(false), + output_json: Some(false), + }) + .await?; + + let codex_core::UnifiedExecResult { content, metadata } = result; + + assert_eq!( + metadata.exec_cmd.as_deref(), + Some("printf 'ready\\n' && read dummy") + ); assert!( - output_text.contains("ready"), - "expected ready output, got {output_text:?}" + metadata.session_id.is_some(), + "session should remain active when waiting for stdin input" + ); + + let text = content.into_string(); + assert!( + !text.contains("Chunk ID:"), + "chunk metadata should be omitted when output_chunk_id is false: {text}" + ); + assert!( + !text.contains("Wall time:"), + "wall time metadata should be omitted when output_wall_time is false: {text}" + ); + assert!( + text.contains("Process running with session ID"), + "expected running-session metadata in textual response: {text}" + ); + assert!( + text.contains("ready"), + "expected command output to appear in textual response: {text}" + ); + + Ok(()) +} + +#[cfg(unix)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unified_exec_reports_truncation_metadata() -> Result<()> { + skip_if_sandbox!(Ok(())); + + let manager = UnifiedExecSessionManager::default(); + let script = r#"python3 - <<'PY' +import sys +sys.stdout.write("X" * 2048) +sys.stdout.flush() +PY +"#; + + let result = manager + .handle_request(UnifiedExecRequest { + mode: UnifiedExecMode::Start { + cmd: std::borrow::Cow::Borrowed(script), + yield_time_ms: Some(500), + max_output_tokens: Some(1), + shell: Some("/bin/sh"), + login: Some(false), + cwd: None, + }, + output_chunk_id: Some(true), + output_wall_time: Some(true), + output_json: Some(false), + }) + .await?; + + let codex_core::UnifiedExecResult { content, metadata } = result; + + assert!( + metadata.original_token_count.is_some_and(|count| count > 0), + "expected original_token_count metadata when truncation occurs" + ); + + let text = content.into_string(); + assert!( + text.contains("tokens truncated"), + "expected truncation notice in textual output: {text}" ); Ok(())