feat: approval for sub-agent in the TUI (#12995)

<img width="766" height="290" alt="Screenshot 2026-02-27 at 10 50 48"
src="https://github.com/user-attachments/assets/3bc96cd9-ed2c-4d67-a317-8f7b60abbbb1"
/>
This commit is contained in:
jif-oai
2026-02-28 14:07:07 +01:00
committed by GitHub
parent 83177ed7a8
commit 2b38b4e03b
7 changed files with 669 additions and 87 deletions

View File

@@ -106,6 +106,7 @@ use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::unbounded_channel;
use tokio::task::JoinHandle;
use toml::Value as TomlValue;
mod pending_interactive_replay;
@@ -684,6 +685,7 @@ pub(crate) struct App {
windows_sandbox: WindowsSandboxState,
thread_event_channels: HashMap<ThreadId, ThreadEventChannel>,
thread_event_listener_tasks: HashMap<ThreadId, JoinHandle<()>>,
agent_picker_threads: HashMap<ThreadId, AgentPickerThreadEntry>,
active_thread_id: Option<ThreadId>,
active_thread_rx: Option<mpsc::Receiver<Event>>,
@@ -863,6 +865,23 @@ impl App {
self.suppress_shutdown_complete = true;
self.chat_widget.submit_op(Op::Shutdown);
self.server.remove_thread(&thread_id).await;
self.abort_thread_event_listener(thread_id);
}
}
fn abort_thread_event_listener(&mut self, thread_id: ThreadId) {
if let Some(handle) = self.thread_event_listener_tasks.remove(&thread_id) {
handle.abort();
}
}
fn abort_all_thread_event_listeners(&mut self) {
for handle in self
.thread_event_listener_tasks
.drain()
.map(|(_, handle)| handle)
{
handle.abort();
}
}
@@ -928,6 +947,14 @@ impl App {
self.refresh_pending_thread_approvals().await;
}
async fn note_thread_outbound_op(&mut self, thread_id: ThreadId, op: &Op) {
let Some(channel) = self.thread_event_channels.get(&thread_id) else {
return;
};
let mut store = channel.store.lock().await;
store.note_outbound_op(op);
}
async fn note_active_thread_outbound_op(&mut self, op: &Op) {
if !ThreadEventStore::op_can_change_pending_replay_state(op) {
return;
@@ -935,11 +962,113 @@ impl App {
let Some(thread_id) = self.active_thread_id else {
return;
};
let Some(channel) = self.thread_event_channels.get(&thread_id) else {
return;
self.note_thread_outbound_op(thread_id, op).await;
}
fn thread_label(&self, thread_id: ThreadId) -> String {
let is_primary = self.primary_thread_id == Some(thread_id);
let fallback_label = if is_primary {
"Main [default]".to_string()
} else {
let thread_id = thread_id.to_string();
let short_id: String = thread_id.chars().take(8).collect();
format!("Agent ({short_id})")
};
let mut store = channel.store.lock().await;
store.note_outbound_op(op);
if let Some(entry) = self.agent_picker_threads.get(&thread_id) {
let label = format_agent_picker_item_name(
entry.agent_nickname.as_deref(),
entry.agent_role.as_deref(),
is_primary,
);
if label == "Agent" {
let thread_id = thread_id.to_string();
let short_id: String = thread_id.chars().take(8).collect();
format!("{label} ({short_id})")
} else {
label
}
} else {
fallback_label
}
}
async fn thread_cwd(&self, thread_id: ThreadId) -> Option<PathBuf> {
let channel = self.thread_event_channels.get(&thread_id)?;
let store = channel.store.lock().await;
match store.session_configured.as_ref().map(|event| &event.msg) {
Some(EventMsg::SessionConfigured(session)) => Some(session.cwd.clone()),
_ => None,
}
}
async fn approval_request_for_thread_event(
&self,
thread_id: ThreadId,
event: &Event,
) -> Option<ApprovalRequest> {
let thread_label = Some(self.thread_label(thread_id));
match &event.msg {
EventMsg::ExecApprovalRequest(ev) => Some(ApprovalRequest::Exec {
thread_id,
thread_label,
id: ev.effective_approval_id(),
command: ev.command.clone(),
reason: ev.reason.clone(),
available_decisions: ev.effective_available_decisions(),
network_approval_context: ev.network_approval_context.clone(),
additional_permissions: ev.additional_permissions.clone(),
}),
EventMsg::ApplyPatchApprovalRequest(ev) => Some(ApprovalRequest::ApplyPatch {
thread_id,
thread_label,
id: ev.call_id.clone(),
reason: ev.reason.clone(),
cwd: self
.thread_cwd(thread_id)
.await
.unwrap_or_else(|| self.config.cwd.clone()),
changes: ev.changes.clone(),
}),
EventMsg::ElicitationRequest(ev) => Some(ApprovalRequest::McpElicitation {
thread_id,
thread_label,
server_name: ev.server_name.clone(),
request_id: ev.id.clone(),
message: ev.message.clone(),
}),
_ => None,
}
}
async fn submit_op_to_thread(&mut self, thread_id: ThreadId, op: Op) {
let replay_state_op =
ThreadEventStore::op_can_change_pending_replay_state(&op).then(|| op.clone());
let submitted = if self.active_thread_id == Some(thread_id) {
self.chat_widget.submit_op(op)
} else {
crate::session_log::log_outbound_op(&op);
match self.server.get_thread(thread_id).await {
Ok(thread) => match thread.submit(op).await {
Ok(_) => true,
Err(err) => {
self.chat_widget.add_error_message(format!(
"Failed to submit op to thread {thread_id}: {err}"
));
false
}
},
Err(err) => {
self.chat_widget.add_error_message(format!(
"Failed to find thread {thread_id} for approval response: {err}"
));
false
}
}
};
if submitted && let Some(op) = replay_state_op.as_ref() {
self.note_thread_outbound_op(thread_id, op).await;
self.refresh_pending_thread_approvals().await;
}
}
async fn refresh_pending_thread_approvals(&mut self) {
@@ -965,32 +1094,7 @@ impl App {
let threads = pending_thread_ids
.into_iter()
.map(|thread_id| {
let is_primary = self.primary_thread_id == Some(thread_id);
let fallback_label = if is_primary {
"Main [default]".to_string()
} else {
let thread_id = thread_id.to_string();
let short_id: String = thread_id.chars().take(8).collect();
format!("Agent ({short_id})")
};
if let Some(entry) = self.agent_picker_threads.get(&thread_id) {
let label = format_agent_picker_item_name(
entry.agent_nickname.as_deref(),
entry.agent_role.as_deref(),
is_primary,
);
if label == "Agent" {
let thread_id = thread_id.to_string();
let short_id: String = thread_id.chars().take(8).collect();
format!("{label} ({short_id})")
} else {
label
}
} else {
fallback_label
}
})
.map(|thread_id| self.thread_label(thread_id))
.collect();
self.chat_widget.set_pending_thread_approvals(threads);
@@ -999,6 +1103,12 @@ impl App {
async fn enqueue_thread_event(&mut self, thread_id: ThreadId, event: Event) -> Result<()> {
let refresh_pending_thread_approvals =
ThreadEventStore::event_can_change_pending_thread_approvals(&event);
let inactive_approval_request = if self.active_thread_id != Some(thread_id) {
self.approval_request_for_thread_event(thread_id, &event)
.await
} else {
None
};
let (sender, store) = {
let channel = self.ensure_thread_channel(thread_id);
(channel.sender.clone(), Arc::clone(&channel.store))
@@ -1027,6 +1137,8 @@ impl App {
tracing::warn!("thread {thread_id} event channel closed");
}
}
} else if let Some(request) = inactive_approval_request {
self.chat_widget.push_approval_request(request);
}
if refresh_pending_thread_approvals {
self.refresh_pending_thread_approvals().await;
@@ -1034,6 +1146,19 @@ impl App {
Ok(())
}
async fn handle_routed_thread_event(
&mut self,
thread_id: ThreadId,
event: Event,
) -> Result<()> {
if !self.thread_event_channels.contains_key(&thread_id) {
tracing::debug!("dropping stale event for untracked thread {thread_id}");
return Ok(());
}
self.enqueue_thread_event(thread_id, event).await
}
async fn enqueue_primary_event(&mut self, event: Event) -> Result<()> {
if let Some(thread_id) = self.primary_thread_id {
return self.enqueue_thread_event(thread_id, event).await;
@@ -1045,12 +1170,12 @@ impl App {
self.primary_session_configured = Some(session.clone());
self.ensure_thread_channel(thread_id);
self.activate_thread_channel(thread_id).await;
self.enqueue_thread_event(thread_id, event).await?;
let pending = std::mem::take(&mut self.pending_primary_events);
for pending_event in pending {
self.enqueue_thread_event(thread_id, pending_event).await?;
}
self.enqueue_thread_event(thread_id, event).await?;
} else {
self.pending_primary_events.push_back(event);
}
@@ -1227,6 +1352,7 @@ impl App {
}
fn reset_thread_event_state(&mut self) {
self.abort_all_thread_event_listeners();
self.thread_event_channels.clear();
self.agent_picker_threads.clear();
self.active_thread_id = None;
@@ -1597,6 +1723,7 @@ impl App {
pending_shutdown_exit_thread_id: None,
windows_sandbox: WindowsSandboxState::default(),
thread_event_channels: HashMap::new(),
thread_event_listener_tasks: HashMap::new(),
agent_picker_threads: HashMap::new(),
active_thread_id: None,
active_thread_rx: None,
@@ -2011,6 +2138,9 @@ impl App {
AppEvent::CodexEvent(event) => {
self.enqueue_primary_event(event).await?;
}
AppEvent::ThreadEvent { thread_id, event } => {
self.handle_routed_thread_event(thread_id, event).await?;
}
AppEvent::Exit(mode) => {
return Ok(self.handle_exit_mode(mode));
}
@@ -2026,6 +2156,9 @@ impl App {
self.refresh_pending_thread_approvals().await;
}
}
AppEvent::SubmitThreadOp { thread_id, op } => {
self.submit_op_to_thread(thread_id, op).await;
}
AppEvent::DiffResult(text) => {
// Clear the in-progress state in the bottom pane
self.chat_widget.on_diff_complete();
@@ -2823,9 +2956,6 @@ impl App {
AppEvent::OpenAgentPicker => {
self.open_agent_picker().await;
}
AppEvent::RefreshPendingThreadApprovals => {
self.refresh_pending_thread_approvals().await;
}
AppEvent::SelectAgentThread(thread_id) => {
self.select_agent_thread(tui, thread_id).await?;
}
@@ -3184,11 +3314,9 @@ impl App {
};
let channel =
ThreadEventChannel::new_with_session_configured(THREAD_EVENT_CHANNEL_CAPACITY, event);
let sender = channel.sender.clone();
let store = Arc::clone(&channel.store);
let app_event_tx = self.app_event_tx.clone();
self.thread_event_channels.insert(thread_id, channel);
tokio::spawn(async move {
let listener_handle = tokio::spawn(async move {
loop {
let event = match thread.next_event().await {
Ok(event) => event,
@@ -3197,22 +3325,11 @@ impl App {
break;
}
};
let refresh_pending_thread_approvals =
ThreadEventStore::event_can_change_pending_thread_approvals(&event);
let should_send = {
let mut guard = store.lock().await;
guard.push_event(event.clone());
guard.active
};
if refresh_pending_thread_approvals {
app_event_tx.send(AppEvent::RefreshPendingThreadApprovals);
}
if should_send && let Err(err) = sender.send(event).await {
tracing::debug!("external thread {thread_id} channel closed: {err}");
break;
}
app_event_tx.send(AppEvent::ThreadEvent { thread_id, event });
}
});
self.thread_event_listener_tasks
.insert(thread_id, listener_handle);
Ok(())
}
@@ -3612,6 +3729,152 @@ mod tests {
);
}
#[tokio::test]
async fn enqueue_primary_event_delivers_session_configured_before_buffered_approval()
-> Result<()> {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
let thread_id = ThreadId::new();
let approval_event = Event {
id: "approval-event".to_string(),
msg: EventMsg::ExecApprovalRequest(
codex_protocol::protocol::ExecApprovalRequestEvent {
call_id: "call-1".to_string(),
approval_id: None,
turn_id: "turn-1".to_string(),
command: vec!["echo".to_string(), "hello".to_string()],
cwd: PathBuf::from("/tmp/project"),
reason: Some("needs approval".to_string()),
network_approval_context: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
additional_permissions: None,
available_decisions: None,
parsed_cmd: Vec::new(),
},
),
};
let session_configured_event = Event {
id: "session-configured".to_string(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: thread_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
cwd: PathBuf::from("/tmp/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
};
app.enqueue_primary_event(approval_event.clone()).await?;
app.enqueue_primary_event(session_configured_event.clone())
.await?;
let rx = app
.active_thread_rx
.as_mut()
.expect("primary thread receiver should be active");
let first_event = time::timeout(Duration::from_millis(50), rx.recv())
.await
.expect("timed out waiting for session configured event")
.expect("channel closed unexpectedly");
let second_event = time::timeout(Duration::from_millis(50), rx.recv())
.await
.expect("timed out waiting for buffered approval event")
.expect("channel closed unexpectedly");
assert!(matches!(first_event.msg, EventMsg::SessionConfigured(_)));
assert!(matches!(second_event.msg, EventMsg::ExecApprovalRequest(_)));
app.handle_codex_event_now(first_event);
app.handle_codex_event_now(second_event);
app.chat_widget
.handle_key_event(KeyEvent::new(KeyCode::Char('y'), KeyModifiers::NONE));
while let Ok(app_event) = app_event_rx.try_recv() {
if let AppEvent::SubmitThreadOp {
thread_id: op_thread_id,
..
} = app_event
{
assert_eq!(op_thread_id, thread_id);
return Ok(());
}
}
panic!("expected approval action to submit a thread-scoped op");
}
#[tokio::test]
async fn routed_thread_event_does_not_recreate_channel_after_reset() -> Result<()> {
let mut app = make_test_app().await;
let thread_id = ThreadId::new();
app.thread_event_channels.insert(
thread_id,
ThreadEventChannel::new(THREAD_EVENT_CHANNEL_CAPACITY),
);
app.reset_thread_event_state();
app.handle_routed_thread_event(
thread_id,
Event {
id: "stale-event".to_string(),
msg: EventMsg::ShutdownComplete,
},
)
.await?;
assert!(
!app.thread_event_channels.contains_key(&thread_id),
"stale routed events should not recreate cleared thread channels"
);
assert_eq!(app.active_thread_id, None);
assert_eq!(app.primary_thread_id, None);
Ok(())
}
#[tokio::test]
async fn reset_thread_event_state_aborts_listener_tasks() {
struct NotifyOnDrop(Option<tokio::sync::oneshot::Sender<()>>);
impl Drop for NotifyOnDrop {
fn drop(&mut self) {
if let Some(tx) = self.0.take() {
let _ = tx.send(());
}
}
}
let mut app = make_test_app().await;
let thread_id = ThreadId::new();
let (started_tx, started_rx) = tokio::sync::oneshot::channel();
let (dropped_tx, dropped_rx) = tokio::sync::oneshot::channel();
let handle = tokio::spawn(async move {
let _notify_on_drop = NotifyOnDrop(Some(dropped_tx));
let _ = started_tx.send(());
std::future::pending::<()>().await;
});
app.thread_event_listener_tasks.insert(thread_id, handle);
started_rx
.await
.expect("listener task should report it started");
app.reset_thread_event_state();
assert_eq!(app.thread_event_listener_tasks.is_empty(), true);
time::timeout(Duration::from_millis(50), dropped_rx)
.await
.expect("timed out waiting for listener task abort")
.expect("listener task drop notification should succeed");
}
#[tokio::test]
async fn enqueue_thread_event_does_not_block_when_channel_full() -> Result<()> {
let mut app = make_test_app().await;
@@ -3761,6 +4024,85 @@ mod tests {
assert!(app.chat_widget.pending_thread_approvals().is_empty());
}
#[tokio::test]
async fn inactive_thread_approval_bubbles_into_active_view() -> Result<()> {
let mut app = make_test_app().await;
let main_thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000011").expect("valid thread");
let agent_thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000022").expect("valid thread");
app.primary_thread_id = Some(main_thread_id);
app.active_thread_id = Some(main_thread_id);
app.thread_event_channels
.insert(main_thread_id, ThreadEventChannel::new(1));
app.thread_event_channels.insert(
agent_thread_id,
ThreadEventChannel::new_with_session_configured(
1,
Event {
id: String::new(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: agent_thread_id,
forked_from_id: None,
thread_name: None,
model: "gpt-5".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::OnRequest,
sandbox_policy: SandboxPolicy::new_workspace_write_policy(),
cwd: PathBuf::from("/tmp/agent"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
network_proxy: None,
rollout_path: Some(PathBuf::from("/tmp/agent-rollout.jsonl")),
}),
},
),
);
app.agent_picker_threads.insert(
agent_thread_id,
AgentPickerThreadEntry {
agent_nickname: Some("Robie".to_string()),
agent_role: Some("explorer".to_string()),
is_closed: false,
},
);
app.enqueue_thread_event(
agent_thread_id,
Event {
id: "ev-approval".to_string(),
msg: EventMsg::ExecApprovalRequest(
codex_protocol::protocol::ExecApprovalRequestEvent {
call_id: "call-approval".to_string(),
approval_id: None,
turn_id: "turn-approval".to_string(),
command: vec!["echo".to_string(), "hi".to_string()],
cwd: PathBuf::from("/tmp/agent"),
reason: Some("need approval".to_string()),
network_approval_context: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
additional_permissions: None,
available_decisions: None,
parsed_cmd: Vec::new(),
},
),
},
)
.await?;
assert_eq!(app.chat_widget.has_active_view(), true);
assert_eq!(
app.chat_widget.pending_thread_approvals(),
&["Robie [explorer]".to_string()]
);
Ok(())
}
#[test]
fn agent_picker_item_name_snapshot() {
let thread_id =
@@ -4041,6 +4383,7 @@ mod tests {
pending_shutdown_exit_thread_id: None,
windows_sandbox: WindowsSandboxState::default(),
thread_event_channels: HashMap::new(),
thread_event_listener_tasks: HashMap::new(),
agent_picker_threads: HashMap::new(),
active_thread_id: None,
active_thread_rx: None,
@@ -4100,6 +4443,7 @@ mod tests {
pending_shutdown_exit_thread_id: None,
windows_sandbox: WindowsSandboxState::default(),
thread_event_channels: HashMap::new(),
thread_event_listener_tasks: HashMap::new(),
agent_picker_threads: HashMap::new(),
active_thread_id: None,
active_thread_rx: None,