diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index f1d6ca5da4..87d76909de 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -15,6 +15,8 @@ use crate::external_editor; use crate::file_search::FileSearchManager; use crate::history_cell; use crate::history_cell::HistoryCell; +#[cfg(not(debug_assertions))] +use crate::history_cell::UpdateAvailableHistoryCell; use crate::model_migration::ModelMigrationOutcome; use crate::model_migration::migration_copy_for_models; use crate::model_migration::run_model_migration_prompt; @@ -37,6 +39,7 @@ use codex_core::models_manager::manager::RefreshStrategy; use codex_core::models_manager::model_presets::HIDE_GPT_5_1_CODEX_MAX_MIGRATION_PROMPT_CONFIG; use codex_core::models_manager::model_presets::HIDE_GPT5_1_MIGRATION_PROMPT_CONFIG; use codex_core::protocol::DeprecationNoticeEvent; +use codex_core::protocol::Event; use codex_core::protocol::EventMsg; use codex_core::protocol::FinalOutput; use codex_core::protocol::ListSkillsResponseEvent; @@ -58,6 +61,8 @@ use ratatui::text::Line; use ratatui::widgets::Paragraph; use ratatui::widgets::Wrap; use std::collections::BTreeMap; +use std::collections::HashMap; +use std::collections::VecDeque; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; @@ -66,11 +71,9 @@ use std::sync::atomic::Ordering; use std::thread; use std::time::Duration; use tokio::select; +use tokio::sync::broadcast; use tokio::sync::mpsc::unbounded_channel; -#[cfg(not(debug_assertions))] -use crate::history_cell::UpdateAvailableHistoryCell; - const EXTERNAL_EDITOR_HINT: &str = "Save and close external editor to continue."; #[derive(Debug, Clone)] @@ -347,6 +350,13 @@ pub(crate) struct App { // One-shot suppression of the next world-writable scan after user confirmation. skip_world_writable_scan_once: bool, + + // TODO(jif) drop once new UX is here. + // Track external agent approvals spawned via AgentControl. + /// Map routed approval IDs to their originating external threads and original IDs. + external_approval_routes: HashMap, + /// Buffered Codex events while external approvals are pending. + paused_codex_events: VecDeque, } impl App { @@ -496,6 +506,8 @@ impl App { pending_update_action: None, suppress_shutdown_complete: false, skip_world_writable_scan_once: false, + external_approval_routes: HashMap::new(), + paused_codex_events: VecDeque::new(), }; // On startup, if Agent mode (workspace-write) or ReadOnly is active, warn about world-writable dirs on Windows. @@ -548,6 +560,9 @@ impl App { tui.frame_requester().schedule_frame(); + let mut thread_created_rx = thread_manager.subscribe_thread_created(); + let mut listen_for_threads = true; + let exit_reason = loop { let control = select! { Some(event) = app_event_rx.recv() => { @@ -556,6 +571,21 @@ impl App { Some(event) = tui_events.next() => { app.handle_tui_event(tui, event).await? } + // Listen on new thread creation due to collab tools. + created = thread_created_rx.recv(), if listen_for_threads => { + match created { + Ok(thread_id) => { + app.handle_thread_created(thread_id).await?; + } + Err(broadcast::error::RecvError::Lagged(_)) => { + tracing::warn!("thread_created receiver lagged; skipping resync"); + } + Err(broadcast::error::RecvError::Closed) => { + listen_for_threads = false; + } + } + AppRunControl::Continue + } }; match control { AppRunControl::Continue => {} @@ -846,18 +876,15 @@ impl App { self.chat_widget.on_commit_tick(); } AppEvent::CodexEvent(event) => { - if self.suppress_shutdown_complete - && matches!(event.msg, EventMsg::ShutdownComplete) - { - self.suppress_shutdown_complete = false; + if !self.external_approval_routes.is_empty() { + // Store the events while the approval is pending. + self.paused_codex_events.push_back(event); return Ok(AppRunControl::Continue); } - if let EventMsg::ListSkillsResponse(response) = &event.msg { - let cwd = self.chat_widget.config_ref().cwd.clone(); - let errors = errors_for_cwd(&cwd, response); - emit_skill_load_warnings(&self.app_event_tx, &errors); - } - self.chat_widget.handle_codex_event(event); + self.handle_codex_event_now(event); + } + AppEvent::ExternalApprovalRequest { thread_id, event } => { + self.handle_external_approval_request(thread_id, event); } AppEvent::Exit(mode) => match mode { ExitMode::ShutdownFirst => self.chat_widget.submit_op(Op::Shutdown), @@ -868,7 +895,54 @@ impl App { AppEvent::FatalExitRequest(message) => { return Ok(AppRunControl::Exit(ExitReason::Fatal(message))); } - AppEvent::CodexOp(op) => self.chat_widget.submit_op(op), + AppEvent::CodexOp(op) => match op { + // Catch potential approvals coming from an external thread and treat them + // directly. This support both command and patch approval. In such case + // the approval get transferred to the corresponding thread and the external + // approval map (`external_approval_routes`) is updated. + Op::ExecApproval { id, decision } => { + if let Some((thread_id, original_id)) = + self.external_approval_routes.remove(&id) + { + // Approval of a sub-agent. + self.forward_external_op( + thread_id, + Op::ExecApproval { + id: original_id, + decision, + }, + ) + .await; + self.finish_external_approval(); + } else { + // This is an approval but not external. + self.chat_widget + .submit_op(Op::ExecApproval { id, decision }); + } + } + Op::PatchApproval { id, decision } => { + if let Some((thread_id, original_id)) = + self.external_approval_routes.remove(&id) + { + // Approval of a sub-agent. + self.forward_external_op( + thread_id, + Op::PatchApproval { + id: original_id, + decision, + }, + ) + .await; + self.finish_external_approval(); + } else { + // This is an approval but not external. + self.chat_widget + .submit_op(Op::PatchApproval { id, decision }); + } + } + // Standard path where this is not an external approval response. + _ => self.chat_widget.submit_op(op), + }, AppEvent::DiffResult(text) => { // Clear the in-progress state in the bottom pane self.chat_widget.on_diff_complete(); @@ -1343,6 +1417,89 @@ impl App { Ok(AppRunControl::Continue) } + fn handle_codex_event_now(&mut self, event: Event) { + if self.suppress_shutdown_complete && matches!(event.msg, EventMsg::ShutdownComplete) { + self.suppress_shutdown_complete = false; + return; + } + if let EventMsg::ListSkillsResponse(response) = &event.msg { + let cwd = self.chat_widget.config_ref().cwd.clone(); + let errors = errors_for_cwd(&cwd, response); + emit_skill_load_warnings(&self.app_event_tx, &errors); + } + self.chat_widget.handle_codex_event(event); + } + + /// Routes external approval request events through the chat widget by + /// rewriting the event id to include the originating thread. + /// + /// `thread_id` is the external thread that issued the approval request. + /// `event` is the approval request event whose id is rewritten so replies + /// can be routed back to the correct thread. + fn handle_external_approval_request(&mut self, thread_id: ThreadId, mut event: Event) { + match &mut event.msg { + EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_) => { + let original_id = event.id.clone(); + let routing_id = format!("{thread_id}:{original_id}"); + self.external_approval_routes + .insert(routing_id.clone(), (thread_id, original_id)); + event.id = routing_id; + } + _ => return, + } + self.chat_widget.handle_codex_event(event); + } + + async fn forward_external_op(&self, thread_id: ThreadId, op: Op) { + let thread = match self.server.get_thread(thread_id).await { + Ok(thread) => thread, + Err(err) => { + tracing::warn!("failed to find thread {thread_id} for approval response: {err}"); + return; + } + }; + if let Err(err) = thread.submit(op).await { + tracing::warn!("failed to submit approval response to thread {thread_id}: {err}"); + } + } + + fn finish_external_approval(&mut self) { + if self.external_approval_routes.is_empty() { + while let Some(event) = self.paused_codex_events.pop_front() { + self.handle_codex_event_now(event); + } + } + } + + async fn handle_thread_created(&mut self, thread_id: ThreadId) -> Result<()> { + let thread = match self.server.get_thread(thread_id).await { + Ok(thread) => thread, + Err(err) => { + tracing::warn!("failed to attach listener for thread {thread_id}: {err}"); + return Ok(()); + } + }; + let app_event_tx = self.app_event_tx.clone(); + tokio::spawn(async move { + loop { + let event = match thread.next_event().await { + Ok(event) => event, + Err(err) => { + tracing::debug!("external thread {thread_id} listener stopped: {err}"); + break; + } + }; + match event.msg { + EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_) => { + app_event_tx.send(AppEvent::ExternalApprovalRequest { thread_id, event }); + } + _ => {} + } + } + }); + Ok(()) + } + fn reasoning_label(reasoning_effort: Option) -> &'static str { match reasoning_effort { Some(ReasoningEffortConfig::Minimal) => "minimal", @@ -1598,6 +1755,8 @@ mod tests { pending_update_action: None, suppress_shutdown_complete: false, skip_world_writable_scan_once: false, + external_approval_routes: HashMap::new(), + paused_codex_events: VecDeque::new(), } } @@ -1638,6 +1797,8 @@ mod tests { pending_update_action: None, suppress_shutdown_complete: false, skip_world_writable_scan_once: false, + external_approval_routes: HashMap::new(), + paused_codex_events: VecDeque::new(), }, rx, op_rx, diff --git a/codex-rs/tui/src/app_event.rs b/codex-rs/tui/src/app_event.rs index 6ce5d2cae5..785f3ae2ce 100644 --- a/codex-rs/tui/src/app_event.rs +++ b/codex-rs/tui/src/app_event.rs @@ -14,6 +14,7 @@ use codex_common::approval_presets::ApprovalPreset; use codex_core::protocol::Event; use codex_core::protocol::RateLimitSnapshot; use codex_file_search::FileMatch; +use codex_protocol::ThreadId; use codex_protocol::openai_models::ModelPreset; use crate::bottom_pane::ApprovalRequest; @@ -41,6 +42,10 @@ pub(crate) enum WindowsSandboxFallbackReason { #[derive(Debug)] pub(crate) enum AppEvent { CodexEvent(Event), + ExternalApprovalRequest { + thread_id: ThreadId, + event: Event, + }, /// Start a new session. NewSession,