mirror of
https://github.com/openai/codex.git
synced 2026-05-02 10:26:45 +00:00
1532 lines
60 KiB
Rust
1532 lines
60 KiB
Rust
//! Thread routing, buffering, and app-server operation submission for the TUI app.
|
|
//!
|
|
//! This module manages active thread channels, routes server requests and notifications into those
|
|
//! channels, submits thread-scoped operations through the app server, and replays buffered events
|
|
//! when the visible thread changes.
|
|
|
|
use super::*;
|
|
|
|
impl App {
|
|
pub(super) async fn shutdown_current_thread(&mut self, app_server: &mut AppServerSession) {
|
|
if let Some(thread_id) = self.chat_widget.thread_id() {
|
|
// Clear any in-flight rollback guard when switching threads.
|
|
self.backtrack.pending_rollback = None;
|
|
if let Err(err) = app_server.thread_unsubscribe(thread_id).await {
|
|
tracing::warn!("failed to unsubscribe thread {thread_id}: {err}");
|
|
}
|
|
self.abort_thread_event_listener(thread_id);
|
|
}
|
|
}
|
|
|
|
pub(super) fn abort_thread_event_listener(&mut self, thread_id: ThreadId) {
|
|
if let Some(handle) = self.thread_event_listener_tasks.remove(&thread_id) {
|
|
handle.abort();
|
|
}
|
|
}
|
|
|
|
pub(super) fn abort_all_thread_event_listeners(&mut self) {
|
|
for handle in self
|
|
.thread_event_listener_tasks
|
|
.drain()
|
|
.map(|(_, handle)| handle)
|
|
{
|
|
handle.abort();
|
|
}
|
|
}
|
|
|
|
pub(super) fn ensure_thread_channel(&mut self, thread_id: ThreadId) -> &mut ThreadEventChannel {
|
|
self.thread_event_channels
|
|
.entry(thread_id)
|
|
.or_insert_with(|| ThreadEventChannel::new(THREAD_EVENT_CHANNEL_CAPACITY))
|
|
}
|
|
|
|
pub(super) async fn set_thread_active(&mut self, thread_id: ThreadId, active: bool) {
|
|
if let Some(channel) = self.thread_event_channels.get_mut(&thread_id) {
|
|
let mut store = channel.store.lock().await;
|
|
store.active = active;
|
|
}
|
|
}
|
|
|
|
pub(super) async fn activate_thread_channel(&mut self, thread_id: ThreadId) {
|
|
if self.active_thread_id.is_some() {
|
|
return;
|
|
}
|
|
self.set_thread_active(thread_id, /*active*/ true).await;
|
|
let receiver = if let Some(channel) = self.thread_event_channels.get_mut(&thread_id) {
|
|
channel.receiver.take()
|
|
} else {
|
|
None
|
|
};
|
|
self.active_thread_id = Some(thread_id);
|
|
self.active_thread_rx = receiver;
|
|
self.refresh_pending_thread_approvals().await;
|
|
}
|
|
|
|
pub(super) async fn store_active_thread_receiver(&mut self) {
|
|
let Some(active_id) = self.active_thread_id else {
|
|
return;
|
|
};
|
|
let input_state = self.chat_widget.capture_thread_input_state();
|
|
if let Some(channel) = self.thread_event_channels.get_mut(&active_id) {
|
|
let receiver = self.active_thread_rx.take();
|
|
let mut store = channel.store.lock().await;
|
|
store.active = false;
|
|
store.input_state = input_state;
|
|
if let Some(receiver) = receiver {
|
|
channel.receiver = Some(receiver);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(super) async fn activate_thread_for_replay(
|
|
&mut self,
|
|
thread_id: ThreadId,
|
|
) -> Option<(mpsc::Receiver<ThreadBufferedEvent>, ThreadEventSnapshot)> {
|
|
let channel = self.thread_event_channels.get_mut(&thread_id)?;
|
|
let receiver = channel.receiver.take()?;
|
|
let mut store = channel.store.lock().await;
|
|
store.active = true;
|
|
let snapshot = store.snapshot();
|
|
Some((receiver, snapshot))
|
|
}
|
|
|
|
pub(super) async fn clear_active_thread(&mut self) {
|
|
if let Some(active_id) = self.active_thread_id.take() {
|
|
self.set_thread_active(active_id, /*active*/ false).await;
|
|
}
|
|
self.active_thread_rx = None;
|
|
self.refresh_pending_thread_approvals().await;
|
|
}
|
|
|
|
pub(super) async fn note_thread_outbound_op(&mut self, thread_id: ThreadId, op: &AppCommand) {
|
|
let Some(channel) = self.thread_event_channels.get(&thread_id) else {
|
|
return;
|
|
};
|
|
let mut store = channel.store.lock().await;
|
|
store.note_outbound_op(op);
|
|
}
|
|
|
|
pub(super) async fn note_active_thread_outbound_op(&mut self, op: &AppCommand) {
|
|
if !ThreadEventStore::op_can_change_pending_replay_state(op) {
|
|
return;
|
|
}
|
|
let Some(thread_id) = self.active_thread_id else {
|
|
return;
|
|
};
|
|
self.note_thread_outbound_op(thread_id, op).await;
|
|
}
|
|
|
|
pub(super) async fn active_turn_id_for_thread(&self, thread_id: ThreadId) -> Option<String> {
|
|
let channel = self.thread_event_channels.get(&thread_id)?;
|
|
let store = channel.store.lock().await;
|
|
store.active_turn_id().map(ToOwned::to_owned)
|
|
}
|
|
|
|
pub(super) fn thread_label(&self, thread_id: ThreadId) -> String {
|
|
let is_primary = self.primary_thread_id == Some(thread_id);
|
|
let fallback_label = if is_primary {
|
|
"Main [default]".to_string()
|
|
} else {
|
|
let thread_id = thread_id.to_string();
|
|
let short_id: String = thread_id.chars().take(8).collect();
|
|
format!("Agent ({short_id})")
|
|
};
|
|
if let Some(entry) = self.agent_navigation.get(&thread_id) {
|
|
let label = format_agent_picker_item_name(
|
|
entry.agent_nickname.as_deref(),
|
|
entry.agent_role.as_deref(),
|
|
is_primary,
|
|
);
|
|
if label == "Agent" {
|
|
let thread_id = thread_id.to_string();
|
|
let short_id: String = thread_id.chars().take(8).collect();
|
|
format!("{label} ({short_id})")
|
|
} else {
|
|
label
|
|
}
|
|
} else {
|
|
fallback_label
|
|
}
|
|
}
|
|
|
|
/// Returns the thread whose transcript is currently on screen.
|
|
///
|
|
/// `active_thread_id` is the source of truth during steady state, but the widget can briefly
|
|
/// lag behind thread bookkeeping during transitions. The footer label and adjacent-thread
|
|
/// navigation both follow what the user is actually looking at, not whichever thread most
|
|
/// recently began switching.
|
|
pub(super) fn current_displayed_thread_id(&self) -> Option<ThreadId> {
|
|
self.active_thread_id.or(self.chat_widget.thread_id())
|
|
}
|
|
|
|
pub(super) fn ignore_same_thread_resume(
|
|
&mut self,
|
|
target_session: &crate::resume_picker::SessionTarget,
|
|
) -> bool {
|
|
if self.active_thread_id != Some(target_session.thread_id) {
|
|
return false;
|
|
};
|
|
|
|
self.chat_widget.add_info_message(
|
|
format!("Already viewing {}.", target_session.display_label()),
|
|
/*hint*/ None,
|
|
);
|
|
true
|
|
}
|
|
|
|
/// Mirrors the visible thread into the contextual footer row.
|
|
///
|
|
/// The footer sometimes shows ambient context instead of an instructional hint. In multi-agent
|
|
/// sessions, that contextual row includes the currently viewed agent label. The label is
|
|
/// intentionally hidden until there is more than one known thread so single-thread sessions do
|
|
/// not spend footer space restating that the user is already on the main conversation.
|
|
pub(super) fn sync_active_agent_label(&mut self) {
|
|
let label = self
|
|
.agent_navigation
|
|
.active_agent_label(self.current_displayed_thread_id(), self.primary_thread_id);
|
|
self.chat_widget.set_active_agent_label(label);
|
|
self.sync_side_thread_ui();
|
|
}
|
|
|
|
pub(super) async fn thread_cwd(&self, thread_id: ThreadId) -> Option<AbsolutePathBuf> {
|
|
let channel = self.thread_event_channels.get(&thread_id)?;
|
|
let store = channel.store.lock().await;
|
|
store.session.as_ref().map(|session| session.cwd.clone())
|
|
}
|
|
|
|
async fn thread_file_change_changes(
|
|
&self,
|
|
thread_id: ThreadId,
|
|
turn_id: &str,
|
|
item_id: &str,
|
|
) -> Option<Vec<codex_app_server_protocol::FileUpdateChange>> {
|
|
let channel = self.thread_event_channels.get(&thread_id)?;
|
|
let store = channel.store.lock().await;
|
|
store.file_change_changes(turn_id, item_id)
|
|
}
|
|
|
|
pub(super) async fn interactive_request_for_thread_request(
|
|
&self,
|
|
thread_id: ThreadId,
|
|
request: &ServerRequest,
|
|
) -> Option<ThreadInteractiveRequest> {
|
|
let thread_label = Some(self.thread_label(thread_id));
|
|
match request {
|
|
ServerRequest::CommandExecutionRequestApproval { params, .. } => {
|
|
let network_approval_context = params
|
|
.network_approval_context
|
|
.clone()
|
|
.map(network_approval_context_to_core);
|
|
let additional_permissions = params.additional_permissions.clone().map(Into::into);
|
|
let proposed_execpolicy_amendment = params
|
|
.proposed_execpolicy_amendment
|
|
.clone()
|
|
.map(codex_app_server_protocol::ExecPolicyAmendment::into_core);
|
|
let proposed_network_policy_amendments = params
|
|
.proposed_network_policy_amendments
|
|
.clone()
|
|
.map(|amendments| {
|
|
amendments
|
|
.into_iter()
|
|
.map(codex_app_server_protocol::NetworkPolicyAmendment::into_core)
|
|
.collect::<Vec<_>>()
|
|
});
|
|
Some(ThreadInteractiveRequest::Approval(ApprovalRequest::Exec {
|
|
thread_id,
|
|
thread_label,
|
|
id: params
|
|
.approval_id
|
|
.clone()
|
|
.unwrap_or_else(|| params.item_id.clone()),
|
|
command: params
|
|
.command
|
|
.as_deref()
|
|
.map(split_command_string)
|
|
.unwrap_or_default(),
|
|
reason: params.reason.clone(),
|
|
available_decisions: params
|
|
.available_decisions
|
|
.clone()
|
|
.map(|decisions| {
|
|
decisions
|
|
.into_iter()
|
|
.map(command_execution_decision_to_review_decision)
|
|
.collect()
|
|
})
|
|
.unwrap_or_else(|| {
|
|
default_exec_approval_decisions(
|
|
network_approval_context.as_ref(),
|
|
proposed_execpolicy_amendment.as_ref(),
|
|
proposed_network_policy_amendments.as_deref(),
|
|
additional_permissions.as_ref(),
|
|
)
|
|
}),
|
|
network_approval_context,
|
|
additional_permissions,
|
|
}))
|
|
}
|
|
ServerRequest::FileChangeRequestApproval { params, .. } => Some(
|
|
ThreadInteractiveRequest::Approval(ApprovalRequest::ApplyPatch {
|
|
thread_id,
|
|
thread_label,
|
|
id: params.item_id.clone(),
|
|
reason: params.reason.clone(),
|
|
cwd: self
|
|
.thread_cwd(thread_id)
|
|
.await
|
|
.unwrap_or_else(|| self.config.cwd.clone()),
|
|
changes: self
|
|
.thread_file_change_changes(thread_id, ¶ms.turn_id, ¶ms.item_id)
|
|
.await
|
|
.map(crate::app_server_approval_conversions::file_update_changes_to_core)
|
|
.unwrap_or_default(),
|
|
}),
|
|
),
|
|
ServerRequest::McpServerElicitationRequest { request_id, params } => {
|
|
if let Some(request) = McpServerElicitationFormRequest::from_app_server_request(
|
|
thread_id,
|
|
app_server_request_id_to_mcp_request_id(request_id),
|
|
params.clone(),
|
|
) {
|
|
Some(ThreadInteractiveRequest::McpServerElicitation(request))
|
|
} else {
|
|
Some(ThreadInteractiveRequest::Approval(
|
|
ApprovalRequest::McpElicitation {
|
|
thread_id,
|
|
thread_label,
|
|
server_name: params.server_name.clone(),
|
|
request_id: app_server_request_id_to_mcp_request_id(request_id),
|
|
message: match ¶ms.request {
|
|
codex_app_server_protocol::McpServerElicitationRequest::Form {
|
|
message,
|
|
..
|
|
}
|
|
| codex_app_server_protocol::McpServerElicitationRequest::Url {
|
|
message,
|
|
..
|
|
} => message.clone(),
|
|
},
|
|
},
|
|
))
|
|
}
|
|
}
|
|
ServerRequest::PermissionsRequestApproval { params, .. } => Some(
|
|
ThreadInteractiveRequest::Approval(ApprovalRequest::Permissions {
|
|
thread_id,
|
|
thread_label,
|
|
call_id: params.item_id.clone(),
|
|
reason: params.reason.clone(),
|
|
permissions: params.permissions.clone().into(),
|
|
}),
|
|
),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
pub(super) fn push_thread_interactive_request(&mut self, request: ThreadInteractiveRequest) {
|
|
match request {
|
|
ThreadInteractiveRequest::Approval(request) => {
|
|
self.render_inactive_patch_preview(&request);
|
|
self.chat_widget.push_approval_request(request);
|
|
}
|
|
ThreadInteractiveRequest::McpServerElicitation(request) => {
|
|
self.chat_widget
|
|
.push_mcp_server_elicitation_request(request);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn render_inactive_patch_preview(&mut self, request: &ApprovalRequest) {
|
|
let ApprovalRequest::ApplyPatch {
|
|
thread_label,
|
|
cwd,
|
|
changes,
|
|
..
|
|
} = request
|
|
else {
|
|
return;
|
|
};
|
|
if thread_label.is_none() || changes.is_empty() {
|
|
return;
|
|
}
|
|
self.chat_widget
|
|
.add_to_history(history_cell::new_patch_event(changes.clone(), cwd));
|
|
}
|
|
|
|
pub(super) async fn pending_inactive_thread_requests(&self) -> Vec<(ThreadId, ServerRequest)> {
|
|
let channels: Vec<(ThreadId, Arc<Mutex<ThreadEventStore>>)> = self
|
|
.thread_event_channels
|
|
.iter()
|
|
.map(|(thread_id, channel)| (*thread_id, Arc::clone(&channel.store)))
|
|
.collect();
|
|
|
|
let mut requests = Vec::new();
|
|
for (thread_id, store) in channels {
|
|
if Some(thread_id) == self.active_thread_id {
|
|
continue;
|
|
}
|
|
|
|
let store = store.lock().await;
|
|
requests.extend(
|
|
store
|
|
.pending_replay_requests()
|
|
.into_iter()
|
|
.map(|request| (thread_id, request)),
|
|
);
|
|
}
|
|
requests
|
|
}
|
|
|
|
pub(super) async fn surface_pending_inactive_thread_interactive_requests(&mut self) {
|
|
if self.active_side_parent_thread_id().is_some() {
|
|
return;
|
|
}
|
|
|
|
let requests = self.pending_inactive_thread_requests().await;
|
|
for (thread_id, request) in requests {
|
|
if let Some(request) = self
|
|
.interactive_request_for_thread_request(thread_id, &request)
|
|
.await
|
|
{
|
|
self.push_thread_interactive_request(request);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(super) async fn submit_active_thread_op(
|
|
&mut self,
|
|
app_server: &mut AppServerSession,
|
|
op: AppCommand,
|
|
) -> Result<()> {
|
|
let Some(thread_id) = self.active_thread_id else {
|
|
self.chat_widget
|
|
.add_error_message("No active thread is available.".to_string());
|
|
return Ok(());
|
|
};
|
|
|
|
self.submit_thread_op(app_server, thread_id, op).await
|
|
}
|
|
|
|
pub(super) async fn submit_thread_op(
|
|
&mut self,
|
|
app_server: &mut AppServerSession,
|
|
thread_id: ThreadId,
|
|
op: AppCommand,
|
|
) -> Result<()> {
|
|
crate::session_log::log_outbound_op(&op);
|
|
|
|
if self.try_handle_local_history_op(thread_id, &op).await? {
|
|
return Ok(());
|
|
}
|
|
|
|
if self
|
|
.try_resolve_app_server_request(app_server, thread_id, &op)
|
|
.await?
|
|
{
|
|
return Ok(());
|
|
}
|
|
|
|
if self
|
|
.try_submit_active_thread_op_via_app_server(app_server, thread_id, &op)
|
|
.await?
|
|
{
|
|
if ThreadEventStore::op_can_change_pending_replay_state(&op) {
|
|
self.note_thread_outbound_op(thread_id, &op).await;
|
|
self.refresh_pending_thread_approvals().await;
|
|
self.refresh_side_parent_status_from_store(thread_id).await;
|
|
}
|
|
return Ok(());
|
|
}
|
|
|
|
self.chat_widget
|
|
.add_error_message(format!("Not available in TUI yet for thread {thread_id}."));
|
|
Ok(())
|
|
}
|
|
|
|
/// Spawn a background task that fetches MCP server status from the app-server
|
|
/// via paginated RPCs, then delivers the result back through
|
|
/// `AppEvent::McpInventoryLoaded`.
|
|
///
|
|
/// The spawned task is fire-and-forget: no `JoinHandle` is stored, so a stale
|
|
/// result may arrive after the user has moved on. We currently accept that
|
|
/// tradeoff because the effect is limited to stale inventory output in history,
|
|
/// while request-token invalidation would add cross-cutting async state for a
|
|
/// low-severity path.
|
|
pub(super) async fn try_handle_local_history_op(
|
|
&mut self,
|
|
thread_id: ThreadId,
|
|
op: &AppCommand,
|
|
) -> Result<bool> {
|
|
match op.view() {
|
|
AppCommandView::Other(Op::AddToHistory { text }) => {
|
|
let text = text.clone();
|
|
let config = self.chat_widget.config_ref().clone();
|
|
tokio::spawn(async move {
|
|
if let Err(err) = append_message_history_entry(&text, &thread_id, &config).await
|
|
{
|
|
tracing::warn!(
|
|
thread_id = %thread_id,
|
|
error = %err,
|
|
"failed to append to message history"
|
|
);
|
|
}
|
|
});
|
|
Ok(true)
|
|
}
|
|
AppCommandView::Other(Op::GetHistoryEntryRequest { offset, log_id }) => {
|
|
let offset = *offset;
|
|
let log_id = *log_id;
|
|
let config = self.chat_widget.config_ref().clone();
|
|
let app_event_tx = self.app_event_tx.clone();
|
|
tokio::spawn(async move {
|
|
let entry_opt = tokio::task::spawn_blocking(move || {
|
|
lookup_message_history_entry(log_id, offset, &config)
|
|
})
|
|
.await
|
|
.unwrap_or_else(|err| {
|
|
tracing::warn!(error = %err, "history lookup task failed");
|
|
None
|
|
});
|
|
|
|
app_event_tx.send(AppEvent::ThreadHistoryEntryResponse {
|
|
thread_id,
|
|
event: GetHistoryEntryResponseEvent {
|
|
offset,
|
|
log_id,
|
|
entry: entry_opt.map(|entry| {
|
|
codex_protocol::message_history::HistoryEntry {
|
|
conversation_id: entry.session_id,
|
|
ts: entry.ts,
|
|
text: entry.text,
|
|
}
|
|
}),
|
|
},
|
|
});
|
|
});
|
|
Ok(true)
|
|
}
|
|
_ => Ok(false),
|
|
}
|
|
}
|
|
|
|
pub(super) async fn try_submit_active_thread_op_via_app_server(
|
|
&mut self,
|
|
app_server: &mut AppServerSession,
|
|
thread_id: ThreadId,
|
|
op: &AppCommand,
|
|
) -> Result<bool> {
|
|
match op.view() {
|
|
AppCommandView::Interrupt => {
|
|
if let Some(turn_id) = self.active_turn_id_for_thread(thread_id).await {
|
|
app_server.turn_interrupt(thread_id, turn_id).await?;
|
|
} else {
|
|
app_server.startup_interrupt(thread_id).await?;
|
|
}
|
|
Ok(true)
|
|
}
|
|
AppCommandView::UserTurn {
|
|
items,
|
|
cwd,
|
|
approval_policy,
|
|
approvals_reviewer,
|
|
sandbox_policy,
|
|
permission_profile: _,
|
|
model,
|
|
effort,
|
|
summary,
|
|
service_tier,
|
|
final_output_json_schema,
|
|
collaboration_mode,
|
|
personality,
|
|
} => {
|
|
let mut should_start_turn = true;
|
|
if let Some(turn_id) = self.active_turn_id_for_thread(thread_id).await {
|
|
let mut steer_turn_id = turn_id;
|
|
let mut retried_after_turn_mismatch = false;
|
|
loop {
|
|
match app_server
|
|
.turn_steer(thread_id, steer_turn_id.clone(), items.to_vec())
|
|
.await
|
|
{
|
|
Ok(_) => return Ok(true),
|
|
Err(error) => {
|
|
if let Some(turn_error) =
|
|
active_turn_not_steerable_turn_error(&error)
|
|
{
|
|
if !self.chat_widget.enqueue_rejected_steer() {
|
|
self.chat_widget.add_error_message(turn_error.message);
|
|
}
|
|
return Ok(true);
|
|
}
|
|
match active_turn_steer_race(&error) {
|
|
Some(ActiveTurnSteerRace::Missing) => {
|
|
if let Some(channel) =
|
|
self.thread_event_channels.get(&thread_id)
|
|
{
|
|
let mut store = channel.store.lock().await;
|
|
store.clear_active_turn_id();
|
|
}
|
|
should_start_turn = true;
|
|
break;
|
|
}
|
|
Some(ActiveTurnSteerRace::ExpectedTurnMismatch {
|
|
actual_turn_id,
|
|
}) if !retried_after_turn_mismatch
|
|
&& actual_turn_id != steer_turn_id =>
|
|
{
|
|
// Review flows can swap the active turn before the TUI
|
|
// processes the corresponding notification. Retry once with
|
|
// the server-reported turn id so non-steerable review turns
|
|
// still fall through to the existing queueing behavior.
|
|
if let Some(channel) =
|
|
self.thread_event_channels.get(&thread_id)
|
|
{
|
|
let mut store = channel.store.lock().await;
|
|
store.active_turn_id = Some(actual_turn_id.clone());
|
|
}
|
|
steer_turn_id = actual_turn_id;
|
|
retried_after_turn_mismatch = true;
|
|
}
|
|
Some(ActiveTurnSteerRace::ExpectedTurnMismatch {
|
|
actual_turn_id,
|
|
}) => {
|
|
if let Some(channel) =
|
|
self.thread_event_channels.get(&thread_id)
|
|
{
|
|
let mut store = channel.store.lock().await;
|
|
store.active_turn_id = Some(actual_turn_id);
|
|
}
|
|
return Err(error.into());
|
|
}
|
|
None => return Err(error.into()),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if should_start_turn {
|
|
app_server
|
|
.turn_start(
|
|
thread_id,
|
|
items.to_vec(),
|
|
cwd.clone(),
|
|
approval_policy,
|
|
approvals_reviewer
|
|
.unwrap_or(self.chat_widget.config_ref().approvals_reviewer),
|
|
sandbox_policy.clone(),
|
|
runtime_permission_profile_for_turn_start(
|
|
self.runtime_sandbox_policy_override.as_ref(),
|
|
sandbox_policy,
|
|
),
|
|
model.to_string(),
|
|
effort,
|
|
*summary,
|
|
*service_tier,
|
|
collaboration_mode.clone(),
|
|
*personality,
|
|
final_output_json_schema.clone(),
|
|
)
|
|
.await?;
|
|
}
|
|
Ok(true)
|
|
}
|
|
AppCommandView::ListSkills { cwds, force_reload } => {
|
|
self.handle_skills_list_result(
|
|
app_server
|
|
.skills_list(codex_app_server_protocol::SkillsListParams {
|
|
cwds: cwds.to_vec(),
|
|
force_reload,
|
|
per_cwd_extra_user_roots: None,
|
|
})
|
|
.await,
|
|
"failed to refresh skills",
|
|
);
|
|
Ok(true)
|
|
}
|
|
AppCommandView::Compact => {
|
|
app_server.thread_compact_start(thread_id).await?;
|
|
Ok(true)
|
|
}
|
|
AppCommandView::SetThreadName { name } => {
|
|
app_server
|
|
.thread_set_name(thread_id, name.to_string())
|
|
.await?;
|
|
Ok(true)
|
|
}
|
|
AppCommandView::ThreadRollback { num_turns } => {
|
|
let response = match app_server.thread_rollback(thread_id, num_turns).await {
|
|
Ok(response) => response,
|
|
Err(err) => {
|
|
self.handle_backtrack_rollback_failed();
|
|
return Err(err);
|
|
}
|
|
};
|
|
self.handle_thread_rollback_response(thread_id, num_turns, &response)
|
|
.await;
|
|
Ok(true)
|
|
}
|
|
AppCommandView::Review { review_request } => {
|
|
app_server
|
|
.review_start(thread_id, review_request.clone())
|
|
.await?;
|
|
Ok(true)
|
|
}
|
|
AppCommandView::CleanBackgroundTerminals => {
|
|
app_server
|
|
.thread_background_terminals_clean(thread_id)
|
|
.await?;
|
|
Ok(true)
|
|
}
|
|
AppCommandView::RealtimeConversationStart(params) => {
|
|
app_server
|
|
.thread_realtime_start(thread_id, params.clone())
|
|
.await?;
|
|
Ok(true)
|
|
}
|
|
AppCommandView::RealtimeConversationAudio(params) => {
|
|
app_server
|
|
.thread_realtime_audio(thread_id, params.clone())
|
|
.await?;
|
|
Ok(true)
|
|
}
|
|
AppCommandView::RealtimeConversationText(params) => {
|
|
app_server
|
|
.thread_realtime_text(thread_id, params.clone())
|
|
.await?;
|
|
Ok(true)
|
|
}
|
|
AppCommandView::RealtimeConversationClose => {
|
|
app_server.thread_realtime_stop(thread_id).await?;
|
|
Ok(true)
|
|
}
|
|
AppCommandView::RunUserShellCommand { command } => {
|
|
app_server
|
|
.thread_shell_command(thread_id, command.to_string())
|
|
.await?;
|
|
Ok(true)
|
|
}
|
|
AppCommandView::ReloadUserConfig => {
|
|
app_server.reload_user_config().await?;
|
|
self.refresh_in_memory_config_from_disk().await?;
|
|
Ok(true)
|
|
}
|
|
AppCommandView::OverrideTurnContext { .. } => Ok(true),
|
|
AppCommandView::Other(Op::ApproveGuardianDeniedAction { event }) => {
|
|
app_server
|
|
.thread_approve_guardian_denied_action(thread_id, event)
|
|
.await?;
|
|
Ok(true)
|
|
}
|
|
_ => Ok(false),
|
|
}
|
|
}
|
|
|
|
pub(super) fn handle_skills_list_result(
|
|
&mut self,
|
|
result: Result<SkillsListResponse>,
|
|
failure_message: &str,
|
|
) {
|
|
match result {
|
|
Ok(response) => self.handle_skills_list_response(response),
|
|
Err(err) => {
|
|
tracing::warn!("{failure_message}: {err:#}");
|
|
self.chat_widget
|
|
.add_error_message(format!("{failure_message}: {err:#}"));
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(super) async fn try_resolve_app_server_request(
|
|
&mut self,
|
|
app_server: &AppServerSession,
|
|
thread_id: ThreadId,
|
|
op: &AppCommand,
|
|
) -> Result<bool> {
|
|
let Some(resolution) = self
|
|
.pending_app_server_requests
|
|
.take_resolution(op)
|
|
.map_err(|err| color_eyre::eyre::eyre!(err))?
|
|
else {
|
|
return Ok(false);
|
|
};
|
|
|
|
match app_server
|
|
.resolve_server_request(resolution.request_id, resolution.result)
|
|
.await
|
|
{
|
|
Ok(()) => {
|
|
if ThreadEventStore::op_can_change_pending_replay_state(op) {
|
|
self.note_thread_outbound_op(thread_id, op).await;
|
|
self.refresh_pending_thread_approvals().await;
|
|
self.refresh_side_parent_status_from_store(thread_id).await;
|
|
}
|
|
Ok(true)
|
|
}
|
|
Err(err) => {
|
|
self.chat_widget.add_error_message(format!(
|
|
"Failed to resolve app-server request for thread {thread_id}: {err}"
|
|
));
|
|
Ok(false)
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(super) async fn refresh_pending_thread_approvals(&mut self) {
|
|
let side_parent_thread_id = self.active_side_parent_thread_id();
|
|
let channels: Vec<(ThreadId, Arc<Mutex<ThreadEventStore>>)> = self
|
|
.thread_event_channels
|
|
.iter()
|
|
.map(|(thread_id, channel)| (*thread_id, Arc::clone(&channel.store)))
|
|
.collect();
|
|
|
|
let mut pending_thread_ids = Vec::new();
|
|
for (thread_id, store) in channels {
|
|
if Some(thread_id) == self.active_thread_id || Some(thread_id) == side_parent_thread_id
|
|
{
|
|
continue;
|
|
}
|
|
|
|
let store = store.lock().await;
|
|
if store.has_pending_thread_approvals() {
|
|
pending_thread_ids.push(thread_id);
|
|
}
|
|
}
|
|
|
|
pending_thread_ids.sort_by_key(ThreadId::to_string);
|
|
|
|
let threads = pending_thread_ids
|
|
.into_iter()
|
|
.map(|thread_id| self.thread_label(thread_id))
|
|
.collect();
|
|
|
|
self.chat_widget.set_pending_thread_approvals(threads);
|
|
}
|
|
|
|
pub(super) async fn refresh_side_parent_status_from_store(&mut self, thread_id: ThreadId) {
|
|
let Some(channel) = self.thread_event_channels.get(&thread_id) else {
|
|
return;
|
|
};
|
|
let status = {
|
|
let store = channel.store.lock().await;
|
|
store.side_parent_pending_status()
|
|
};
|
|
if let Some(status) = status {
|
|
self.set_side_parent_status(thread_id, Some(status));
|
|
} else {
|
|
self.clear_side_parent_action_status(thread_id);
|
|
}
|
|
}
|
|
|
|
pub(super) async fn enqueue_thread_notification(
|
|
&mut self,
|
|
thread_id: ThreadId,
|
|
notification: ServerNotification,
|
|
) -> Result<()> {
|
|
let inferred_session = self
|
|
.infer_session_for_thread_notification(thread_id, ¬ification)
|
|
.await;
|
|
let (sender, store) = {
|
|
let channel = self.ensure_thread_channel(thread_id);
|
|
(channel.sender.clone(), Arc::clone(&channel.store))
|
|
};
|
|
|
|
let (should_send, pending_status) = {
|
|
let mut guard = store.lock().await;
|
|
if guard.session.is_none()
|
|
&& let Some(session) = inferred_session
|
|
{
|
|
guard.session = Some(session);
|
|
}
|
|
guard.push_notification(notification.clone());
|
|
(guard.active, guard.side_parent_pending_status())
|
|
};
|
|
let notification_status_change = SideParentStatusChange::for_notification(¬ification);
|
|
|
|
if should_send {
|
|
match sender.try_send(ThreadBufferedEvent::Notification(notification)) {
|
|
Ok(()) => {}
|
|
Err(TrySendError::Full(event)) => {
|
|
tokio::spawn(async move {
|
|
if let Err(err) = sender.send(event).await {
|
|
tracing::warn!("thread {thread_id} event channel closed: {err}");
|
|
}
|
|
});
|
|
}
|
|
Err(TrySendError::Closed(_)) => {
|
|
tracing::warn!("thread {thread_id} event channel closed");
|
|
}
|
|
}
|
|
}
|
|
if let Some(status) = pending_status {
|
|
self.set_side_parent_status(thread_id, Some(status));
|
|
} else if let Some(change) = notification_status_change {
|
|
self.apply_side_parent_status_change(thread_id, change);
|
|
}
|
|
self.refresh_pending_thread_approvals().await;
|
|
Ok(())
|
|
}
|
|
|
|
/// Eagerly fetches nickname and role for receiver threads referenced by a collab notification.
|
|
///
|
|
/// This runs on every buffered thread notification before it reaches rendering. For each
|
|
/// receiver thread id that the navigation cache does not yet have metadata for, it issues a
|
|
/// `thread/read` RPC and registers the result in both `AgentNavigationState` and the
|
|
/// `ChatWidget` metadata map. Threads that already have a nickname or role cached are skipped,
|
|
/// so the cost is at most one RPC per thread over the lifetime of a session.
|
|
///
|
|
/// Failures are logged and silently ignored -- the worst outcome is that a rendered item shows
|
|
/// a thread id instead of a human-readable name, which is the same behavior the TUI had before
|
|
/// this change.
|
|
pub(super) async fn hydrate_collab_agent_metadata_for_notification(
|
|
&mut self,
|
|
app_server: &mut AppServerSession,
|
|
notification: &ServerNotification,
|
|
) {
|
|
let Some(receiver_thread_ids) = collab_receiver_thread_ids(notification) else {
|
|
return;
|
|
};
|
|
|
|
for receiver_thread_id in receiver_thread_ids {
|
|
let Ok(thread_id) = ThreadId::from_string(receiver_thread_id) else {
|
|
tracing::warn!(
|
|
thread_id = receiver_thread_id,
|
|
"ignoring collab receiver with invalid thread id during metadata hydration"
|
|
);
|
|
continue;
|
|
};
|
|
|
|
if self
|
|
.agent_navigation
|
|
.get(&thread_id)
|
|
.is_some_and(|entry| entry.agent_nickname.is_some() || entry.agent_role.is_some())
|
|
{
|
|
continue;
|
|
}
|
|
|
|
match app_server
|
|
.thread_read(thread_id, /*include_turns*/ false)
|
|
.await
|
|
{
|
|
Ok(thread) => {
|
|
self.upsert_agent_picker_thread(
|
|
thread_id,
|
|
thread.agent_nickname,
|
|
thread.agent_role,
|
|
/*is_closed*/ false,
|
|
);
|
|
}
|
|
Err(err) => {
|
|
tracing::warn!(
|
|
thread_id = %thread_id,
|
|
error = %err,
|
|
"failed to hydrate collab receiver thread metadata"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(super) async fn infer_session_for_thread_notification(
|
|
&mut self,
|
|
thread_id: ThreadId,
|
|
notification: &ServerNotification,
|
|
) -> Option<ThreadSessionState> {
|
|
let ServerNotification::ThreadStarted(notification) = notification else {
|
|
return None;
|
|
};
|
|
let mut session = self.primary_session_configured.clone()?;
|
|
session.thread_id = thread_id;
|
|
session.thread_name = notification.thread.name.clone();
|
|
session.model_provider_id = notification.thread.model_provider.clone();
|
|
session.cwd = notification.thread.cwd.clone();
|
|
let rollout_path = notification.thread.path.clone();
|
|
if let Some(model) =
|
|
read_session_model(&self.config, thread_id, rollout_path.as_deref()).await
|
|
{
|
|
session.model = model;
|
|
} else if rollout_path.is_some() {
|
|
session.model.clear();
|
|
}
|
|
session.history_log_id = 0;
|
|
session.history_entry_count = 0;
|
|
session.rollout_path = rollout_path;
|
|
self.upsert_agent_picker_thread(
|
|
thread_id,
|
|
notification.thread.agent_nickname.clone(),
|
|
notification.thread.agent_role.clone(),
|
|
/*is_closed*/ false,
|
|
);
|
|
Some(session)
|
|
}
|
|
|
|
pub(super) async fn enqueue_thread_request(
|
|
&mut self,
|
|
thread_id: ThreadId,
|
|
request: ServerRequest,
|
|
) -> Result<()> {
|
|
let inactive_interactive_request = if self.active_thread_id != Some(thread_id) {
|
|
self.interactive_request_for_thread_request(thread_id, &request)
|
|
.await
|
|
} else {
|
|
None
|
|
};
|
|
let (sender, store) = {
|
|
let channel = self.ensure_thread_channel(thread_id);
|
|
(channel.sender.clone(), Arc::clone(&channel.store))
|
|
};
|
|
|
|
let (should_send, pending_status) = {
|
|
let mut guard = store.lock().await;
|
|
guard.push_request(request.clone());
|
|
(guard.active, guard.side_parent_pending_status())
|
|
};
|
|
let request_status = SideParentStatus::for_request(&request);
|
|
|
|
if should_send {
|
|
match sender.try_send(ThreadBufferedEvent::Request(request)) {
|
|
Ok(()) => {}
|
|
Err(TrySendError::Full(event)) => {
|
|
tokio::spawn(async move {
|
|
if let Err(err) = sender.send(event).await {
|
|
tracing::warn!("thread {thread_id} event channel closed: {err}");
|
|
}
|
|
});
|
|
}
|
|
Err(TrySendError::Closed(_)) => {
|
|
tracing::warn!("thread {thread_id} event channel closed");
|
|
}
|
|
}
|
|
} else if self.active_side_parent_thread_id().is_none()
|
|
&& let Some(request) = inactive_interactive_request
|
|
{
|
|
self.push_thread_interactive_request(request);
|
|
}
|
|
if let Some(status) = pending_status.or(request_status) {
|
|
self.set_side_parent_status(thread_id, Some(status));
|
|
}
|
|
self.refresh_pending_thread_approvals().await;
|
|
Ok(())
|
|
}
|
|
|
|
pub(super) async fn enqueue_thread_history_entry_response(
|
|
&mut self,
|
|
thread_id: ThreadId,
|
|
event: GetHistoryEntryResponseEvent,
|
|
) -> Result<()> {
|
|
let (sender, store) = {
|
|
let channel = self.ensure_thread_channel(thread_id);
|
|
(channel.sender.clone(), Arc::clone(&channel.store))
|
|
};
|
|
|
|
let should_send = {
|
|
let mut guard = store.lock().await;
|
|
guard
|
|
.buffer
|
|
.push_back(ThreadBufferedEvent::HistoryEntryResponse(event.clone()));
|
|
if guard.buffer.len() > guard.capacity
|
|
&& let Some(removed) = guard.buffer.pop_front()
|
|
&& let ThreadBufferedEvent::Request(request) = &removed
|
|
{
|
|
guard
|
|
.pending_interactive_replay
|
|
.note_evicted_server_request(request);
|
|
}
|
|
guard.active
|
|
};
|
|
|
|
if should_send {
|
|
match sender.try_send(ThreadBufferedEvent::HistoryEntryResponse(event)) {
|
|
Ok(()) => {}
|
|
Err(TrySendError::Full(event)) => {
|
|
tokio::spawn(async move {
|
|
if let Err(err) = sender.send(event).await {
|
|
tracing::warn!("thread {thread_id} event channel closed: {err}");
|
|
}
|
|
});
|
|
}
|
|
Err(TrySendError::Closed(_)) => {
|
|
tracing::warn!("thread {thread_id} event channel closed");
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub(super) async fn enqueue_primary_thread_session(
|
|
&mut self,
|
|
session: ThreadSessionState,
|
|
turns: Vec<Turn>,
|
|
) -> Result<()> {
|
|
let thread_id = session.thread_id;
|
|
self.primary_thread_id = Some(thread_id);
|
|
self.primary_session_configured = Some(session.clone());
|
|
self.upsert_agent_picker_thread(
|
|
thread_id, /*agent_nickname*/ None, /*agent_role*/ None,
|
|
/*is_closed*/ false,
|
|
);
|
|
let channel = self.ensure_thread_channel(thread_id);
|
|
{
|
|
let mut store = channel.store.lock().await;
|
|
store.set_session(session.clone(), turns.clone());
|
|
}
|
|
self.activate_thread_channel(thread_id).await;
|
|
self.chat_widget
|
|
.set_initial_user_message_submit_suppressed(/*suppressed*/ true);
|
|
self.chat_widget.handle_thread_session(session);
|
|
let should_buffer_initial_replay =
|
|
self.terminal_resize_reflow_enabled() && !turns.is_empty();
|
|
if should_buffer_initial_replay {
|
|
self.app_event_tx
|
|
.send(AppEvent::BeginInitialHistoryReplayBuffer);
|
|
}
|
|
self.chat_widget
|
|
.replay_thread_turns(turns, ReplayKind::ResumeInitialMessages);
|
|
if should_buffer_initial_replay {
|
|
self.app_event_tx
|
|
.send(AppEvent::EndInitialHistoryReplayBuffer);
|
|
}
|
|
let pending = std::mem::take(&mut self.pending_primary_events);
|
|
for pending_event in pending {
|
|
match pending_event {
|
|
ThreadBufferedEvent::Notification(notification) => {
|
|
self.enqueue_thread_notification(thread_id, notification)
|
|
.await?;
|
|
}
|
|
ThreadBufferedEvent::Request(request) => {
|
|
self.enqueue_thread_request(thread_id, request).await?;
|
|
}
|
|
ThreadBufferedEvent::HistoryEntryResponse(event) => {
|
|
self.enqueue_thread_history_entry_response(thread_id, event)
|
|
.await?;
|
|
}
|
|
ThreadBufferedEvent::FeedbackSubmission(event) => {
|
|
self.enqueue_thread_feedback_event(thread_id, event).await;
|
|
}
|
|
}
|
|
}
|
|
self.chat_widget
|
|
.set_initial_user_message_submit_suppressed(/*suppressed*/ false);
|
|
self.chat_widget.submit_initial_user_message_if_pending();
|
|
Ok(())
|
|
}
|
|
|
|
pub(super) async fn enqueue_primary_thread_notification(
|
|
&mut self,
|
|
notification: ServerNotification,
|
|
) -> Result<()> {
|
|
if let Some(thread_id) = self.primary_thread_id {
|
|
return self
|
|
.enqueue_thread_notification(thread_id, notification)
|
|
.await;
|
|
}
|
|
self.pending_primary_events
|
|
.push_back(ThreadBufferedEvent::Notification(notification));
|
|
Ok(())
|
|
}
|
|
|
|
pub(super) async fn enqueue_primary_thread_request(
|
|
&mut self,
|
|
request: ServerRequest,
|
|
) -> Result<()> {
|
|
if let Some(thread_id) = self.primary_thread_id {
|
|
return self.enqueue_thread_request(thread_id, request).await;
|
|
}
|
|
self.pending_primary_events
|
|
.push_back(ThreadBufferedEvent::Request(request));
|
|
Ok(())
|
|
}
|
|
|
|
pub(super) async fn refresh_snapshot_session_if_needed(
|
|
&mut self,
|
|
app_server: &mut AppServerSession,
|
|
thread_id: ThreadId,
|
|
is_replay_only: bool,
|
|
snapshot: &mut ThreadEventSnapshot,
|
|
) {
|
|
if !self.should_refresh_snapshot_session(thread_id, is_replay_only, snapshot) {
|
|
return;
|
|
}
|
|
|
|
match app_server
|
|
.resume_thread(self.config.clone(), thread_id)
|
|
.await
|
|
{
|
|
Ok(started) => {
|
|
self.apply_refreshed_snapshot_thread(thread_id, started, snapshot)
|
|
.await
|
|
}
|
|
Err(err) => {
|
|
tracing::warn!(
|
|
thread_id = %thread_id,
|
|
error = %err,
|
|
"failed to refresh inferred thread session before replay"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(super) fn should_refresh_snapshot_session(
|
|
&self,
|
|
thread_id: ThreadId,
|
|
is_replay_only: bool,
|
|
snapshot: &ThreadEventSnapshot,
|
|
) -> bool {
|
|
!is_replay_only
|
|
&& !self.side_threads.contains_key(&thread_id)
|
|
&& snapshot.session.as_ref().is_none_or(|session| {
|
|
session.model.trim().is_empty() || session.rollout_path.is_none()
|
|
})
|
|
}
|
|
|
|
pub(super) async fn apply_refreshed_snapshot_thread(
|
|
&mut self,
|
|
thread_id: ThreadId,
|
|
started: AppServerStartedThread,
|
|
snapshot: &mut ThreadEventSnapshot,
|
|
) {
|
|
let AppServerStartedThread { session, turns } = started;
|
|
if let Some(channel) = self.thread_event_channels.get(&thread_id) {
|
|
let mut store = channel.store.lock().await;
|
|
store.set_session(session.clone(), turns.clone());
|
|
store.rebase_buffer_after_session_refresh();
|
|
}
|
|
snapshot.session = Some(session);
|
|
snapshot.turns = turns;
|
|
snapshot
|
|
.events
|
|
.retain(ThreadEventStore::event_survives_session_refresh);
|
|
}
|
|
|
|
/// Opens the `/agent` picker after refreshing cached labels for known threads.
|
|
///
|
|
/// The picker state is derived from long-lived thread channels plus best-effort metadata
|
|
/// refreshes from the backend. Refresh failures are treated as "thread is only inspectable by
|
|
/// historical id now" and converted into closed picker entries instead of deleting them, so
|
|
/// the stable traversal order remains intact for review and keyboard navigation.
|
|
pub(super) async fn drain_active_thread_events(&mut self, tui: &mut tui::Tui) -> Result<()> {
|
|
let Some(mut rx) = self.active_thread_rx.take() else {
|
|
return Ok(());
|
|
};
|
|
|
|
let mut disconnected = false;
|
|
loop {
|
|
match rx.try_recv() {
|
|
Ok(event) => self.handle_thread_event_now(event),
|
|
Err(TryRecvError::Empty) => break,
|
|
Err(TryRecvError::Disconnected) => {
|
|
disconnected = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if !disconnected {
|
|
self.active_thread_rx = Some(rx);
|
|
} else {
|
|
self.clear_active_thread().await;
|
|
}
|
|
|
|
if self.backtrack_render_pending {
|
|
tui.frame_requester().schedule_frame();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Returns `(closed_thread_id, primary_thread_id)` when a non-primary active
|
|
/// thread has died and we should fail over to the primary thread.
|
|
///
|
|
/// A user-requested shutdown (`ExitMode::ShutdownFirst`) sets
|
|
/// `pending_shutdown_exit_thread_id`; matching shutdown completions are ignored
|
|
/// here so Ctrl+C-like exits don't accidentally resurrect the main thread.
|
|
///
|
|
/// Failover is only eligible when all of these are true:
|
|
/// 1. the event is `thread/closed`;
|
|
/// 2. the active thread differs from the primary thread;
|
|
/// 3. the active thread is not the pending shutdown-exit thread.
|
|
pub(super) fn active_non_primary_shutdown_target(
|
|
&self,
|
|
notification: &ServerNotification,
|
|
) -> Option<(ThreadId, ThreadId)> {
|
|
if !matches!(notification, ServerNotification::ThreadClosed(_)) {
|
|
return None;
|
|
}
|
|
let active_thread_id = self.active_thread_id?;
|
|
let primary_thread_id = self.primary_thread_id?;
|
|
if self.pending_shutdown_exit_thread_id == Some(active_thread_id) {
|
|
return None;
|
|
}
|
|
(active_thread_id != primary_thread_id).then_some((active_thread_id, primary_thread_id))
|
|
}
|
|
|
|
pub(super) fn replay_thread_snapshot(
|
|
&mut self,
|
|
snapshot: ThreadEventSnapshot,
|
|
resume_restored_queue: bool,
|
|
) {
|
|
let suppress_replay_notices =
|
|
replay_filter::snapshot_has_pending_interactive_request(&snapshot);
|
|
if let Some(session) = snapshot.session {
|
|
if self.side_threads.contains_key(&session.thread_id) {
|
|
self.chat_widget.handle_side_thread_session(session);
|
|
} else if suppress_replay_notices {
|
|
self.chat_widget.handle_thread_session_quiet(session);
|
|
} else {
|
|
self.chat_widget.handle_thread_session(session);
|
|
}
|
|
}
|
|
self.chat_widget
|
|
.set_queue_autosend_suppressed(/*suppressed*/ true);
|
|
self.chat_widget
|
|
.restore_thread_input_state(snapshot.input_state);
|
|
if !snapshot.turns.is_empty() {
|
|
self.chat_widget
|
|
.replay_thread_turns(snapshot.turns, ReplayKind::ThreadSnapshot);
|
|
}
|
|
for event in snapshot.events {
|
|
if suppress_replay_notices && replay_filter::event_is_notice(&event) {
|
|
continue;
|
|
}
|
|
self.handle_thread_event_replay(event);
|
|
}
|
|
self.chat_widget
|
|
.set_queue_autosend_suppressed(/*suppressed*/ false);
|
|
self.chat_widget
|
|
.set_initial_user_message_submit_suppressed(/*suppressed*/ false);
|
|
self.chat_widget.submit_initial_user_message_if_pending();
|
|
if resume_restored_queue {
|
|
self.chat_widget.maybe_send_next_queued_input();
|
|
}
|
|
self.refresh_status_line();
|
|
}
|
|
|
|
pub(super) fn should_wait_for_initial_session(session_selection: &SessionSelection) -> bool {
|
|
matches!(
|
|
session_selection,
|
|
SessionSelection::StartFresh | SessionSelection::Exit
|
|
)
|
|
}
|
|
|
|
pub(super) fn should_handle_active_thread_events(
|
|
waiting_for_initial_session_configured: bool,
|
|
has_active_thread_receiver: bool,
|
|
) -> bool {
|
|
has_active_thread_receiver && !waiting_for_initial_session_configured
|
|
}
|
|
|
|
pub(super) fn should_stop_waiting_for_initial_session(
|
|
waiting_for_initial_session_configured: bool,
|
|
primary_thread_id: Option<ThreadId>,
|
|
) -> bool {
|
|
waiting_for_initial_session_configured && primary_thread_id.is_some()
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub(super) fn handle_skills_list_response(&mut self, response: SkillsListResponse) {
|
|
let response = list_skills_response_to_core(response);
|
|
let cwd = self.chat_widget.config_ref().cwd.clone();
|
|
let errors = errors_for_cwd(&cwd, &response);
|
|
emit_skill_load_warnings(&self.app_event_tx, &errors);
|
|
self.chat_widget.handle_skills_list_response(response);
|
|
}
|
|
|
|
pub(super) async fn handle_thread_rollback_response(
|
|
&mut self,
|
|
thread_id: ThreadId,
|
|
num_turns: u32,
|
|
response: &ThreadRollbackResponse,
|
|
) {
|
|
if let Some(channel) = self.thread_event_channels.get(&thread_id) {
|
|
let mut store = channel.store.lock().await;
|
|
store.apply_thread_rollback(response);
|
|
}
|
|
if self.active_thread_id == Some(thread_id)
|
|
&& let Some(mut rx) = self.active_thread_rx.take()
|
|
{
|
|
let mut disconnected = false;
|
|
loop {
|
|
match rx.try_recv() {
|
|
Ok(_) => {}
|
|
Err(TryRecvError::Empty) => break,
|
|
Err(TryRecvError::Disconnected) => {
|
|
disconnected = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if !disconnected {
|
|
self.active_thread_rx = Some(rx);
|
|
} else {
|
|
self.clear_active_thread().await;
|
|
}
|
|
}
|
|
self.handle_backtrack_rollback_succeeded(num_turns);
|
|
}
|
|
|
|
pub(super) fn handle_thread_event_now(&mut self, event: ThreadBufferedEvent) {
|
|
let needs_refresh = matches!(
|
|
&event,
|
|
ThreadBufferedEvent::Notification(ServerNotification::TurnStarted(_))
|
|
| ThreadBufferedEvent::Notification(ServerNotification::ThreadTokenUsageUpdated(_))
|
|
);
|
|
match event {
|
|
ThreadBufferedEvent::Notification(notification) => {
|
|
self.chat_widget
|
|
.handle_server_notification(notification, /*replay_kind*/ None);
|
|
}
|
|
ThreadBufferedEvent::Request(request) => {
|
|
if self
|
|
.pending_app_server_requests
|
|
.contains_server_request(&request)
|
|
{
|
|
self.chat_widget
|
|
.handle_server_request(request, /*replay_kind*/ None);
|
|
}
|
|
}
|
|
ThreadBufferedEvent::HistoryEntryResponse(event) => {
|
|
self.chat_widget.handle_history_entry_response(event);
|
|
}
|
|
ThreadBufferedEvent::FeedbackSubmission(event) => {
|
|
self.handle_feedback_thread_event(event);
|
|
}
|
|
}
|
|
if needs_refresh {
|
|
self.refresh_status_line();
|
|
}
|
|
}
|
|
|
|
pub(super) fn handle_thread_event_replay(&mut self, event: ThreadBufferedEvent) {
|
|
match event {
|
|
ThreadBufferedEvent::Notification(notification) => self
|
|
.chat_widget
|
|
.handle_server_notification(notification, Some(ReplayKind::ThreadSnapshot)),
|
|
ThreadBufferedEvent::Request(request) => self
|
|
.chat_widget
|
|
.handle_server_request(request, Some(ReplayKind::ThreadSnapshot)),
|
|
ThreadBufferedEvent::HistoryEntryResponse(event) => {
|
|
self.chat_widget.handle_history_entry_response(event)
|
|
}
|
|
ThreadBufferedEvent::FeedbackSubmission(event) => {
|
|
self.handle_feedback_thread_event(event);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Handles an event emitted by the currently active thread.
|
|
///
|
|
/// This function enforces shutdown intent routing: unexpected non-primary
|
|
/// thread shutdowns fail over to the primary thread, while user-requested
|
|
/// app exits consume only the tracked shutdown completion and then proceed.
|
|
pub(super) async fn handle_active_thread_event(
|
|
&mut self,
|
|
tui: &mut tui::Tui,
|
|
app_server: &mut AppServerSession,
|
|
event: ThreadBufferedEvent,
|
|
) -> Result<()> {
|
|
// Capture this before any potential thread switch: we only want to clear
|
|
// the exit marker when the currently active thread acknowledges shutdown.
|
|
let pending_shutdown_exit_completed = matches!(
|
|
&event,
|
|
ThreadBufferedEvent::Notification(ServerNotification::ThreadClosed(_))
|
|
) && self.pending_shutdown_exit_thread_id
|
|
== self.active_thread_id;
|
|
|
|
// Processing order matters:
|
|
//
|
|
// 1. handle unexpected non-primary shutdown failover first;
|
|
// 2. clear pending exit marker for matching shutdown;
|
|
// 3. forward the event through normal handling.
|
|
//
|
|
// This preserves the mental model that user-requested exits do not trigger
|
|
// failover, while true sub-agent deaths still do.
|
|
if let ThreadBufferedEvent::Notification(notification) = &event
|
|
&& let Some((closed_thread_id, primary_thread_id)) =
|
|
self.active_non_primary_shutdown_target(notification)
|
|
{
|
|
self.mark_agent_picker_thread_closed(closed_thread_id);
|
|
if self.side_threads.contains_key(&closed_thread_id) {
|
|
self.discard_closed_side_thread(closed_thread_id).await;
|
|
self.select_agent_thread(tui, app_server, primary_thread_id)
|
|
.await?;
|
|
} else {
|
|
self.select_agent_thread_and_discard_side(tui, app_server, primary_thread_id)
|
|
.await?;
|
|
}
|
|
if self.active_thread_id == Some(primary_thread_id) {
|
|
self.chat_widget.add_info_message(
|
|
format!(
|
|
"Agent thread {closed_thread_id} closed. Switched back to main thread."
|
|
),
|
|
/*hint*/ None,
|
|
);
|
|
} else {
|
|
self.clear_active_thread().await;
|
|
self.chat_widget.add_error_message(format!(
|
|
"Agent thread {closed_thread_id} closed. Failed to switch back to main thread {primary_thread_id}.",
|
|
));
|
|
}
|
|
return Ok(());
|
|
}
|
|
|
|
if pending_shutdown_exit_completed {
|
|
// Clear only after seeing the shutdown completion for the tracked
|
|
// thread, so unrelated shutdowns cannot consume this marker.
|
|
self.pending_shutdown_exit_thread_id = None;
|
|
}
|
|
if let ThreadBufferedEvent::Notification(notification) = &event {
|
|
self.hydrate_collab_agent_metadata_for_notification(app_server, notification)
|
|
.await;
|
|
}
|
|
|
|
self.handle_thread_event_now(event);
|
|
if self.backtrack_render_pending {
|
|
tui.frame_requester().schedule_frame();
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn runtime_permission_profile_for_turn_start(
|
|
runtime_sandbox_policy_override: Option<&SandboxPolicy>,
|
|
sandbox_policy: &SandboxPolicy,
|
|
) -> Option<codex_protocol::models::PermissionProfile> {
|
|
runtime_sandbox_policy_override?;
|
|
match sandbox_policy {
|
|
SandboxPolicy::ExternalSandbox { .. } => None,
|
|
SandboxPolicy::ReadOnly { .. }
|
|
| SandboxPolicy::WorkspaceWrite { .. }
|
|
| SandboxPolicy::DangerFullAccess => Some(
|
|
codex_protocol::models::PermissionProfile::from_legacy_sandbox_policy(sandbox_policy),
|
|
),
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use codex_protocol::protocol::SandboxPolicy;
|
|
use pretty_assertions::assert_eq;
|
|
|
|
#[test]
|
|
fn runtime_permission_profile_for_turn_start_only_when_sandbox_was_overridden() {
|
|
let sandbox_policy = SandboxPolicy::DangerFullAccess;
|
|
|
|
assert_eq!(
|
|
runtime_permission_profile_for_turn_start(
|
|
/*runtime_sandbox_policy_override*/ None,
|
|
&sandbox_policy,
|
|
),
|
|
None
|
|
);
|
|
|
|
let profile =
|
|
runtime_permission_profile_for_turn_start(Some(&sandbox_policy), &sandbox_policy)
|
|
.expect("runtime sandbox override should send active permissions");
|
|
|
|
assert_eq!(
|
|
profile,
|
|
codex_protocol::models::PermissionProfile::from_legacy_sandbox_policy(&sandbox_policy)
|
|
);
|
|
}
|
|
}
|