diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index d3c6c90a76..250987dbb8 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -18,9 +18,11 @@ use crate::app_server_session::app_server_rate_limit_snapshot_to_core; use crate::app_server_session::status_account_display_from_auth_mode; use crate::local_chatgpt_auth::load_local_chatgpt_auth; use codex_app_server_client::AppServerEvent; +use codex_app_server_protocol::AuthMode; use codex_app_server_protocol::ChatgptAuthTokensRefreshParams; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::Thread; @@ -86,69 +88,13 @@ impl App { "app-server event consumer lagged; dropping ignored events" ); } - AppServerEvent::ServerNotification(notification) => match notification { - ServerNotification::ServerRequestResolved(notification) => { - self.pending_app_server_requests - .resolve_notification(¬ification.request_id); - } - ServerNotification::AccountRateLimitsUpdated(notification) => { - self.chat_widget.on_rate_limit_snapshot(Some( - app_server_rate_limit_snapshot_to_core(notification.rate_limits), - )); - } - ServerNotification::AccountUpdated(notification) => { - self.chat_widget.update_account_state( - status_account_display_from_auth_mode( - notification.auth_mode, - notification.plan_type, - ), - notification.plan_type, - matches!( - notification.auth_mode, - Some(codex_app_server_protocol::AuthMode::Chatgpt) - | Some(codex_app_server_protocol::AuthMode::ChatgptAuthTokens) - ), - ); - } - notification => { - if !app_server_client.is_remote() - && matches!( - notification, - ServerNotification::TurnCompleted(_) - | ServerNotification::ThreadRealtimeItemAdded(_) - | ServerNotification::ThreadRealtimeOutputAudioDelta(_) - | ServerNotification::ThreadRealtimeError(_) - ) - { - return; - } - if let Some((thread_id, events)) = - server_notification_thread_events(notification) - { - for event in events { - if self.primary_thread_id.is_none() - || matches!(event.msg, EventMsg::SessionConfigured(_)) - && self.primary_thread_id == Some(thread_id) - { - if let Err(err) = self.enqueue_primary_event(event).await { - tracing::warn!( - "failed to enqueue primary app-server server notification: {err}" - ); - } - } else if let Err(err) = - self.enqueue_thread_event(thread_id, event).await - { - tracing::warn!( - "failed to enqueue app-server server notification for {thread_id}: {err}" - ); - } - } - } - } - }, + AppServerEvent::ServerNotification(notification) => { + self.handle_server_notification_event(app_server_client, notification) + .await; + } AppServerEvent::LegacyNotification(notification) => { if let Some((thread_id, legacy_notification)) = - legacy_thread_notification(notification.clone()) + legacy_thread_notification(notification) { let result = match legacy_notification { LegacyThreadNotification::Warning(message) => { @@ -174,23 +120,8 @@ impl App { if let Err(err) = result { tracing::warn!("failed to enqueue app-server legacy notification: {err}"); } - } else if let Some((thread_id, event)) = legacy_thread_event(notification.params) { - self.pending_app_server_requests.note_legacy_event(&event); - if legacy_event_is_shadowed_by_server_notification(&event.msg) { - return; - } - if self.primary_thread_id.is_none() - || matches!(event.msg, EventMsg::SessionConfigured(_)) - && self.primary_thread_id == Some(thread_id) - { - if let Err(err) = self.enqueue_primary_event(event).await { - tracing::warn!("failed to enqueue primary app-server event: {err}"); - } - } else if let Err(err) = self.enqueue_thread_event(thread_id, event).await { - tracing::warn!( - "failed to enqueue app-server thread event for {thread_id}: {err}" - ); - } + } else { + tracing::debug!("ignoring legacy app-server notification in tui_app_server"); } } AppServerEvent::ServerRequest(request) => { @@ -203,28 +134,8 @@ impl App { .await; return; } - if let Some(unsupported) = self - .pending_app_server_requests - .note_server_request(&request) - { - tracing::warn!( - request_id = ?unsupported.request_id, - message = unsupported.message, - "rejecting unsupported app-server request" - ); - self.chat_widget - .add_error_message(unsupported.message.clone()); - if let Err(err) = self - .reject_app_server_request( - app_server_client, - unsupported.request_id, - unsupported.message, - ) - .await - { - tracing::warn!("{err}"); - } - } + self.handle_server_request_event(app_server_client, request) + .await; } AppServerEvent::Disconnected { message } => { tracing::warn!("app-server event stream disconnected: {message}"); @@ -234,10 +145,118 @@ impl App { } } + async fn handle_server_notification_event( + &mut self, + _app_server_client: &AppServerSession, + notification: ServerNotification, + ) { + match ¬ification { + ServerNotification::ServerRequestResolved(notification) => { + self.pending_app_server_requests + .resolve_notification(¬ification.request_id); + } + ServerNotification::AccountRateLimitsUpdated(notification) => { + self.chat_widget.on_rate_limit_snapshot(Some( + app_server_rate_limit_snapshot_to_core(notification.rate_limits.clone()), + )); + return; + } + ServerNotification::AccountUpdated(notification) => { + self.chat_widget.update_account_state( + status_account_display_from_auth_mode( + notification.auth_mode, + notification.plan_type, + ), + notification.plan_type, + matches!( + notification.auth_mode, + Some(AuthMode::Chatgpt) | Some(AuthMode::ChatgptAuthTokens) + ), + ); + return; + } + _ => {} + } + + match server_notification_thread_target(¬ification) { + ServerNotificationThreadTarget::Thread(thread_id) => { + let result = if self.primary_thread_id == Some(thread_id) + || self.primary_thread_id.is_none() + { + self.enqueue_primary_thread_notification(notification).await + } else { + self.enqueue_thread_notification(thread_id, notification) + .await + }; + + if let Err(err) = result { + tracing::warn!("failed to enqueue app-server notification: {err}"); + } + return; + } + ServerNotificationThreadTarget::InvalidThreadId(thread_id) => { + tracing::warn!( + thread_id, + "ignoring app-server notification with invalid thread_id" + ); + return; + } + ServerNotificationThreadTarget::Global => {} + } + + self.chat_widget + .handle_server_notification(notification, /*replay_kind*/ None); + } + + async fn handle_server_request_event( + &mut self, + app_server_client: &AppServerSession, + request: ServerRequest, + ) { + if let Some(unsupported) = self + .pending_app_server_requests + .note_server_request(&request) + { + tracing::warn!( + request_id = ?unsupported.request_id, + message = unsupported.message, + "rejecting unsupported app-server request" + ); + self.chat_widget + .add_error_message(unsupported.message.clone()); + if let Err(err) = self + .reject_app_server_request( + app_server_client, + unsupported.request_id, + unsupported.message, + ) + .await + { + tracing::warn!("{err}"); + } + return; + } + + let Some(thread_id) = server_request_thread_id(&request) else { + tracing::warn!("ignoring threadless app-server request"); + return; + }; + + let result = + if self.primary_thread_id == Some(thread_id) || self.primary_thread_id.is_none() { + self.enqueue_primary_thread_request(request).await + } else { + self.enqueue_thread_request(thread_id, request).await + }; + if let Err(err) = result { + tracing::warn!("failed to enqueue app-server request: {err}"); + } + } + async fn handle_chatgpt_auth_tokens_refresh_request( &mut self, app_server_client: &AppServerSession, - request_id: codex_app_server_protocol::RequestId, + request_id: RequestId, params: ChatgptAuthTokensRefreshParams, ) { let config = self.config.clone(); @@ -318,6 +337,143 @@ impl App { } } +fn server_request_thread_id(request: &ServerRequest) -> Option { + match request { + ServerRequest::CommandExecutionRequestApproval { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::FileChangeRequestApproval { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::ToolRequestUserInput { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::McpServerElicitationRequest { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::PermissionsRequestApproval { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::DynamicToolCall { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::ChatgptAuthTokensRefresh { .. } + | ServerRequest::ApplyPatchApproval { .. } + | ServerRequest::ExecCommandApproval { .. } => None, + } +} + +#[derive(Debug, PartialEq, Eq)] +enum ServerNotificationThreadTarget { + Thread(ThreadId), + InvalidThreadId(String), + Global, +} + +fn server_notification_thread_target( + notification: &ServerNotification, +) -> ServerNotificationThreadTarget { + let thread_id = match notification { + ServerNotification::Error(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ThreadStarted(notification) => Some(notification.thread.id.as_str()), + ServerNotification::ThreadStatusChanged(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadArchived(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ThreadUnarchived(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ThreadClosed(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ThreadNameUpdated(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadTokenUsageUpdated(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::TurnStarted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::HookStarted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::TurnCompleted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::HookCompleted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::TurnDiffUpdated(notification) => Some(notification.thread_id.as_str()), + ServerNotification::TurnPlanUpdated(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ItemStarted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ItemGuardianApprovalReviewStarted(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ItemGuardianApprovalReviewCompleted(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ItemCompleted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::RawResponseItemCompleted(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::AgentMessageDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::PlanDelta(notification) => Some(notification.thread_id.as_str()), + ServerNotification::CommandExecutionOutputDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::TerminalInteraction(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::FileChangeOutputDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ServerRequestResolved(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::McpToolCallProgress(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ReasoningSummaryTextDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ReasoningSummaryPartAdded(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ReasoningTextDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ContextCompacted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ModelRerouted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ThreadRealtimeStarted(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeItemAdded(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeOutputAudioDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeError(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeClosed(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::SkillsChanged(_) + | ServerNotification::McpServerOauthLoginCompleted(_) + | ServerNotification::AccountUpdated(_) + | ServerNotification::AccountRateLimitsUpdated(_) + | ServerNotification::AppListUpdated(_) + | ServerNotification::DeprecationNotice(_) + | ServerNotification::ConfigWarning(_) + | ServerNotification::FuzzyFileSearchSessionUpdated(_) + | ServerNotification::FuzzyFileSearchSessionCompleted(_) + | ServerNotification::CommandExecOutputDelta(_) + | ServerNotification::WindowsWorldWritableWarning(_) + | ServerNotification::WindowsSandboxSetupCompleted(_) + | ServerNotification::AccountLoginCompleted(_) => None, + }; + + match thread_id { + Some(thread_id) => match ThreadId::from_string(thread_id) { + Ok(thread_id) => ServerNotificationThreadTarget::Thread(thread_id), + Err(_) => ServerNotificationThreadTarget::InvalidThreadId(thread_id.to_string()), + }, + None => ServerNotificationThreadTarget::Global, + } +} + fn resolve_chatgpt_auth_tokens_refresh_response( codex_home: &std::path::Path, auth_credentials_store_mode: codex_core::auth::AuthCredentialsStoreMode, @@ -365,22 +521,6 @@ pub(super) fn thread_snapshot_events( .collect() } -fn legacy_thread_event(params: Option) -> Option<(ThreadId, Event)> { - let Value::Object(mut params) = params? else { - return None; - }; - let thread_id = params - .remove("conversationId") - .and_then(|value| serde_json::from_value::(value).ok()) - .and_then(|value| ThreadId::from_string(&value).ok()); - let event = serde_json::from_value::(Value::Object(params)).ok()?; - let thread_id = thread_id.or(match &event.msg { - EventMsg::SessionConfigured(session) => Some(session.session_id), - _ => None, - })?; - Some((thread_id, event)) -} - fn legacy_thread_notification( notification: JSONRPCNotification, ) -> Option<(ThreadId, LegacyThreadNotification)> { @@ -423,27 +563,6 @@ fn legacy_thread_notification( } } -fn legacy_event_is_shadowed_by_server_notification(msg: &EventMsg) -> bool { - matches!( - msg, - EventMsg::TokenCount(_) - | EventMsg::Error(_) - | EventMsg::ThreadNameUpdated(_) - | EventMsg::TurnStarted(_) - | EventMsg::ItemStarted(_) - | EventMsg::ItemCompleted(_) - | EventMsg::ExecCommandBegin(_) - | EventMsg::ExecCommandOutputDelta(_) - | EventMsg::ExecCommandEnd(_) - | EventMsg::AgentMessageDelta(_) - | EventMsg::PlanDelta(_) - | EventMsg::AgentReasoningDelta(_) - | EventMsg::AgentReasoningRawContentDelta(_) - | EventMsg::RealtimeConversationStarted(_) - | EventMsg::RealtimeConversationClosed(_) - ) -} - fn server_notification_thread_events( notification: ServerNotification, ) -> Option<(ThreadId, Vec)> { @@ -777,13 +896,33 @@ fn thread_item_to_core(item: &ThreadItem) -> Option { .map(codex_app_server_protocol::UserInput::into_core) .collect(), })), - ThreadItem::AgentMessage { id, text, phase } => { - Some(TurnItem::AgentMessage(AgentMessageItem { - id: id.clone(), - content: vec![AgentMessageContent::Text { text: text.clone() }], - phase: phase.clone(), - })) - } + ThreadItem::AgentMessage { + id, + text, + phase, + memory_citation, + } => Some(TurnItem::AgentMessage(AgentMessageItem { + id: id.clone(), + content: vec![AgentMessageContent::Text { text: text.clone() }], + phase: phase.clone(), + memory_citation: memory_citation.clone().map(|citation| { + codex_protocol::memory_citation::MemoryCitation { + entries: citation + .entries + .into_iter() + .map( + |entry| codex_protocol::memory_citation::MemoryCitationEntry { + path: entry.path, + line_start: entry.line_start, + line_end: entry.line_end, + note: entry.note, + }, + ) + .collect(), + rollout_ids: citation.thread_ids, + } + }), + })), ThreadItem::Plan { id, text } => Some(TurnItem::Plan(PlanItem { id: id.clone(), text: text.clone(), @@ -1193,6 +1332,7 @@ mod tests { id: item_id, text: "Hello from your coding assistant.".to_string(), phase: Some(MessagePhase::FinalAnswer), + memory_citation: None, }, thread_id: thread_id.clone(), turn_id: turn_id.clone(), @@ -1217,7 +1357,9 @@ mod tests { ); assert_eq!(completed.turn_id, turn_id); match &completed.item { - TurnItem::AgentMessage(AgentMessageItem { id, content, phase }) => { + TurnItem::AgentMessage(AgentMessageItem { + id, content, phase, .. + }) => { assert_eq!(id, "msg_123"); let [AgentMessageContent::Text { text }] = content.as_slice() else { panic!("expected a single text content item"); @@ -1589,6 +1731,7 @@ mod tests { id: "assistant-1".to_string(), text: "hi".to_string(), phase: Some(MessagePhase::FinalAnswer), + memory_citation: None, }, ], status: TurnStatus::Completed, diff --git a/codex-rs/tui_app_server/src/chatwidget.rs b/codex-rs/tui_app_server/src/chatwidget.rs index 27d56ba116..8c7b7af3ef 100644 --- a/codex-rs/tui_app_server/src/chatwidget.rs +++ b/codex-rs/tui_app_server/src/chatwidget.rs @@ -5556,9 +5556,11 @@ impl ChatWidget { command, cwd, process_id, + source, status, command_actions, aggregated_output, + formatted_output, exit_code, duration_ms, } => { @@ -5576,10 +5578,11 @@ impl ChatWidget { .into_iter() .map(codex_app_server_protocol::CommandAction::into_core) .collect(), - source: ExecCommandSource::Agent, + source: source.to_core(), interaction_input: None, }); } else { + let aggregated_output = aggregated_output.unwrap_or_default(); self.on_exec_command_end(ExecCommandEndEvent { call_id: id, process_id, @@ -5590,16 +5593,20 @@ impl ChatWidget { .into_iter() .map(codex_app_server_protocol::CommandAction::into_core) .collect(), - source: ExecCommandSource::Agent, + source: source.to_core(), interaction_input: None, stdout: String::new(), stderr: String::new(), - aggregated_output: aggregated_output.unwrap_or_default(), + aggregated_output: aggregated_output.clone(), exit_code: exit_code.unwrap_or_default(), duration: Duration::from_millis( duration_ms.unwrap_or_default().max(0) as u64 ), - formatted_output: String::new(), + formatted_output: if formatted_output.is_empty() { + aggregated_output + } else { + formatted_output + }, status: match status { codex_app_server_protocol::CommandExecutionStatus::Completed => { codex_protocol::protocol::ExecCommandStatus::Completed @@ -5879,7 +5886,7 @@ impl ChatWidget { self.on_exec_command_output_delta(ExecCommandOutputDeltaEvent { call_id: notification.item_id, stream: codex_protocol::protocol::ExecOutputStream::Stdout, - chunk: notification.delta.into_bytes(), + chunk: notification.delta, }); } ServerNotification::FileChangeOutputDelta(notification) => { @@ -6138,6 +6145,7 @@ impl ChatWidget { command, cwd, process_id, + source, command_actions, .. } => { @@ -6151,7 +6159,7 @@ impl ChatWidget { .into_iter() .map(codex_app_server_protocol::CommandAction::into_core) .collect(), - source: ExecCommandSource::Agent, + source: source.to_core(), interaction_input: None, }); }