feat: propagate approval request of unsubscribed threads (#9232)

A thread can now be spawned by another thread. In order to process the
approval requests of such sub-threads, we need to detect those event and
show them in the TUI.

This is a temporary solution while the UX is being figured out. This PR
should be reverted once done
This commit is contained in:
jif-oai
2026-01-16 11:23:01 +01:00
committed by GitHub
parent 0cce6ebd83
commit f5b3e738fb
2 changed files with 180 additions and 14 deletions

View File

@@ -15,6 +15,8 @@ use crate::external_editor;
use crate::file_search::FileSearchManager;
use crate::history_cell;
use crate::history_cell::HistoryCell;
#[cfg(not(debug_assertions))]
use crate::history_cell::UpdateAvailableHistoryCell;
use crate::model_migration::ModelMigrationOutcome;
use crate::model_migration::migration_copy_for_models;
use crate::model_migration::run_model_migration_prompt;
@@ -37,6 +39,7 @@ use codex_core::models_manager::manager::RefreshStrategy;
use codex_core::models_manager::model_presets::HIDE_GPT_5_1_CODEX_MAX_MIGRATION_PROMPT_CONFIG;
use codex_core::models_manager::model_presets::HIDE_GPT5_1_MIGRATION_PROMPT_CONFIG;
use codex_core::protocol::DeprecationNoticeEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::FinalOutput;
use codex_core::protocol::ListSkillsResponseEvent;
@@ -58,6 +61,8 @@ use ratatui::text::Line;
use ratatui::widgets::Paragraph;
use ratatui::widgets::Wrap;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
@@ -66,11 +71,9 @@ use std::sync::atomic::Ordering;
use std::thread;
use std::time::Duration;
use tokio::select;
use tokio::sync::broadcast;
use tokio::sync::mpsc::unbounded_channel;
#[cfg(not(debug_assertions))]
use crate::history_cell::UpdateAvailableHistoryCell;
const EXTERNAL_EDITOR_HINT: &str = "Save and close external editor to continue.";
#[derive(Debug, Clone)]
@@ -347,6 +350,13 @@ pub(crate) struct App {
// One-shot suppression of the next world-writable scan after user confirmation.
skip_world_writable_scan_once: bool,
// TODO(jif) drop once new UX is here.
// Track external agent approvals spawned via AgentControl.
/// Map routed approval IDs to their originating external threads and original IDs.
external_approval_routes: HashMap<String, (ThreadId, String)>,
/// Buffered Codex events while external approvals are pending.
paused_codex_events: VecDeque<Event>,
}
impl App {
@@ -496,6 +506,8 @@ impl App {
pending_update_action: None,
suppress_shutdown_complete: false,
skip_world_writable_scan_once: false,
external_approval_routes: HashMap::new(),
paused_codex_events: VecDeque::new(),
};
// On startup, if Agent mode (workspace-write) or ReadOnly is active, warn about world-writable dirs on Windows.
@@ -548,6 +560,9 @@ impl App {
tui.frame_requester().schedule_frame();
let mut thread_created_rx = thread_manager.subscribe_thread_created();
let mut listen_for_threads = true;
let exit_reason = loop {
let control = select! {
Some(event) = app_event_rx.recv() => {
@@ -556,6 +571,21 @@ impl App {
Some(event) = tui_events.next() => {
app.handle_tui_event(tui, event).await?
}
// Listen on new thread creation due to collab tools.
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
app.handle_thread_created(thread_id).await?;
}
Err(broadcast::error::RecvError::Lagged(_)) => {
tracing::warn!("thread_created receiver lagged; skipping resync");
}
Err(broadcast::error::RecvError::Closed) => {
listen_for_threads = false;
}
}
AppRunControl::Continue
}
};
match control {
AppRunControl::Continue => {}
@@ -846,18 +876,15 @@ impl App {
self.chat_widget.on_commit_tick();
}
AppEvent::CodexEvent(event) => {
if self.suppress_shutdown_complete
&& matches!(event.msg, EventMsg::ShutdownComplete)
{
self.suppress_shutdown_complete = false;
if !self.external_approval_routes.is_empty() {
// Store the events while the approval is pending.
self.paused_codex_events.push_back(event);
return Ok(AppRunControl::Continue);
}
if let EventMsg::ListSkillsResponse(response) = &event.msg {
let cwd = self.chat_widget.config_ref().cwd.clone();
let errors = errors_for_cwd(&cwd, response);
emit_skill_load_warnings(&self.app_event_tx, &errors);
}
self.chat_widget.handle_codex_event(event);
self.handle_codex_event_now(event);
}
AppEvent::ExternalApprovalRequest { thread_id, event } => {
self.handle_external_approval_request(thread_id, event);
}
AppEvent::Exit(mode) => match mode {
ExitMode::ShutdownFirst => self.chat_widget.submit_op(Op::Shutdown),
@@ -868,7 +895,54 @@ impl App {
AppEvent::FatalExitRequest(message) => {
return Ok(AppRunControl::Exit(ExitReason::Fatal(message)));
}
AppEvent::CodexOp(op) => self.chat_widget.submit_op(op),
AppEvent::CodexOp(op) => match op {
// Catch potential approvals coming from an external thread and treat them
// directly. This support both command and patch approval. In such case
// the approval get transferred to the corresponding thread and the external
// approval map (`external_approval_routes`) is updated.
Op::ExecApproval { id, decision } => {
if let Some((thread_id, original_id)) =
self.external_approval_routes.remove(&id)
{
// Approval of a sub-agent.
self.forward_external_op(
thread_id,
Op::ExecApproval {
id: original_id,
decision,
},
)
.await;
self.finish_external_approval();
} else {
// This is an approval but not external.
self.chat_widget
.submit_op(Op::ExecApproval { id, decision });
}
}
Op::PatchApproval { id, decision } => {
if let Some((thread_id, original_id)) =
self.external_approval_routes.remove(&id)
{
// Approval of a sub-agent.
self.forward_external_op(
thread_id,
Op::PatchApproval {
id: original_id,
decision,
},
)
.await;
self.finish_external_approval();
} else {
// This is an approval but not external.
self.chat_widget
.submit_op(Op::PatchApproval { id, decision });
}
}
// Standard path where this is not an external approval response.
_ => self.chat_widget.submit_op(op),
},
AppEvent::DiffResult(text) => {
// Clear the in-progress state in the bottom pane
self.chat_widget.on_diff_complete();
@@ -1343,6 +1417,89 @@ impl App {
Ok(AppRunControl::Continue)
}
fn handle_codex_event_now(&mut self, event: Event) {
if self.suppress_shutdown_complete && matches!(event.msg, EventMsg::ShutdownComplete) {
self.suppress_shutdown_complete = false;
return;
}
if let EventMsg::ListSkillsResponse(response) = &event.msg {
let cwd = self.chat_widget.config_ref().cwd.clone();
let errors = errors_for_cwd(&cwd, response);
emit_skill_load_warnings(&self.app_event_tx, &errors);
}
self.chat_widget.handle_codex_event(event);
}
/// Routes external approval request events through the chat widget by
/// rewriting the event id to include the originating thread.
///
/// `thread_id` is the external thread that issued the approval request.
/// `event` is the approval request event whose id is rewritten so replies
/// can be routed back to the correct thread.
fn handle_external_approval_request(&mut self, thread_id: ThreadId, mut event: Event) {
match &mut event.msg {
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_) => {
let original_id = event.id.clone();
let routing_id = format!("{thread_id}:{original_id}");
self.external_approval_routes
.insert(routing_id.clone(), (thread_id, original_id));
event.id = routing_id;
}
_ => return,
}
self.chat_widget.handle_codex_event(event);
}
async fn forward_external_op(&self, thread_id: ThreadId, op: Op) {
let thread = match self.server.get_thread(thread_id).await {
Ok(thread) => thread,
Err(err) => {
tracing::warn!("failed to find thread {thread_id} for approval response: {err}");
return;
}
};
if let Err(err) = thread.submit(op).await {
tracing::warn!("failed to submit approval response to thread {thread_id}: {err}");
}
}
fn finish_external_approval(&mut self) {
if self.external_approval_routes.is_empty() {
while let Some(event) = self.paused_codex_events.pop_front() {
self.handle_codex_event_now(event);
}
}
}
async fn handle_thread_created(&mut self, thread_id: ThreadId) -> Result<()> {
let thread = match self.server.get_thread(thread_id).await {
Ok(thread) => thread,
Err(err) => {
tracing::warn!("failed to attach listener for thread {thread_id}: {err}");
return Ok(());
}
};
let app_event_tx = self.app_event_tx.clone();
tokio::spawn(async move {
loop {
let event = match thread.next_event().await {
Ok(event) => event,
Err(err) => {
tracing::debug!("external thread {thread_id} listener stopped: {err}");
break;
}
};
match event.msg {
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_) => {
app_event_tx.send(AppEvent::ExternalApprovalRequest { thread_id, event });
}
_ => {}
}
}
});
Ok(())
}
fn reasoning_label(reasoning_effort: Option<ReasoningEffortConfig>) -> &'static str {
match reasoning_effort {
Some(ReasoningEffortConfig::Minimal) => "minimal",
@@ -1598,6 +1755,8 @@ mod tests {
pending_update_action: None,
suppress_shutdown_complete: false,
skip_world_writable_scan_once: false,
external_approval_routes: HashMap::new(),
paused_codex_events: VecDeque::new(),
}
}
@@ -1638,6 +1797,8 @@ mod tests {
pending_update_action: None,
suppress_shutdown_complete: false,
skip_world_writable_scan_once: false,
external_approval_routes: HashMap::new(),
paused_codex_events: VecDeque::new(),
},
rx,
op_rx,

View File

@@ -14,6 +14,7 @@ use codex_common::approval_presets::ApprovalPreset;
use codex_core::protocol::Event;
use codex_core::protocol::RateLimitSnapshot;
use codex_file_search::FileMatch;
use codex_protocol::ThreadId;
use codex_protocol::openai_models::ModelPreset;
use crate::bottom_pane::ApprovalRequest;
@@ -41,6 +42,10 @@ pub(crate) enum WindowsSandboxFallbackReason {
#[derive(Debug)]
pub(crate) enum AppEvent {
CodexEvent(Event),
ExternalApprovalRequest {
thread_id: ThreadId,
event: Event,
},
/// Start a new session.
NewSession,