mirror of
https://github.com/openai/codex.git
synced 2026-03-06 06:33:21 +00:00
Compare commits
2 Commits
latest-alp
...
pr13432
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
995ca7d929 | ||
|
|
deee4fd3e8 |
@@ -11,7 +11,6 @@ use app_test_support::McpProcess;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::CommandAction;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
@@ -35,9 +34,11 @@ use codex_core::features::Feature;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::sleep;
|
||||
use tokio::time::timeout;
|
||||
|
||||
#[cfg(windows)]
|
||||
@@ -61,10 +62,8 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
|
||||
};
|
||||
eprintln!("using zsh path for zsh-fork test: {}", zsh_path.display());
|
||||
|
||||
let responses = vec![create_shell_command_sse_response(
|
||||
vec!["echo".to_string(), "hi".to_string()],
|
||||
None,
|
||||
Some(5000),
|
||||
let responses = vec![create_zsh_fork_exec_command_sse_response(
|
||||
"echo hi",
|
||||
"call-zsh-fork",
|
||||
)?];
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
@@ -74,7 +73,7 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
|
||||
"never",
|
||||
&BTreeMap::from([
|
||||
(Feature::ShellZshFork, true),
|
||||
(Feature::UnifiedExec, false),
|
||||
(Feature::UnifiedExec, true),
|
||||
(Feature::ShellSnapshot, false),
|
||||
]),
|
||||
&zsh_path,
|
||||
@@ -172,14 +171,8 @@ async fn turn_start_shell_zsh_fork_exec_approval_decline_v2() -> Result<()> {
|
||||
eprintln!("using zsh path for zsh-fork test: {}", zsh_path.display());
|
||||
|
||||
let responses = vec![
|
||||
create_shell_command_sse_response(
|
||||
vec![
|
||||
"python3".to_string(),
|
||||
"-c".to_string(),
|
||||
"print(42)".to_string(),
|
||||
],
|
||||
None,
|
||||
Some(5000),
|
||||
create_zsh_fork_exec_command_sse_response(
|
||||
"python3 -c 'print(42)'",
|
||||
"call-zsh-fork-decline",
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("done")?,
|
||||
@@ -191,7 +184,7 @@ async fn turn_start_shell_zsh_fork_exec_approval_decline_v2() -> Result<()> {
|
||||
"untrusted",
|
||||
&BTreeMap::from([
|
||||
(Feature::ShellZshFork, true),
|
||||
(Feature::UnifiedExec, false),
|
||||
(Feature::UnifiedExec, true),
|
||||
(Feature::ShellSnapshot, false),
|
||||
]),
|
||||
&zsh_path,
|
||||
@@ -307,14 +300,8 @@ async fn turn_start_shell_zsh_fork_exec_approval_cancel_v2() -> Result<()> {
|
||||
};
|
||||
eprintln!("using zsh path for zsh-fork test: {}", zsh_path.display());
|
||||
|
||||
let responses = vec![create_shell_command_sse_response(
|
||||
vec![
|
||||
"python3".to_string(),
|
||||
"-c".to_string(),
|
||||
"print(42)".to_string(),
|
||||
],
|
||||
None,
|
||||
Some(5000),
|
||||
let responses = vec![create_zsh_fork_exec_command_sse_response(
|
||||
"python3 -c 'print(42)'",
|
||||
"call-zsh-fork-cancel",
|
||||
)?];
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
@@ -324,7 +311,7 @@ async fn turn_start_shell_zsh_fork_exec_approval_cancel_v2() -> Result<()> {
|
||||
"untrusted",
|
||||
&BTreeMap::from([
|
||||
(Feature::ShellZshFork, true),
|
||||
(Feature::UnifiedExec, false),
|
||||
(Feature::UnifiedExec, true),
|
||||
(Feature::ShellSnapshot, false),
|
||||
]),
|
||||
&zsh_path,
|
||||
@@ -422,6 +409,181 @@ async fn turn_start_shell_zsh_fork_exec_approval_cancel_v2() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_shell_zsh_fork_interrupt_kills_approved_subcommand_v2() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let tmp = TempDir::new()?;
|
||||
let codex_home = tmp.path().join("codex_home");
|
||||
std::fs::create_dir(&codex_home)?;
|
||||
let workspace = tmp.path().join("workspace");
|
||||
std::fs::create_dir(&workspace)?;
|
||||
let pid_file = workspace.join("approved-subcommand.pid");
|
||||
let pid_file_display = pid_file.display().to_string();
|
||||
assert!(
|
||||
!pid_file_display.contains('\''),
|
||||
"test workspace path should not contain single quotes: {pid_file_display}"
|
||||
);
|
||||
|
||||
let Some(zsh_path) = find_test_zsh_path()? else {
|
||||
eprintln!("skipping zsh fork interrupt cleanup test: no zsh executable found");
|
||||
return Ok(());
|
||||
};
|
||||
if !supports_exec_wrapper_intercept(&zsh_path) {
|
||||
eprintln!(
|
||||
"skipping zsh fork interrupt cleanup test: zsh does not support EXEC_WRAPPER intercepts ({})",
|
||||
zsh_path.display()
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
let zsh_path_display = zsh_path.display().to_string();
|
||||
eprintln!("using zsh path for zsh-fork test: {zsh_path_display}");
|
||||
|
||||
let shell_command =
|
||||
format!("/bin/sh -c 'echo $$ > \"{pid_file_display}\" && exec /bin/sleep 100'");
|
||||
let tool_call_arguments = serde_json::to_string(&json!({
|
||||
"cmd": shell_command,
|
||||
"yield_time_ms": 30_000,
|
||||
}))?;
|
||||
let response = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(
|
||||
"call-zsh-fork-interrupt-cleanup",
|
||||
"exec_command",
|
||||
&tool_call_arguments,
|
||||
),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
let no_op_response = responses::sse(vec![
|
||||
responses::ev_response_created("resp-2"),
|
||||
responses::ev_completed("resp-2"),
|
||||
]);
|
||||
let server =
|
||||
create_mock_responses_server_sequence_unchecked(vec![response, no_op_response]).await;
|
||||
create_config_toml(
|
||||
&codex_home,
|
||||
&server.uri(),
|
||||
"untrusted",
|
||||
&BTreeMap::from([
|
||||
(Feature::ShellZshFork, true),
|
||||
(Feature::UnifiedExec, true),
|
||||
(Feature::ShellSnapshot, false),
|
||||
]),
|
||||
&zsh_path,
|
||||
)?;
|
||||
|
||||
let mut mcp = create_zsh_test_mcp_process(&codex_home, &workspace).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let start_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
cwd: Some(workspace.to_string_lossy().into_owned()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
|
||||
let turn_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "run the long-lived command".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
cwd: Some(workspace.clone()),
|
||||
approval_policy: Some(codex_app_server_protocol::AskForApproval::UnlessTrusted),
|
||||
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::WorkspaceWrite {
|
||||
writable_roots: vec![workspace.clone().try_into()?],
|
||||
read_only_access: codex_app_server_protocol::ReadOnlyAccess::FullAccess,
|
||||
network_access: false,
|
||||
exclude_tmpdir_env_var: false,
|
||||
exclude_slash_tmp: false,
|
||||
}),
|
||||
model: Some("mock-model".to_string()),
|
||||
effort: Some(codex_protocol::openai_models::ReasoningEffort::Medium),
|
||||
summary: Some(codex_protocol::config_types::ReasoningSummary::Auto),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
|
||||
let mut saw_target_approval = false;
|
||||
while !saw_target_approval {
|
||||
let server_req = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_request_message(),
|
||||
)
|
||||
.await??;
|
||||
let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req
|
||||
else {
|
||||
panic!("expected CommandExecutionRequestApproval request");
|
||||
};
|
||||
let approval_command = params.command.clone().unwrap_or_default();
|
||||
saw_target_approval = approval_command.contains("/bin/sh")
|
||||
&& approval_command.contains(&pid_file_display)
|
||||
&& !approval_command.contains(&zsh_path_display);
|
||||
mcp.send_response(
|
||||
request_id,
|
||||
serde_json::to_value(CommandExecutionRequestApprovalResponse {
|
||||
decision: CommandExecutionApprovalDecision::Accept,
|
||||
})?,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let pid = timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
if let Ok(contents) = std::fs::read_to_string(&pid_file) {
|
||||
return Ok::<i32, anyhow::Error>(contents.trim().parse()?);
|
||||
}
|
||||
sleep(std::time::Duration::from_millis(20)).await;
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
let still_running = std::process::Command::new("/bin/kill")
|
||||
.args(["-0", &pid.to_string()])
|
||||
.status()?
|
||||
.success();
|
||||
assert!(
|
||||
still_running,
|
||||
"expected approved intercepted subprocess pid {pid} to be running before interrupt"
|
||||
);
|
||||
|
||||
mcp.interrupt_turn_and_wait_for_aborted(
|
||||
thread.id.clone(),
|
||||
turn.id.clone(),
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let still_running = std::process::Command::new("/bin/kill")
|
||||
.args(["-0", &pid.to_string()])
|
||||
.status()?
|
||||
.success();
|
||||
if !still_running {
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
sleep(std::time::Duration::from_millis(20)).await;
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -453,16 +615,15 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
|
||||
first_file.display(),
|
||||
second_file.display()
|
||||
);
|
||||
let tool_call_arguments = serde_json::to_string(&serde_json::json!({
|
||||
"command": shell_command,
|
||||
"workdir": serde_json::Value::Null,
|
||||
"timeout_ms": 5000
|
||||
let tool_call_arguments = serde_json::to_string(&json!({
|
||||
"cmd": shell_command,
|
||||
"yield_time_ms": 5000,
|
||||
}))?;
|
||||
let response = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(
|
||||
"call-zsh-fork-subcommand-decline",
|
||||
"shell_command",
|
||||
"exec_command",
|
||||
&tool_call_arguments,
|
||||
),
|
||||
responses::ev_completed("resp-1"),
|
||||
@@ -483,7 +644,7 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
|
||||
"untrusted",
|
||||
&BTreeMap::from([
|
||||
(Feature::ShellZshFork, true),
|
||||
(Feature::UnifiedExec, false),
|
||||
(Feature::UnifiedExec, true),
|
||||
(Feature::ShellSnapshot, false),
|
||||
]),
|
||||
&zsh_path,
|
||||
@@ -694,6 +855,21 @@ async fn create_zsh_test_mcp_process(codex_home: &Path, zdotdir: &Path) -> Resul
|
||||
McpProcess::new_with_env(codex_home, &[("ZDOTDIR", Some(zdotdir.as_str()))]).await
|
||||
}
|
||||
|
||||
fn create_zsh_fork_exec_command_sse_response(
|
||||
command: &str,
|
||||
call_id: &str,
|
||||
) -> anyhow::Result<String> {
|
||||
let tool_call_arguments = serde_json::to_string(&json!({
|
||||
"cmd": command,
|
||||
"yield_time_ms": 5000,
|
||||
}))?;
|
||||
Ok(responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(call_id, "exec_command", &tool_call_arguments),
|
||||
responses::ev_completed("resp-1"),
|
||||
]))
|
||||
}
|
||||
|
||||
fn create_config_toml(
|
||||
codex_home: &Path,
|
||||
server_uri: &str,
|
||||
|
||||
@@ -48,7 +48,6 @@ use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -106,9 +105,6 @@ pub(super) async fn try_run_zsh_fork(
|
||||
req.timeout_ms
|
||||
.unwrap_or(crate::exec::DEFAULT_EXEC_COMMAND_TIMEOUT_MS),
|
||||
);
|
||||
let exec_policy = Arc::new(RwLock::new(
|
||||
ctx.session.services.exec_policy.current().as_ref().clone(),
|
||||
));
|
||||
let command_executor = CoreShellCommandExecutor {
|
||||
command,
|
||||
cwd: sandbox_cwd,
|
||||
@@ -153,7 +149,6 @@ pub(super) async fn try_run_zsh_fork(
|
||||
let stopwatch = Stopwatch::new(effective_timeout);
|
||||
let cancel_token = stopwatch.cancellation_token();
|
||||
let escalation_policy = CoreShellActionProvider {
|
||||
policy: Arc::clone(&exec_policy),
|
||||
session: Arc::clone(&ctx.session),
|
||||
turn: Arc::clone(&ctx.turn),
|
||||
call_id: ctx.call_id.clone(),
|
||||
@@ -213,20 +208,30 @@ pub(crate) async fn prepare_unified_exec_zsh_fork(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let exec_policy = Arc::new(RwLock::new(
|
||||
ctx.session.services.exec_policy.current().as_ref().clone(),
|
||||
));
|
||||
let ExecRequest {
|
||||
command,
|
||||
cwd,
|
||||
env,
|
||||
network,
|
||||
expiration: _expiration,
|
||||
sandbox,
|
||||
windows_sandbox_level,
|
||||
sandbox_permissions,
|
||||
sandbox_policy,
|
||||
justification,
|
||||
arg0,
|
||||
} = &exec_request;
|
||||
let command_executor = CoreShellCommandExecutor {
|
||||
command: exec_request.command.clone(),
|
||||
cwd: exec_request.cwd.clone(),
|
||||
sandbox_policy: exec_request.sandbox_policy.clone(),
|
||||
sandbox: exec_request.sandbox,
|
||||
env: exec_request.env.clone(),
|
||||
network: exec_request.network.clone(),
|
||||
windows_sandbox_level: exec_request.windows_sandbox_level,
|
||||
sandbox_permissions: exec_request.sandbox_permissions,
|
||||
justification: exec_request.justification.clone(),
|
||||
arg0: exec_request.arg0.clone(),
|
||||
command: command.clone(),
|
||||
cwd: cwd.clone(),
|
||||
sandbox_policy: sandbox_policy.clone(),
|
||||
sandbox: *sandbox,
|
||||
env: env.clone(),
|
||||
network: network.clone(),
|
||||
windows_sandbox_level: *windows_sandbox_level,
|
||||
sandbox_permissions: *sandbox_permissions,
|
||||
justification: justification.clone(),
|
||||
arg0: arg0.clone(),
|
||||
sandbox_policy_cwd: ctx.turn.cwd.clone(),
|
||||
macos_seatbelt_profile_extensions: ctx
|
||||
.turn
|
||||
@@ -248,7 +253,6 @@ pub(crate) async fn prepare_unified_exec_zsh_fork(
|
||||
)
|
||||
})?;
|
||||
let escalation_policy = CoreShellActionProvider {
|
||||
policy: Arc::clone(&exec_policy),
|
||||
session: Arc::clone(&ctx.session),
|
||||
turn: Arc::clone(&ctx.turn),
|
||||
call_id: ctx.call_id.clone(),
|
||||
@@ -276,7 +280,6 @@ pub(crate) async fn prepare_unified_exec_zsh_fork(
|
||||
}
|
||||
|
||||
struct CoreShellActionProvider {
|
||||
policy: Arc<RwLock<Policy>>,
|
||||
session: Arc<crate::codex::Session>,
|
||||
turn: Arc<crate::codex::TurnContext>,
|
||||
call_id: String,
|
||||
@@ -592,18 +595,16 @@ impl EscalationPolicy for CoreShellActionProvider {
|
||||
.await;
|
||||
}
|
||||
|
||||
let evaluation = {
|
||||
let policy = self.policy.read().await;
|
||||
evaluate_intercepted_exec_policy(
|
||||
&policy,
|
||||
program,
|
||||
argv,
|
||||
self.approval_policy,
|
||||
&self.sandbox_policy,
|
||||
self.sandbox_permissions,
|
||||
ENABLE_INTERCEPTED_EXEC_POLICY_SHELL_WRAPPER_PARSING,
|
||||
)
|
||||
};
|
||||
let policy = self.session.services.exec_policy.current();
|
||||
let evaluation = evaluate_intercepted_exec_policy(
|
||||
policy.as_ref(),
|
||||
program,
|
||||
argv,
|
||||
self.approval_policy,
|
||||
&self.sandbox_policy,
|
||||
self.sandbox_permissions,
|
||||
ENABLE_INTERCEPTED_EXEC_POLICY_SHELL_WRAPPER_PARSING,
|
||||
);
|
||||
// When true, means the Evaluation was due to *.rules, not the
|
||||
// fallback function.
|
||||
let decision_driven_by_policy =
|
||||
|
||||
@@ -48,12 +48,23 @@ mod imp {
|
||||
use crate::unified_exec::SpawnLifecycle;
|
||||
use codex_shell_escalation::EscalationSession;
|
||||
|
||||
const ESCALATE_SOCKET_ENV_VAR: &str = "CODEX_ESCALATE_SOCKET";
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ZshForkSpawnLifecycle {
|
||||
escalation_session: EscalationSession,
|
||||
}
|
||||
|
||||
impl SpawnLifecycle for ZshForkSpawnLifecycle {
|
||||
fn inherited_fds(&self) -> Vec<i32> {
|
||||
self.escalation_session
|
||||
.env()
|
||||
.get(ESCALATE_SOCKET_ENV_VAR)
|
||||
.and_then(|fd| fd.parse().ok())
|
||||
.into_iter()
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn after_spawn(&mut self) {
|
||||
self.escalation_session.close_client_socket();
|
||||
}
|
||||
|
||||
@@ -119,8 +119,6 @@ impl ToolsConfig {
|
||||
|
||||
let shell_type = if !features.enabled(Feature::ShellTool) {
|
||||
ConfigShellToolType::Disabled
|
||||
} else if features.enabled(Feature::ShellZshFork) {
|
||||
ConfigShellToolType::ShellCommand
|
||||
} else if features.enabled(Feature::UnifiedExec) {
|
||||
// If ConPTY not supported (for old Windows versions), fallback on ShellCommand.
|
||||
if codex_utils_pty::conpty_supported() {
|
||||
@@ -128,6 +126,8 @@ impl ToolsConfig {
|
||||
} else {
|
||||
ConfigShellToolType::ShellCommand
|
||||
}
|
||||
} else if features.enabled(Feature::ShellZshFork) {
|
||||
ConfigShellToolType::ShellCommand
|
||||
} else {
|
||||
model_info.shell_type
|
||||
};
|
||||
@@ -2816,7 +2816,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shell_zsh_fork_prefers_shell_command_over_unified_exec() {
|
||||
fn shell_zsh_fork_uses_unified_exec_when_enabled() {
|
||||
let config = test_config();
|
||||
let model_info = ModelsManager::construct_model_info_offline_for_tests("o3", &config);
|
||||
let mut features = Features::with_defaults();
|
||||
@@ -2830,7 +2830,7 @@ mod tests {
|
||||
session_source: SessionSource::Cli,
|
||||
});
|
||||
|
||||
assert_eq!(tools_config.shell_type, ConfigShellToolType::ShellCommand);
|
||||
assert_eq!(tools_config.shell_type, ConfigShellToolType::UnifiedExec);
|
||||
assert_eq!(
|
||||
tools_config.shell_command_backend,
|
||||
ShellCommandBackendConfig::ZshFork
|
||||
|
||||
@@ -25,6 +25,10 @@ use super::UnifiedExecError;
|
||||
use super::head_tail_buffer::HeadTailBuffer;
|
||||
|
||||
pub(crate) trait SpawnLifecycle: std::fmt::Debug + Send + Sync {
|
||||
fn inherited_fds(&self) -> Vec<i32> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn after_spawn(&mut self) {}
|
||||
}
|
||||
|
||||
|
||||
@@ -535,14 +535,16 @@ impl UnifiedExecProcessManager {
|
||||
.command
|
||||
.split_first()
|
||||
.ok_or(UnifiedExecError::MissingCommandLine)?;
|
||||
let inherited_fds = spawn_lifecycle.inherited_fds();
|
||||
|
||||
let spawn_result = if tty {
|
||||
codex_utils_pty::pty::spawn_process(
|
||||
codex_utils_pty::pty::spawn_process_with_inherited_fds(
|
||||
program,
|
||||
args,
|
||||
env.cwd.as_path(),
|
||||
&env.env,
|
||||
&env.arg0,
|
||||
&inherited_fds,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use anyhow::Context;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
|
||||
pub async fn wait_for_pid_file(path: &Path) -> anyhow::Result<String> {
|
||||
@@ -24,6 +25,7 @@ pub async fn wait_for_pid_file(path: &Path) -> anyhow::Result<String> {
|
||||
pub fn process_is_alive(pid: &str) -> anyhow::Result<bool> {
|
||||
let status = std::process::Command::new("kill")
|
||||
.args(["-0", pid])
|
||||
.stderr(Stdio::null())
|
||||
.status()
|
||||
.context("failed to probe process liveness with kill -0")?;
|
||||
Ok(status.success())
|
||||
|
||||
@@ -91,6 +91,29 @@ where
|
||||
builder.build(server).await
|
||||
}
|
||||
|
||||
pub async fn build_unified_exec_zsh_fork_test<F>(
|
||||
server: &wiremock::MockServer,
|
||||
runtime: ZshForkRuntime,
|
||||
approval_policy: AskForApproval,
|
||||
sandbox_policy: SandboxPolicy,
|
||||
pre_build_hook: F,
|
||||
) -> Result<TestCodex>
|
||||
where
|
||||
F: FnOnce(&Path) + Send + 'static,
|
||||
{
|
||||
let mut builder = test_codex()
|
||||
.with_pre_build_hook(pre_build_hook)
|
||||
.with_config(move |config| {
|
||||
runtime.apply_to_config(config, approval_policy, sandbox_policy);
|
||||
config.use_experimental_unified_exec_tool = true;
|
||||
config
|
||||
.features
|
||||
.enable(Feature::UnifiedExec)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
builder.build(server).await
|
||||
}
|
||||
|
||||
fn find_test_zsh_path() -> Result<Option<PathBuf>> {
|
||||
let repo_root = codex_utils_cargo_bin::repo_root()?;
|
||||
let dotslash_zsh = repo_root.join("codex-rs/app-server/tests/suite/zsh");
|
||||
|
||||
@@ -35,6 +35,7 @@ use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_with_timeout;
|
||||
use core_test_support::zsh_fork::build_unified_exec_zsh_fork_test;
|
||||
use core_test_support::zsh_fork::build_zsh_fork_test;
|
||||
use core_test_support::zsh_fork::restrictive_workspace_write_policy;
|
||||
use core_test_support::zsh_fork::zsh_fork_runtime;
|
||||
@@ -1985,6 +1986,158 @@ async fn approving_execpolicy_amendment_persists_policy_and_skips_future_prompts
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[cfg(unix)]
|
||||
async fn unified_exec_zsh_fork_execpolicy_amendment_skips_later_subcommands() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let Some(runtime) = zsh_fork_runtime("unified exec zsh-fork execpolicy amendment test")? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let approval_policy = AskForApproval::UnlessTrusted;
|
||||
let sandbox_policy = SandboxPolicy::new_read_only_policy();
|
||||
let server = start_mock_server().await;
|
||||
let test = build_unified_exec_zsh_fork_test(
|
||||
&server,
|
||||
runtime,
|
||||
approval_policy,
|
||||
sandbox_policy.clone(),
|
||||
|_| {},
|
||||
)
|
||||
.await?;
|
||||
let allow_prefix_path = test.cwd.path().join("allow-prefix-zsh-fork.txt");
|
||||
let _ = fs::remove_file(&allow_prefix_path);
|
||||
|
||||
let call_id = "allow-prefix-zsh-fork";
|
||||
let command = "touch allow-prefix-zsh-fork.txt && touch allow-prefix-zsh-fork.txt";
|
||||
let event = exec_command_event(
|
||||
call_id,
|
||||
command,
|
||||
Some(1_000),
|
||||
SandboxPermissions::UseDefault,
|
||||
None,
|
||||
)?;
|
||||
let _ = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-zsh-fork-allow-prefix-1"),
|
||||
event,
|
||||
ev_completed("resp-zsh-fork-allow-prefix-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let results = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-zsh-fork-allow-prefix-1", "done"),
|
||||
ev_completed("resp-zsh-fork-allow-prefix-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
submit_turn(
|
||||
&test,
|
||||
"allow-prefix-zsh-fork",
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let expected_execpolicy_amendment = ExecPolicyAmendment::new(vec![
|
||||
"touch".to_string(),
|
||||
"allow-prefix-zsh-fork.txt".to_string(),
|
||||
]);
|
||||
let mut saw_parent_approval = false;
|
||||
let mut saw_subcommand_approval = false;
|
||||
loop {
|
||||
let event = wait_for_event(&test.codex, |event| {
|
||||
matches!(
|
||||
event,
|
||||
EventMsg::ExecApprovalRequest(_) | EventMsg::TurnComplete(_)
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
match event {
|
||||
EventMsg::TurnComplete(_) => break,
|
||||
EventMsg::ExecApprovalRequest(approval) => {
|
||||
let command_parts = approval.command.clone();
|
||||
let last_arg = command_parts.last().map(String::as_str).unwrap_or_default();
|
||||
if last_arg == command {
|
||||
assert!(
|
||||
!saw_parent_approval,
|
||||
"unexpected duplicate parent approval: {command_parts:?}"
|
||||
);
|
||||
saw_parent_approval = true;
|
||||
test.codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: approval.effective_approval_id(),
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::Approved,
|
||||
})
|
||||
.await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
let is_touch_subcommand = command_parts
|
||||
.iter()
|
||||
.any(|part| part == "allow-prefix-zsh-fork.txt")
|
||||
&& command_parts
|
||||
.first()
|
||||
.is_some_and(|part| part.ends_with("/touch") || part == "touch");
|
||||
if is_touch_subcommand {
|
||||
assert!(
|
||||
!saw_subcommand_approval,
|
||||
"execpolicy amendment should suppress later matching subcommand approvals: {command_parts:?}"
|
||||
);
|
||||
saw_subcommand_approval = true;
|
||||
assert_eq!(
|
||||
approval.proposed_execpolicy_amendment,
|
||||
Some(expected_execpolicy_amendment.clone())
|
||||
);
|
||||
test.codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: approval.effective_approval_id(),
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::ApprovedExecpolicyAmendment {
|
||||
proposed_execpolicy_amendment: expected_execpolicy_amendment
|
||||
.clone(),
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
test.codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: approval.effective_approval_id(),
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::Approved,
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
assert!(saw_parent_approval, "expected parent unified-exec approval");
|
||||
assert!(
|
||||
saw_subcommand_approval,
|
||||
"expected at least one intercepted touch approval"
|
||||
);
|
||||
|
||||
let result = parse_result(&results.single_request().function_call_output(call_id));
|
||||
assert_eq!(result.exit_code.unwrap_or(0), 0);
|
||||
assert!(
|
||||
allow_prefix_path.exists(),
|
||||
"expected touch command to complete after approving the first intercepted subcommand; output: {}",
|
||||
result.stdout
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[cfg(unix)]
|
||||
async fn matched_prefix_rule_runs_unsandboxed_under_zsh_fork() -> Result<()> {
|
||||
|
||||
@@ -18,6 +18,7 @@ use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_match;
|
||||
use core_test_support::zsh_fork::build_unified_exec_zsh_fork_test;
|
||||
use core_test_support::zsh_fork::build_zsh_fork_test;
|
||||
use core_test_support::zsh_fork::restrictive_workspace_write_policy;
|
||||
use core_test_support::zsh_fork::zsh_fork_runtime;
|
||||
@@ -48,6 +49,13 @@ fn shell_command_arguments(command: &str) -> Result<String> {
|
||||
}))?)
|
||||
}
|
||||
|
||||
fn exec_command_arguments(command: &str) -> Result<String> {
|
||||
Ok(serde_json::to_string(&json!({
|
||||
"cmd": command,
|
||||
"yield_time_ms": 500,
|
||||
}))?)
|
||||
}
|
||||
|
||||
async fn submit_turn_with_policies(
|
||||
test: &TestCodex,
|
||||
prompt: &str,
|
||||
@@ -87,6 +95,38 @@ echo 'zsh-fork-stderr' >&2
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn write_repo_skill_with_shell_script_contents(
|
||||
repo_root: &Path,
|
||||
name: &str,
|
||||
script_name: &str,
|
||||
script_contents: &str,
|
||||
) -> Result<PathBuf> {
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
let skill_dir = repo_root.join(".agents").join("skills").join(name);
|
||||
let scripts_dir = skill_dir.join("scripts");
|
||||
fs::create_dir_all(&scripts_dir)?;
|
||||
fs::write(repo_root.join(".git"), "gitdir: here")?;
|
||||
fs::write(
|
||||
skill_dir.join("SKILL.md"),
|
||||
format!(
|
||||
r#"---
|
||||
name: {name}
|
||||
description: {name} skill
|
||||
---
|
||||
"#
|
||||
),
|
||||
)?;
|
||||
|
||||
let script_path = scripts_dir.join(script_name);
|
||||
fs::write(&script_path, script_contents)?;
|
||||
let mut permissions = fs::metadata(&script_path)?.permissions();
|
||||
permissions.set_mode(0o755);
|
||||
fs::set_permissions(&script_path, permissions)?;
|
||||
Ok(script_path)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn write_skill_with_shell_script_contents(
|
||||
home: &Path,
|
||||
@@ -265,6 +305,168 @@ permissions:
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_zsh_fork_prompts_for_skill_script_execution() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let Some(runtime) = zsh_fork_runtime("unified exec zsh-fork skill prompt test")? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let tool_call_id = "uexec-zsh-fork-skill-call";
|
||||
let test = build_unified_exec_zsh_fork_test(
|
||||
&server,
|
||||
runtime,
|
||||
AskForApproval::OnRequest,
|
||||
SandboxPolicy::new_workspace_write_policy(),
|
||||
|home| {
|
||||
write_skill_with_shell_script(home, "mbolin-test-skill", "hello-mbolin.sh").unwrap();
|
||||
write_skill_metadata(
|
||||
home,
|
||||
"mbolin-test-skill",
|
||||
r#"
|
||||
permissions:
|
||||
file_system:
|
||||
read:
|
||||
- "./data"
|
||||
write:
|
||||
- "./output"
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (script_path_str, command) = skill_script_command(&test, "hello-mbolin.sh")?;
|
||||
let arguments = exec_command_arguments(&command)?;
|
||||
let mocks =
|
||||
mount_function_call_agent_response(&server, tool_call_id, &arguments, "exec_command").await;
|
||||
|
||||
submit_turn_with_policies(
|
||||
&test,
|
||||
"use $mbolin-test-skill",
|
||||
AskForApproval::OnRequest,
|
||||
SandboxPolicy::new_workspace_write_policy(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let approval = wait_for_exec_approval_request(&test)
|
||||
.await
|
||||
.expect("expected exec approval request before completion");
|
||||
assert_eq!(approval.call_id, tool_call_id);
|
||||
assert_eq!(approval.command, vec![script_path_str.clone()]);
|
||||
assert_eq!(
|
||||
approval.available_decisions,
|
||||
Some(vec![
|
||||
ReviewDecision::Approved,
|
||||
ReviewDecision::ApprovedForSession,
|
||||
ReviewDecision::Abort,
|
||||
])
|
||||
);
|
||||
assert_eq!(
|
||||
approval.additional_permissions,
|
||||
Some(PermissionProfile {
|
||||
file_system: Some(FileSystemPermissions {
|
||||
read: Some(vec![absolute_path(
|
||||
&test.codex_home_path().join("skills/mbolin-test-skill/data"),
|
||||
)]),
|
||||
write: Some(vec![absolute_path(
|
||||
&test
|
||||
.codex_home_path()
|
||||
.join("skills/mbolin-test-skill/output"),
|
||||
)]),
|
||||
}),
|
||||
..Default::default()
|
||||
})
|
||||
);
|
||||
|
||||
test.codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: approval.effective_approval_id(),
|
||||
turn_id: None,
|
||||
decision: ReviewDecision::Denied,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_turn_complete(&test).await;
|
||||
|
||||
let call_output = mocks
|
||||
.completion
|
||||
.single_request()
|
||||
.function_call_output(tool_call_id);
|
||||
let output = call_output["output"].as_str().unwrap_or_default();
|
||||
assert!(
|
||||
output.contains("Execution denied: User denied execution"),
|
||||
"expected rejection marker in function_call_output: {output:?}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_zsh_fork_keeps_skill_loading_pinned_to_turn_cwd() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let Some(runtime) = zsh_fork_runtime("unified exec zsh-fork turn cwd skill test")? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let tool_call_id = "uexec-zsh-fork-repo-skill-call";
|
||||
let test = build_unified_exec_zsh_fork_test(
|
||||
&server,
|
||||
runtime,
|
||||
AskForApproval::OnRequest,
|
||||
SandboxPolicy::new_workspace_write_policy(),
|
||||
|_| {},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let repo_root = test.cwd_path().join("repo");
|
||||
let script_path = write_repo_skill_with_shell_script_contents(
|
||||
&repo_root,
|
||||
"repo-skill",
|
||||
"repo-skill.sh",
|
||||
"#!/bin/sh\necho 'repo-skill-output'\n",
|
||||
)?;
|
||||
let script_path_quoted = shlex::try_join([script_path.to_string_lossy().as_ref()])?;
|
||||
let repo_root_quoted = shlex::try_join([repo_root.to_string_lossy().as_ref()])?;
|
||||
let command = format!("cd {repo_root_quoted} && {script_path_quoted}");
|
||||
let arguments = exec_command_arguments(&command)?;
|
||||
let mocks =
|
||||
mount_function_call_agent_response(&server, tool_call_id, &arguments, "exec_command").await;
|
||||
|
||||
submit_turn_with_policies(
|
||||
&test,
|
||||
"run the repo skill after changing directories",
|
||||
AskForApproval::OnRequest,
|
||||
SandboxPolicy::new_workspace_write_policy(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let approval = wait_for_exec_approval_request(&test).await;
|
||||
assert!(
|
||||
approval.is_none(),
|
||||
"changing directories inside unified exec should not load repo-local skills from the shell cwd",
|
||||
);
|
||||
|
||||
let call_output = mocks
|
||||
.completion
|
||||
.single_request()
|
||||
.function_call_output(tool_call_id);
|
||||
let output = call_output["output"].as_str().unwrap_or_default();
|
||||
assert!(
|
||||
output.contains("repo-skill-output"),
|
||||
"expected repo skill script to run without skill-specific approval when only the shell cwd changes: {output:?}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Look for `additional_permissions == None`, then verify that both the first
|
||||
/// run and the cached session-approval rerun stay inside the turn sandbox.
|
||||
#[cfg(unix)]
|
||||
|
||||
@@ -32,6 +32,10 @@ use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_match;
|
||||
use core_test_support::wait_for_event_with_timeout;
|
||||
#[cfg(unix)]
|
||||
use core_test_support::zsh_fork::build_unified_exec_zsh_fork_test;
|
||||
#[cfg(unix)]
|
||||
use core_test_support::zsh_fork::zsh_fork_runtime;
|
||||
use pretty_assertions::assert_eq;
|
||||
use regex_lite::Regex;
|
||||
use serde_json::Value;
|
||||
@@ -1323,7 +1327,6 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
|
||||
.into_iter()
|
||||
.map(|request| request.body_json())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
let metadata = outputs
|
||||
.get(call_id)
|
||||
@@ -1445,7 +1448,6 @@ async fn unified_exec_defaults_to_pipe() -> Result<()> {
|
||||
.into_iter()
|
||||
.map(|request| request.body_json())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
let output = outputs
|
||||
.get(call_id)
|
||||
@@ -1539,7 +1541,6 @@ async fn unified_exec_can_enable_tty() -> Result<()> {
|
||||
.into_iter()
|
||||
.map(|request| request.body_json())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
let output = outputs
|
||||
.get(call_id)
|
||||
@@ -1624,7 +1625,6 @@ async fn unified_exec_respects_early_exit_notifications() -> Result<()> {
|
||||
.into_iter()
|
||||
.map(|request| request.body_json())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
let output = outputs
|
||||
.get(call_id)
|
||||
@@ -1823,6 +1823,227 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[cfg(unix)]
|
||||
async fn unified_exec_zsh_fork_keeps_python_repl_attached_to_zsh_session() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let Some(runtime) = zsh_fork_runtime("unified exec zsh-fork tty session test")? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let python = match which("python3") {
|
||||
Ok(path) => path,
|
||||
Err(_) => {
|
||||
eprintln!("python3 not found in PATH, skipping zsh-fork python repl test.");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let test = build_unified_exec_zsh_fork_test(
|
||||
&server,
|
||||
runtime,
|
||||
AskForApproval::Never,
|
||||
SandboxPolicy::new_workspace_write_policy(),
|
||||
|_| {},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let start_call_id = "uexec-zsh-fork-python-start";
|
||||
let send_call_id = "uexec-zsh-fork-python-pid";
|
||||
let exit_call_id = "uexec-zsh-fork-python-exit";
|
||||
|
||||
let start_args = serde_json::json!({
|
||||
"cmd": python.display().to_string(),
|
||||
"yield_time_ms": 500,
|
||||
"tty": true,
|
||||
});
|
||||
let send_args = serde_json::json!({
|
||||
"chars": "import os; print(os.getpid())\r\n",
|
||||
"session_id": 1000,
|
||||
"yield_time_ms": 500,
|
||||
});
|
||||
let exit_args = serde_json::json!({
|
||||
"chars": "import sys; sys.exit(0)\r\n",
|
||||
"session_id": 1000,
|
||||
"yield_time_ms": 500,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(
|
||||
start_call_id,
|
||||
"exec_command",
|
||||
&serde_json::to_string(&start_args)?,
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_function_call(
|
||||
send_call_id,
|
||||
"write_stdin",
|
||||
&serde_json::to_string(&send_args)?,
|
||||
),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "python is running"),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-4"),
|
||||
ev_function_call(
|
||||
exit_call_id,
|
||||
"write_stdin",
|
||||
&serde_json::to_string(&exit_args)?,
|
||||
),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-2", "all done"),
|
||||
ev_completed("resp-5"),
|
||||
]),
|
||||
];
|
||||
let request_log = mount_sse_sequence(&server, responses).await;
|
||||
|
||||
test.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "test unified exec zsh-fork tty behavior".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: test.cwd_path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::new_workspace_write_policy(),
|
||||
model: test.session_configured.model.clone(),
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
let requests = request_log.requests();
|
||||
assert!(!requests.is_empty(), "expected at least one POST request");
|
||||
let bodies = requests
|
||||
.into_iter()
|
||||
.map(|request| request.body_json())
|
||||
.collect::<Vec<_>>();
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
|
||||
let start_output = outputs
|
||||
.get(start_call_id)
|
||||
.expect("missing start output for exec_command");
|
||||
let process_id = start_output
|
||||
.process_id
|
||||
.clone()
|
||||
.expect("expected process id from exec_command");
|
||||
assert!(
|
||||
start_output.exit_code.is_none(),
|
||||
"initial exec_command should leave the PTY session running"
|
||||
);
|
||||
|
||||
let send_output = outputs
|
||||
.get(send_call_id)
|
||||
.expect("missing write_stdin output");
|
||||
let normalized = send_output.output.replace("\r\n", "\n");
|
||||
let python_pid = Regex::new(r"(?m)^(\d+)$")
|
||||
.expect("valid python pid regex")
|
||||
.captures(&normalized)
|
||||
.and_then(|captures| captures.get(1))
|
||||
.map(|value| value.as_str().to_string())
|
||||
.with_context(|| format!("missing python pid in output {normalized:?}"))?;
|
||||
assert!(
|
||||
process_is_alive(&python_pid)?,
|
||||
"python process should still be alive after printing its pid, got output {normalized:?}"
|
||||
);
|
||||
assert_eq!(send_output.process_id.as_deref(), Some(process_id.as_str()));
|
||||
assert!(
|
||||
send_output.exit_code.is_none(),
|
||||
"write_stdin should not report an exit code while the process is still running"
|
||||
);
|
||||
|
||||
let zsh_pid = std::process::Command::new("ps")
|
||||
.args(["-o", "ppid=", "-p", &python_pid])
|
||||
.output()
|
||||
.context("failed to look up python parent pid")?;
|
||||
let zsh_pid = String::from_utf8(zsh_pid.stdout)
|
||||
.context("python parent pid output is not UTF-8")?
|
||||
.trim()
|
||||
.to_string();
|
||||
assert!(
|
||||
!zsh_pid.is_empty(),
|
||||
"expected python parent pid to identify the zsh session"
|
||||
);
|
||||
assert!(
|
||||
process_is_alive(&zsh_pid)?,
|
||||
"expected zsh parent process {zsh_pid} to still be alive"
|
||||
);
|
||||
|
||||
let zsh_command = std::process::Command::new("ps")
|
||||
.args(["-o", "command=", "-p", &zsh_pid])
|
||||
.output()
|
||||
.context("failed to look up zsh parent command")?;
|
||||
let zsh_command =
|
||||
String::from_utf8(zsh_command.stdout).context("zsh parent command output is not UTF-8")?;
|
||||
assert!(
|
||||
zsh_command.contains("zsh"),
|
||||
"expected python parent command to be zsh, got {zsh_command:?}"
|
||||
);
|
||||
|
||||
test.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "shut down the python repl".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: test.cwd_path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::new_workspace_write_policy(),
|
||||
model: test.session_configured.model.clone(),
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
let requests = request_log.requests();
|
||||
assert!(!requests.is_empty(), "expected at least one POST request");
|
||||
let bodies = requests
|
||||
.into_iter()
|
||||
.map(|request| request.body_json())
|
||||
.collect::<Vec<_>>();
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
let exit_output = outputs
|
||||
.get(exit_call_id)
|
||||
.expect("missing exit output after requesting python shutdown");
|
||||
assert!(
|
||||
exit_output.exit_code.is_none() || exit_output.exit_code == Some(0),
|
||||
"exit request should either leave cleanup to the background watcher or report success directly, got {exit_output:?}"
|
||||
);
|
||||
wait_for_process_exit(&python_pid).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::io;
|
||||
use std::os::fd::AsFd;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::FromRawFd as _;
|
||||
use std::os::fd::OwnedFd;
|
||||
|
||||
use anyhow::Context as _;
|
||||
@@ -28,6 +28,12 @@ fn get_escalate_client() -> anyhow::Result<AsyncDatagramSocket> {
|
||||
Ok(unsafe { AsyncDatagramSocket::from_raw_fd(client_fd) }?)
|
||||
}
|
||||
|
||||
fn duplicate_fd_for_transfer(fd: impl AsFd, name: &str) -> anyhow::Result<OwnedFd> {
|
||||
fd.as_fd()
|
||||
.try_clone_to_owned()
|
||||
.with_context(|| format!("failed to duplicate {name} for escalation transfer"))
|
||||
}
|
||||
|
||||
pub async fn run_shell_escalation_execve_wrapper(
|
||||
file: String,
|
||||
argv: Vec<String>,
|
||||
@@ -62,11 +68,18 @@ pub async fn run_shell_escalation_execve_wrapper(
|
||||
.context("failed to receive EscalateResponse")?;
|
||||
match message.action {
|
||||
EscalateAction::Escalate => {
|
||||
// TODO: maybe we should send ALL open FDs (except the escalate client)?
|
||||
// Duplicate stdio before transferring ownership to the server. The
|
||||
// wrapper must keep using its own stdin/stdout/stderr until the
|
||||
// escalated child takes over.
|
||||
let destination_fds = [
|
||||
io::stdin().as_raw_fd(),
|
||||
io::stdout().as_raw_fd(),
|
||||
io::stderr().as_raw_fd(),
|
||||
];
|
||||
let fds_to_send = [
|
||||
unsafe { OwnedFd::from_raw_fd(io::stdin().as_raw_fd()) },
|
||||
unsafe { OwnedFd::from_raw_fd(io::stdout().as_raw_fd()) },
|
||||
unsafe { OwnedFd::from_raw_fd(io::stderr().as_raw_fd()) },
|
||||
duplicate_fd_for_transfer(io::stdin(), "stdin")?,
|
||||
duplicate_fd_for_transfer(io::stdout(), "stdout")?,
|
||||
duplicate_fd_for_transfer(io::stderr(), "stderr")?,
|
||||
];
|
||||
|
||||
// TODO: also forward signals over the super-exec socket
|
||||
@@ -74,7 +87,7 @@ pub async fn run_shell_escalation_execve_wrapper(
|
||||
client
|
||||
.send_with_fds(
|
||||
SuperExecMessage {
|
||||
fds: fds_to_send.iter().map(AsRawFd::as_raw_fd).collect(),
|
||||
fds: destination_fds.into_iter().collect(),
|
||||
},
|
||||
&fds_to_send,
|
||||
)
|
||||
@@ -115,3 +128,23 @@ pub async fn run_shell_escalation_execve_wrapper(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::unix::net::UnixStream;
|
||||
|
||||
#[test]
|
||||
fn duplicate_fd_for_transfer_does_not_close_original() {
|
||||
let (left, _right) = UnixStream::pair().expect("socket pair");
|
||||
let original_fd = left.as_raw_fd();
|
||||
|
||||
let duplicate = duplicate_fd_for_transfer(&left, "test fd").expect("duplicate fd");
|
||||
assert_ne!(duplicate.as_raw_fd(), original_fd);
|
||||
|
||||
drop(duplicate);
|
||||
|
||||
assert_ne!(unsafe { libc::fcntl(original_fd, libc::F_GETFD) }, -1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use core::fmt;
|
||||
use std::any::Any;
|
||||
use std::io;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
|
||||
use portable_pty::MasterPty;
|
||||
use portable_pty::SlavePty;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
@@ -17,8 +16,8 @@ pub(crate) trait ChildTerminator: Send + Sync {
|
||||
}
|
||||
|
||||
pub struct PtyHandles {
|
||||
pub _slave: Option<Box<dyn SlavePty + Send>>,
|
||||
pub _master: Box<dyn MasterPty + Send>,
|
||||
pub _slave: Option<Box<dyn Any + Send>>,
|
||||
pub _master: Box<dyn Any + Send>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for PtyHandles {
|
||||
|
||||
@@ -1,6 +1,18 @@
|
||||
use std::collections::HashMap;
|
||||
#[cfg(unix)]
|
||||
use std::fs::File;
|
||||
use std::io::ErrorKind;
|
||||
#[cfg(unix)]
|
||||
use std::os::fd::FromRawFd;
|
||||
#[cfg(unix)]
|
||||
use std::os::fd::RawFd;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::process::CommandExt;
|
||||
use std::path::Path;
|
||||
#[cfg(unix)]
|
||||
use std::process::Command as StdCommand;
|
||||
#[cfg(unix)]
|
||||
use std::process::Stdio;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
@@ -60,6 +72,18 @@ impl ChildTerminator for PtyChildTerminator {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
struct RawPidTerminator {
|
||||
process_group_id: u32,
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl ChildTerminator for RawPidTerminator {
|
||||
fn kill(&mut self) -> std::io::Result<()> {
|
||||
crate::process_group::kill_process_group(self.process_group_id)
|
||||
}
|
||||
}
|
||||
|
||||
fn platform_native_pty_system() -> Box<dyn portable_pty::PtySystem + Send> {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
@@ -79,11 +103,42 @@ pub async fn spawn_process(
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
) -> Result<SpawnedProcess> {
|
||||
spawn_process_with_inherited_fds(program, args, cwd, env, arg0, &[]).await
|
||||
}
|
||||
|
||||
/// Spawn a process attached to a PTY, preserving any inherited file
|
||||
/// descriptors listed in `inherited_fds` across exec on Unix.
|
||||
pub async fn spawn_process_with_inherited_fds(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
inherited_fds: &[i32],
|
||||
) -> Result<SpawnedProcess> {
|
||||
if program.is_empty() {
|
||||
anyhow::bail!("missing program for PTY spawn");
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
let _ = inherited_fds;
|
||||
|
||||
#[cfg(unix)]
|
||||
if !inherited_fds.is_empty() {
|
||||
return spawn_process_preserving_fds(program, args, cwd, env, arg0, inherited_fds).await;
|
||||
}
|
||||
|
||||
spawn_process_portable(program, args, cwd, env, arg0).await
|
||||
}
|
||||
|
||||
async fn spawn_process_portable(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
) -> Result<SpawnedProcess> {
|
||||
let pty_system = platform_native_pty_system();
|
||||
let pair = pty_system.openpty(PtySize {
|
||||
rows: 24,
|
||||
@@ -167,11 +222,11 @@ pub async fn spawn_process(
|
||||
|
||||
let handles = PtyHandles {
|
||||
_slave: if cfg!(windows) {
|
||||
Some(pair.slave)
|
||||
Some(Box::new(pair.slave))
|
||||
} else {
|
||||
None
|
||||
},
|
||||
_master: pair.master,
|
||||
_master: Box::new(pair.master),
|
||||
};
|
||||
|
||||
let (handle, output_rx) = ProcessHandle::new(
|
||||
@@ -198,3 +253,221 @@ pub async fn spawn_process(
|
||||
exit_rx,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn spawn_process_preserving_fds(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
cwd: &Path,
|
||||
env: &HashMap<String, String>,
|
||||
arg0: &Option<String>,
|
||||
inherited_fds: &[RawFd],
|
||||
) -> Result<SpawnedProcess> {
|
||||
let (master, slave) = open_unix_pty()?;
|
||||
let mut command = StdCommand::new(arg0.as_ref().unwrap_or(&program.to_string()));
|
||||
command.current_dir(cwd);
|
||||
command.env_clear();
|
||||
for arg in args {
|
||||
command.arg(arg);
|
||||
}
|
||||
for (key, value) in env {
|
||||
command.env(key, value);
|
||||
}
|
||||
|
||||
let stdin = slave.try_clone()?;
|
||||
let stdout = slave.try_clone()?;
|
||||
let stderr = slave.try_clone()?;
|
||||
let inherited_fds = inherited_fds.to_vec();
|
||||
|
||||
unsafe {
|
||||
command
|
||||
.stdin(Stdio::from(stdin))
|
||||
.stdout(Stdio::from(stdout))
|
||||
.stderr(Stdio::from(stderr))
|
||||
.pre_exec(move || {
|
||||
for signo in &[
|
||||
libc::SIGCHLD,
|
||||
libc::SIGHUP,
|
||||
libc::SIGINT,
|
||||
libc::SIGQUIT,
|
||||
libc::SIGTERM,
|
||||
libc::SIGALRM,
|
||||
] {
|
||||
libc::signal(*signo, libc::SIG_DFL);
|
||||
}
|
||||
|
||||
let empty_set: libc::sigset_t = std::mem::zeroed();
|
||||
libc::sigprocmask(libc::SIG_SETMASK, &empty_set, std::ptr::null_mut());
|
||||
|
||||
if libc::setsid() == -1 {
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
|
||||
#[allow(clippy::cast_lossless)]
|
||||
if libc::ioctl(0, libc::TIOCSCTTY as _, 0) == -1 {
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
|
||||
close_random_fds_except(&inherited_fds);
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
let mut child = command.spawn()?;
|
||||
drop(slave);
|
||||
let process_group_id = child.id();
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
|
||||
let initial_output_rx = output_tx.subscribe();
|
||||
|
||||
let mut reader = master.try_clone()?;
|
||||
let output_tx_clone = output_tx.clone();
|
||||
let reader_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
|
||||
let mut buf = [0u8; 8_192];
|
||||
loop {
|
||||
match std::io::Read::read(&mut reader, &mut buf) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let _ = output_tx_clone.send(buf[..n].to_vec());
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
|
||||
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
|
||||
std::thread::sleep(Duration::from_millis(5));
|
||||
continue;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let writer = Arc::new(tokio::sync::Mutex::new(master.try_clone()?));
|
||||
let writer_handle: JoinHandle<()> = tokio::spawn({
|
||||
let writer = Arc::clone(&writer);
|
||||
async move {
|
||||
while let Some(bytes) = writer_rx.recv().await {
|
||||
let mut guard = writer.lock().await;
|
||||
use std::io::Write;
|
||||
let _ = guard.write_all(&bytes);
|
||||
let _ = guard.flush();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
|
||||
let exit_status = Arc::new(AtomicBool::new(false));
|
||||
let wait_exit_status = Arc::clone(&exit_status);
|
||||
let exit_code = Arc::new(StdMutex::new(None));
|
||||
let wait_exit_code = Arc::clone(&exit_code);
|
||||
let wait_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
|
||||
let code = match child.wait() {
|
||||
Ok(status) => status.code().unwrap_or(-1),
|
||||
Err(_) => -1,
|
||||
};
|
||||
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||
if let Ok(mut guard) = wait_exit_code.lock() {
|
||||
*guard = Some(code);
|
||||
}
|
||||
let _ = exit_tx.send(code);
|
||||
});
|
||||
|
||||
let handles = PtyHandles {
|
||||
_slave: None,
|
||||
_master: Box::new(master),
|
||||
};
|
||||
|
||||
let (handle, output_rx) = ProcessHandle::new(
|
||||
writer_tx,
|
||||
output_tx,
|
||||
initial_output_rx,
|
||||
Box::new(RawPidTerminator { process_group_id }),
|
||||
reader_handle,
|
||||
Vec::new(),
|
||||
writer_handle,
|
||||
wait_handle,
|
||||
exit_status,
|
||||
exit_code,
|
||||
Some(handles),
|
||||
);
|
||||
|
||||
Ok(SpawnedProcess {
|
||||
session: handle,
|
||||
output_rx,
|
||||
exit_rx,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn open_unix_pty() -> Result<(File, File)> {
|
||||
let mut master: RawFd = -1;
|
||||
let mut slave: RawFd = -1;
|
||||
let mut size = libc::winsize {
|
||||
ws_row: 24,
|
||||
ws_col: 80,
|
||||
ws_xpixel: 0,
|
||||
ws_ypixel: 0,
|
||||
};
|
||||
let winp = std::ptr::addr_of_mut!(size);
|
||||
|
||||
let result = unsafe {
|
||||
libc::openpty(
|
||||
&mut master,
|
||||
&mut slave,
|
||||
std::ptr::null_mut(),
|
||||
std::ptr::null_mut(),
|
||||
winp,
|
||||
)
|
||||
};
|
||||
if result != 0 {
|
||||
anyhow::bail!("failed to openpty: {:?}", std::io::Error::last_os_error());
|
||||
}
|
||||
|
||||
set_cloexec(master)?;
|
||||
set_cloexec(slave)?;
|
||||
|
||||
Ok(unsafe { (File::from_raw_fd(master), File::from_raw_fd(slave)) })
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn set_cloexec(fd: RawFd) -> std::io::Result<()> {
|
||||
let flags = unsafe { libc::fcntl(fd, libc::F_GETFD) };
|
||||
if flags == -1 {
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
let result = unsafe { libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
|
||||
if result == -1 {
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn close_random_fds_except(preserved_fds: &[RawFd]) {
|
||||
if let Ok(dir) = std::fs::read_dir("/dev/fd") {
|
||||
let mut fds = Vec::new();
|
||||
for entry in dir {
|
||||
let num = entry
|
||||
.ok()
|
||||
.map(|entry| entry.file_name())
|
||||
.and_then(|name| name.into_string().ok())
|
||||
.and_then(|name| name.parse::<RawFd>().ok());
|
||||
if let Some(num) = num {
|
||||
if num <= 2 || preserved_fds.contains(&num) {
|
||||
continue;
|
||||
}
|
||||
// Keep CLOEXEC descriptors open so std::process can still use
|
||||
// its internal exec-error pipe to report spawn failures.
|
||||
let flags = unsafe { libc::fcntl(num, libc::F_GETFD) };
|
||||
if flags == -1 || flags & libc::FD_CLOEXEC != 0 {
|
||||
continue;
|
||||
}
|
||||
fds.push(num);
|
||||
}
|
||||
}
|
||||
for fd in fds {
|
||||
unsafe {
|
||||
libc::close(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ use std::path::Path;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[cfg(unix)]
|
||||
use crate::pty::spawn_process_with_inherited_fds;
|
||||
use crate::spawn_pipe_process;
|
||||
use crate::spawn_pty_process;
|
||||
|
||||
@@ -183,16 +185,26 @@ async fn wait_for_marker_pid(
|
||||
collected.extend_from_slice(&chunk);
|
||||
|
||||
let text = String::from_utf8_lossy(&collected);
|
||||
if let Some(marker_idx) = text.find(marker) {
|
||||
let suffix = &text[marker_idx + marker.len()..];
|
||||
let digits: String = suffix
|
||||
let mut offset = 0;
|
||||
while let Some(pos) = text[offset..].find(marker) {
|
||||
let marker_start = offset + pos;
|
||||
let suffix = &text[marker_start + marker.len()..];
|
||||
let digits_len = suffix
|
||||
.chars()
|
||||
.skip_while(|ch| !ch.is_ascii_digit())
|
||||
.take_while(char::is_ascii_digit)
|
||||
.collect();
|
||||
if !digits.is_empty() {
|
||||
return Ok(digits.parse()?);
|
||||
.map(char::len_utf8)
|
||||
.sum::<usize>();
|
||||
if digits_len == 0 {
|
||||
offset = marker_start + marker.len();
|
||||
continue;
|
||||
}
|
||||
|
||||
let pid_str = &suffix[..digits_len];
|
||||
let trailing = &suffix[digits_len..];
|
||||
if trailing.is_empty() {
|
||||
break;
|
||||
}
|
||||
return Ok(pid_str.parse()?);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -444,3 +456,166 @@ async fn pty_terminate_kills_background_children_in_same_process_group() -> anyh
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pty_spawn_can_preserve_inherited_fds() -> anyhow::Result<()> {
|
||||
use std::io::Read;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::FromRawFd;
|
||||
|
||||
let mut fds = [0; 2];
|
||||
let result = unsafe { libc::pipe(fds.as_mut_ptr()) };
|
||||
if result != 0 {
|
||||
return Err(std::io::Error::last_os_error().into());
|
||||
}
|
||||
|
||||
let mut read_end = unsafe { std::fs::File::from_raw_fd(fds[0]) };
|
||||
let write_end = unsafe { std::fs::File::from_raw_fd(fds[1]) };
|
||||
|
||||
let mut env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
env_map.insert(
|
||||
"PRESERVED_FD".to_string(),
|
||||
write_end.as_raw_fd().to_string(),
|
||||
);
|
||||
|
||||
let script = "printf __preserved__ >\"/dev/fd/$PRESERVED_FD\"";
|
||||
let spawned = spawn_process_with_inherited_fds(
|
||||
"/bin/sh",
|
||||
&["-c".to_string(), script.to_string()],
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
&[write_end.as_raw_fd()],
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(write_end);
|
||||
|
||||
let (_, code) = collect_output_until_exit(spawned.output_rx, spawned.exit_rx, 2_000).await;
|
||||
assert_eq!(code, 0, "expected preserved-fd PTY child to exit cleanly");
|
||||
|
||||
let mut pipe_output = String::new();
|
||||
read_end.read_to_string(&mut pipe_output)?;
|
||||
assert_eq!(pipe_output, "__preserved__");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pty_preserving_inherited_fds_keeps_python_repl_running() -> anyhow::Result<()> {
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::FromRawFd;
|
||||
|
||||
let Some(python) = find_python() else {
|
||||
eprintln!(
|
||||
"python not found; skipping pty_preserving_inherited_fds_keeps_python_repl_running"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let mut fds = [0; 2];
|
||||
let result = unsafe { libc::pipe(fds.as_mut_ptr()) };
|
||||
if result != 0 {
|
||||
return Err(std::io::Error::last_os_error().into());
|
||||
}
|
||||
|
||||
let read_end = unsafe { std::fs::File::from_raw_fd(fds[0]) };
|
||||
let preserved_fd = unsafe { std::fs::File::from_raw_fd(fds[1]) };
|
||||
|
||||
let mut env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
env_map.insert(
|
||||
"PRESERVED_FD".to_string(),
|
||||
preserved_fd.as_raw_fd().to_string(),
|
||||
);
|
||||
|
||||
let spawned = spawn_process_with_inherited_fds(
|
||||
&python,
|
||||
&[],
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
&[preserved_fd.as_raw_fd()],
|
||||
)
|
||||
.await?;
|
||||
drop(read_end);
|
||||
drop(preserved_fd);
|
||||
|
||||
let writer = spawned.session.writer_sender();
|
||||
let mut output_rx = spawned.output_rx;
|
||||
let newline = "\n";
|
||||
let mut output = wait_for_python_repl_ready(&writer, &mut output_rx, 5_000, newline).await?;
|
||||
let marker = "__codex_preserved_py_pid:";
|
||||
writer
|
||||
.send(format!("import os; print('{marker}' + str(os.getpid())){newline}").into_bytes())
|
||||
.await?;
|
||||
|
||||
let python_pid = match wait_for_marker_pid(&mut output_rx, marker, 2_000).await {
|
||||
Ok(pid) => pid,
|
||||
Err(err) => {
|
||||
spawned.session.terminate();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
assert!(
|
||||
process_exists(python_pid)?,
|
||||
"expected python pid {python_pid} to stay alive after prompt output"
|
||||
);
|
||||
|
||||
writer.send(format!("exit(){newline}").into_bytes()).await?;
|
||||
let (remaining_output, code) =
|
||||
collect_output_until_exit(output_rx, spawned.exit_rx, 5_000).await;
|
||||
output.extend_from_slice(&remaining_output);
|
||||
|
||||
assert_eq!(code, 0, "expected python to exit cleanly");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pty_spawn_with_inherited_fds_reports_exec_failures() -> anyhow::Result<()> {
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::FromRawFd;
|
||||
|
||||
let mut fds = [0; 2];
|
||||
let result = unsafe { libc::pipe(fds.as_mut_ptr()) };
|
||||
if result != 0 {
|
||||
return Err(std::io::Error::last_os_error().into());
|
||||
}
|
||||
|
||||
let read_end = unsafe { std::fs::File::from_raw_fd(fds[0]) };
|
||||
let write_end = unsafe { std::fs::File::from_raw_fd(fds[1]) };
|
||||
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let spawn_result = spawn_process_with_inherited_fds(
|
||||
"/definitely/missing/command",
|
||||
&[],
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
&[write_end.as_raw_fd()],
|
||||
)
|
||||
.await;
|
||||
|
||||
drop(read_end);
|
||||
drop(write_end);
|
||||
|
||||
let err = match spawn_result {
|
||||
Ok(spawned) => {
|
||||
spawned.session.terminate();
|
||||
anyhow::bail!("missing executable unexpectedly spawned");
|
||||
}
|
||||
Err(err) => err,
|
||||
};
|
||||
let err_text = err.to_string();
|
||||
assert!(
|
||||
err_text.contains("No such file")
|
||||
|| err_text.contains("not found")
|
||||
|| err_text.contains("os error 2"),
|
||||
"expected spawn error for missing executable, got: {err_text}",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user