This commit is contained in:
jimmyfraiture
2025-09-07 15:22:24 -07:00
parent 4bd40bd27f
commit 909666b03d
5 changed files with 173 additions and 124 deletions

View File

@@ -1997,22 +1997,7 @@ async fn handle_response_item(
.clone()
.or_else(|| session_id.clone())
.unwrap_or_else(|| format!("ishell:{}", Uuid::new_v4()));
let spawn_command = match crate::ishell::spawn_command_for_shell(&sess.user_shell) {
Some(command) => command,
None => {
return Ok(Some(ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: "interactive shell is not available on this platform"
.to_string(),
success: Some(false),
},
}));
}
};
let parsed_session_id = match session_id {
// todo useless
Some(value) => match value.parse::<i32>() {
Ok(parsed) => Some(parsed),
Err(_) => {
@@ -2032,7 +2017,6 @@ async fn handle_response_item(
session_id: parsed_session_id,
input: &arguments,
timeout_ms,
spawn_command: &spawn_command,
};
let result = sess.ishell_manager.handle_request(request).await;
@@ -2041,7 +2025,7 @@ async fn handle_response_item(
Ok(value) => {
#[derive(serde::Serialize)]
struct SerializedIshellResult<'a> {
session_id: i32,
session_id: Option<i32>,
output: &'a str,
}

View File

@@ -24,6 +24,9 @@ pub(crate) struct ExecCommandSession {
/// JoinHandle for the child wait task.
wait_handle: StdMutex<Option<JoinHandle<()>>>,
/// Tracks whether the underlying process has exited.
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl ExecCommandSession {
@@ -34,6 +37,7 @@ impl ExecCommandSession {
reader_handle: JoinHandle<()>,
writer_handle: JoinHandle<()>,
wait_handle: JoinHandle<()>,
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Self {
Self {
writer_tx,
@@ -42,6 +46,7 @@ impl ExecCommandSession {
reader_handle: StdMutex::new(Some(reader_handle)),
writer_handle: StdMutex::new(Some(writer_handle)),
wait_handle: StdMutex::new(Some(wait_handle)),
exit_status,
}
}
@@ -52,6 +57,10 @@ impl ExecCommandSession {
pub(crate) fn output_receiver(&self) -> broadcast::Receiver<Vec<u8>> {
self.output_tx.subscribe()
}
pub(crate) fn has_exited(&self) -> bool {
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
}
}
impl Drop for ExecCommandSession {

View File

@@ -327,11 +327,14 @@ async fn create_exec_command_session(
// Keep the child alive until it exits, then signal exit code.
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
let exit_status = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let wait_exit_status = std::sync::Arc::clone(&exit_status);
let wait_handle = tokio::task::spawn_blocking(move || {
let code = match child.wait() {
Ok(status) => status.exit_code() as i32,
Err(_) => -1,
};
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
let _ = exit_tx.send(code);
});
@@ -343,6 +346,7 @@ async fn create_exec_command_session(
reader_handle,
writer_handle,
wait_handle,
exit_status,
);
Ok((session, exit_rx))
}

View File

@@ -7,6 +7,7 @@ use std::io::ErrorKind;
use std::io::Read;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use tokio::sync::Mutex;
@@ -17,7 +18,6 @@ use tokio::time::Duration;
use tokio::time::Instant;
use crate::exec_command::ExecCommandSession;
use crate::shell;
const DEFAULT_TIMEOUT_MS: u64 = 250;
// Cap on how many bytes of interactive shell output we retain per request.
@@ -25,35 +25,16 @@ const DEFAULT_TIMEOUT_MS: u64 = 250;
// with an elision marker to indicate how much was removed.
const ISHELL_OUTPUT_MAX_BYTES: usize = 16 * 1024; // 16 KiB
#[derive(Debug, Clone)]
pub(crate) struct ShellSpawnCommand {
pub program: String,
pub args: Vec<String>,
}
pub(crate) fn spawn_command_for_shell(user_shell: &shell::Shell) -> Option<ShellSpawnCommand> {
let mut invocation = user_shell.interactive_spawn_command()?;
if invocation.is_empty() {
return None;
}
let program = invocation.remove(0);
Some(ShellSpawnCommand {
program,
args: invocation,
})
}
#[derive(Debug)]
pub(crate) struct InteractiveShellRequest<'a> {
pub session_id: Option<i32>,
pub input: &'a str,
pub timeout_ms: Option<u64>,
pub spawn_command: &'a ShellSpawnCommand,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct InteractiveShellResult {
pub session_id: i32,
pub session_id: Option<i32>,
pub output: String,
}
@@ -108,6 +89,10 @@ impl ManagedInteractiveSession {
Arc::clone(&self.output_notify),
)
}
fn has_exited(&self) -> bool {
self.session.has_exited()
}
}
impl Drop for ManagedInteractiveSession {
@@ -124,29 +109,43 @@ impl InteractiveShellSessionManager {
// todo update the errors
let timeout_ms = request.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS);
let session_id = if let Some(id) = request.session_id {
id
} else {
let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
let session = create_shell_session(request.spawn_command).await?;
let managed_session = ManagedInteractiveSession::new(session);
self.sessions.lock().await.insert(new_id, managed_session);
new_id
};
let mut new_session: Option<ManagedInteractiveSession> = None;
let session_id;
let writer_tx;
let output_buffer;
let output_notify;
// todo get the in and out of the session
let (writer_tx, output_buffer, output_notify) = {
if let Some(existing_id) = request.session_id {
let sessions = self.sessions.lock().await;
match sessions.get(&session_id) {
match sessions.get(&existing_id) {
Some(session) => {
let (buffer, notify) = session.output_handles();
(session.writer_sender(), buffer, notify)
session_id = existing_id;
writer_tx = session.writer_sender();
output_buffer = buffer;
output_notify = notify;
}
None => {
return Err(error::InteractiveShellError::UnknownSessionId {
session_id: existing_id,
});
}
None => return Err(error::InteractiveShellError::UnknownSessionId { session_id }),
}
} else {
let command = parse_command_line(request.input)?;
let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
let session = create_shell_session(&command).await?;
let managed_session = ManagedInteractiveSession::new(session);
let (buffer, notify) = managed_session.output_handles();
writer_tx = managed_session.writer_sender();
output_buffer = buffer;
output_notify = notify;
session_id = new_id;
new_session = Some(managed_session);
};
if !request.input.is_empty()
if request.session_id.is_some()
&& !request.input.is_empty()
&& writer_tx
.send(request.input.as_bytes().to_vec())
.await
@@ -195,13 +194,50 @@ impl InteractiveShellSessionManager {
ISHELL_OUTPUT_MAX_BYTES,
);
Ok(InteractiveShellResult { session_id, output })
let should_store_session = if let Some(session) = new_session.as_ref() {
!session.has_exited()
} else {
true
};
if should_store_session {
if let Some(session) = new_session {
self.sessions.lock().await.insert(session_id, session);
}
Ok(InteractiveShellResult {
session_id: Some(session_id),
output,
})
} else {
Ok(InteractiveShellResult {
session_id: None,
output,
})
}
}
}
pub(crate) fn parse_command_line(line: &str) -> Result<Vec<String>, error::InteractiveShellError> {
let trimmed = line.trim();
if trimmed.is_empty() {
return Err(error::InteractiveShellError::MissingCommandLine);
}
match shlex::split(trimmed) {
Some(parts) if !parts.is_empty() => Ok(parts),
_ => Err(error::InteractiveShellError::InvalidCommandLine {
command_line: trimmed.to_string(),
}),
}
}
async fn create_shell_session(
command: &ShellSpawnCommand,
command: &[String],
) -> Result<ExecCommandSession, error::InteractiveShellError> {
if command.is_empty() {
return Err(error::InteractiveShellError::MissingCommandLine);
}
let pty_system = native_pty_system();
let pair = pty_system
@@ -213,8 +249,8 @@ async fn create_shell_session(
})
.map_err(error::InteractiveShellError::create_session)?;
let mut command_builder = CommandBuilder::new(&command.program);
for arg in &command.args {
let mut command_builder = CommandBuilder::new(&command[0]);
for arg in &command[1..] {
command_builder.arg(arg);
}
@@ -272,8 +308,11 @@ async fn create_shell_session(
}
});
let exit_status = Arc::new(AtomicBool::new(false));
let wait_exit_status = Arc::clone(&exit_status);
let wait_handle = tokio::task::spawn_blocking(move || {
let _ = child.wait();
wait_exit_status.store(true, Ordering::SeqCst);
});
Ok(ExecCommandSession::new(
@@ -283,6 +322,7 @@ async fn create_shell_session(
reader_handle,
writer_handle,
wait_handle,
exit_status,
))
}
@@ -393,6 +433,10 @@ mod error {
UnknownSessionId { session_id: i32 },
#[error("failed to write to stdin")]
WriteToStdin,
#[error("missing command line for interactive shell request")]
MissingCommandLine,
#[error("invalid command line: {command_line}")]
InvalidCommandLine { command_line: String },
}
impl InteractiveShellError {
@@ -407,23 +451,28 @@ mod tests {
use super::*;
#[test]
fn spawn_command_from_shell_unknown_returns_none() {
assert!(spawn_command_for_shell(&shell::Shell::Unknown).is_none());
fn parse_command_line_splits_words() {
assert_eq!(
parse_command_line("echo codex").unwrap(),
vec!["echo".to_string(), "codex".to_string()]
);
}
#[test]
fn spawn_command_from_shell_bash_uses_interactive_flag() {
let bash_shell: shell::BashShell = serde_json::from_value(serde_json::json!({
"shell_path": "/bin/bash",
"bashrc_path": "~/.bashrc",
}))
.expect("bash shell should deserialize");
fn parse_command_line_trims_whitespace() {
assert_eq!(
parse_command_line(" ls -la \n").unwrap(),
vec!["ls".to_string(), "-la".to_string()]
);
}
let cmd = spawn_command_for_shell(&shell::Shell::Bash(bash_shell))
.expect("expect a spawn command");
assert_eq!(cmd.program, "/bin/bash");
assert_eq!(cmd.args, vec!["-i"]);
#[test]
fn parse_command_line_rejects_empty() {
let err = parse_command_line(" ").expect_err("expected error");
assert!(matches!(
err,
error::InteractiveShellError::MissingCommandLine
));
}
#[cfg(unix)]
@@ -432,31 +481,30 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interactive_shell_persists_across_requests() -> Result<(), error::InteractiveShellError>
{
let bash_shell: shell::BashShell = serde_json::from_value(serde_json::json!({
"shell_path": "/bin/bash",
"bashrc_path": "~/.bashrc",
}))
.expect("bash shell should deserialize");
let spawn_command = spawn_command_for_shell(&shell::Shell::Bash(bash_shell)).unwrap();
let manager = InteractiveShellSessionManager::default();
let out_1 = manager
let open_shell = manager
.handle_request(InteractiveShellRequest {
session_id: None,
input: "/bin/bash -i",
timeout_ms: Some(1_500),
})
.await?;
let session_id = open_shell.session_id.expect("expected session_id");
manager
.handle_request(InteractiveShellRequest {
session_id: Some(session_id),
input: "export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
timeout_ms: Some(1_500),
spawn_command: &spawn_command,
})
.await?;
let out_2 = manager
.handle_request(InteractiveShellRequest {
session_id: Some(out_1.session_id),
session_id: Some(session_id),
input: "echo $CODEX_INTERACTIVE_SHELL_VAR\n",
timeout_ms: Some(1_500),
spawn_command: &spawn_command,
})
.await?;
@@ -470,41 +518,39 @@ mod tests {
/// previously created sessions continue to function.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multi_interactive_shells() -> Result<(), error::InteractiveShellError> {
let bash_shell: shell::BashShell = serde_json::from_value(serde_json::json!({
"shell_path": "/bin/bash",
"bashrc_path": "~/.bashrc",
}))
.expect("bash shell should deserialize");
let spawn_command = spawn_command_for_shell(&shell::Shell::Bash(bash_shell)).unwrap();
let manager = InteractiveShellSessionManager::default();
let out_1 = manager
let shell_a = manager
.handle_request(InteractiveShellRequest {
session_id: None,
input: "/bin/bash -i",
timeout_ms: Some(1_500),
})
.await?;
let session_a = shell_a.session_id.expect("expected session id");
manager
.handle_request(InteractiveShellRequest {
session_id: Some(session_a),
input: "export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
timeout_ms: Some(1_500),
spawn_command: &spawn_command,
})
.await?;
let out_2 = manager
.handle_request(InteractiveShellRequest {
session_id: None,
input: "echo $CODEX_INTERACTIVE_SHELL_VAR\n",
input: "/bin/echo $CODEX_INTERACTIVE_SHELL_VAR\n",
timeout_ms: Some(1_500),
spawn_command: &spawn_command,
})
.await?;
assert!(!out_2.output.contains("codex"));
let out_3 = manager
.handle_request(InteractiveShellRequest {
session_id: Some(out_1.session_id),
session_id: Some(session_a),
input: "echo $CODEX_INTERACTIVE_SHELL_VAR\n",
timeout_ms: Some(1_500),
spawn_command: &spawn_command,
})
.await?;
assert!(out_3.output.contains("codex"));
@@ -517,44 +563,42 @@ mod tests {
/// collected by a follow-up request against the same session.
#[tokio::test]
async fn interactive_shell_timeouts() -> Result<(), error::InteractiveShellError> {
let bash_shell: shell::BashShell = serde_json::from_value(serde_json::json!({
"shell_path": "/bin/bash",
"bashrc_path": "~/.bashrc",
}))
.expect("bash shell should deserialize");
let spawn_command = spawn_command_for_shell(&shell::Shell::Bash(bash_shell)).unwrap();
let manager = InteractiveShellSessionManager::default();
let out_1 = manager
let open_shell = manager
.handle_request(InteractiveShellRequest {
session_id: None,
input: "/bin/bash -i",
timeout_ms: Some(1_500),
})
.await?;
let session_id = open_shell.session_id.expect("expected session id");
manager
.handle_request(InteractiveShellRequest {
session_id: Some(session_id),
input: "export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
timeout_ms: Some(1_500),
spawn_command: &spawn_command,
})
.await?;
let out_2 = manager
.handle_request(InteractiveShellRequest {
session_id: Some(out_1.session_id),
session_id: Some(session_id),
input: "sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n",
timeout_ms: Some(10),
spawn_command: &spawn_command,
})
.await?;
assert!(!out_2.output.contains("codex"));
// Wait for the end of the bash sleep.
tokio::time::sleep(Duration::from_secs(6)).await;
tokio::time::sleep(Duration::from_secs(7)).await;
let out_3 = manager
.handle_request(InteractiveShellRequest {
session_id: Some(out_1.session_id),
session_id: Some(session_id),
input: "",
timeout_ms: Some(100),
spawn_command: &spawn_command,
})
.await?;
@@ -563,6 +607,27 @@ mod tests {
Ok(())
}
#[cfg(unix)]
#[tokio::test]
async fn completed_commands_do_not_persist_sessions() -> Result<(), error::InteractiveShellError>
{
let manager = InteractiveShellSessionManager::default();
let result = manager
.handle_request(InteractiveShellRequest {
session_id: None,
input: "/bin/echo codex",
timeout_ms: Some(1_500),
})
.await?;
assert!(result.session_id.is_none());
assert!(result.output.contains("codex"));
assert!(manager.sessions.lock().await.is_empty());
Ok(())
}
#[test]
fn truncate_middle_no_newlines_fallback() {
// Long string without newlines forces a pure byte/char-boundary truncation.

View File

@@ -30,19 +30,6 @@ pub enum Shell {
}
impl Shell {
pub(crate) fn interactive_spawn_command(&self) -> Option<Vec<String>> {
match self {
Shell::Zsh(zsh) => Some(vec![zsh.shell_path.clone(), "-i".to_string()]),
Shell::Bash(bash) => Some(vec![bash.shell_path.clone(), "-i".to_string()]),
Shell::PowerShell(ps) => Some(vec![
ps.exe.clone(),
"-NoLogo".to_string(),
"-NoProfile".to_string(),
]),
Shell::Unknown => None,
}
}
pub fn format_default_shell_invocation(&self, command: Vec<String>) -> Option<Vec<String>> {
match self {
Shell::Zsh(zsh) => {