mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
feat: tui beta for collab (#9690)
https://github.com/user-attachments/assets/1ca07e7a-3d82-40da-a5b0-8ab2eef0bb69
This commit is contained in:
@@ -7,6 +7,9 @@ use crate::app_event::WindowsSandboxEnableMode;
|
||||
use crate::app_event::WindowsSandboxFallbackReason;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
use crate::bottom_pane::ApprovalRequest;
|
||||
use crate::bottom_pane::SelectionItem;
|
||||
use crate::bottom_pane::SelectionViewParams;
|
||||
use crate::bottom_pane::popup_consts::standard_popup_hint_line;
|
||||
use crate::chatwidget::ChatWidget;
|
||||
use crate::chatwidget::ExternalEditorState;
|
||||
use crate::diff_render::DiffSummary;
|
||||
@@ -50,9 +53,11 @@ use codex_core::protocol::SkillErrorInfo;
|
||||
use codex_core::protocol::TokenUsage;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::openai_models::ModelPreset;
|
||||
use codex_protocol::openai_models::ModelUpgrade;
|
||||
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::protocol::SessionConfiguredEvent;
|
||||
use color_eyre::eyre::Result;
|
||||
use color_eyre::eyre::WrapErr;
|
||||
use crossterm::event::KeyCode;
|
||||
@@ -64,6 +69,7 @@ use ratatui::widgets::Paragraph;
|
||||
use ratatui::widgets::Wrap;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::VecDeque;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
@@ -74,10 +80,14 @@ use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::select;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
const EXTERNAL_EDITOR_HINT: &str = "Save and close external editor to continue.";
|
||||
const THREAD_EVENT_CHANNEL_CAPACITY: usize = 1024;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AppExitInfo {
|
||||
@@ -168,6 +178,117 @@ struct SessionSummary {
|
||||
resume_command: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct ThreadEventSnapshot {
|
||||
session_configured: Option<Event>,
|
||||
events: Vec<Event>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ThreadEventStore {
|
||||
session_configured: Option<Event>,
|
||||
buffer: VecDeque<Event>,
|
||||
user_message_ids: HashSet<String>,
|
||||
capacity: usize,
|
||||
active: bool,
|
||||
}
|
||||
|
||||
impl ThreadEventStore {
|
||||
fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
session_configured: None,
|
||||
buffer: VecDeque::new(),
|
||||
user_message_ids: HashSet::new(),
|
||||
capacity,
|
||||
active: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn new_with_session_configured(capacity: usize, event: Event) -> Self {
|
||||
let mut store = Self::new(capacity);
|
||||
store.session_configured = Some(event);
|
||||
store
|
||||
}
|
||||
|
||||
fn push_event(&mut self, event: Event) {
|
||||
match &event.msg {
|
||||
EventMsg::SessionConfigured(_) => {
|
||||
self.session_configured = Some(event);
|
||||
return;
|
||||
}
|
||||
EventMsg::ItemCompleted(completed) => {
|
||||
if let TurnItem::UserMessage(item) = &completed.item {
|
||||
if !event.id.is_empty() && self.user_message_ids.contains(&event.id) {
|
||||
return;
|
||||
}
|
||||
let legacy = Event {
|
||||
id: event.id,
|
||||
msg: item.as_legacy_event(),
|
||||
};
|
||||
self.push_legacy_event(legacy);
|
||||
return;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
self.push_legacy_event(event);
|
||||
}
|
||||
|
||||
fn push_legacy_event(&mut self, event: Event) {
|
||||
if let EventMsg::UserMessage(_) = &event.msg
|
||||
&& !event.id.is_empty()
|
||||
&& !self.user_message_ids.insert(event.id.clone())
|
||||
{
|
||||
return;
|
||||
}
|
||||
self.buffer.push_back(event);
|
||||
if self.buffer.len() > self.capacity
|
||||
&& let Some(removed) = self.buffer.pop_front()
|
||||
&& matches!(removed.msg, EventMsg::UserMessage(_))
|
||||
&& !removed.id.is_empty()
|
||||
{
|
||||
self.user_message_ids.remove(&removed.id);
|
||||
}
|
||||
}
|
||||
|
||||
fn snapshot(&self) -> ThreadEventSnapshot {
|
||||
ThreadEventSnapshot {
|
||||
session_configured: self.session_configured.clone(),
|
||||
events: self.buffer.iter().cloned().collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ThreadEventChannel {
|
||||
sender: mpsc::Sender<Event>,
|
||||
receiver: Option<mpsc::Receiver<Event>>,
|
||||
store: Arc<Mutex<ThreadEventStore>>,
|
||||
}
|
||||
|
||||
impl ThreadEventChannel {
|
||||
fn new(capacity: usize) -> Self {
|
||||
let (sender, receiver) = mpsc::channel(capacity);
|
||||
Self {
|
||||
sender,
|
||||
receiver: Some(receiver),
|
||||
store: Arc::new(Mutex::new(ThreadEventStore::new(capacity))),
|
||||
}
|
||||
}
|
||||
|
||||
fn new_with_session_configured(capacity: usize, event: Event) -> Self {
|
||||
let (sender, receiver) = mpsc::channel(capacity);
|
||||
Self {
|
||||
sender,
|
||||
receiver: Some(receiver),
|
||||
store: Arc::new(Mutex::new(ThreadEventStore::new_with_session_configured(
|
||||
capacity, event,
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn should_show_model_migration_prompt(
|
||||
current_model: &str,
|
||||
target_model: &str,
|
||||
@@ -368,12 +489,12 @@ pub(crate) struct App {
|
||||
|
||||
windows_sandbox: WindowsSandboxState,
|
||||
|
||||
// TODO(jif) drop once new UX is here.
|
||||
// Track external agent approvals spawned via AgentControl.
|
||||
/// Map routed approval IDs to their originating external threads and original IDs.
|
||||
external_approval_routes: HashMap<String, (ThreadId, String)>,
|
||||
/// Buffered Codex events while external approvals are pending.
|
||||
paused_codex_events: VecDeque<Event>,
|
||||
thread_event_channels: HashMap<ThreadId, ThreadEventChannel>,
|
||||
active_thread_id: Option<ThreadId>,
|
||||
active_thread_rx: Option<mpsc::Receiver<Event>>,
|
||||
primary_thread_id: Option<ThreadId>,
|
||||
primary_session_configured: Option<SessionConfiguredEvent>,
|
||||
pending_primary_events: VecDeque<Event>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -415,6 +536,248 @@ impl App {
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_thread_channel(&mut self, thread_id: ThreadId) -> &mut ThreadEventChannel {
|
||||
self.thread_event_channels
|
||||
.entry(thread_id)
|
||||
.or_insert_with(|| ThreadEventChannel::new(THREAD_EVENT_CHANNEL_CAPACITY))
|
||||
}
|
||||
|
||||
async fn set_thread_active(&mut self, thread_id: ThreadId, active: bool) {
|
||||
if let Some(channel) = self.thread_event_channels.get_mut(&thread_id) {
|
||||
let mut store = channel.store.lock().await;
|
||||
store.active = active;
|
||||
}
|
||||
}
|
||||
|
||||
async fn activate_thread_channel(&mut self, thread_id: ThreadId) {
|
||||
if self.active_thread_id.is_some() {
|
||||
return;
|
||||
}
|
||||
self.set_thread_active(thread_id, true).await;
|
||||
let receiver = if let Some(channel) = self.thread_event_channels.get_mut(&thread_id) {
|
||||
channel.receiver.take()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
self.active_thread_id = Some(thread_id);
|
||||
self.active_thread_rx = receiver;
|
||||
}
|
||||
|
||||
async fn store_active_thread_receiver(&mut self) {
|
||||
let Some(active_id) = self.active_thread_id else {
|
||||
return;
|
||||
};
|
||||
let Some(receiver) = self.active_thread_rx.take() else {
|
||||
return;
|
||||
};
|
||||
if let Some(channel) = self.thread_event_channels.get_mut(&active_id) {
|
||||
let mut store = channel.store.lock().await;
|
||||
store.active = false;
|
||||
channel.receiver = Some(receiver);
|
||||
}
|
||||
}
|
||||
|
||||
async fn activate_thread_for_replay(
|
||||
&mut self,
|
||||
thread_id: ThreadId,
|
||||
) -> Option<(mpsc::Receiver<Event>, ThreadEventSnapshot)> {
|
||||
let channel = self.thread_event_channels.get_mut(&thread_id)?;
|
||||
let receiver = channel.receiver.take()?;
|
||||
let mut store = channel.store.lock().await;
|
||||
store.active = true;
|
||||
let snapshot = store.snapshot();
|
||||
Some((receiver, snapshot))
|
||||
}
|
||||
|
||||
async fn clear_active_thread(&mut self) {
|
||||
if let Some(active_id) = self.active_thread_id.take() {
|
||||
self.set_thread_active(active_id, false).await;
|
||||
}
|
||||
self.active_thread_rx = None;
|
||||
}
|
||||
|
||||
async fn enqueue_thread_event(&mut self, thread_id: ThreadId, event: Event) -> Result<()> {
|
||||
let (sender, store) = {
|
||||
let channel = self.ensure_thread_channel(thread_id);
|
||||
(channel.sender.clone(), Arc::clone(&channel.store))
|
||||
};
|
||||
|
||||
let should_send = {
|
||||
let mut guard = store.lock().await;
|
||||
guard.push_event(event.clone());
|
||||
guard.active
|
||||
};
|
||||
|
||||
if should_send && let Err(err) = sender.send(event).await {
|
||||
tracing::warn!("thread {thread_id} event channel closed: {err}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn enqueue_primary_event(&mut self, event: Event) -> Result<()> {
|
||||
if let Some(thread_id) = self.primary_thread_id {
|
||||
return self.enqueue_thread_event(thread_id, event).await;
|
||||
}
|
||||
|
||||
if let EventMsg::SessionConfigured(session) = &event.msg {
|
||||
let thread_id = session.session_id;
|
||||
self.primary_thread_id = Some(thread_id);
|
||||
self.primary_session_configured = Some(session.clone());
|
||||
self.ensure_thread_channel(thread_id);
|
||||
self.activate_thread_channel(thread_id).await;
|
||||
|
||||
let pending = std::mem::take(&mut self.pending_primary_events);
|
||||
for pending_event in pending {
|
||||
self.enqueue_thread_event(thread_id, pending_event).await?;
|
||||
}
|
||||
self.enqueue_thread_event(thread_id, event).await?;
|
||||
} else {
|
||||
self.pending_primary_events.push_back(event);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn open_agent_picker(&mut self) {
|
||||
if self.thread_event_channels.is_empty() {
|
||||
self.chat_widget
|
||||
.add_info_message("No agents available yet.".to_string(), None);
|
||||
return;
|
||||
}
|
||||
|
||||
let mut thread_ids: Vec<ThreadId> = self.thread_event_channels.keys().cloned().collect();
|
||||
thread_ids.sort_by_key(ToString::to_string);
|
||||
|
||||
let mut initial_selected_idx = None;
|
||||
let items: Vec<SelectionItem> = thread_ids
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, thread_id)| {
|
||||
if self.active_thread_id == Some(*thread_id) {
|
||||
initial_selected_idx = Some(idx);
|
||||
}
|
||||
let id = *thread_id;
|
||||
SelectionItem {
|
||||
name: thread_id.to_string(),
|
||||
is_current: self.active_thread_id == Some(*thread_id),
|
||||
actions: vec![Box::new(move |tx| {
|
||||
tx.send(AppEvent::SelectAgentThread(id));
|
||||
})],
|
||||
dismiss_on_select: true,
|
||||
search_value: Some(thread_id.to_string()),
|
||||
..Default::default()
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
self.chat_widget.show_selection_view(SelectionViewParams {
|
||||
title: Some("Agents".to_string()),
|
||||
subtitle: Some("Select a thread to focus".to_string()),
|
||||
footer_hint: Some(standard_popup_hint_line()),
|
||||
items,
|
||||
initial_selected_idx,
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
|
||||
async fn select_agent_thread(&mut self, tui: &mut tui::Tui, thread_id: ThreadId) -> Result<()> {
|
||||
if self.active_thread_id == Some(thread_id) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let thread = match self.server.get_thread(thread_id).await {
|
||||
Ok(thread) => thread,
|
||||
Err(err) => {
|
||||
self.chat_widget.add_error_message(format!(
|
||||
"Failed to attach to agent thread {thread_id}: {err}"
|
||||
));
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let previous_thread_id = self.active_thread_id;
|
||||
self.store_active_thread_receiver().await;
|
||||
self.active_thread_id = None;
|
||||
let Some((receiver, snapshot)) = self.activate_thread_for_replay(thread_id).await else {
|
||||
self.chat_widget
|
||||
.add_error_message(format!("Agent thread {thread_id} is already active."));
|
||||
if let Some(previous_thread_id) = previous_thread_id {
|
||||
self.activate_thread_channel(previous_thread_id).await;
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.active_thread_id = Some(thread_id);
|
||||
self.active_thread_rx = Some(receiver);
|
||||
|
||||
let init = self.chatwidget_init_for_forked_or_resumed_thread(tui, self.config.clone());
|
||||
let codex_op_tx = crate::chatwidget::spawn_op_forwarder(thread);
|
||||
self.chat_widget = ChatWidget::new_with_op_sender(init, codex_op_tx);
|
||||
|
||||
self.reset_for_thread_switch(tui)?;
|
||||
self.replay_thread_snapshot(snapshot);
|
||||
self.drain_active_thread_events(tui).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reset_for_thread_switch(&mut self, tui: &mut tui::Tui) -> Result<()> {
|
||||
self.overlay = None;
|
||||
self.transcript_cells.clear();
|
||||
self.deferred_history_lines.clear();
|
||||
self.has_emitted_history_lines = false;
|
||||
self.backtrack = BacktrackState::default();
|
||||
self.backtrack_render_pending = false;
|
||||
tui.terminal.clear_scrollback()?;
|
||||
tui.terminal.clear()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reset_thread_event_state(&mut self) {
|
||||
self.thread_event_channels.clear();
|
||||
self.active_thread_id = None;
|
||||
self.active_thread_rx = None;
|
||||
self.primary_thread_id = None;
|
||||
self.pending_primary_events.clear();
|
||||
}
|
||||
|
||||
async fn drain_active_thread_events(&mut self, tui: &mut tui::Tui) -> Result<()> {
|
||||
let Some(mut rx) = self.active_thread_rx.take() else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let mut disconnected = false;
|
||||
loop {
|
||||
match rx.try_recv() {
|
||||
Ok(event) => self.handle_codex_event_now(event),
|
||||
Err(TryRecvError::Empty) => break,
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
disconnected = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !disconnected {
|
||||
self.active_thread_rx = Some(rx);
|
||||
} else {
|
||||
self.clear_active_thread().await;
|
||||
}
|
||||
|
||||
if self.backtrack_render_pending {
|
||||
tui.frame_requester().schedule_frame();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn replay_thread_snapshot(&mut self, snapshot: ThreadEventSnapshot) {
|
||||
if let Some(event) = snapshot.session_configured {
|
||||
self.handle_codex_event_replay(event);
|
||||
}
|
||||
for event in snapshot.events {
|
||||
self.handle_codex_event_replay(event);
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn run(
|
||||
tui: &mut tui::Tui,
|
||||
@@ -587,8 +950,12 @@ impl App {
|
||||
pending_update_action: None,
|
||||
suppress_shutdown_complete: false,
|
||||
windows_sandbox: WindowsSandboxState::default(),
|
||||
external_approval_routes: HashMap::new(),
|
||||
paused_codex_events: VecDeque::new(),
|
||||
thread_event_channels: HashMap::new(),
|
||||
active_thread_id: None,
|
||||
active_thread_rx: None,
|
||||
primary_thread_id: None,
|
||||
primary_session_configured: None,
|
||||
pending_primary_events: VecDeque::new(),
|
||||
};
|
||||
|
||||
// On startup, if Agent mode (workspace-write) or ReadOnly is active, warn about world-writable dirs on Windows.
|
||||
@@ -649,6 +1016,20 @@ impl App {
|
||||
Some(event) = app_event_rx.recv() => {
|
||||
app.handle_event(tui, event).await?
|
||||
}
|
||||
active = async {
|
||||
if let Some(rx) = app.active_thread_rx.as_mut() {
|
||||
rx.recv().await
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}, if app.active_thread_rx.is_some() => {
|
||||
if let Some(event) = active {
|
||||
app.handle_active_thread_event(tui, event)?;
|
||||
} else {
|
||||
app.clear_active_thread().await;
|
||||
}
|
||||
AppRunControl::Continue
|
||||
}
|
||||
Some(event) = tui_events.next() => {
|
||||
app.handle_tui_event(tui, event).await?
|
||||
}
|
||||
@@ -759,6 +1140,7 @@ impl App {
|
||||
otel_manager: self.otel_manager.clone(),
|
||||
};
|
||||
self.chat_widget = ChatWidget::new(init, self.server.clone());
|
||||
self.reset_thread_event_state();
|
||||
if let Some(summary) = summary {
|
||||
let mut lines: Vec<Line<'static>> = vec![summary.usage_line.clone().into()];
|
||||
if let Some(command) = summary.resume_command {
|
||||
@@ -803,6 +1185,7 @@ impl App {
|
||||
resumed.thread,
|
||||
resumed.session_configured,
|
||||
);
|
||||
self.reset_thread_event_state();
|
||||
if let Some(summary) = summary {
|
||||
let mut lines: Vec<Line<'static>> =
|
||||
vec![summary.usage_line.clone().into()];
|
||||
@@ -852,6 +1235,7 @@ impl App {
|
||||
forked.thread,
|
||||
forked.session_configured,
|
||||
);
|
||||
self.reset_thread_event_state();
|
||||
if let Some(summary) = summary {
|
||||
let mut lines: Vec<Line<'static>> =
|
||||
vec![summary.usage_line.clone().into()];
|
||||
@@ -928,18 +1312,7 @@ impl App {
|
||||
self.chat_widget.on_commit_tick();
|
||||
}
|
||||
AppEvent::CodexEvent(event) => {
|
||||
if !self.external_approval_routes.is_empty() {
|
||||
// Store the events while the approval is pending.
|
||||
self.paused_codex_events.push_back(event);
|
||||
return Ok(AppRunControl::Continue);
|
||||
}
|
||||
self.handle_codex_event_now(event);
|
||||
if self.backtrack_render_pending {
|
||||
tui.frame_requester().schedule_frame();
|
||||
}
|
||||
}
|
||||
AppEvent::ExternalApprovalRequest { thread_id, event } => {
|
||||
self.handle_external_approval_request(thread_id, event);
|
||||
self.enqueue_primary_event(event).await?;
|
||||
}
|
||||
AppEvent::Exit(mode) => match mode {
|
||||
ExitMode::ShutdownFirst => self.chat_widget.submit_op(Op::Shutdown),
|
||||
@@ -950,72 +1323,9 @@ impl App {
|
||||
AppEvent::FatalExitRequest(message) => {
|
||||
return Ok(AppRunControl::Exit(ExitReason::Fatal(message)));
|
||||
}
|
||||
AppEvent::CodexOp(op) => match op {
|
||||
// Catch potential approvals coming from an external thread and treat them
|
||||
// directly. This support both command and patch approval. In such case
|
||||
// the approval get transferred to the corresponding thread and the external
|
||||
// approval map (`external_approval_routes`) is updated.
|
||||
Op::ExecApproval { id, decision } => {
|
||||
if let Some((thread_id, original_id)) =
|
||||
self.external_approval_routes.remove(&id)
|
||||
{
|
||||
// Approval of a sub-agent.
|
||||
self.forward_external_op(
|
||||
thread_id,
|
||||
Op::ExecApproval {
|
||||
id: original_id,
|
||||
decision,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
self.finish_external_approval();
|
||||
} else {
|
||||
// This is an approval but not external.
|
||||
self.chat_widget
|
||||
.submit_op(Op::ExecApproval { id, decision });
|
||||
}
|
||||
}
|
||||
Op::PatchApproval { id, decision } => {
|
||||
if let Some((thread_id, original_id)) =
|
||||
self.external_approval_routes.remove(&id)
|
||||
{
|
||||
// Approval of a sub-agent.
|
||||
self.forward_external_op(
|
||||
thread_id,
|
||||
Op::PatchApproval {
|
||||
id: original_id,
|
||||
decision,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
self.finish_external_approval();
|
||||
} else {
|
||||
// This is an approval but not external.
|
||||
self.chat_widget
|
||||
.submit_op(Op::PatchApproval { id, decision });
|
||||
}
|
||||
}
|
||||
Op::UserInputAnswer { id, response } => {
|
||||
if let Some((thread_id, original_id)) =
|
||||
self.external_approval_routes.remove(&id)
|
||||
{
|
||||
self.forward_external_op(
|
||||
thread_id,
|
||||
Op::UserInputAnswer {
|
||||
id: original_id,
|
||||
response,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
self.finish_external_approval();
|
||||
} else {
|
||||
self.chat_widget
|
||||
.submit_op(Op::UserInputAnswer { id, response });
|
||||
}
|
||||
}
|
||||
// Standard path where this is not an external approval response.
|
||||
_ => self.chat_widget.submit_op(op),
|
||||
},
|
||||
AppEvent::CodexOp(op) => {
|
||||
self.chat_widget.submit_op(op);
|
||||
}
|
||||
AppEvent::DiffResult(text) => {
|
||||
// Clear the in-progress state in the bottom pane
|
||||
self.chat_widget.on_diff_complete();
|
||||
@@ -1481,6 +1791,12 @@ impl App {
|
||||
AppEvent::OpenApprovalsPopup => {
|
||||
self.chat_widget.open_approvals_popup();
|
||||
}
|
||||
AppEvent::OpenAgentPicker => {
|
||||
self.open_agent_picker();
|
||||
}
|
||||
AppEvent::SelectAgentThread(thread_id) => {
|
||||
self.select_agent_thread(tui, thread_id).await?;
|
||||
}
|
||||
AppEvent::OpenSkillsList => {
|
||||
self.chat_widget.open_skills_list();
|
||||
}
|
||||
@@ -1584,56 +1900,23 @@ impl App {
|
||||
self.chat_widget.handle_codex_event(event);
|
||||
}
|
||||
|
||||
/// Routes external approval request events through the chat widget by
|
||||
/// rewriting the event id to include the originating thread.
|
||||
///
|
||||
/// `thread_id` is the external thread that issued the approval request.
|
||||
/// `event` is the approval request event whose id is rewritten so replies
|
||||
/// can be routed back to the correct thread.
|
||||
fn handle_external_approval_request(&mut self, thread_id: ThreadId, mut event: Event) {
|
||||
match &mut event.msg {
|
||||
EventMsg::RequestUserInput(ev) => {
|
||||
let original_id = ev.turn_id.clone();
|
||||
let routing_id = format!("{thread_id}:{original_id}");
|
||||
self.external_approval_routes
|
||||
.insert(routing_id.clone(), (thread_id, original_id));
|
||||
ev.turn_id = routing_id.clone();
|
||||
event.id = routing_id;
|
||||
}
|
||||
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_) => {
|
||||
let original_id = event.id.clone();
|
||||
let routing_id = format!("{thread_id}:{original_id}");
|
||||
self.external_approval_routes
|
||||
.insert(routing_id.clone(), (thread_id, original_id));
|
||||
event.id = routing_id;
|
||||
}
|
||||
_ => return,
|
||||
}
|
||||
self.chat_widget.handle_codex_event(event);
|
||||
fn handle_codex_event_replay(&mut self, event: Event) {
|
||||
self.handle_backtrack_event(&event.msg);
|
||||
self.chat_widget.handle_codex_event_replay(event);
|
||||
}
|
||||
|
||||
async fn forward_external_op(&self, thread_id: ThreadId, op: Op) {
|
||||
let thread = match self.server.get_thread(thread_id).await {
|
||||
Ok(thread) => thread,
|
||||
Err(err) => {
|
||||
tracing::warn!("failed to find thread {thread_id} for approval response: {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(err) = thread.submit(op).await {
|
||||
tracing::warn!("failed to submit approval response to thread {thread_id}: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
fn finish_external_approval(&mut self) {
|
||||
if self.external_approval_routes.is_empty() {
|
||||
while let Some(event) = self.paused_codex_events.pop_front() {
|
||||
self.handle_codex_event_now(event);
|
||||
}
|
||||
fn handle_active_thread_event(&mut self, tui: &mut tui::Tui, event: Event) -> Result<()> {
|
||||
self.handle_codex_event_now(event);
|
||||
if self.backtrack_render_pending {
|
||||
tui.frame_requester().schedule_frame();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_thread_created(&mut self, thread_id: ThreadId) -> Result<()> {
|
||||
if self.thread_event_channels.contains_key(&thread_id) {
|
||||
return Ok(());
|
||||
}
|
||||
let thread = match self.server.get_thread(thread_id).await {
|
||||
Ok(thread) => thread,
|
||||
Err(err) => {
|
||||
@@ -1641,7 +1924,15 @@ impl App {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let app_event_tx = self.app_event_tx.clone();
|
||||
let event = Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::SessionConfigured(self.session_configured_for_thread(thread_id)),
|
||||
};
|
||||
let channel =
|
||||
ThreadEventChannel::new_with_session_configured(THREAD_EVENT_CHANNEL_CAPACITY, event);
|
||||
let sender = channel.sender.clone();
|
||||
let store = Arc::clone(&channel.store);
|
||||
self.thread_event_channels.insert(thread_id, channel);
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let event = match thread.next_event().await {
|
||||
@@ -1651,19 +1942,47 @@ impl App {
|
||||
break;
|
||||
}
|
||||
};
|
||||
match event.msg {
|
||||
EventMsg::ExecApprovalRequest(_)
|
||||
| EventMsg::ApplyPatchApprovalRequest(_)
|
||||
| EventMsg::RequestUserInput(_) => {
|
||||
app_event_tx.send(AppEvent::ExternalApprovalRequest { thread_id, event });
|
||||
}
|
||||
_ => {}
|
||||
let should_send = {
|
||||
let mut guard = store.lock().await;
|
||||
guard.push_event(event.clone());
|
||||
guard.active
|
||||
};
|
||||
if should_send && let Err(err) = sender.send(event).await {
|
||||
tracing::debug!("external thread {thread_id} channel closed: {err}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn session_configured_for_thread(&self, thread_id: ThreadId) -> SessionConfiguredEvent {
|
||||
let mut session_configured =
|
||||
self.primary_session_configured
|
||||
.clone()
|
||||
.unwrap_or_else(|| SessionConfiguredEvent {
|
||||
session_id: thread_id,
|
||||
forked_from_id: None,
|
||||
model: self.chat_widget.current_model().to_string(),
|
||||
model_provider_id: self.config.model_provider_id.clone(),
|
||||
approval_policy: *self.config.approval_policy.get(),
|
||||
sandbox_policy: self.config.sandbox_policy.get().clone(),
|
||||
cwd: self.config.cwd.clone(),
|
||||
reasoning_effort: None,
|
||||
history_log_id: 0,
|
||||
history_entry_count: 0,
|
||||
initial_messages: None,
|
||||
rollout_path: PathBuf::new(),
|
||||
});
|
||||
session_configured.session_id = thread_id;
|
||||
session_configured.forked_from_id = None;
|
||||
session_configured.history_log_id = 0;
|
||||
session_configured.history_entry_count = 0;
|
||||
session_configured.initial_messages = None;
|
||||
session_configured.rollout_path = PathBuf::new();
|
||||
session_configured
|
||||
}
|
||||
|
||||
fn reasoning_label(reasoning_effort: Option<ReasoningEffortConfig>) -> &'static str {
|
||||
match reasoning_effort {
|
||||
Some(ReasoningEffortConfig::Minimal) => "minimal",
|
||||
@@ -1926,8 +2245,12 @@ mod tests {
|
||||
pending_update_action: None,
|
||||
suppress_shutdown_complete: false,
|
||||
windows_sandbox: WindowsSandboxState::default(),
|
||||
external_approval_routes: HashMap::new(),
|
||||
paused_codex_events: VecDeque::new(),
|
||||
thread_event_channels: HashMap::new(),
|
||||
active_thread_id: None,
|
||||
active_thread_rx: None,
|
||||
primary_thread_id: None,
|
||||
primary_session_configured: None,
|
||||
pending_primary_events: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1970,8 +2293,12 @@ mod tests {
|
||||
pending_update_action: None,
|
||||
suppress_shutdown_complete: false,
|
||||
windows_sandbox: WindowsSandboxState::default(),
|
||||
external_approval_routes: HashMap::new(),
|
||||
paused_codex_events: VecDeque::new(),
|
||||
thread_event_channels: HashMap::new(),
|
||||
active_thread_id: None,
|
||||
active_thread_rx: None,
|
||||
primary_thread_id: None,
|
||||
primary_session_configured: None,
|
||||
pending_primary_events: VecDeque::new(),
|
||||
},
|
||||
rx,
|
||||
op_rx,
|
||||
|
||||
@@ -43,10 +43,10 @@ pub(crate) enum WindowsSandboxFallbackReason {
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum AppEvent {
|
||||
CodexEvent(Event),
|
||||
ExternalApprovalRequest {
|
||||
thread_id: ThreadId,
|
||||
event: Event,
|
||||
},
|
||||
/// Open the agent picker for switching active threads.
|
||||
OpenAgentPicker,
|
||||
/// Switch the active thread to the selected agent.
|
||||
SelectAgentThread(ThreadId),
|
||||
|
||||
/// Start a new session.
|
||||
NewSession,
|
||||
|
||||
@@ -178,6 +178,7 @@ use self::interrupts::InterruptManager;
|
||||
mod agent;
|
||||
use self::agent::spawn_agent;
|
||||
use self::agent::spawn_agent_from_existing;
|
||||
pub(crate) use self::agent::spawn_op_forwarder;
|
||||
mod session_header;
|
||||
use self::session_header::SessionHeader;
|
||||
mod skills;
|
||||
@@ -2030,6 +2031,127 @@ impl ChatWidget {
|
||||
widget
|
||||
}
|
||||
|
||||
pub(crate) fn new_with_op_sender(
|
||||
common: ChatWidgetInit,
|
||||
codex_op_tx: UnboundedSender<Op>,
|
||||
) -> Self {
|
||||
let ChatWidgetInit {
|
||||
config,
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
initial_user_message,
|
||||
enhanced_keys_supported,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
feedback,
|
||||
is_first_run,
|
||||
model,
|
||||
otel_manager,
|
||||
} = common;
|
||||
let model = model.filter(|m| !m.trim().is_empty());
|
||||
let mut config = config;
|
||||
config.model = model.clone();
|
||||
let mut rng = rand::rng();
|
||||
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
|
||||
|
||||
let model_for_header = model.unwrap_or_else(|| DEFAULT_MODEL_DISPLAY_NAME.to_string());
|
||||
let fallback_custom = Settings {
|
||||
model: model_for_header.clone(),
|
||||
reasoning_effort: None,
|
||||
developer_instructions: None,
|
||||
};
|
||||
let stored_collaboration_mode = if config.features.enabled(Feature::CollaborationModes) {
|
||||
initial_collaboration_mode(
|
||||
models_manager.as_ref(),
|
||||
fallback_custom,
|
||||
config.experimental_mode,
|
||||
)
|
||||
} else {
|
||||
CollaborationMode::Custom(fallback_custom)
|
||||
};
|
||||
|
||||
let active_cell = Some(Self::placeholder_session_header_cell(
|
||||
&config,
|
||||
config.features.enabled(Feature::CollaborationModes),
|
||||
stored_collaboration_mode.clone(),
|
||||
));
|
||||
|
||||
let mut widget = Self {
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
frame_requester: frame_requester.clone(),
|
||||
codex_op_tx,
|
||||
bottom_pane: BottomPane::new(BottomPaneParams {
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
has_input_focus: true,
|
||||
enhanced_keys_supported,
|
||||
placeholder_text: placeholder,
|
||||
disable_paste_burst: config.disable_paste_burst,
|
||||
animations_enabled: config.animations,
|
||||
skills: None,
|
||||
}),
|
||||
active_cell,
|
||||
active_cell_revision: 0,
|
||||
config,
|
||||
skills_all: Vec::new(),
|
||||
skills_initial_state: None,
|
||||
stored_collaboration_mode,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
otel_manager,
|
||||
session_header: SessionHeader::new(model_for_header),
|
||||
initial_user_message,
|
||||
token_info: None,
|
||||
rate_limit_snapshot: None,
|
||||
plan_type: None,
|
||||
rate_limit_warnings: RateLimitWarningState::default(),
|
||||
rate_limit_switch_prompt: RateLimitSwitchPromptState::default(),
|
||||
rate_limit_poller: None,
|
||||
stream_controller: None,
|
||||
running_commands: HashMap::new(),
|
||||
suppressed_exec_calls: HashSet::new(),
|
||||
last_unified_wait: None,
|
||||
unified_exec_wait_streak: None,
|
||||
task_complete_pending: false,
|
||||
unified_exec_processes: Vec::new(),
|
||||
agent_turn_running: false,
|
||||
mcp_startup_status: None,
|
||||
interrupts: InterruptManager::new(),
|
||||
reasoning_buffer: String::new(),
|
||||
full_reasoning_buffer: String::new(),
|
||||
current_status_header: String::from("Working"),
|
||||
retry_status_header: None,
|
||||
thread_id: None,
|
||||
forked_from: None,
|
||||
saw_plan_update_this_turn: false,
|
||||
queued_user_messages: VecDeque::new(),
|
||||
show_welcome_banner: is_first_run,
|
||||
suppress_session_configured_redraw: false,
|
||||
pending_notification: None,
|
||||
quit_shortcut_expires_at: None,
|
||||
quit_shortcut_key: None,
|
||||
is_review_mode: false,
|
||||
pre_review_token_info: None,
|
||||
needs_final_message_separator: false,
|
||||
had_work_activity: false,
|
||||
last_separator_elapsed_secs: None,
|
||||
last_rendered_width: std::cell::Cell::new(None),
|
||||
feedback,
|
||||
current_rollout_path: None,
|
||||
external_editor_state: ExternalEditorState::Closed,
|
||||
};
|
||||
|
||||
widget.prefetch_rate_limits();
|
||||
widget
|
||||
.bottom_pane
|
||||
.set_steer_enabled(widget.config.features.enabled(Feature::Steer));
|
||||
widget.bottom_pane.set_collaboration_modes_enabled(
|
||||
widget.config.features.enabled(Feature::CollaborationModes),
|
||||
);
|
||||
|
||||
widget
|
||||
}
|
||||
|
||||
/// Create a ChatWidget attached to an existing conversation (e.g., a fork).
|
||||
pub(crate) fn new_from_existing(
|
||||
common: ChatWidgetInit,
|
||||
@@ -2316,6 +2438,11 @@ impl ChatWidget {
|
||||
self.bottom_pane.set_footer_hint_override(items);
|
||||
}
|
||||
|
||||
pub(crate) fn show_selection_view(&mut self, params: SelectionViewParams) {
|
||||
self.bottom_pane.show_selection_view(params);
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub(crate) fn can_launch_external_editor(&self) -> bool {
|
||||
self.bottom_pane.can_launch_external_editor()
|
||||
}
|
||||
@@ -2380,6 +2507,9 @@ impl ChatWidget {
|
||||
self.open_collaboration_modes_popup();
|
||||
}
|
||||
}
|
||||
SlashCommand::Agent => {
|
||||
self.app_event_tx.send(AppEvent::OpenAgentPicker);
|
||||
}
|
||||
SlashCommand::Approvals => {
|
||||
self.open_approvals_popup();
|
||||
}
|
||||
@@ -2748,6 +2878,14 @@ impl ChatWidget {
|
||||
self.dispatch_event_msg(Some(id), msg, false);
|
||||
}
|
||||
|
||||
pub(crate) fn handle_codex_event_replay(&mut self, event: Event) {
|
||||
let Event { msg, .. } = event;
|
||||
if matches!(msg, EventMsg::ShutdownComplete) {
|
||||
return;
|
||||
}
|
||||
self.dispatch_event_msg(None, msg, true);
|
||||
}
|
||||
|
||||
/// Dispatch a protocol `EventMsg` to the appropriate handler.
|
||||
///
|
||||
/// `id` is `Some` for live events and `None` for replayed events from
|
||||
|
||||
@@ -105,3 +105,18 @@ pub(crate) fn spawn_agent_from_existing(
|
||||
|
||||
codex_op_tx
|
||||
}
|
||||
|
||||
/// Spawn an op-forwarding loop for an existing thread without subscribing to events.
|
||||
pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc<CodexThread>) -> UnboundedSender<Op> {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = codex_op_rx.recv().await {
|
||||
if let Err(e) = thread.submit(op).await {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
codex_op_tx
|
||||
}
|
||||
|
||||
@@ -381,6 +381,19 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clear terminal scrollback (if supported) and force a full redraw.
|
||||
pub fn clear_scrollback(&mut self) -> io::Result<()> {
|
||||
if self.viewport_area.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
self.backend
|
||||
.set_cursor_position(self.viewport_area.as_position())?;
|
||||
queue!(self.backend, Clear(crossterm::terminal::ClearType::Purge))?;
|
||||
std::io::Write::flush(&mut self.backend)?;
|
||||
self.previous_buffer_mut().reset();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clears the inactive buffer and swaps it with the current buffer
|
||||
pub fn swap_buffers(&mut self) {
|
||||
self.previous_buffer_mut().reset();
|
||||
|
||||
@@ -26,6 +26,7 @@ pub enum SlashCommand {
|
||||
Init,
|
||||
Compact,
|
||||
Collab,
|
||||
Agent,
|
||||
// Undo,
|
||||
Diff,
|
||||
Mention,
|
||||
@@ -60,6 +61,7 @@ impl SlashCommand {
|
||||
SlashCommand::Ps => "list background terminals",
|
||||
SlashCommand::Model => "choose what model and reasoning effort to use",
|
||||
SlashCommand::Collab => "change collaboration mode (experimental)",
|
||||
SlashCommand::Agent => "switch the active agent thread",
|
||||
SlashCommand::Approvals => "choose what Codex can do without approval",
|
||||
SlashCommand::Permissions => "choose what Codex is allowed to do",
|
||||
SlashCommand::ElevateSandbox => "set up elevated agent sandbox",
|
||||
@@ -105,6 +107,7 @@ impl SlashCommand {
|
||||
SlashCommand::Rollout => true,
|
||||
SlashCommand::TestApproval => true,
|
||||
SlashCommand::Collab => true,
|
||||
SlashCommand::Agent => true,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user