mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
refactor: prepare unified exec for zsh-fork backend
This commit is contained in:
@@ -5,7 +5,8 @@ Executes shell requests under the orchestrator: asks for approval when needed,
|
||||
builds a CommandSpec, and runs it under the current SandboxAttempt.
|
||||
*/
|
||||
#[cfg(unix)]
|
||||
mod unix_escalation;
|
||||
pub(crate) mod unix_escalation;
|
||||
pub(crate) mod zsh_fork_backend;
|
||||
|
||||
use crate::command_canonicalization::canonicalize_command_for_approval;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
@@ -80,7 +81,6 @@ pub(crate) enum ShellRuntimeBackend {
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ShellRuntime {
|
||||
#[cfg_attr(not(unix), allow(dead_code))]
|
||||
backend: ShellRuntimeBackend,
|
||||
}
|
||||
|
||||
@@ -215,9 +215,8 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
|
||||
command
|
||||
};
|
||||
|
||||
#[cfg(unix)]
|
||||
if self.backend == ShellRuntimeBackend::ShellCommandZshFork {
|
||||
match unix_escalation::try_run_zsh_fork(req, attempt, ctx, &command).await? {
|
||||
match zsh_fork_backend::maybe_run_shell_command(req, attempt, ctx, &command).await? {
|
||||
Some(out) => return Ok(out),
|
||||
None => {
|
||||
tracing::warn!(
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::SandboxType;
|
||||
use crate::exec::is_likely_sandbox_denied;
|
||||
use crate::features::Feature;
|
||||
use crate::sandboxing::ExecRequest;
|
||||
use crate::sandboxing::SandboxPermissions;
|
||||
use crate::shell::ShellType;
|
||||
use crate::skills::SkillMetadata;
|
||||
@@ -36,6 +37,7 @@ use codex_shell_escalation::EscalationDecision;
|
||||
use codex_shell_escalation::EscalationExecution;
|
||||
use codex_shell_escalation::EscalationPermissions;
|
||||
use codex_shell_escalation::EscalationPolicy;
|
||||
use codex_shell_escalation::EscalationSession;
|
||||
use codex_shell_escalation::ExecParams;
|
||||
use codex_shell_escalation::ExecResult;
|
||||
use codex_shell_escalation::Permissions as EscalatedPermissions;
|
||||
@@ -51,6 +53,11 @@ use tokio::sync::RwLock;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub(crate) struct PreparedUnifiedExecZshFork {
|
||||
pub(crate) exec_request: ExecRequest,
|
||||
pub(crate) escalation_session: EscalationSession,
|
||||
}
|
||||
|
||||
pub(super) async fn try_run_zsh_fork(
|
||||
req: &ShellRequest,
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
@@ -95,7 +102,7 @@ pub(super) async fn try_run_zsh_fork(
|
||||
justification,
|
||||
arg0,
|
||||
} = sandbox_exec_request;
|
||||
let ParsedShellCommand { script, login } = extract_shell_script(&command)?;
|
||||
let ParsedShellCommand { script, login, .. } = extract_shell_script(&command)?;
|
||||
let effective_timeout = Duration::from_millis(
|
||||
req.timeout_ms
|
||||
.unwrap_or(crate::exec::DEFAULT_EXEC_COMMAND_TIMEOUT_MS),
|
||||
@@ -172,6 +179,103 @@ pub(super) async fn try_run_zsh_fork(
|
||||
map_exec_result(attempt.sandbox, exec_result).map(Some)
|
||||
}
|
||||
|
||||
pub(crate) async fn prepare_unified_exec_zsh_fork(
|
||||
req: &crate::tools::runtimes::unified_exec::UnifiedExecRequest,
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
ctx: &ToolCtx,
|
||||
exec_request: ExecRequest,
|
||||
) -> Result<Option<PreparedUnifiedExecZshFork>, ToolError> {
|
||||
let Some(shell_zsh_path) = ctx.session.services.shell_zsh_path.as_ref() else {
|
||||
tracing::warn!("ZshFork backend specified, but shell_zsh_path is not configured.");
|
||||
return Ok(None);
|
||||
};
|
||||
if !ctx.session.features().enabled(Feature::ShellZshFork) {
|
||||
tracing::warn!("ZshFork backend specified, but ShellZshFork feature is not enabled.");
|
||||
return Ok(None);
|
||||
}
|
||||
if !matches!(ctx.session.user_shell().shell_type, ShellType::Zsh) {
|
||||
tracing::warn!("ZshFork backend specified, but user shell is not Zsh.");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let parsed = match extract_shell_script(&exec_request.command) {
|
||||
Ok(parsed) => parsed,
|
||||
Err(err) => {
|
||||
tracing::warn!("ZshFork unified exec fallback: {err:?}");
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
if parsed.program != shell_zsh_path.to_string_lossy() {
|
||||
tracing::warn!(
|
||||
"ZshFork backend specified, but unified exec command targets `{}` instead of `{}`.",
|
||||
parsed.program,
|
||||
shell_zsh_path.display(),
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let exec_policy = Arc::new(RwLock::new(
|
||||
ctx.session.services.exec_policy.current().as_ref().clone(),
|
||||
));
|
||||
let command_executor = CoreShellCommandExecutor {
|
||||
command: exec_request.command.clone(),
|
||||
cwd: exec_request.cwd.clone(),
|
||||
sandbox_policy: exec_request.sandbox_policy.clone(),
|
||||
sandbox: exec_request.sandbox,
|
||||
env: exec_request.env.clone(),
|
||||
network: exec_request.network.clone(),
|
||||
windows_sandbox_level: exec_request.windows_sandbox_level,
|
||||
sandbox_permissions: exec_request.sandbox_permissions,
|
||||
justification: exec_request.justification.clone(),
|
||||
arg0: exec_request.arg0.clone(),
|
||||
sandbox_policy_cwd: ctx.turn.cwd.clone(),
|
||||
macos_seatbelt_profile_extensions: ctx
|
||||
.turn
|
||||
.config
|
||||
.permissions
|
||||
.macos_seatbelt_profile_extensions
|
||||
.clone(),
|
||||
codex_linux_sandbox_exe: ctx.turn.codex_linux_sandbox_exe.clone(),
|
||||
use_linux_sandbox_bwrap: ctx.turn.features.enabled(Feature::UseLinuxSandboxBwrap),
|
||||
};
|
||||
let main_execve_wrapper_exe = ctx
|
||||
.session
|
||||
.services
|
||||
.main_execve_wrapper_exe
|
||||
.clone()
|
||||
.ok_or_else(|| {
|
||||
ToolError::Rejected(
|
||||
"zsh fork feature enabled, but execve wrapper is not configured".to_string(),
|
||||
)
|
||||
})?;
|
||||
let escalation_policy = CoreShellActionProvider {
|
||||
policy: Arc::clone(&exec_policy),
|
||||
session: Arc::clone(&ctx.session),
|
||||
turn: Arc::clone(&ctx.turn),
|
||||
call_id: ctx.call_id.clone(),
|
||||
approval_policy: ctx.turn.approval_policy.value(),
|
||||
sandbox_policy: attempt.policy.clone(),
|
||||
sandbox_permissions: req.sandbox_permissions,
|
||||
prompt_permissions: req.additional_permissions.clone(),
|
||||
stopwatch: Stopwatch::unlimited(),
|
||||
};
|
||||
|
||||
let escalate_server = EscalateServer::new(
|
||||
shell_zsh_path.clone(),
|
||||
main_execve_wrapper_exe,
|
||||
escalation_policy,
|
||||
);
|
||||
let escalation_session = escalate_server
|
||||
.start_session(CancellationToken::new(), Arc::new(command_executor))
|
||||
.map_err(|err| ToolError::Rejected(err.to_string()))?;
|
||||
let mut exec_request = exec_request;
|
||||
exec_request.env.extend(escalation_session.env().clone());
|
||||
Ok(Some(PreparedUnifiedExecZshFork {
|
||||
exec_request,
|
||||
escalation_session,
|
||||
}))
|
||||
}
|
||||
|
||||
struct CoreShellActionProvider {
|
||||
policy: Arc<RwLock<Policy>>,
|
||||
session: Arc<crate::codex::Session>,
|
||||
@@ -809,6 +913,7 @@ impl CoreShellCommandExecutor {
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
struct ParsedShellCommand {
|
||||
program: String,
|
||||
script: String,
|
||||
login: bool,
|
||||
}
|
||||
@@ -817,12 +922,20 @@ fn extract_shell_script(command: &[String]) -> Result<ParsedShellCommand, ToolEr
|
||||
// Commands reaching zsh-fork can be wrapped by environment/sandbox helpers, so
|
||||
// we search for the first `-c`/`-lc` triple anywhere in the argv rather
|
||||
// than assuming it is the first positional form.
|
||||
if let Some((script, login)) = command.windows(3).find_map(|parts| match parts {
|
||||
[_, flag, script] if flag == "-c" => Some((script.to_owned(), false)),
|
||||
[_, flag, script] if flag == "-lc" => Some((script.to_owned(), true)),
|
||||
if let Some((program, script, login)) = command.windows(3).find_map(|parts| match parts {
|
||||
[program, flag, script] if flag == "-c" => {
|
||||
Some((program.to_owned(), script.to_owned(), false))
|
||||
}
|
||||
[program, flag, script] if flag == "-lc" => {
|
||||
Some((program.to_owned(), script.to_owned(), true))
|
||||
}
|
||||
_ => None,
|
||||
}) {
|
||||
return Ok(ParsedShellCommand { script, login });
|
||||
return Ok(ParsedShellCommand {
|
||||
program,
|
||||
script,
|
||||
login,
|
||||
});
|
||||
}
|
||||
|
||||
Err(ToolError::Rejected(
|
||||
|
||||
@@ -64,6 +64,7 @@ fn extract_shell_script_preserves_login_flag() {
|
||||
assert_eq!(
|
||||
extract_shell_script(&["/bin/zsh".into(), "-lc".into(), "echo hi".into()]).unwrap(),
|
||||
ParsedShellCommand {
|
||||
program: "/bin/zsh".to_string(),
|
||||
script: "echo hi".to_string(),
|
||||
login: true,
|
||||
}
|
||||
@@ -71,6 +72,7 @@ fn extract_shell_script_preserves_login_flag() {
|
||||
assert_eq!(
|
||||
extract_shell_script(&["/bin/zsh".into(), "-c".into(), "echo hi".into()]).unwrap(),
|
||||
ParsedShellCommand {
|
||||
program: "/bin/zsh".to_string(),
|
||||
script: "echo hi".to_string(),
|
||||
login: false,
|
||||
}
|
||||
@@ -89,6 +91,7 @@ fn extract_shell_script_supports_wrapped_command_prefixes() {
|
||||
])
|
||||
.unwrap(),
|
||||
ParsedShellCommand {
|
||||
program: "/bin/zsh".to_string(),
|
||||
script: "echo hello".to_string(),
|
||||
login: true,
|
||||
}
|
||||
@@ -105,6 +108,7 @@ fn extract_shell_script_supports_wrapped_command_prefixes() {
|
||||
])
|
||||
.unwrap(),
|
||||
ParsedShellCommand {
|
||||
program: "/bin/zsh".to_string(),
|
||||
script: "pwd".to_string(),
|
||||
login: false,
|
||||
}
|
||||
|
||||
115
codex-rs/core/src/tools/runtimes/shell/zsh_fork_backend.rs
Normal file
115
codex-rs/core/src/tools/runtimes/shell/zsh_fork_backend.rs
Normal file
@@ -0,0 +1,115 @@
|
||||
use super::ShellRequest;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::sandboxing::ExecRequest;
|
||||
use crate::tools::runtimes::unified_exec::UnifiedExecRequest;
|
||||
use crate::tools::sandboxing::SandboxAttempt;
|
||||
use crate::tools::sandboxing::ToolCtx;
|
||||
use crate::tools::sandboxing::ToolError;
|
||||
use crate::unified_exec::SpawnLifecycleHandle;
|
||||
|
||||
pub(crate) struct PreparedUnifiedExecSpawn {
|
||||
pub(crate) exec_request: ExecRequest,
|
||||
pub(crate) spawn_lifecycle: SpawnLifecycleHandle,
|
||||
}
|
||||
|
||||
/// Runs the zsh-fork shell-command backend when this request should be handled
|
||||
/// by executable-level escalation instead of the default shell runtime.
|
||||
///
|
||||
/// Returns `Ok(None)` when the current platform or request shape should fall
|
||||
/// back to the normal shell-command path.
|
||||
pub(crate) async fn maybe_run_shell_command(
|
||||
req: &ShellRequest,
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
ctx: &ToolCtx,
|
||||
command: &[String],
|
||||
) -> Result<Option<ExecToolCallOutput>, ToolError> {
|
||||
imp::maybe_run_shell_command(req, attempt, ctx, command).await
|
||||
}
|
||||
|
||||
/// Prepares unified exec to launch through the zsh-fork backend when the
|
||||
/// request matches a wrapped `zsh -c/-lc` command on a supported platform.
|
||||
///
|
||||
/// Returns the transformed `ExecRequest` plus a spawn lifecycle that keeps the
|
||||
/// escalation server alive for the session and performs post-spawn cleanup.
|
||||
/// Returns `Ok(None)` when unified exec should use its normal spawn path.
|
||||
pub(crate) async fn maybe_prepare_unified_exec(
|
||||
req: &UnifiedExecRequest,
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
ctx: &ToolCtx,
|
||||
exec_request: ExecRequest,
|
||||
) -> Result<Option<PreparedUnifiedExecSpawn>, ToolError> {
|
||||
imp::maybe_prepare_unified_exec(req, attempt, ctx, exec_request).await
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
mod imp {
|
||||
use super::*;
|
||||
use crate::tools::runtimes::shell::unix_escalation;
|
||||
use crate::unified_exec::SpawnLifecycle;
|
||||
use codex_shell_escalation::EscalationSession;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ZshForkSpawnLifecycle {
|
||||
escalation_session: EscalationSession,
|
||||
}
|
||||
|
||||
impl SpawnLifecycle for ZshForkSpawnLifecycle {
|
||||
fn after_spawn(&mut self) {
|
||||
self.escalation_session.close_client_socket();
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn maybe_run_shell_command(
|
||||
req: &ShellRequest,
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
ctx: &ToolCtx,
|
||||
command: &[String],
|
||||
) -> Result<Option<ExecToolCallOutput>, ToolError> {
|
||||
unix_escalation::try_run_zsh_fork(req, attempt, ctx, command).await
|
||||
}
|
||||
|
||||
pub(super) async fn maybe_prepare_unified_exec(
|
||||
req: &UnifiedExecRequest,
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
ctx: &ToolCtx,
|
||||
exec_request: ExecRequest,
|
||||
) -> Result<Option<PreparedUnifiedExecSpawn>, ToolError> {
|
||||
let Some(prepared) =
|
||||
unix_escalation::prepare_unified_exec_zsh_fork(req, attempt, ctx, exec_request).await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
Ok(Some(PreparedUnifiedExecSpawn {
|
||||
exec_request: prepared.exec_request,
|
||||
spawn_lifecycle: Box::new(ZshForkSpawnLifecycle {
|
||||
escalation_session: prepared.escalation_session,
|
||||
}),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
mod imp {
|
||||
use super::*;
|
||||
|
||||
pub(super) async fn maybe_run_shell_command(
|
||||
req: &ShellRequest,
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
ctx: &ToolCtx,
|
||||
command: &[String],
|
||||
) -> Result<Option<ExecToolCallOutput>, ToolError> {
|
||||
let _ = (req, attempt, ctx, command);
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub(super) async fn maybe_prepare_unified_exec(
|
||||
req: &UnifiedExecRequest,
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
ctx: &ToolCtx,
|
||||
exec_request: ExecRequest,
|
||||
) -> Result<Option<PreparedUnifiedExecSpawn>, ToolError> {
|
||||
let _ = (req, attempt, ctx, exec_request);
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@ use crate::tools::network_approval::NetworkApprovalMode;
|
||||
use crate::tools::network_approval::NetworkApprovalSpec;
|
||||
use crate::tools::runtimes::build_command_spec;
|
||||
use crate::tools::runtimes::maybe_wrap_shell_lc_with_snapshot;
|
||||
use crate::tools::runtimes::shell::zsh_fork_backend;
|
||||
use crate::tools::sandboxing::Approvable;
|
||||
use crate::tools::sandboxing::ApprovalCtx;
|
||||
use crate::tools::sandboxing::ExecApprovalRequirement;
|
||||
@@ -28,6 +29,8 @@ use crate::tools::sandboxing::ToolError;
|
||||
use crate::tools::sandboxing::ToolRuntime;
|
||||
use crate::tools::sandboxing::sandbox_override_for_first_attempt;
|
||||
use crate::tools::sandboxing::with_cached_approval;
|
||||
use crate::tools::spec::UnifiedExecBackendConfig;
|
||||
use crate::unified_exec::NoopSpawnLifecycle;
|
||||
use crate::unified_exec::UnifiedExecError;
|
||||
use crate::unified_exec::UnifiedExecProcess;
|
||||
use crate::unified_exec::UnifiedExecProcessManager;
|
||||
@@ -63,11 +66,12 @@ pub struct UnifiedExecApprovalKey {
|
||||
|
||||
pub struct UnifiedExecRuntime<'a> {
|
||||
manager: &'a UnifiedExecProcessManager,
|
||||
backend: UnifiedExecBackendConfig,
|
||||
}
|
||||
|
||||
impl<'a> UnifiedExecRuntime<'a> {
|
||||
pub fn new(manager: &'a UnifiedExecProcessManager) -> Self {
|
||||
Self { manager }
|
||||
pub fn new(manager: &'a UnifiedExecProcessManager, backend: UnifiedExecBackendConfig) -> Self {
|
||||
Self { manager, backend }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,6 +188,47 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
if let Some(network) = req.network.as_ref() {
|
||||
network.apply_to_env(&mut env);
|
||||
}
|
||||
if self.backend == UnifiedExecBackendConfig::ZshFork {
|
||||
let spec = build_command_spec(
|
||||
&command,
|
||||
&req.cwd,
|
||||
&env,
|
||||
ExecExpiration::DefaultTimeout,
|
||||
req.sandbox_permissions,
|
||||
req.additional_permissions.clone(),
|
||||
req.justification.clone(),
|
||||
)
|
||||
.map_err(|_| ToolError::Rejected("missing command line for PTY".to_string()))?;
|
||||
let exec_env = attempt
|
||||
.env_for(spec, req.network.as_ref())
|
||||
.map_err(|err| ToolError::Codex(err.into()))?;
|
||||
match zsh_fork_backend::maybe_prepare_unified_exec(req, attempt, ctx, exec_env).await? {
|
||||
Some(prepared) => {
|
||||
return self
|
||||
.manager
|
||||
.open_session_with_exec_env(
|
||||
&prepared.exec_request,
|
||||
req.tty,
|
||||
prepared.spawn_lifecycle,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
UnifiedExecError::SandboxDenied { output, .. } => {
|
||||
ToolError::Codex(CodexErr::Sandbox(SandboxErr::Denied {
|
||||
output: Box::new(output),
|
||||
network_policy_decision: None,
|
||||
}))
|
||||
}
|
||||
other => ToolError::Rejected(other.to_string()),
|
||||
});
|
||||
}
|
||||
None => {
|
||||
tracing::warn!(
|
||||
"UnifiedExec ZshFork backend specified, but conditions for using it were not met, falling back to direct execution",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
let spec = build_command_spec(
|
||||
&command,
|
||||
&req.cwd,
|
||||
@@ -198,7 +243,7 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
.env_for(spec, req.network.as_ref())
|
||||
.map_err(|err| ToolError::Codex(err.into()))?;
|
||||
self.manager
|
||||
.open_session_with_exec_env(&exec_env, req.tty)
|
||||
.open_session_with_exec_env(&exec_env, req.tty, Box::new(NoopSpawnLifecycle))
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
UnifiedExecError::SandboxDenied { output, .. } => {
|
||||
|
||||
@@ -42,10 +42,17 @@ pub enum ShellCommandBackendConfig {
|
||||
ZshFork,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum UnifiedExecBackendConfig {
|
||||
Direct,
|
||||
ZshFork,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ToolsConfig {
|
||||
pub shell_type: ConfigShellToolType,
|
||||
shell_command_backend: ShellCommandBackendConfig,
|
||||
pub unified_exec_backend: UnifiedExecBackendConfig,
|
||||
pub allow_login_shell: bool,
|
||||
pub apply_patch_tool_type: Option<ApplyPatchToolType>,
|
||||
pub web_search_mode: Option<WebSearchMode>,
|
||||
@@ -94,6 +101,12 @@ impl ToolsConfig {
|
||||
} else {
|
||||
ShellCommandBackendConfig::Classic
|
||||
};
|
||||
let unified_exec_backend =
|
||||
if features.enabled(Feature::ShellTool) && features.enabled(Feature::ShellZshFork) {
|
||||
UnifiedExecBackendConfig::ZshFork
|
||||
} else {
|
||||
UnifiedExecBackendConfig::Direct
|
||||
};
|
||||
|
||||
let shell_type = if !features.enabled(Feature::ShellTool) {
|
||||
ConfigShellToolType::Disabled
|
||||
@@ -132,6 +145,7 @@ impl ToolsConfig {
|
||||
Self {
|
||||
shell_type,
|
||||
shell_command_backend,
|
||||
unified_exec_backend,
|
||||
allow_login_shell: true,
|
||||
apply_patch_tool_type,
|
||||
web_search_mode: *web_search_mode,
|
||||
@@ -2727,6 +2741,10 @@ mod tests {
|
||||
tools_config.shell_command_backend,
|
||||
ShellCommandBackendConfig::ZshFork
|
||||
);
|
||||
assert_eq!(
|
||||
tools_config.unified_exec_backend,
|
||||
UnifiedExecBackendConfig::ZshFork
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -49,6 +49,10 @@ pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) {
|
||||
}
|
||||
|
||||
pub(crate) use errors::UnifiedExecError;
|
||||
pub(crate) use process::NoopSpawnLifecycle;
|
||||
#[cfg(unix)]
|
||||
pub(crate) use process::SpawnLifecycle;
|
||||
pub(crate) use process::SpawnLifecycleHandle;
|
||||
pub(crate) use process::UnifiedExecProcess;
|
||||
|
||||
pub(crate) const MIN_YIELD_TIME_MS: u64 = 250;
|
||||
|
||||
@@ -24,6 +24,17 @@ use super::UNIFIED_EXEC_OUTPUT_MAX_TOKENS;
|
||||
use super::UnifiedExecError;
|
||||
use super::head_tail_buffer::HeadTailBuffer;
|
||||
|
||||
pub(crate) trait SpawnLifecycle: std::fmt::Debug + Send + Sync {
|
||||
fn after_spawn(&mut self) {}
|
||||
}
|
||||
|
||||
pub(crate) type SpawnLifecycleHandle = Box<dyn SpawnLifecycle>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct NoopSpawnLifecycle;
|
||||
|
||||
impl SpawnLifecycle for NoopSpawnLifecycle {}
|
||||
|
||||
pub(crate) type OutputBuffer = Arc<Mutex<HeadTailBuffer>>;
|
||||
pub(crate) struct OutputHandles {
|
||||
pub(crate) output_buffer: OutputBuffer,
|
||||
@@ -44,6 +55,7 @@ pub(crate) struct UnifiedExecProcess {
|
||||
output_drained: Arc<Notify>,
|
||||
output_task: JoinHandle<()>,
|
||||
sandbox_type: SandboxType,
|
||||
_spawn_lifecycle: SpawnLifecycleHandle,
|
||||
}
|
||||
|
||||
impl UnifiedExecProcess {
|
||||
@@ -51,6 +63,7 @@ impl UnifiedExecProcess {
|
||||
process_handle: ExecCommandSession,
|
||||
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
sandbox_type: SandboxType,
|
||||
spawn_lifecycle: SpawnLifecycleHandle,
|
||||
) -> Self {
|
||||
let output_buffer = Arc::new(Mutex::new(HeadTailBuffer::default()));
|
||||
let output_notify = Arc::new(Notify::new());
|
||||
@@ -92,6 +105,7 @@ impl UnifiedExecProcess {
|
||||
output_drained,
|
||||
output_task,
|
||||
sandbox_type,
|
||||
_spawn_lifecycle: spawn_lifecycle,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,13 +210,14 @@ impl UnifiedExecProcess {
|
||||
pub(super) async fn from_spawned(
|
||||
spawned: SpawnedPty,
|
||||
sandbox_type: SandboxType,
|
||||
spawn_lifecycle: SpawnLifecycleHandle,
|
||||
) -> Result<Self, UnifiedExecError> {
|
||||
let SpawnedPty {
|
||||
session: process_handle,
|
||||
output_rx,
|
||||
mut exit_rx,
|
||||
} = spawned;
|
||||
let managed = Self::new(process_handle, output_rx, sandbox_type);
|
||||
let managed = Self::new(process_handle, output_rx, sandbox_type, spawn_lifecycle);
|
||||
|
||||
let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed));
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ use crate::unified_exec::generate_chunk_id;
|
||||
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
|
||||
use crate::unified_exec::process::OutputBuffer;
|
||||
use crate::unified_exec::process::OutputHandles;
|
||||
use crate::unified_exec::process::SpawnLifecycleHandle;
|
||||
use crate::unified_exec::process::UnifiedExecProcess;
|
||||
use crate::unified_exec::resolve_max_tokens;
|
||||
|
||||
@@ -528,6 +529,7 @@ impl UnifiedExecProcessManager {
|
||||
&self,
|
||||
env: &ExecRequest,
|
||||
tty: bool,
|
||||
mut spawn_lifecycle: SpawnLifecycleHandle,
|
||||
) -> Result<UnifiedExecProcess, UnifiedExecError> {
|
||||
let (program, args) = env
|
||||
.command
|
||||
@@ -555,7 +557,8 @@ impl UnifiedExecProcessManager {
|
||||
};
|
||||
let spawned =
|
||||
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||
UnifiedExecProcess::from_spawned(spawned, env.sandbox).await
|
||||
spawn_lifecycle.after_spawn();
|
||||
UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await
|
||||
}
|
||||
|
||||
pub(super) async fn open_session_with_sandbox(
|
||||
@@ -569,7 +572,8 @@ impl UnifiedExecProcessManager {
|
||||
Some(context.session.conversation_id),
|
||||
));
|
||||
let mut orchestrator = ToolOrchestrator::new();
|
||||
let mut runtime = UnifiedExecRuntime::new(self);
|
||||
let mut runtime =
|
||||
UnifiedExecRuntime::new(self, context.turn.tools_config.unified_exec_backend);
|
||||
let exec_approval_requirement = context
|
||||
.session
|
||||
.services
|
||||
|
||||
@@ -14,6 +14,8 @@ pub use unix::EscalationPermissions;
|
||||
#[cfg(unix)]
|
||||
pub use unix::EscalationPolicy;
|
||||
#[cfg(unix)]
|
||||
pub use unix::EscalationSession;
|
||||
#[cfg(unix)]
|
||||
pub use unix::ExecParams;
|
||||
#[cfg(unix)]
|
||||
pub use unix::ExecResult;
|
||||
|
||||
@@ -3,11 +3,15 @@ use std::os::fd::AsRawFd;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context as _;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use socket2::Socket;
|
||||
use tokio::process::Command;
|
||||
use tokio::task::AbortHandle;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::unix::escalate_protocol::ESCALATE_SOCKET_ENV_VAR;
|
||||
@@ -82,6 +86,35 @@ pub struct PreparedExec {
|
||||
pub arg0: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EscalationSession {
|
||||
env: HashMap<String, String>,
|
||||
task: JoinHandle<anyhow::Result<()>>,
|
||||
client_socket: Option<Socket>,
|
||||
worker_abort_handles: Arc<Mutex<Vec<AbortHandle>>>,
|
||||
}
|
||||
|
||||
impl EscalationSession {
|
||||
pub fn env(&self) -> &HashMap<String, String> {
|
||||
&self.env
|
||||
}
|
||||
|
||||
pub fn close_client_socket(&mut self) {
|
||||
self.client_socket.take();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EscalationSession {
|
||||
fn drop(&mut self) {
|
||||
self.task.abort();
|
||||
if let Ok(mut handles) = self.worker_abort_handles.lock() {
|
||||
for handle in handles.drain(..) {
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EscalateServer {
|
||||
bash_path: PathBuf,
|
||||
execve_wrapper: PathBuf,
|
||||
@@ -106,16 +139,51 @@ impl EscalateServer {
|
||||
cancel_rx: CancellationToken,
|
||||
command_executor: Arc<dyn ShellCommandExecutor>,
|
||||
) -> anyhow::Result<ExecResult> {
|
||||
let session = self.start_session(cancel_rx.clone(), Arc::clone(&command_executor))?;
|
||||
let command = vec![
|
||||
self.bash_path.to_string_lossy().to_string(),
|
||||
if params.login == Some(false) {
|
||||
"-c".to_string()
|
||||
} else {
|
||||
"-lc".to_string()
|
||||
},
|
||||
params.command,
|
||||
];
|
||||
let workdir = AbsolutePathBuf::try_from(params.workdir)?;
|
||||
let result = command_executor
|
||||
.run(
|
||||
command,
|
||||
workdir.to_path_buf(),
|
||||
session.env().clone(),
|
||||
cancel_rx,
|
||||
)
|
||||
.await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Starts an escalation session and returns the environment overlay a shell
|
||||
/// needs in order to route intercepted execs through this server.
|
||||
///
|
||||
/// This does not spawn the shell itself. Callers own process creation and
|
||||
/// only use the returned environment plus the session lifetime handle.
|
||||
pub fn start_session(
|
||||
&self,
|
||||
cancellation_token: CancellationToken,
|
||||
command_executor: Arc<dyn ShellCommandExecutor>,
|
||||
) -> anyhow::Result<EscalationSession> {
|
||||
let (escalate_server, escalate_client) = AsyncDatagramSocket::pair()?;
|
||||
let client_socket = escalate_client.into_inner();
|
||||
// Only the client endpoint should cross exec into the wrapper process.
|
||||
client_socket.set_cloexec(false)?;
|
||||
let escalate_task = tokio::spawn(escalate_task(
|
||||
let worker_abort_handles = Arc::new(Mutex::new(Vec::new()));
|
||||
let task = tokio::spawn(escalate_task(
|
||||
escalate_server,
|
||||
Arc::clone(&self.policy),
|
||||
Arc::clone(&command_executor),
|
||||
cancellation_token,
|
||||
Arc::clone(&worker_abort_handles),
|
||||
));
|
||||
let mut env = std::env::vars().collect::<HashMap<String, String>>();
|
||||
let mut env = HashMap::new();
|
||||
env.insert(
|
||||
ESCALATE_SOCKET_ENV_VAR.to_string(),
|
||||
client_socket.as_raw_fd().to_string(),
|
||||
@@ -128,22 +196,12 @@ impl EscalateServer {
|
||||
LEGACY_BASH_EXEC_WRAPPER_ENV_VAR.to_string(),
|
||||
self.execve_wrapper.to_string_lossy().to_string(),
|
||||
);
|
||||
|
||||
let command = vec![
|
||||
self.bash_path.to_string_lossy().to_string(),
|
||||
if params.login == Some(false) {
|
||||
"-c".to_string()
|
||||
} else {
|
||||
"-lc".to_string()
|
||||
},
|
||||
params.command,
|
||||
];
|
||||
let workdir = AbsolutePathBuf::try_from(params.workdir)?;
|
||||
let result = command_executor
|
||||
.run(command, workdir.to_path_buf(), env, cancel_rx)
|
||||
.await?;
|
||||
escalate_task.abort();
|
||||
Ok(result)
|
||||
Ok(EscalationSession {
|
||||
env,
|
||||
task,
|
||||
client_socket: Some(client_socket),
|
||||
worker_abort_handles,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,9 +209,14 @@ async fn escalate_task(
|
||||
socket: AsyncDatagramSocket,
|
||||
policy: Arc<dyn EscalationPolicy>,
|
||||
command_executor: Arc<dyn ShellCommandExecutor>,
|
||||
cancellation_token: CancellationToken,
|
||||
worker_abort_handles: Arc<Mutex<Vec<AbortHandle>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
loop {
|
||||
let (_, mut fds) = socket.receive_with_fds().await?;
|
||||
let (_, mut fds) = tokio::select! {
|
||||
received = socket.receive_with_fds() => received?,
|
||||
_ = cancellation_token.cancelled() => return Ok(()),
|
||||
};
|
||||
if fds.len() != 1 {
|
||||
tracing::error!("expected 1 fd in datagram handshake, got {}", fds.len());
|
||||
continue;
|
||||
@@ -161,13 +224,22 @@ async fn escalate_task(
|
||||
let stream_socket = AsyncSocket::from_fd(fds.remove(0))?;
|
||||
let policy = Arc::clone(&policy);
|
||||
let command_executor = Arc::clone(&command_executor);
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) =
|
||||
handle_escalate_session_with_policy(stream_socket, policy, command_executor).await
|
||||
let cancellation_token = cancellation_token.clone();
|
||||
let worker = tokio::spawn(async move {
|
||||
if let Err(err) = handle_escalate_session_with_policy(
|
||||
stream_socket,
|
||||
policy,
|
||||
command_executor,
|
||||
cancellation_token,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!("escalate session failed: {err:?}");
|
||||
}
|
||||
});
|
||||
if let Ok(mut handles) = worker_abort_handles.lock() {
|
||||
handles.push(worker.abort_handle());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,18 +247,24 @@ async fn handle_escalate_session_with_policy(
|
||||
socket: AsyncSocket,
|
||||
policy: Arc<dyn EscalationPolicy>,
|
||||
command_executor: Arc<dyn ShellCommandExecutor>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let EscalateRequest {
|
||||
file,
|
||||
argv,
|
||||
workdir,
|
||||
env,
|
||||
} = socket.receive::<EscalateRequest>().await?;
|
||||
} = tokio::select! {
|
||||
request = socket.receive::<EscalateRequest>() => request?,
|
||||
_ = cancellation_token.cancelled() => return Ok(()),
|
||||
};
|
||||
let program = AbsolutePathBuf::resolve_path_against_base(file, workdir.as_path())?;
|
||||
let decision = policy
|
||||
.determine_action(&program, &argv, &workdir)
|
||||
.await
|
||||
.context("failed to determine escalation action")?;
|
||||
let decision = tokio::select! {
|
||||
decision = policy.determine_action(&program, &argv, &workdir) => {
|
||||
decision.context("failed to determine escalation action")?
|
||||
}
|
||||
_ = cancellation_token.cancelled() => return Ok(()),
|
||||
};
|
||||
|
||||
tracing::debug!("decided {decision:?} for {program:?} {argv:?} {workdir:?}");
|
||||
|
||||
@@ -204,10 +282,12 @@ async fn handle_escalate_session_with_policy(
|
||||
action: EscalateAction::Escalate,
|
||||
})
|
||||
.await?;
|
||||
let (msg, fds) = socket
|
||||
.receive_with_fds::<SuperExecMessage>()
|
||||
.await
|
||||
.context("failed to receive SuperExecMessage")?;
|
||||
let (msg, fds) = tokio::select! {
|
||||
message = socket.receive_with_fds::<SuperExecMessage>() => {
|
||||
message.context("failed to receive SuperExecMessage")?
|
||||
}
|
||||
_ = cancellation_token.cancelled() => return Ok(()),
|
||||
};
|
||||
if fds.len() != msg.fds.len() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"mismatched number of fds in SuperExecMessage: {} in the message, {} from the control message",
|
||||
@@ -231,9 +311,10 @@ async fn handle_escalate_session_with_policy(
|
||||
cwd,
|
||||
env,
|
||||
arg0,
|
||||
} = command_executor
|
||||
.prepare_escalated_exec(&program, &argv, &workdir, env, execution)
|
||||
.await?;
|
||||
} = tokio::select! {
|
||||
prepared = command_executor.prepare_escalated_exec(&program, &argv, &workdir, env, execution) => prepared?,
|
||||
_ = cancellation_token.cancelled() => return Ok(()),
|
||||
};
|
||||
let (program, args) = command
|
||||
.split_first()
|
||||
.ok_or_else(|| anyhow::anyhow!("prepared escalated command must not be empty"))?;
|
||||
@@ -245,7 +326,8 @@ async fn handle_escalate_session_with_policy(
|
||||
.current_dir(&cwd)
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::null());
|
||||
.stderr(Stdio::null())
|
||||
.kill_on_drop(true);
|
||||
unsafe {
|
||||
command.pre_exec(move || {
|
||||
for (dst_fd, src_fd) in msg.fds.iter().zip(&fds) {
|
||||
@@ -255,7 +337,13 @@ async fn handle_escalate_session_with_policy(
|
||||
});
|
||||
}
|
||||
let mut child = command.spawn()?;
|
||||
let exit_status = child.wait().await?;
|
||||
let exit_status = tokio::select! {
|
||||
status = child.wait() => status?,
|
||||
_ = cancellation_token.cancelled() => {
|
||||
let _ = child.start_kill();
|
||||
child.wait().await?
|
||||
}
|
||||
};
|
||||
socket
|
||||
.send(SuperExecResult {
|
||||
exit_code: exit_status.code().unwrap_or(127),
|
||||
@@ -282,6 +370,13 @@ mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::LazyLock;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::sleep;
|
||||
|
||||
static ESCALATE_SERVER_TEST_LOCK: LazyLock<tokio::sync::Mutex<()>> =
|
||||
LazyLock::new(|| tokio::sync::Mutex::new(()));
|
||||
|
||||
struct DeterministicEscalationPolicy {
|
||||
decision: EscalationDecision,
|
||||
@@ -390,8 +485,89 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_pid_file(pid_file: &std::path::Path) -> anyhow::Result<i32> {
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
loop {
|
||||
if let Ok(contents) = std::fs::read_to_string(pid_file) {
|
||||
return Ok(contents.trim().parse()?);
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
return Err(anyhow::anyhow!(
|
||||
"timed out waiting for pid file {}",
|
||||
pid_file.display()
|
||||
));
|
||||
}
|
||||
sleep(Duration::from_millis(20)).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn process_exists(pid: i32) -> bool {
|
||||
let rc = unsafe { libc::kill(pid, 0) };
|
||||
if rc == 0 {
|
||||
return true;
|
||||
}
|
||||
std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
|
||||
}
|
||||
|
||||
async fn wait_for_process_exit(pid: i32) -> anyhow::Result<()> {
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
loop {
|
||||
if !process_exists(pid) {
|
||||
return Ok(());
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
return Err(anyhow::anyhow!("timed out waiting for pid {pid} to exit"));
|
||||
}
|
||||
sleep(Duration::from_millis(20)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Verifies that `start_session()` returns only the wrapper/socket env
|
||||
/// overlay and does not need to touch the configured shell or wrapper
|
||||
/// executable paths.
|
||||
///
|
||||
/// The `/bin/bash` and `/tmp/codex-execve-wrapper` values here are
|
||||
/// intentionally fake sentinels: this test asserts that the paths are
|
||||
/// copied into the exported environment and that the socket fd stays valid
|
||||
/// until `close_client_socket()` is called.
|
||||
#[tokio::test]
|
||||
async fn start_session_exposes_wrapper_env_overlay() -> anyhow::Result<()> {
|
||||
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
|
||||
let execve_wrapper = PathBuf::from("/tmp/codex-execve-wrapper");
|
||||
let execve_wrapper_str = execve_wrapper.to_string_lossy().to_string();
|
||||
let server = EscalateServer::new(
|
||||
PathBuf::from("/bin/bash"),
|
||||
execve_wrapper.clone(),
|
||||
DeterministicEscalationPolicy {
|
||||
decision: EscalationDecision::run(),
|
||||
},
|
||||
);
|
||||
|
||||
let mut session = server.start_session(
|
||||
CancellationToken::new(),
|
||||
Arc::new(ForwardingShellCommandExecutor),
|
||||
)?;
|
||||
let env = session.env();
|
||||
assert_eq!(env.get(EXEC_WRAPPER_ENV_VAR), Some(&execve_wrapper_str));
|
||||
assert_eq!(
|
||||
env.get(LEGACY_BASH_EXEC_WRAPPER_ENV_VAR),
|
||||
Some(&execve_wrapper_str)
|
||||
);
|
||||
let socket_fd = env
|
||||
.get(ESCALATE_SOCKET_ENV_VAR)
|
||||
.expect("session should export shell escalation socket");
|
||||
let socket_fd = socket_fd.parse::<i32>()?;
|
||||
assert!(socket_fd >= 0);
|
||||
assert_ne!(unsafe { libc::fcntl(socket_fd, libc::F_GETFD) }, -1);
|
||||
session.close_client_socket();
|
||||
assert_eq!(unsafe { libc::fcntl(socket_fd, libc::F_GETFD) }, -1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handle_escalate_session_respects_run_in_sandbox_decision() -> anyhow::Result<()> {
|
||||
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
|
||||
let (server, client) = AsyncSocket::pair()?;
|
||||
let server_task = tokio::spawn(handle_escalate_session_with_policy(
|
||||
server,
|
||||
@@ -399,6 +575,7 @@ mod tests {
|
||||
decision: EscalationDecision::run(),
|
||||
}),
|
||||
Arc::new(ForwardingShellCommandExecutor),
|
||||
CancellationToken::new(),
|
||||
));
|
||||
|
||||
let mut env = HashMap::new();
|
||||
@@ -429,6 +606,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn handle_escalate_session_resolves_relative_file_against_request_workdir()
|
||||
-> anyhow::Result<()> {
|
||||
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
|
||||
let (server, client) = AsyncSocket::pair()?;
|
||||
let tmp = tempfile::TempDir::new()?;
|
||||
let workdir = tmp.path().join("workspace");
|
||||
@@ -442,6 +620,7 @@ mod tests {
|
||||
expected_workdir: workdir.clone(),
|
||||
}),
|
||||
Arc::new(ForwardingShellCommandExecutor),
|
||||
CancellationToken::new(),
|
||||
));
|
||||
|
||||
client
|
||||
@@ -465,6 +644,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn handle_escalate_session_executes_escalated_command() -> anyhow::Result<()> {
|
||||
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
|
||||
let (server, client) = AsyncSocket::pair()?;
|
||||
let server_task = tokio::spawn(handle_escalate_session_with_policy(
|
||||
server,
|
||||
@@ -472,6 +652,7 @@ mod tests {
|
||||
decision: EscalationDecision::escalate(EscalationExecution::Unsandboxed),
|
||||
}),
|
||||
Arc::new(ForwardingShellCommandExecutor),
|
||||
CancellationToken::new(),
|
||||
));
|
||||
|
||||
client
|
||||
@@ -507,6 +688,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn handle_escalate_session_passes_permissions_to_executor() -> anyhow::Result<()> {
|
||||
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
|
||||
let (server, client) = AsyncSocket::pair()?;
|
||||
let server_task = tokio::spawn(handle_escalate_session_with_policy(
|
||||
server,
|
||||
@@ -524,6 +706,7 @@ mod tests {
|
||||
..Default::default()
|
||||
}),
|
||||
}),
|
||||
CancellationToken::new(),
|
||||
));
|
||||
|
||||
client
|
||||
@@ -552,4 +735,85 @@ mod tests {
|
||||
|
||||
server_task.await?
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dropping_session_aborts_intercept_workers_and_kills_spawned_child()
|
||||
-> anyhow::Result<()> {
|
||||
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
|
||||
let tmp = TempDir::new()?;
|
||||
let pid_file = tmp.path().join("escalated-child.pid");
|
||||
let pid_file_display = pid_file.display().to_string();
|
||||
assert!(
|
||||
!pid_file_display.contains('\''),
|
||||
"test temp path should not contain single quotes: {pid_file_display}"
|
||||
);
|
||||
let server = EscalateServer::new(
|
||||
PathBuf::from("/bin/bash"),
|
||||
PathBuf::from("/tmp/codex-execve-wrapper"),
|
||||
DeterministicEscalationPolicy {
|
||||
decision: EscalationDecision::escalate(EscalationExecution::Unsandboxed),
|
||||
},
|
||||
);
|
||||
|
||||
let session = server.start_session(
|
||||
CancellationToken::new(),
|
||||
Arc::new(ForwardingShellCommandExecutor),
|
||||
)?;
|
||||
let socket_fd = session
|
||||
.env()
|
||||
.get(ESCALATE_SOCKET_ENV_VAR)
|
||||
.expect("session should export shell escalation socket")
|
||||
.parse::<i32>()?;
|
||||
let dup_socket_fd = unsafe { libc::dup(socket_fd) };
|
||||
assert!(dup_socket_fd >= 0, "expected dup() to succeed");
|
||||
let handshake_client = unsafe { AsyncDatagramSocket::from_raw_fd(dup_socket_fd) }?;
|
||||
let (server_stream, client_stream) = AsyncSocket::pair()?;
|
||||
let server_stream_fd = server_stream.into_inner().into();
|
||||
handshake_client
|
||||
.send_with_fds(&[0], &[server_stream_fd])
|
||||
.await
|
||||
.context("failed to send handshake datagram")?;
|
||||
|
||||
client_stream
|
||||
.send(EscalateRequest {
|
||||
file: PathBuf::from("/bin/sh"),
|
||||
argv: vec![
|
||||
"sh".to_string(),
|
||||
"-c".to_string(),
|
||||
format!("echo $$ > '{pid_file_display}' && exec /bin/sleep 100"),
|
||||
],
|
||||
workdir: AbsolutePathBuf::current_dir()?,
|
||||
env: HashMap::new(),
|
||||
})
|
||||
.await
|
||||
.context("failed to send EscalateRequest")?;
|
||||
|
||||
let response = client_stream
|
||||
.receive::<EscalateResponse>()
|
||||
.await
|
||||
.context("failed to receive EscalateResponse")?;
|
||||
assert_eq!(
|
||||
EscalateResponse {
|
||||
action: EscalateAction::Escalate,
|
||||
},
|
||||
response
|
||||
);
|
||||
|
||||
client_stream
|
||||
.send_with_fds(SuperExecMessage { fds: Vec::new() }, &[])
|
||||
.await
|
||||
.context("failed to send SuperExecMessage")?;
|
||||
|
||||
let pid = wait_for_pid_file(&pid_file).await?;
|
||||
assert!(
|
||||
process_exists(pid),
|
||||
"expected spawned child pid {pid} to exist"
|
||||
);
|
||||
|
||||
drop(session);
|
||||
|
||||
wait_for_process_exit(pid).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,6 +66,7 @@ pub use self::escalate_protocol::EscalateAction;
|
||||
pub use self::escalate_protocol::EscalationDecision;
|
||||
pub use self::escalate_protocol::EscalationExecution;
|
||||
pub use self::escalate_server::EscalateServer;
|
||||
pub use self::escalate_server::EscalationSession;
|
||||
pub use self::escalate_server::ExecParams;
|
||||
pub use self::escalate_server::ExecResult;
|
||||
pub use self::escalate_server::PreparedExec;
|
||||
|
||||
@@ -9,7 +9,7 @@ use tokio_util::sync::CancellationToken;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Stopwatch {
|
||||
limit: Duration,
|
||||
limit: Option<Duration>,
|
||||
inner: Arc<Mutex<StopwatchState>>,
|
||||
notify: Arc<Notify>,
|
||||
}
|
||||
@@ -30,13 +30,27 @@ impl Stopwatch {
|
||||
active_pauses: 0,
|
||||
})),
|
||||
notify: Arc::new(Notify::new()),
|
||||
limit,
|
||||
limit: Some(limit),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unlimited() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(StopwatchState {
|
||||
elapsed: Duration::ZERO,
|
||||
running_since: Some(Instant::now()),
|
||||
active_pauses: 0,
|
||||
})),
|
||||
notify: Arc::new(Notify::new()),
|
||||
limit: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cancellation_token(&self) -> CancellationToken {
|
||||
let limit = self.limit;
|
||||
let token = CancellationToken::new();
|
||||
let Some(limit) = self.limit else {
|
||||
return token;
|
||||
};
|
||||
let cancel = token.clone();
|
||||
let inner = Arc::clone(&self.inner);
|
||||
let notify = Arc::clone(&self.notify);
|
||||
@@ -208,4 +222,16 @@ mod tests {
|
||||
// Now the stopwatch should resume and hit the limit shortly after.
|
||||
token.cancelled().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unlimited_stopwatch_never_cancels() {
|
||||
let stopwatch = Stopwatch::unlimited();
|
||||
let token = stopwatch.cancellation_token();
|
||||
|
||||
assert!(
|
||||
timeout(Duration::from_millis(30), token.cancelled())
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user