mirror of
https://github.com/openai/codex.git
synced 2026-04-26 07:35:29 +00:00
Add pushed exec process events
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -7,8 +7,10 @@ use std::sync::Arc;
|
||||
use anyhow::Result;
|
||||
use codex_exec_server::Environment;
|
||||
use codex_exec_server::ExecBackend;
|
||||
use codex_exec_server::ExecOutputStream;
|
||||
use codex_exec_server::ExecParams;
|
||||
use codex_exec_server::ExecProcess;
|
||||
use codex_exec_server::ExecProcessEvent;
|
||||
use codex_exec_server::ExecStdinMode;
|
||||
use codex_exec_server::ProcessId;
|
||||
use codex_exec_server::ReadResponse;
|
||||
@@ -117,6 +119,40 @@ async fn collect_process_output_from_reads(
|
||||
Ok((output, exit_code, true))
|
||||
}
|
||||
|
||||
async fn collect_process_output_from_events(
|
||||
session: Arc<dyn ExecProcess>,
|
||||
) -> Result<(String, String, Option<i32>, bool)> {
|
||||
let mut events = session.subscribe_events();
|
||||
let mut stdout = String::new();
|
||||
let mut stderr = String::new();
|
||||
let mut exit_code = None;
|
||||
loop {
|
||||
match timeout(Duration::from_secs(2), events.recv()).await?? {
|
||||
ExecProcessEvent::Output(chunk) => match chunk.stream {
|
||||
ExecOutputStream::Stdout | ExecOutputStream::Pty => {
|
||||
stdout.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner()));
|
||||
}
|
||||
ExecOutputStream::Stderr => {
|
||||
stderr.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner()));
|
||||
}
|
||||
},
|
||||
ExecProcessEvent::Exited {
|
||||
seq: _,
|
||||
exit_code: code,
|
||||
} => {
|
||||
exit_code = Some(code);
|
||||
}
|
||||
ExecProcessEvent::Closed { seq: _ } => {
|
||||
drop(session);
|
||||
return Ok((stdout, stderr, exit_code, true));
|
||||
}
|
||||
ExecProcessEvent::Failed(message) => {
|
||||
anyhow::bail!("process failed before closed state: {message}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn assert_exec_process_streams_output(use_remote: bool) -> Result<()> {
|
||||
let context = create_process_context(use_remote).await?;
|
||||
let process_id = "proc-stream".to_string();
|
||||
@@ -148,6 +184,42 @@ async fn assert_exec_process_streams_output(use_remote: bool) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_exec_process_pushes_events(use_remote: bool) -> Result<()> {
|
||||
let context = create_process_context(use_remote).await?;
|
||||
let process_id = "proc-events".to_string();
|
||||
let session = context
|
||||
.backend
|
||||
.start(ExecParams {
|
||||
process_id: process_id.clone().into(),
|
||||
argv: vec![
|
||||
"/bin/sh".to_string(),
|
||||
"-c".to_string(),
|
||||
"sleep 0.05; printf 'event output\\n'; printf 'event err\\n' >&2".to_string(),
|
||||
],
|
||||
cwd: std::env::current_dir()?,
|
||||
env_policy: /*env_policy*/ None,
|
||||
env: Default::default(),
|
||||
tty: false,
|
||||
stdin: ExecStdinMode::Closed,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
assert_eq!(session.process.process_id().as_str(), process_id);
|
||||
|
||||
let StartedExecProcess { process } = session;
|
||||
let actual = collect_process_output_from_events(process).await?;
|
||||
assert_eq!(
|
||||
actual,
|
||||
(
|
||||
"event output\n".to_string(),
|
||||
"event err\n".to_string(),
|
||||
Some(0),
|
||||
true
|
||||
)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> {
|
||||
let context = create_process_context(use_remote).await?;
|
||||
let process_id = "proc-stdin".to_string();
|
||||
@@ -238,15 +310,25 @@ async fn remote_exec_process_reports_transport_disconnect() -> Result<()> {
|
||||
})
|
||||
.await?;
|
||||
|
||||
let process = Arc::clone(&session.process);
|
||||
let mut events = process.subscribe_events();
|
||||
let server = context
|
||||
.server
|
||||
.as_mut()
|
||||
.expect("remote context should include exec-server harness");
|
||||
server.shutdown().await?;
|
||||
|
||||
let mut wake_rx = session.process.subscribe_wake();
|
||||
let response =
|
||||
read_process_until_change(session.process, &mut wake_rx, /*after_seq*/ None).await?;
|
||||
let event = timeout(Duration::from_secs(2), events.recv()).await??;
|
||||
let ExecProcessEvent::Failed(event_message) = event else {
|
||||
anyhow::bail!("expected process failure event, got {event:?}");
|
||||
};
|
||||
assert!(
|
||||
event_message.starts_with("exec-server transport disconnected"),
|
||||
"unexpected failure event: {event_message}"
|
||||
);
|
||||
|
||||
let mut wake_rx = process.subscribe_wake();
|
||||
let response = read_process_until_change(process, &mut wake_rx, /*after_seq*/ None).await?;
|
||||
let message = response
|
||||
.failure
|
||||
.expect("disconnect should surface as a failure");
|
||||
@@ -280,6 +362,15 @@ async fn exec_process_streams_output(use_remote: bool) -> Result<()> {
|
||||
assert_exec_process_streams_output(use_remote).await
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
// Serialize tests that launch a real exec-server process through the full CLI.
|
||||
#[serial_test::serial(remote_exec_server)]
|
||||
async fn exec_process_pushes_events(use_remote: bool) -> Result<()> {
|
||||
assert_exec_process_pushes_events(use_remote).await
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
|
||||
Reference in New Issue
Block a user