From f30f39b28b13e7048bfca7f7e8d4022e7840f443 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 23 Jan 2026 13:57:59 +0100 Subject: [PATCH] feat: tui beta for collab (#9690) https://github.com/user-attachments/assets/1ca07e7a-3d82-40da-a5b0-8ab2eef0bb69 --- codex-rs/tui/src/app.rs | 611 ++++++++++++++++++++------- codex-rs/tui/src/app_event.rs | 8 +- codex-rs/tui/src/chatwidget.rs | 138 ++++++ codex-rs/tui/src/chatwidget/agent.rs | 15 + codex-rs/tui/src/custom_terminal.rs | 13 + codex-rs/tui/src/slash_command.rs | 3 + 6 files changed, 642 insertions(+), 146 deletions(-) diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 8f945436b6..fa24e110f5 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -7,6 +7,9 @@ use crate::app_event::WindowsSandboxEnableMode; use crate::app_event::WindowsSandboxFallbackReason; use crate::app_event_sender::AppEventSender; use crate::bottom_pane::ApprovalRequest; +use crate::bottom_pane::SelectionItem; +use crate::bottom_pane::SelectionViewParams; +use crate::bottom_pane::popup_consts::standard_popup_hint_line; use crate::chatwidget::ChatWidget; use crate::chatwidget::ExternalEditorState; use crate::diff_render::DiffSummary; @@ -50,9 +53,11 @@ use codex_core::protocol::SkillErrorInfo; use codex_core::protocol::TokenUsage; use codex_otel::OtelManager; use codex_protocol::ThreadId; +use codex_protocol::items::TurnItem; use codex_protocol::openai_models::ModelPreset; use codex_protocol::openai_models::ModelUpgrade; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; +use codex_protocol::protocol::SessionConfiguredEvent; use color_eyre::eyre::Result; use color_eyre::eyre::WrapErr; use crossterm::event::KeyCode; @@ -64,6 +69,7 @@ use ratatui::widgets::Paragraph; use ratatui::widgets::Wrap; use std::collections::BTreeMap; use std::collections::HashMap; +use std::collections::HashSet; use std::collections::VecDeque; use std::path::Path; use std::path::PathBuf; @@ -74,10 +80,14 @@ use std::thread; use std::time::Duration; use std::time::Instant; use tokio::select; +use tokio::sync::Mutex; use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::unbounded_channel; const EXTERNAL_EDITOR_HINT: &str = "Save and close external editor to continue."; +const THREAD_EVENT_CHANNEL_CAPACITY: usize = 1024; #[derive(Debug, Clone)] pub struct AppExitInfo { @@ -168,6 +178,117 @@ struct SessionSummary { resume_command: Option, } +#[derive(Debug, Clone)] +struct ThreadEventSnapshot { + session_configured: Option, + events: Vec, +} + +#[derive(Debug)] +struct ThreadEventStore { + session_configured: Option, + buffer: VecDeque, + user_message_ids: HashSet, + capacity: usize, + active: bool, +} + +impl ThreadEventStore { + fn new(capacity: usize) -> Self { + Self { + session_configured: None, + buffer: VecDeque::new(), + user_message_ids: HashSet::new(), + capacity, + active: false, + } + } + + fn new_with_session_configured(capacity: usize, event: Event) -> Self { + let mut store = Self::new(capacity); + store.session_configured = Some(event); + store + } + + fn push_event(&mut self, event: Event) { + match &event.msg { + EventMsg::SessionConfigured(_) => { + self.session_configured = Some(event); + return; + } + EventMsg::ItemCompleted(completed) => { + if let TurnItem::UserMessage(item) = &completed.item { + if !event.id.is_empty() && self.user_message_ids.contains(&event.id) { + return; + } + let legacy = Event { + id: event.id, + msg: item.as_legacy_event(), + }; + self.push_legacy_event(legacy); + return; + } + } + _ => {} + } + + self.push_legacy_event(event); + } + + fn push_legacy_event(&mut self, event: Event) { + if let EventMsg::UserMessage(_) = &event.msg + && !event.id.is_empty() + && !self.user_message_ids.insert(event.id.clone()) + { + return; + } + self.buffer.push_back(event); + if self.buffer.len() > self.capacity + && let Some(removed) = self.buffer.pop_front() + && matches!(removed.msg, EventMsg::UserMessage(_)) + && !removed.id.is_empty() + { + self.user_message_ids.remove(&removed.id); + } + } + + fn snapshot(&self) -> ThreadEventSnapshot { + ThreadEventSnapshot { + session_configured: self.session_configured.clone(), + events: self.buffer.iter().cloned().collect(), + } + } +} + +#[derive(Debug)] +struct ThreadEventChannel { + sender: mpsc::Sender, + receiver: Option>, + store: Arc>, +} + +impl ThreadEventChannel { + fn new(capacity: usize) -> Self { + let (sender, receiver) = mpsc::channel(capacity); + Self { + sender, + receiver: Some(receiver), + store: Arc::new(Mutex::new(ThreadEventStore::new(capacity))), + } + } + + fn new_with_session_configured(capacity: usize, event: Event) -> Self { + let (sender, receiver) = mpsc::channel(capacity); + Self { + sender, + receiver: Some(receiver), + store: Arc::new(Mutex::new(ThreadEventStore::new_with_session_configured( + capacity, event, + ))), + } + } +} + fn should_show_model_migration_prompt( current_model: &str, target_model: &str, @@ -368,12 +489,12 @@ pub(crate) struct App { windows_sandbox: WindowsSandboxState, - // TODO(jif) drop once new UX is here. - // Track external agent approvals spawned via AgentControl. - /// Map routed approval IDs to their originating external threads and original IDs. - external_approval_routes: HashMap, - /// Buffered Codex events while external approvals are pending. - paused_codex_events: VecDeque, + thread_event_channels: HashMap, + active_thread_id: Option, + active_thread_rx: Option>, + primary_thread_id: Option, + primary_session_configured: Option, + pending_primary_events: VecDeque, } #[derive(Default)] @@ -415,6 +536,248 @@ impl App { } } + fn ensure_thread_channel(&mut self, thread_id: ThreadId) -> &mut ThreadEventChannel { + self.thread_event_channels + .entry(thread_id) + .or_insert_with(|| ThreadEventChannel::new(THREAD_EVENT_CHANNEL_CAPACITY)) + } + + async fn set_thread_active(&mut self, thread_id: ThreadId, active: bool) { + if let Some(channel) = self.thread_event_channels.get_mut(&thread_id) { + let mut store = channel.store.lock().await; + store.active = active; + } + } + + async fn activate_thread_channel(&mut self, thread_id: ThreadId) { + if self.active_thread_id.is_some() { + return; + } + self.set_thread_active(thread_id, true).await; + let receiver = if let Some(channel) = self.thread_event_channels.get_mut(&thread_id) { + channel.receiver.take() + } else { + None + }; + self.active_thread_id = Some(thread_id); + self.active_thread_rx = receiver; + } + + async fn store_active_thread_receiver(&mut self) { + let Some(active_id) = self.active_thread_id else { + return; + }; + let Some(receiver) = self.active_thread_rx.take() else { + return; + }; + if let Some(channel) = self.thread_event_channels.get_mut(&active_id) { + let mut store = channel.store.lock().await; + store.active = false; + channel.receiver = Some(receiver); + } + } + + async fn activate_thread_for_replay( + &mut self, + thread_id: ThreadId, + ) -> Option<(mpsc::Receiver, ThreadEventSnapshot)> { + let channel = self.thread_event_channels.get_mut(&thread_id)?; + let receiver = channel.receiver.take()?; + let mut store = channel.store.lock().await; + store.active = true; + let snapshot = store.snapshot(); + Some((receiver, snapshot)) + } + + async fn clear_active_thread(&mut self) { + if let Some(active_id) = self.active_thread_id.take() { + self.set_thread_active(active_id, false).await; + } + self.active_thread_rx = None; + } + + async fn enqueue_thread_event(&mut self, thread_id: ThreadId, event: Event) -> Result<()> { + let (sender, store) = { + let channel = self.ensure_thread_channel(thread_id); + (channel.sender.clone(), Arc::clone(&channel.store)) + }; + + let should_send = { + let mut guard = store.lock().await; + guard.push_event(event.clone()); + guard.active + }; + + if should_send && let Err(err) = sender.send(event).await { + tracing::warn!("thread {thread_id} event channel closed: {err}"); + } + Ok(()) + } + + async fn enqueue_primary_event(&mut self, event: Event) -> Result<()> { + if let Some(thread_id) = self.primary_thread_id { + return self.enqueue_thread_event(thread_id, event).await; + } + + if let EventMsg::SessionConfigured(session) = &event.msg { + let thread_id = session.session_id; + self.primary_thread_id = Some(thread_id); + self.primary_session_configured = Some(session.clone()); + self.ensure_thread_channel(thread_id); + self.activate_thread_channel(thread_id).await; + + let pending = std::mem::take(&mut self.pending_primary_events); + for pending_event in pending { + self.enqueue_thread_event(thread_id, pending_event).await?; + } + self.enqueue_thread_event(thread_id, event).await?; + } else { + self.pending_primary_events.push_back(event); + } + Ok(()) + } + + fn open_agent_picker(&mut self) { + if self.thread_event_channels.is_empty() { + self.chat_widget + .add_info_message("No agents available yet.".to_string(), None); + return; + } + + let mut thread_ids: Vec = self.thread_event_channels.keys().cloned().collect(); + thread_ids.sort_by_key(ToString::to_string); + + let mut initial_selected_idx = None; + let items: Vec = thread_ids + .iter() + .enumerate() + .map(|(idx, thread_id)| { + if self.active_thread_id == Some(*thread_id) { + initial_selected_idx = Some(idx); + } + let id = *thread_id; + SelectionItem { + name: thread_id.to_string(), + is_current: self.active_thread_id == Some(*thread_id), + actions: vec![Box::new(move |tx| { + tx.send(AppEvent::SelectAgentThread(id)); + })], + dismiss_on_select: true, + search_value: Some(thread_id.to_string()), + ..Default::default() + } + }) + .collect(); + + self.chat_widget.show_selection_view(SelectionViewParams { + title: Some("Agents".to_string()), + subtitle: Some("Select a thread to focus".to_string()), + footer_hint: Some(standard_popup_hint_line()), + items, + initial_selected_idx, + ..Default::default() + }); + } + + async fn select_agent_thread(&mut self, tui: &mut tui::Tui, thread_id: ThreadId) -> Result<()> { + if self.active_thread_id == Some(thread_id) { + return Ok(()); + } + + let thread = match self.server.get_thread(thread_id).await { + Ok(thread) => thread, + Err(err) => { + self.chat_widget.add_error_message(format!( + "Failed to attach to agent thread {thread_id}: {err}" + )); + return Ok(()); + } + }; + + let previous_thread_id = self.active_thread_id; + self.store_active_thread_receiver().await; + self.active_thread_id = None; + let Some((receiver, snapshot)) = self.activate_thread_for_replay(thread_id).await else { + self.chat_widget + .add_error_message(format!("Agent thread {thread_id} is already active.")); + if let Some(previous_thread_id) = previous_thread_id { + self.activate_thread_channel(previous_thread_id).await; + } + return Ok(()); + }; + + self.active_thread_id = Some(thread_id); + self.active_thread_rx = Some(receiver); + + let init = self.chatwidget_init_for_forked_or_resumed_thread(tui, self.config.clone()); + let codex_op_tx = crate::chatwidget::spawn_op_forwarder(thread); + self.chat_widget = ChatWidget::new_with_op_sender(init, codex_op_tx); + + self.reset_for_thread_switch(tui)?; + self.replay_thread_snapshot(snapshot); + self.drain_active_thread_events(tui).await?; + + Ok(()) + } + + fn reset_for_thread_switch(&mut self, tui: &mut tui::Tui) -> Result<()> { + self.overlay = None; + self.transcript_cells.clear(); + self.deferred_history_lines.clear(); + self.has_emitted_history_lines = false; + self.backtrack = BacktrackState::default(); + self.backtrack_render_pending = false; + tui.terminal.clear_scrollback()?; + tui.terminal.clear()?; + Ok(()) + } + + fn reset_thread_event_state(&mut self) { + self.thread_event_channels.clear(); + self.active_thread_id = None; + self.active_thread_rx = None; + self.primary_thread_id = None; + self.pending_primary_events.clear(); + } + + async fn drain_active_thread_events(&mut self, tui: &mut tui::Tui) -> Result<()> { + let Some(mut rx) = self.active_thread_rx.take() else { + return Ok(()); + }; + + let mut disconnected = false; + loop { + match rx.try_recv() { + Ok(event) => self.handle_codex_event_now(event), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + disconnected = true; + break; + } + } + } + + if !disconnected { + self.active_thread_rx = Some(rx); + } else { + self.clear_active_thread().await; + } + + if self.backtrack_render_pending { + tui.frame_requester().schedule_frame(); + } + Ok(()) + } + + fn replay_thread_snapshot(&mut self, snapshot: ThreadEventSnapshot) { + if let Some(event) = snapshot.session_configured { + self.handle_codex_event_replay(event); + } + for event in snapshot.events { + self.handle_codex_event_replay(event); + } + } + #[allow(clippy::too_many_arguments)] pub async fn run( tui: &mut tui::Tui, @@ -587,8 +950,12 @@ impl App { pending_update_action: None, suppress_shutdown_complete: false, windows_sandbox: WindowsSandboxState::default(), - external_approval_routes: HashMap::new(), - paused_codex_events: VecDeque::new(), + thread_event_channels: HashMap::new(), + active_thread_id: None, + active_thread_rx: None, + primary_thread_id: None, + primary_session_configured: None, + pending_primary_events: VecDeque::new(), }; // On startup, if Agent mode (workspace-write) or ReadOnly is active, warn about world-writable dirs on Windows. @@ -649,6 +1016,20 @@ impl App { Some(event) = app_event_rx.recv() => { app.handle_event(tui, event).await? } + active = async { + if let Some(rx) = app.active_thread_rx.as_mut() { + rx.recv().await + } else { + None + } + }, if app.active_thread_rx.is_some() => { + if let Some(event) = active { + app.handle_active_thread_event(tui, event)?; + } else { + app.clear_active_thread().await; + } + AppRunControl::Continue + } Some(event) = tui_events.next() => { app.handle_tui_event(tui, event).await? } @@ -759,6 +1140,7 @@ impl App { otel_manager: self.otel_manager.clone(), }; self.chat_widget = ChatWidget::new(init, self.server.clone()); + self.reset_thread_event_state(); if let Some(summary) = summary { let mut lines: Vec> = vec![summary.usage_line.clone().into()]; if let Some(command) = summary.resume_command { @@ -803,6 +1185,7 @@ impl App { resumed.thread, resumed.session_configured, ); + self.reset_thread_event_state(); if let Some(summary) = summary { let mut lines: Vec> = vec![summary.usage_line.clone().into()]; @@ -852,6 +1235,7 @@ impl App { forked.thread, forked.session_configured, ); + self.reset_thread_event_state(); if let Some(summary) = summary { let mut lines: Vec> = vec![summary.usage_line.clone().into()]; @@ -928,18 +1312,7 @@ impl App { self.chat_widget.on_commit_tick(); } AppEvent::CodexEvent(event) => { - if !self.external_approval_routes.is_empty() { - // Store the events while the approval is pending. - self.paused_codex_events.push_back(event); - return Ok(AppRunControl::Continue); - } - self.handle_codex_event_now(event); - if self.backtrack_render_pending { - tui.frame_requester().schedule_frame(); - } - } - AppEvent::ExternalApprovalRequest { thread_id, event } => { - self.handle_external_approval_request(thread_id, event); + self.enqueue_primary_event(event).await?; } AppEvent::Exit(mode) => match mode { ExitMode::ShutdownFirst => self.chat_widget.submit_op(Op::Shutdown), @@ -950,72 +1323,9 @@ impl App { AppEvent::FatalExitRequest(message) => { return Ok(AppRunControl::Exit(ExitReason::Fatal(message))); } - AppEvent::CodexOp(op) => match op { - // Catch potential approvals coming from an external thread and treat them - // directly. This support both command and patch approval. In such case - // the approval get transferred to the corresponding thread and the external - // approval map (`external_approval_routes`) is updated. - Op::ExecApproval { id, decision } => { - if let Some((thread_id, original_id)) = - self.external_approval_routes.remove(&id) - { - // Approval of a sub-agent. - self.forward_external_op( - thread_id, - Op::ExecApproval { - id: original_id, - decision, - }, - ) - .await; - self.finish_external_approval(); - } else { - // This is an approval but not external. - self.chat_widget - .submit_op(Op::ExecApproval { id, decision }); - } - } - Op::PatchApproval { id, decision } => { - if let Some((thread_id, original_id)) = - self.external_approval_routes.remove(&id) - { - // Approval of a sub-agent. - self.forward_external_op( - thread_id, - Op::PatchApproval { - id: original_id, - decision, - }, - ) - .await; - self.finish_external_approval(); - } else { - // This is an approval but not external. - self.chat_widget - .submit_op(Op::PatchApproval { id, decision }); - } - } - Op::UserInputAnswer { id, response } => { - if let Some((thread_id, original_id)) = - self.external_approval_routes.remove(&id) - { - self.forward_external_op( - thread_id, - Op::UserInputAnswer { - id: original_id, - response, - }, - ) - .await; - self.finish_external_approval(); - } else { - self.chat_widget - .submit_op(Op::UserInputAnswer { id, response }); - } - } - // Standard path where this is not an external approval response. - _ => self.chat_widget.submit_op(op), - }, + AppEvent::CodexOp(op) => { + self.chat_widget.submit_op(op); + } AppEvent::DiffResult(text) => { // Clear the in-progress state in the bottom pane self.chat_widget.on_diff_complete(); @@ -1481,6 +1791,12 @@ impl App { AppEvent::OpenApprovalsPopup => { self.chat_widget.open_approvals_popup(); } + AppEvent::OpenAgentPicker => { + self.open_agent_picker(); + } + AppEvent::SelectAgentThread(thread_id) => { + self.select_agent_thread(tui, thread_id).await?; + } AppEvent::OpenSkillsList => { self.chat_widget.open_skills_list(); } @@ -1584,56 +1900,23 @@ impl App { self.chat_widget.handle_codex_event(event); } - /// Routes external approval request events through the chat widget by - /// rewriting the event id to include the originating thread. - /// - /// `thread_id` is the external thread that issued the approval request. - /// `event` is the approval request event whose id is rewritten so replies - /// can be routed back to the correct thread. - fn handle_external_approval_request(&mut self, thread_id: ThreadId, mut event: Event) { - match &mut event.msg { - EventMsg::RequestUserInput(ev) => { - let original_id = ev.turn_id.clone(); - let routing_id = format!("{thread_id}:{original_id}"); - self.external_approval_routes - .insert(routing_id.clone(), (thread_id, original_id)); - ev.turn_id = routing_id.clone(); - event.id = routing_id; - } - EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_) => { - let original_id = event.id.clone(); - let routing_id = format!("{thread_id}:{original_id}"); - self.external_approval_routes - .insert(routing_id.clone(), (thread_id, original_id)); - event.id = routing_id; - } - _ => return, - } - self.chat_widget.handle_codex_event(event); + fn handle_codex_event_replay(&mut self, event: Event) { + self.handle_backtrack_event(&event.msg); + self.chat_widget.handle_codex_event_replay(event); } - async fn forward_external_op(&self, thread_id: ThreadId, op: Op) { - let thread = match self.server.get_thread(thread_id).await { - Ok(thread) => thread, - Err(err) => { - tracing::warn!("failed to find thread {thread_id} for approval response: {err}"); - return; - } - }; - if let Err(err) = thread.submit(op).await { - tracing::warn!("failed to submit approval response to thread {thread_id}: {err}"); - } - } - - fn finish_external_approval(&mut self) { - if self.external_approval_routes.is_empty() { - while let Some(event) = self.paused_codex_events.pop_front() { - self.handle_codex_event_now(event); - } + fn handle_active_thread_event(&mut self, tui: &mut tui::Tui, event: Event) -> Result<()> { + self.handle_codex_event_now(event); + if self.backtrack_render_pending { + tui.frame_requester().schedule_frame(); } + Ok(()) } async fn handle_thread_created(&mut self, thread_id: ThreadId) -> Result<()> { + if self.thread_event_channels.contains_key(&thread_id) { + return Ok(()); + } let thread = match self.server.get_thread(thread_id).await { Ok(thread) => thread, Err(err) => { @@ -1641,7 +1924,15 @@ impl App { return Ok(()); } }; - let app_event_tx = self.app_event_tx.clone(); + let event = Event { + id: String::new(), + msg: EventMsg::SessionConfigured(self.session_configured_for_thread(thread_id)), + }; + let channel = + ThreadEventChannel::new_with_session_configured(THREAD_EVENT_CHANNEL_CAPACITY, event); + let sender = channel.sender.clone(); + let store = Arc::clone(&channel.store); + self.thread_event_channels.insert(thread_id, channel); tokio::spawn(async move { loop { let event = match thread.next_event().await { @@ -1651,19 +1942,47 @@ impl App { break; } }; - match event.msg { - EventMsg::ExecApprovalRequest(_) - | EventMsg::ApplyPatchApprovalRequest(_) - | EventMsg::RequestUserInput(_) => { - app_event_tx.send(AppEvent::ExternalApprovalRequest { thread_id, event }); - } - _ => {} + let should_send = { + let mut guard = store.lock().await; + guard.push_event(event.clone()); + guard.active + }; + if should_send && let Err(err) = sender.send(event).await { + tracing::debug!("external thread {thread_id} channel closed: {err}"); + break; } } }); Ok(()) } + fn session_configured_for_thread(&self, thread_id: ThreadId) -> SessionConfiguredEvent { + let mut session_configured = + self.primary_session_configured + .clone() + .unwrap_or_else(|| SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + model: self.chat_widget.current_model().to_string(), + model_provider_id: self.config.model_provider_id.clone(), + approval_policy: *self.config.approval_policy.get(), + sandbox_policy: self.config.sandbox_policy.get().clone(), + cwd: self.config.cwd.clone(), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + rollout_path: PathBuf::new(), + }); + session_configured.session_id = thread_id; + session_configured.forked_from_id = None; + session_configured.history_log_id = 0; + session_configured.history_entry_count = 0; + session_configured.initial_messages = None; + session_configured.rollout_path = PathBuf::new(); + session_configured + } + fn reasoning_label(reasoning_effort: Option) -> &'static str { match reasoning_effort { Some(ReasoningEffortConfig::Minimal) => "minimal", @@ -1926,8 +2245,12 @@ mod tests { pending_update_action: None, suppress_shutdown_complete: false, windows_sandbox: WindowsSandboxState::default(), - external_approval_routes: HashMap::new(), - paused_codex_events: VecDeque::new(), + thread_event_channels: HashMap::new(), + active_thread_id: None, + active_thread_rx: None, + primary_thread_id: None, + primary_session_configured: None, + pending_primary_events: VecDeque::new(), } } @@ -1970,8 +2293,12 @@ mod tests { pending_update_action: None, suppress_shutdown_complete: false, windows_sandbox: WindowsSandboxState::default(), - external_approval_routes: HashMap::new(), - paused_codex_events: VecDeque::new(), + thread_event_channels: HashMap::new(), + active_thread_id: None, + active_thread_rx: None, + primary_thread_id: None, + primary_session_configured: None, + pending_primary_events: VecDeque::new(), }, rx, op_rx, diff --git a/codex-rs/tui/src/app_event.rs b/codex-rs/tui/src/app_event.rs index b74899795c..f6ed6e8abc 100644 --- a/codex-rs/tui/src/app_event.rs +++ b/codex-rs/tui/src/app_event.rs @@ -43,10 +43,10 @@ pub(crate) enum WindowsSandboxFallbackReason { #[derive(Debug)] pub(crate) enum AppEvent { CodexEvent(Event), - ExternalApprovalRequest { - thread_id: ThreadId, - event: Event, - }, + /// Open the agent picker for switching active threads. + OpenAgentPicker, + /// Switch the active thread to the selected agent. + SelectAgentThread(ThreadId), /// Start a new session. NewSession, diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 93158c41cc..5a96e4e381 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -178,6 +178,7 @@ use self::interrupts::InterruptManager; mod agent; use self::agent::spawn_agent; use self::agent::spawn_agent_from_existing; +pub(crate) use self::agent::spawn_op_forwarder; mod session_header; use self::session_header::SessionHeader; mod skills; @@ -2030,6 +2031,127 @@ impl ChatWidget { widget } + pub(crate) fn new_with_op_sender( + common: ChatWidgetInit, + codex_op_tx: UnboundedSender, + ) -> Self { + let ChatWidgetInit { + config, + frame_requester, + app_event_tx, + initial_user_message, + enhanced_keys_supported, + auth_manager, + models_manager, + feedback, + is_first_run, + model, + otel_manager, + } = common; + let model = model.filter(|m| !m.trim().is_empty()); + let mut config = config; + config.model = model.clone(); + let mut rng = rand::rng(); + let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string(); + + let model_for_header = model.unwrap_or_else(|| DEFAULT_MODEL_DISPLAY_NAME.to_string()); + let fallback_custom = Settings { + model: model_for_header.clone(), + reasoning_effort: None, + developer_instructions: None, + }; + let stored_collaboration_mode = if config.features.enabled(Feature::CollaborationModes) { + initial_collaboration_mode( + models_manager.as_ref(), + fallback_custom, + config.experimental_mode, + ) + } else { + CollaborationMode::Custom(fallback_custom) + }; + + let active_cell = Some(Self::placeholder_session_header_cell( + &config, + config.features.enabled(Feature::CollaborationModes), + stored_collaboration_mode.clone(), + )); + + let mut widget = Self { + app_event_tx: app_event_tx.clone(), + frame_requester: frame_requester.clone(), + codex_op_tx, + bottom_pane: BottomPane::new(BottomPaneParams { + frame_requester, + app_event_tx, + has_input_focus: true, + enhanced_keys_supported, + placeholder_text: placeholder, + disable_paste_burst: config.disable_paste_burst, + animations_enabled: config.animations, + skills: None, + }), + active_cell, + active_cell_revision: 0, + config, + skills_all: Vec::new(), + skills_initial_state: None, + stored_collaboration_mode, + auth_manager, + models_manager, + otel_manager, + session_header: SessionHeader::new(model_for_header), + initial_user_message, + token_info: None, + rate_limit_snapshot: None, + plan_type: None, + rate_limit_warnings: RateLimitWarningState::default(), + rate_limit_switch_prompt: RateLimitSwitchPromptState::default(), + rate_limit_poller: None, + stream_controller: None, + running_commands: HashMap::new(), + suppressed_exec_calls: HashSet::new(), + last_unified_wait: None, + unified_exec_wait_streak: None, + task_complete_pending: false, + unified_exec_processes: Vec::new(), + agent_turn_running: false, + mcp_startup_status: None, + interrupts: InterruptManager::new(), + reasoning_buffer: String::new(), + full_reasoning_buffer: String::new(), + current_status_header: String::from("Working"), + retry_status_header: None, + thread_id: None, + forked_from: None, + saw_plan_update_this_turn: false, + queued_user_messages: VecDeque::new(), + show_welcome_banner: is_first_run, + suppress_session_configured_redraw: false, + pending_notification: None, + quit_shortcut_expires_at: None, + quit_shortcut_key: None, + is_review_mode: false, + pre_review_token_info: None, + needs_final_message_separator: false, + had_work_activity: false, + last_separator_elapsed_secs: None, + last_rendered_width: std::cell::Cell::new(None), + feedback, + current_rollout_path: None, + external_editor_state: ExternalEditorState::Closed, + }; + + widget.prefetch_rate_limits(); + widget + .bottom_pane + .set_steer_enabled(widget.config.features.enabled(Feature::Steer)); + widget.bottom_pane.set_collaboration_modes_enabled( + widget.config.features.enabled(Feature::CollaborationModes), + ); + + widget + } + /// Create a ChatWidget attached to an existing conversation (e.g., a fork). pub(crate) fn new_from_existing( common: ChatWidgetInit, @@ -2316,6 +2438,11 @@ impl ChatWidget { self.bottom_pane.set_footer_hint_override(items); } + pub(crate) fn show_selection_view(&mut self, params: SelectionViewParams) { + self.bottom_pane.show_selection_view(params); + self.request_redraw(); + } + pub(crate) fn can_launch_external_editor(&self) -> bool { self.bottom_pane.can_launch_external_editor() } @@ -2380,6 +2507,9 @@ impl ChatWidget { self.open_collaboration_modes_popup(); } } + SlashCommand::Agent => { + self.app_event_tx.send(AppEvent::OpenAgentPicker); + } SlashCommand::Approvals => { self.open_approvals_popup(); } @@ -2748,6 +2878,14 @@ impl ChatWidget { self.dispatch_event_msg(Some(id), msg, false); } + pub(crate) fn handle_codex_event_replay(&mut self, event: Event) { + let Event { msg, .. } = event; + if matches!(msg, EventMsg::ShutdownComplete) { + return; + } + self.dispatch_event_msg(None, msg, true); + } + /// Dispatch a protocol `EventMsg` to the appropriate handler. /// /// `id` is `Some` for live events and `None` for replayed events from diff --git a/codex-rs/tui/src/chatwidget/agent.rs b/codex-rs/tui/src/chatwidget/agent.rs index b24233c2af..c902e5c638 100644 --- a/codex-rs/tui/src/chatwidget/agent.rs +++ b/codex-rs/tui/src/chatwidget/agent.rs @@ -105,3 +105,18 @@ pub(crate) fn spawn_agent_from_existing( codex_op_tx } + +/// Spawn an op-forwarding loop for an existing thread without subscribing to events. +pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc) -> UnboundedSender { + let (codex_op_tx, mut codex_op_rx) = unbounded_channel::(); + + tokio::spawn(async move { + while let Some(op) = codex_op_rx.recv().await { + if let Err(e) = thread.submit(op).await { + tracing::error!("failed to submit op: {e}"); + } + } + }); + + codex_op_tx +} diff --git a/codex-rs/tui/src/custom_terminal.rs b/codex-rs/tui/src/custom_terminal.rs index 46d16a83f0..da2f6d5f2e 100644 --- a/codex-rs/tui/src/custom_terminal.rs +++ b/codex-rs/tui/src/custom_terminal.rs @@ -381,6 +381,19 @@ where Ok(()) } + /// Clear terminal scrollback (if supported) and force a full redraw. + pub fn clear_scrollback(&mut self) -> io::Result<()> { + if self.viewport_area.is_empty() { + return Ok(()); + } + self.backend + .set_cursor_position(self.viewport_area.as_position())?; + queue!(self.backend, Clear(crossterm::terminal::ClearType::Purge))?; + std::io::Write::flush(&mut self.backend)?; + self.previous_buffer_mut().reset(); + Ok(()) + } + /// Clears the inactive buffer and swaps it with the current buffer pub fn swap_buffers(&mut self) { self.previous_buffer_mut().reset(); diff --git a/codex-rs/tui/src/slash_command.rs b/codex-rs/tui/src/slash_command.rs index f825f32fa8..2562d5e689 100644 --- a/codex-rs/tui/src/slash_command.rs +++ b/codex-rs/tui/src/slash_command.rs @@ -26,6 +26,7 @@ pub enum SlashCommand { Init, Compact, Collab, + Agent, // Undo, Diff, Mention, @@ -60,6 +61,7 @@ impl SlashCommand { SlashCommand::Ps => "list background terminals", SlashCommand::Model => "choose what model and reasoning effort to use", SlashCommand::Collab => "change collaboration mode (experimental)", + SlashCommand::Agent => "switch the active agent thread", SlashCommand::Approvals => "choose what Codex can do without approval", SlashCommand::Permissions => "choose what Codex is allowed to do", SlashCommand::ElevateSandbox => "set up elevated agent sandbox", @@ -105,6 +107,7 @@ impl SlashCommand { SlashCommand::Rollout => true, SlashCommand::TestApproval => true, SlashCommand::Collab => true, + SlashCommand::Agent => true, } }