mirror of
https://github.com/openai/codex.git
synced 2026-04-26 07:35:29 +00:00
Support piped stdin in exec process API
Add pipe_stdin to process/start so non-tty processes can opt into a writable stdin pipe for process/write. Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -12,6 +12,7 @@ use codex_exec_server::ExecProcess;
|
||||
use codex_exec_server::ProcessId;
|
||||
use codex_exec_server::ReadResponse;
|
||||
use codex_exec_server::StartedExecProcess;
|
||||
use codex_exec_server::WriteStatus;
|
||||
use pretty_assertions::assert_eq;
|
||||
use test_case::test_case;
|
||||
use tokio::sync::watch;
|
||||
@@ -54,6 +55,7 @@ async fn assert_exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
|
||||
env_policy: /*env_policy*/ None,
|
||||
env: Default::default(),
|
||||
tty: false,
|
||||
pipe_stdin: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
@@ -131,6 +133,7 @@ async fn assert_exec_process_streams_output(use_remote: bool) -> Result<()> {
|
||||
env_policy: /*env_policy*/ None,
|
||||
env: Default::default(),
|
||||
tty: false,
|
||||
pipe_stdin: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
@@ -164,6 +167,7 @@ async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> {
|
||||
env_policy: /*env_policy*/ None,
|
||||
env: Default::default(),
|
||||
tty: true,
|
||||
pipe_stdin: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
@@ -184,6 +188,73 @@ async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_exec_process_write_then_read_without_tty(use_remote: bool) -> Result<()> {
|
||||
let context = create_process_context(use_remote).await?;
|
||||
let process_id = "proc-stdin-pipe".to_string();
|
||||
let session = context
|
||||
.backend
|
||||
.start(ExecParams {
|
||||
process_id: process_id.clone().into(),
|
||||
argv: vec![
|
||||
"/bin/sh".to_string(),
|
||||
"-c".to_string(),
|
||||
"IFS= read line; printf 'from-stdin:%s\\n' \"$line\"".to_string(),
|
||||
],
|
||||
cwd: std::env::current_dir()?,
|
||||
env_policy: /*env_policy*/ None,
|
||||
env: Default::default(),
|
||||
tty: false,
|
||||
pipe_stdin: true,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
assert_eq!(session.process.process_id().as_str(), process_id);
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
let write_response = session.process.write(b"hello\n".to_vec()).await?;
|
||||
assert_eq!(write_response.status, WriteStatus::Accepted);
|
||||
let StartedExecProcess { process } = session;
|
||||
let wake_rx = process.subscribe_wake();
|
||||
let actual = collect_process_output_from_reads(process, wake_rx).await?;
|
||||
|
||||
assert_eq!(actual, ("from-stdin:hello\n".to_string(), Some(0), true));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_exec_process_rejects_write_without_pipe_stdin(use_remote: bool) -> Result<()> {
|
||||
let context = create_process_context(use_remote).await?;
|
||||
let process_id = "proc-stdin-closed".to_string();
|
||||
let session = context
|
||||
.backend
|
||||
.start(ExecParams {
|
||||
process_id: process_id.clone().into(),
|
||||
argv: vec![
|
||||
"/bin/sh".to_string(),
|
||||
"-c".to_string(),
|
||||
"sleep 0.3; if IFS= read -r line; then printf 'read:%s\\n' \"$line\"; else printf 'eof\\n'; fi".to_string(),
|
||||
],
|
||||
cwd: std::env::current_dir()?,
|
||||
env_policy: /*env_policy*/ None,
|
||||
env: Default::default(),
|
||||
tty: false,
|
||||
pipe_stdin: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
assert_eq!(session.process.process_id().as_str(), process_id);
|
||||
|
||||
let write_response = session.process.write(b"ignored\n".to_vec()).await?;
|
||||
assert_eq!(write_response.status, WriteStatus::StdinClosed);
|
||||
let StartedExecProcess { process } = session;
|
||||
let wake_rx = process.subscribe_wake();
|
||||
let (output, exit_code, closed) = collect_process_output_from_reads(process, wake_rx).await?;
|
||||
|
||||
assert_eq!(output, "eof\n");
|
||||
assert_eq!(exit_code, Some(0));
|
||||
assert!(closed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_exec_process_preserves_queued_events_before_subscribe(
|
||||
use_remote: bool,
|
||||
) -> Result<()> {
|
||||
@@ -201,6 +272,7 @@ async fn assert_exec_process_preserves_queued_events_before_subscribe(
|
||||
env_policy: /*env_policy*/ None,
|
||||
env: Default::default(),
|
||||
tty: false,
|
||||
pipe_stdin: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
@@ -234,6 +306,7 @@ async fn remote_exec_process_reports_transport_disconnect() -> Result<()> {
|
||||
env_policy: /*env_policy*/ None,
|
||||
env: Default::default(),
|
||||
tty: false,
|
||||
pipe_stdin: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
@@ -289,6 +362,24 @@ async fn exec_process_write_then_read(use_remote: bool) -> Result<()> {
|
||||
assert_exec_process_write_then_read(use_remote).await
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
// Serialize tests that launch a real exec-server process through the full CLI.
|
||||
#[serial_test::serial(remote_exec_server)]
|
||||
async fn exec_process_write_then_read_without_tty(use_remote: bool) -> Result<()> {
|
||||
assert_exec_process_write_then_read_without_tty(use_remote).await
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
// Serialize tests that launch a real exec-server process through the full CLI.
|
||||
#[serial_test::serial(remote_exec_server)]
|
||||
async fn exec_process_rejects_write_without_pipe_stdin(use_remote: bool) -> Result<()> {
|
||||
assert_exec_process_rejects_write_without_pipe_stdin(use_remote).await
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
|
||||
Reference in New Issue
Block a user