Files
codex/codex-rs/tui/src/app/pending_interactive_replay.rs
Michael Bolin 3ca0e7673b feat: run zsh fork shell tool via shell-escalation (#12649)
## Why

This PR switches the `shell_command` zsh-fork path over to
`codex-shell-escalation` so the new shell tool can use the shared
exec-wrapper/escalation protocol instead of the `zsh_exec_bridge`
implementation that was introduced in
https://github.com/openai/codex/pull/12052. `zsh_exec_bridge` relied on
UNIX domain sockets, which is not as tamper-proof as the FD-based
approach in `codex-shell-escalation`.

## What Changed

- Added a Unix zsh-fork runtime adapter in `core`
(`core/src/tools/runtimes/shell/unix_escalation.rs`) that:
- runs zsh-fork commands through
`codex_shell_escalation::run_escalate_server`
  - bridges exec-policy / approval decisions into `ShellActionProvider`
- executes escalated commands via a `ShellCommandExecutor` that calls
`process_exec_tool_call`
- Updated `ShellRuntime` / `ShellCommandHandler` / tool spec wiring to
select a `shell_command` backend (`classic` vs `zsh-fork`) while leaving
the generic `shell` tool path unchanged.
- Removed the `zsh_exec_bridge`-based session service and deleted
`core/src/zsh_exec_bridge/mod.rs`.
- Moved exec-wrapper entrypoint dispatch to `arg0` by handling the
`codex-execve-wrapper` arg0 alias there, and removed the old
`codex_core::maybe_run_zsh_exec_wrapper_mode()` hooks from `cli` and
`app-server` mains.
- Added the needed `codex-shell-escalation` dependencies for `core` and
`arg0`.

## Tests

- `cargo test -p codex-core
shell_zsh_fork_prefers_shell_command_over_unified_exec`
- `cargo test -p codex-app-server turn_start_shell_zsh_fork --
--nocapture`
- verifies zsh-fork command execution and approval flows through the new
backend
- includes subcommand approve/decline coverage using the shared zsh
DotSlash fixture in `app-server/tests/suite/zsh`
- To test manually, I added the following to `~/.codex/config.toml`:

```toml
zsh_path = "/Users/mbolin/code/codex3/codex-rs/app-server/tests/suite/zsh"

[features]
shell_zsh_fork = true
```

Then I ran `just c` to run the dev build of Codex with these changes and
sent it the message:

```
run `echo $0`
```

And it replied with:

```
  echo $0 printed:

  /Users/mbolin/code/codex3/codex-rs/app-server/tests/suite/zsh

  In this tool context, $0 reflects the script path used to invoke the shell, not just zsh.
```

so the tool appears to be wired up correctly.

## Notes

- The zsh subcommand-decline integration test now uses `rm` under a
`WorkspaceWrite` sandbox. The previous `/usr/bin/true` scenario is
auto-allowed by the new `shell-escalation` policy path, which no longer
produces subcommand approval prompts.
2026-02-24 10:31:08 -08:00

582 lines
22 KiB
Rust

use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use std::collections::HashMap;
use std::collections::HashSet;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ElicitationRequestKey {
server_name: String,
request_id: codex_protocol::mcp::RequestId,
}
impl ElicitationRequestKey {
fn new(server_name: String, request_id: codex_protocol::mcp::RequestId) -> Self {
Self {
server_name,
request_id,
}
}
}
#[derive(Debug, Default)]
// Tracks which interactive prompts are still unresolved in the thread-event buffer.
//
// Thread snapshots are replayed when switching threads/agents. Most events should replay
// verbatim, but interactive prompts (approvals, request_user_input, MCP elicitations) must
// only replay if they are still pending. This state is updated from:
// - inbound events (`note_event`)
// - outbound ops that resolve a prompt (`note_outbound_op`)
// - buffer eviction (`note_evicted_event`)
//
// We keep both fast lookup sets (for snapshot filtering by call_id/request key) and
// turn-indexed queues/vectors so `TurnComplete`/`TurnAborted` can clear stale prompts tied
// to a turn. `request_user_input` removal is FIFO because the overlay answers queued prompts
// in FIFO order for a shared `turn_id`.
pub(super) struct PendingInteractiveReplayState {
exec_approval_call_ids: HashSet<String>,
exec_approval_call_ids_by_turn_id: HashMap<String, Vec<String>>,
patch_approval_call_ids: HashSet<String>,
patch_approval_call_ids_by_turn_id: HashMap<String, Vec<String>>,
elicitation_requests: HashSet<ElicitationRequestKey>,
request_user_input_call_ids: HashSet<String>,
request_user_input_call_ids_by_turn_id: HashMap<String, Vec<String>>,
}
impl PendingInteractiveReplayState {
pub(super) fn op_can_change_state(op: &Op) -> bool {
matches!(
op,
Op::ExecApproval { .. }
| Op::PatchApproval { .. }
| Op::ResolveElicitation { .. }
| Op::UserInputAnswer { .. }
| Op::Shutdown
)
}
pub(super) fn note_outbound_op(&mut self, op: &Op) {
match op {
Op::ExecApproval { id, turn_id, .. } => {
self.exec_approval_call_ids.remove(id);
if let Some(turn_id) = turn_id {
Self::remove_call_id_from_turn_map_entry(
&mut self.exec_approval_call_ids_by_turn_id,
turn_id,
id,
);
}
}
Op::PatchApproval { id, .. } => {
self.patch_approval_call_ids.remove(id);
Self::remove_call_id_from_turn_map(
&mut self.patch_approval_call_ids_by_turn_id,
id,
);
}
Op::ResolveElicitation {
server_name,
request_id,
..
} => {
self.elicitation_requests
.remove(&ElicitationRequestKey::new(
server_name.clone(),
request_id.clone(),
));
}
// `Op::UserInputAnswer` identifies the turn, not the prompt call_id. The UI
// answers queued prompts for the same turn in FIFO order, so remove the oldest
// queued call_id for that turn.
Op::UserInputAnswer { id, .. } => {
let mut remove_turn_entry = false;
if let Some(call_ids) = self.request_user_input_call_ids_by_turn_id.get_mut(id) {
if !call_ids.is_empty() {
let call_id = call_ids.remove(0);
self.request_user_input_call_ids.remove(&call_id);
}
if call_ids.is_empty() {
remove_turn_entry = true;
}
}
if remove_turn_entry {
self.request_user_input_call_ids_by_turn_id.remove(id);
}
}
Op::Shutdown => self.clear(),
_ => {}
}
}
pub(super) fn note_event(&mut self, event: &Event) {
match &event.msg {
EventMsg::ExecApprovalRequest(ev) => {
self.exec_approval_call_ids.insert(ev.call_id.clone());
self.exec_approval_call_ids_by_turn_id
.entry(ev.turn_id.clone())
.or_default()
.push(ev.call_id.clone());
}
EventMsg::ExecCommandBegin(ev) => {
self.exec_approval_call_ids.remove(&ev.call_id);
Self::remove_call_id_from_turn_map(
&mut self.exec_approval_call_ids_by_turn_id,
&ev.call_id,
);
}
EventMsg::ApplyPatchApprovalRequest(ev) => {
self.patch_approval_call_ids.insert(ev.call_id.clone());
self.patch_approval_call_ids_by_turn_id
.entry(ev.turn_id.clone())
.or_default()
.push(ev.call_id.clone());
}
EventMsg::PatchApplyBegin(ev) => {
self.patch_approval_call_ids.remove(&ev.call_id);
Self::remove_call_id_from_turn_map(
&mut self.patch_approval_call_ids_by_turn_id,
&ev.call_id,
);
}
EventMsg::ElicitationRequest(ev) => {
self.elicitation_requests.insert(ElicitationRequestKey::new(
ev.server_name.clone(),
ev.id.clone(),
));
}
EventMsg::RequestUserInput(ev) => {
self.request_user_input_call_ids.insert(ev.call_id.clone());
self.request_user_input_call_ids_by_turn_id
.entry(ev.turn_id.clone())
.or_default()
.push(ev.call_id.clone());
}
// A turn ending (normally or aborted/replaced) invalidates any unresolved
// turn-scoped approvals and request_user_input prompts from that turn.
EventMsg::TurnComplete(ev) => {
self.clear_exec_approval_turn(&ev.turn_id);
self.clear_patch_approval_turn(&ev.turn_id);
self.clear_request_user_input_turn(&ev.turn_id);
}
EventMsg::TurnAborted(ev) => {
if let Some(turn_id) = &ev.turn_id {
self.clear_exec_approval_turn(turn_id);
self.clear_patch_approval_turn(turn_id);
self.clear_request_user_input_turn(turn_id);
}
}
EventMsg::ShutdownComplete => self.clear(),
_ => {}
}
}
pub(super) fn note_evicted_event(&mut self, event: &Event) {
match &event.msg {
EventMsg::ExecApprovalRequest(ev) => {
self.exec_approval_call_ids.remove(&ev.call_id);
Self::remove_call_id_from_turn_map_entry(
&mut self.exec_approval_call_ids_by_turn_id,
&ev.turn_id,
&ev.call_id,
);
}
EventMsg::ApplyPatchApprovalRequest(ev) => {
self.patch_approval_call_ids.remove(&ev.call_id);
Self::remove_call_id_from_turn_map_entry(
&mut self.patch_approval_call_ids_by_turn_id,
&ev.turn_id,
&ev.call_id,
);
}
EventMsg::ElicitationRequest(ev) => {
self.elicitation_requests
.remove(&ElicitationRequestKey::new(
ev.server_name.clone(),
ev.id.clone(),
));
}
EventMsg::RequestUserInput(ev) => {
self.request_user_input_call_ids.remove(&ev.call_id);
let mut remove_turn_entry = false;
if let Some(call_ids) = self
.request_user_input_call_ids_by_turn_id
.get_mut(&ev.turn_id)
{
call_ids.retain(|call_id| call_id != &ev.call_id);
if call_ids.is_empty() {
remove_turn_entry = true;
}
}
if remove_turn_entry {
self.request_user_input_call_ids_by_turn_id
.remove(&ev.turn_id);
}
}
_ => {}
}
}
pub(super) fn should_replay_snapshot_event(&self, event: &Event) -> bool {
match &event.msg {
EventMsg::ExecApprovalRequest(ev) => self.exec_approval_call_ids.contains(&ev.call_id),
EventMsg::ApplyPatchApprovalRequest(ev) => {
self.patch_approval_call_ids.contains(&ev.call_id)
}
EventMsg::ElicitationRequest(ev) => {
self.elicitation_requests
.contains(&ElicitationRequestKey::new(
ev.server_name.clone(),
ev.id.clone(),
))
}
EventMsg::RequestUserInput(ev) => {
self.request_user_input_call_ids.contains(&ev.call_id)
}
_ => true,
}
}
fn clear_request_user_input_turn(&mut self, turn_id: &str) {
if let Some(call_ids) = self.request_user_input_call_ids_by_turn_id.remove(turn_id) {
for call_id in call_ids {
self.request_user_input_call_ids.remove(&call_id);
}
}
}
fn clear_exec_approval_turn(&mut self, turn_id: &str) {
if let Some(call_ids) = self.exec_approval_call_ids_by_turn_id.remove(turn_id) {
for call_id in call_ids {
self.exec_approval_call_ids.remove(&call_id);
}
}
}
fn clear_patch_approval_turn(&mut self, turn_id: &str) {
if let Some(call_ids) = self.patch_approval_call_ids_by_turn_id.remove(turn_id) {
for call_id in call_ids {
self.patch_approval_call_ids.remove(&call_id);
}
}
}
fn remove_call_id_from_turn_map(
call_ids_by_turn_id: &mut HashMap<String, Vec<String>>,
call_id: &str,
) {
call_ids_by_turn_id.retain(|_, call_ids| {
call_ids.retain(|queued_call_id| queued_call_id != call_id);
!call_ids.is_empty()
});
}
fn remove_call_id_from_turn_map_entry(
call_ids_by_turn_id: &mut HashMap<String, Vec<String>>,
turn_id: &str,
call_id: &str,
) {
let mut remove_turn_entry = false;
if let Some(call_ids) = call_ids_by_turn_id.get_mut(turn_id) {
call_ids.retain(|queued_call_id| queued_call_id != call_id);
if call_ids.is_empty() {
remove_turn_entry = true;
}
}
if remove_turn_entry {
call_ids_by_turn_id.remove(turn_id);
}
}
fn clear(&mut self) {
self.exec_approval_call_ids.clear();
self.exec_approval_call_ids_by_turn_id.clear();
self.patch_approval_call_ids.clear();
self.patch_approval_call_ids_by_turn_id.clear();
self.elicitation_requests.clear();
self.request_user_input_call_ids.clear();
self.request_user_input_call_ids_by_turn_id.clear();
}
}
#[cfg(test)]
mod tests {
use super::super::ThreadEventStore;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::TurnAbortReason;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::path::PathBuf;
#[test]
fn thread_event_snapshot_keeps_pending_request_user_input() {
let mut store = ThreadEventStore::new(8);
let request = Event {
id: "ev-1".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
};
store.push_event(request);
let snapshot = store.snapshot();
assert_eq!(snapshot.events.len(), 1);
assert!(matches!(
snapshot.events.first().map(|event| &event.msg),
Some(EventMsg::RequestUserInput(_))
));
}
#[test]
fn thread_event_snapshot_drops_resolved_request_user_input_after_user_answer() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
});
store.note_outbound_op(&Op::UserInputAnswer {
id: "turn-1".to_string(),
response: codex_protocol::request_user_input::RequestUserInputResponse {
answers: HashMap::new(),
},
});
let snapshot = store.snapshot();
assert!(
snapshot.events.is_empty(),
"resolved request_user_input prompt should not replay on thread switch"
);
}
#[test]
fn thread_event_snapshot_drops_resolved_exec_approval_after_outbound_approval_call_id() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::ExecApprovalRequest(
codex_protocol::protocol::ExecApprovalRequestEvent {
call_id: "call-1".to_string(),
approval_id: Some("approval-1".to_string()),
turn_id: "turn-1".to_string(),
command: vec!["echo".to_string(), "hi".to_string()],
cwd: PathBuf::from("/tmp"),
reason: None,
network_approval_context: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
additional_permissions: None,
parsed_cmd: Vec::new(),
},
),
});
store.note_outbound_op(&Op::ExecApproval {
id: "call-1".to_string(),
turn_id: Some("turn-1".to_string()),
decision: codex_protocol::protocol::ReviewDecision::Approved,
});
let snapshot = store.snapshot();
assert!(
snapshot.events.is_empty(),
"resolved exec approval prompt should not replay on thread switch"
);
}
#[test]
fn thread_event_snapshot_drops_answered_request_user_input_for_multi_prompt_turn() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
});
store.note_outbound_op(&Op::UserInputAnswer {
id: "turn-1".to_string(),
response: codex_protocol::request_user_input::RequestUserInputResponse {
answers: HashMap::new(),
},
});
store.push_event(Event {
id: "ev-2".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-2".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
});
let snapshot = store.snapshot();
assert_eq!(snapshot.events.len(), 1);
assert!(matches!(
snapshot.events.first().map(|event| &event.msg),
Some(EventMsg::RequestUserInput(ev)) if ev.call_id == "call-2"
));
}
#[test]
fn thread_event_snapshot_keeps_newer_request_user_input_pending_when_same_turn_has_queue() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
});
store.push_event(Event {
id: "ev-2".to_string(),
msg: EventMsg::RequestUserInput(
codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-2".to_string(),
turn_id: "turn-1".to_string(),
questions: Vec::new(),
},
),
});
store.note_outbound_op(&Op::UserInputAnswer {
id: "turn-1".to_string(),
response: codex_protocol::request_user_input::RequestUserInputResponse {
answers: HashMap::new(),
},
});
let snapshot = store.snapshot();
assert_eq!(snapshot.events.len(), 1);
assert!(matches!(
snapshot.events.first().map(|event| &event.msg),
Some(EventMsg::RequestUserInput(ev)) if ev.call_id == "call-2"
));
}
#[test]
fn thread_event_snapshot_drops_resolved_patch_approval_after_outbound_approval() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::ApplyPatchApprovalRequest(
codex_protocol::protocol::ApplyPatchApprovalRequestEvent {
call_id: "call-1".to_string(),
turn_id: "turn-1".to_string(),
changes: HashMap::new(),
reason: None,
grant_root: None,
},
),
});
store.note_outbound_op(&Op::PatchApproval {
id: "call-1".to_string(),
decision: codex_protocol::protocol::ReviewDecision::Approved,
});
let snapshot = store.snapshot();
assert!(
snapshot.events.is_empty(),
"resolved patch approval prompt should not replay on thread switch"
);
}
#[test]
fn thread_event_snapshot_drops_pending_approvals_when_turn_aborts() {
let mut store = ThreadEventStore::new(8);
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::ExecApprovalRequest(
codex_protocol::protocol::ExecApprovalRequestEvent {
call_id: "exec-call-1".to_string(),
approval_id: Some("approval-1".to_string()),
turn_id: "turn-1".to_string(),
command: vec!["echo".to_string(), "hi".to_string()],
cwd: PathBuf::from("/tmp"),
reason: None,
network_approval_context: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
additional_permissions: None,
parsed_cmd: Vec::new(),
},
),
});
store.push_event(Event {
id: "ev-2".to_string(),
msg: EventMsg::ApplyPatchApprovalRequest(
codex_protocol::protocol::ApplyPatchApprovalRequestEvent {
call_id: "patch-call-1".to_string(),
turn_id: "turn-1".to_string(),
changes: HashMap::new(),
reason: None,
grant_root: None,
},
),
});
store.push_event(Event {
id: "ev-3".to_string(),
msg: EventMsg::TurnAborted(codex_protocol::protocol::TurnAbortedEvent {
turn_id: Some("turn-1".to_string()),
reason: TurnAbortReason::Replaced,
}),
});
let snapshot = store.snapshot();
assert!(snapshot.events.iter().all(|event| {
!matches!(
&event.msg,
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_)
)
}));
}
#[test]
fn thread_event_snapshot_drops_resolved_elicitation_after_outbound_resolution() {
let mut store = ThreadEventStore::new(8);
let request_id = codex_protocol::mcp::RequestId::String("request-1".to_string());
store.push_event(Event {
id: "ev-1".to_string(),
msg: EventMsg::ElicitationRequest(codex_protocol::approvals::ElicitationRequestEvent {
server_name: "server-1".to_string(),
id: request_id.clone(),
message: "Please confirm".to_string(),
}),
});
store.note_outbound_op(&Op::ResolveElicitation {
server_name: "server-1".to_string(),
request_id,
decision: codex_protocol::approvals::ElicitationAction::Accept,
});
let snapshot = store.snapshot();
assert!(
snapshot.events.is_empty(),
"resolved elicitation prompt should not replay on thread switch"
);
}
}