From c70cdc108fed58eb356fc8a0d4b742308e74bf64 Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Thu, 30 Apr 2026 10:52:19 -0700 Subject: [PATCH] Remove core protocol dependency [1/2] (#20324) ## Why This stack moves `codex-tui` away from the core protocol event surface and toward app-server API shapes plus TUI-owned local models. This first PR sets up the lower-risk foundation: it introduces the local model surface and extracts app-server event routing into focused TUI modules while preserving the existing behavior for the larger migration in PR2. This PR is part 1 of a 2-PR stack: 1. Add TUI-owned replacement models and extract app-server event routing. 2. Move the active TUI flow to app-server notifications and delete obsolete adapter code. ## What changed - Added TUI-owned approval, diff, session state, session resume, token usage, and user-message models. - Added `app/app_server_event_targets.rs` and `app/app_server_events.rs` to hold app-server event targeting and dispatch logic outside `app.rs`. - Updated app/status tests to use the local model layer and added focused routing coverage. - Boxed a few large async TUI test futures so this base layer remains checkable without overflowing the default test stack. ## Verification - `cargo check -p codex-tui --tests` --- codex-rs/tui/src/app.rs | 3 +- .../tui/src/app/app_server_event_targets.rs | 218 ++++++++++++ codex-rs/tui/src/app/app_server_events.rs | 208 ++++++++++++ codex-rs/tui/src/app/tests.rs | 303 +++++++++-------- codex-rs/tui/src/approval_events.rs | 119 +++++++ codex-rs/tui/src/chatwidget/user_messages.rs | 107 ++++++ codex-rs/tui/src/diff_model.rs | 21 ++ codex-rs/tui/src/session_resume.rs | 314 ++++++++++++++++++ codex-rs/tui/src/session_state.rs | 45 +++ codex-rs/tui/src/status/tests.rs | 1 + codex-rs/tui/src/token_usage.rs | 87 +++++ 11 files changed, 1294 insertions(+), 132 deletions(-) create mode 100644 codex-rs/tui/src/app/app_server_event_targets.rs create mode 100644 codex-rs/tui/src/app/app_server_events.rs create mode 100644 codex-rs/tui/src/approval_events.rs create mode 100644 codex-rs/tui/src/chatwidget/user_messages.rs create mode 100644 codex-rs/tui/src/diff_model.rs create mode 100644 codex-rs/tui/src/session_resume.rs create mode 100644 codex-rs/tui/src/session_state.rs create mode 100644 codex-rs/tui/src/token_usage.rs diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 86e40ac5e9..e42a4c7354 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -183,7 +183,8 @@ use tokio::task::JoinHandle; use toml::Value as TomlValue; use uuid::Uuid; mod agent_navigation; -mod app_server_adapter; +mod app_server_event_targets; +mod app_server_events; pub(crate) mod app_server_requests; mod background_requests; mod config_persistence; diff --git a/codex-rs/tui/src/app/app_server_event_targets.rs b/codex-rs/tui/src/app/app_server_event_targets.rs new file mode 100644 index 0000000000..bc0567df51 --- /dev/null +++ b/codex-rs/tui/src/app/app_server_event_targets.rs @@ -0,0 +1,218 @@ +//! Thread targeting helpers for app-server requests and notifications. + +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_protocol::ThreadId; + +pub(super) fn server_request_thread_id(request: &ServerRequest) -> Option { + match request { + ServerRequest::CommandExecutionRequestApproval { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::FileChangeRequestApproval { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::ToolRequestUserInput { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::McpServerElicitationRequest { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::PermissionsRequestApproval { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::DynamicToolCall { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } + ServerRequest::ChatgptAuthTokensRefresh { .. } + | ServerRequest::ApplyPatchApproval { .. } + | ServerRequest::ExecCommandApproval { .. } => None, + } +} + +#[derive(Debug, PartialEq, Eq)] +pub(super) enum ServerNotificationThreadTarget { + Thread(ThreadId), + InvalidThreadId(String), + Global, +} + +pub(super) fn server_notification_thread_target( + notification: &ServerNotification, +) -> ServerNotificationThreadTarget { + let thread_id = match notification { + ServerNotification::Error(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ThreadStarted(notification) => Some(notification.thread.id.as_str()), + ServerNotification::ThreadStatusChanged(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadArchived(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ThreadUnarchived(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ThreadClosed(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ThreadNameUpdated(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadTokenUsageUpdated(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadGoalUpdated(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadGoalCleared(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::TurnStarted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::HookStarted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::TurnCompleted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::HookCompleted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::TurnDiffUpdated(notification) => Some(notification.thread_id.as_str()), + ServerNotification::TurnPlanUpdated(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ItemStarted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ItemGuardianApprovalReviewStarted(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ItemGuardianApprovalReviewCompleted(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ItemCompleted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::RawResponseItemCompleted(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::AgentMessageDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::PlanDelta(notification) => Some(notification.thread_id.as_str()), + ServerNotification::CommandExecutionOutputDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::TerminalInteraction(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::FileChangeOutputDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::FileChangePatchUpdated(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ServerRequestResolved(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::McpToolCallProgress(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ReasoningSummaryTextDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ReasoningSummaryPartAdded(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ReasoningTextDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ContextCompacted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ModelRerouted(notification) => Some(notification.thread_id.as_str()), + ServerNotification::ModelVerification(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeStarted(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeItemAdded(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeTranscriptDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeTranscriptDone(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeOutputAudioDelta(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeSdp(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeError(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadRealtimeClosed(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::Warning(notification) => notification.thread_id.as_deref(), + ServerNotification::GuardianWarning(notification) => Some(notification.thread_id.as_str()), + ServerNotification::SkillsChanged(_) + | ServerNotification::McpServerStatusUpdated(_) + | ServerNotification::McpServerOauthLoginCompleted(_) + | ServerNotification::AccountUpdated(_) + | ServerNotification::AccountRateLimitsUpdated(_) + | ServerNotification::AppListUpdated(_) + | ServerNotification::RemoteControlStatusChanged(_) + | ServerNotification::ExternalAgentConfigImportCompleted(_) + | ServerNotification::DeprecationNotice(_) + | ServerNotification::ConfigWarning(_) + | ServerNotification::FuzzyFileSearchSessionUpdated(_) + | ServerNotification::FuzzyFileSearchSessionCompleted(_) + | ServerNotification::CommandExecOutputDelta(_) + | ServerNotification::FsChanged(_) + | ServerNotification::WindowsWorldWritableWarning(_) + | ServerNotification::WindowsSandboxSetupCompleted(_) + | ServerNotification::AccountLoginCompleted(_) => None, + }; + + match thread_id { + Some(thread_id) => match ThreadId::from_string(thread_id) { + Ok(thread_id) => ServerNotificationThreadTarget::Thread(thread_id), + Err(_) => ServerNotificationThreadTarget::InvalidThreadId(thread_id.to_string()), + }, + None => ServerNotificationThreadTarget::Global, + } +} + +#[cfg(test)] +mod tests { + use super::ServerNotificationThreadTarget; + use super::server_notification_thread_target; + use codex_app_server_protocol::GuardianWarningNotification; + use codex_app_server_protocol::ServerNotification; + use codex_app_server_protocol::WarningNotification; + use codex_protocol::ThreadId; + use pretty_assertions::assert_eq; + + #[test] + fn warning_notifications_without_threads_are_global() { + let notification = ServerNotification::Warning(WarningNotification { + thread_id: None, + message: "warning".to_string(), + }); + + let target = server_notification_thread_target(¬ification); + + assert_eq!(target, ServerNotificationThreadTarget::Global); + } + + #[test] + fn warning_notifications_route_to_threads_when_thread_id_is_present() { + let thread_id = ThreadId::new(); + let notification = ServerNotification::Warning(WarningNotification { + thread_id: Some(thread_id.to_string()), + message: "warning".to_string(), + }); + + let target = server_notification_thread_target(¬ification); + + assert_eq!(target, ServerNotificationThreadTarget::Thread(thread_id)); + } + + #[test] + fn guardian_warning_notifications_route_to_threads() { + let thread_id = ThreadId::new(); + let notification = ServerNotification::GuardianWarning(GuardianWarningNotification { + thread_id: thread_id.to_string(), + message: "warning".to_string(), + }); + + let target = server_notification_thread_target(¬ification); + + assert_eq!(target, ServerNotificationThreadTarget::Thread(thread_id)); + } +} diff --git a/codex-rs/tui/src/app/app_server_events.rs b/codex-rs/tui/src/app/app_server_events.rs new file mode 100644 index 0000000000..05b76a7a13 --- /dev/null +++ b/codex-rs/tui/src/app/app_server_events.rs @@ -0,0 +1,208 @@ +//! App-server event stream handling for the TUI app. + +use super::App; +use super::app_server_event_targets::ServerNotificationThreadTarget; +use super::app_server_event_targets::server_notification_thread_target; +use super::app_server_event_targets::server_request_thread_id; +use crate::app_command::AppCommand; +use crate::app_event::AppEvent; +use crate::app_server_session::AppServerSession; +use crate::app_server_session::app_server_rate_limit_snapshot_to_core; +use crate::app_server_session::status_account_display_from_auth_mode; +use codex_app_server_client::AppServerEvent; +use codex_app_server_protocol::AuthMode; +use codex_app_server_protocol::JSONRPCErrorError; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; + +impl App { + fn refresh_mcp_startup_expected_servers_from_config(&mut self) { + let enabled_config_mcp_servers: Vec = self + .chat_widget + .config_ref() + .mcp_servers + .get() + .iter() + .filter_map(|(name, server)| server.enabled.then_some(name.clone())) + .collect(); + self.chat_widget + .set_mcp_startup_expected_servers(enabled_config_mcp_servers); + } + + pub(super) async fn handle_app_server_event( + &mut self, + app_server_client: &AppServerSession, + event: AppServerEvent, + ) { + match event { + AppServerEvent::Lagged { skipped } => { + tracing::warn!( + skipped, + "app-server event consumer lagged; dropping ignored events" + ); + self.refresh_mcp_startup_expected_servers_from_config(); + self.chat_widget.finish_mcp_startup_after_lag(); + } + AppServerEvent::ServerNotification(notification) => { + self.handle_server_notification_event(app_server_client, notification) + .await; + } + AppServerEvent::ServerRequest(request) => { + self.handle_server_request_event(app_server_client, request) + .await; + } + AppServerEvent::Disconnected { message } => { + tracing::warn!("app-server event stream disconnected: {message}"); + self.chat_widget.add_error_message(message.clone()); + self.app_event_tx.send(AppEvent::FatalExitRequest(message)); + } + } + } + + async fn handle_server_notification_event( + &mut self, + app_server_client: &AppServerSession, + notification: ServerNotification, + ) { + match ¬ification { + ServerNotification::ServerRequestResolved(notification) => { + if let Some(request) = self + .pending_app_server_requests + .resolve_notification(¬ification.request_id) + { + self.chat_widget.dismiss_app_server_request(&request); + } + } + ServerNotification::McpServerStatusUpdated(_) => { + self.refresh_mcp_startup_expected_servers_from_config(); + } + ServerNotification::AccountRateLimitsUpdated(notification) => { + self.chat_widget.on_rate_limit_snapshot(Some( + app_server_rate_limit_snapshot_to_core(notification.rate_limits.clone()), + )); + return; + } + ServerNotification::AccountUpdated(notification) => { + self.chat_widget.update_account_state( + status_account_display_from_auth_mode( + notification.auth_mode, + notification.plan_type, + ), + notification.plan_type, + matches!( + notification.auth_mode, + Some(AuthMode::Chatgpt) | Some(AuthMode::ChatgptAuthTokens) + ), + ); + return; + } + ServerNotification::ExternalAgentConfigImportCompleted(_) => { + let cwd = self.chat_widget.config_ref().cwd.to_path_buf(); + if let Err(err) = self.refresh_in_memory_config_from_disk().await { + tracing::warn!( + error = %err, + "failed to refresh config after external agent config import" + ); + } + self.chat_widget.refresh_plugin_mentions(); + self.chat_widget.submit_op(AppCommand::reload_user_config()); + self.fetch_plugins_list(app_server_client, cwd); + return; + } + _ => {} + } + + match server_notification_thread_target(¬ification) { + ServerNotificationThreadTarget::Thread(thread_id) => { + let result = if self.primary_thread_id == Some(thread_id) + || self.primary_thread_id.is_none() + { + self.enqueue_primary_thread_notification(notification).await + } else { + self.enqueue_thread_notification(thread_id, notification) + .await + }; + + if let Err(err) = result { + tracing::warn!("failed to enqueue app-server notification: {err}"); + } + return; + } + ServerNotificationThreadTarget::InvalidThreadId(thread_id) => { + tracing::warn!( + thread_id, + "ignoring app-server notification with invalid thread_id" + ); + return; + } + ServerNotificationThreadTarget::Global => {} + } + + self.chat_widget + .handle_server_notification(notification, /*replay_kind*/ None); + } + + async fn handle_server_request_event( + &mut self, + app_server_client: &AppServerSession, + request: ServerRequest, + ) { + if let Some(unsupported) = self + .pending_app_server_requests + .note_server_request(&request) + { + tracing::warn!( + request_id = ?unsupported.request_id, + message = unsupported.message, + "rejecting unsupported app-server request" + ); + self.chat_widget + .add_error_message(unsupported.message.clone()); + if let Err(err) = self + .reject_app_server_request( + app_server_client, + unsupported.request_id, + unsupported.message, + ) + .await + { + tracing::warn!("{err}"); + } + return; + } + + let Some(thread_id) = server_request_thread_id(&request) else { + tracing::warn!("ignoring threadless app-server request"); + return; + }; + + let result = + if self.primary_thread_id == Some(thread_id) || self.primary_thread_id.is_none() { + self.enqueue_primary_thread_request(request).await + } else { + self.enqueue_thread_request(thread_id, request).await + }; + if let Err(err) = result { + tracing::warn!("failed to enqueue app-server request: {err}"); + } + } + + async fn reject_app_server_request( + &self, + app_server_client: &AppServerSession, + request_id: codex_app_server_protocol::RequestId, + reason: String, + ) -> std::result::Result<(), String> { + app_server_client + .reject_server_request( + request_id, + JSONRPCErrorError { + code: -32000, + message: reason, + data: None, + }, + ) + .await + .map_err(|err| format!("failed to reject app-server request: {err}")) + } +} diff --git a/codex-rs/tui/src/app/tests.rs b/codex-rs/tui/src/app/tests.rs index 756c50869d..3e5d3f741b 100644 --- a/codex-rs/tui/src/app/tests.rs +++ b/codex-rs/tui/src/app/tests.rs @@ -1227,15 +1227,17 @@ async fn token_usage_update_refreshes_status_line_with_runtime_context_window() #[tokio::test] async fn open_agent_picker_keeps_missing_threads_for_replay() -> Result<()> { - let mut app = make_test_app().await; - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); + let mut app = Box::pin(make_test_app()).await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); let thread_id = ThreadId::new(); app.thread_event_channels .insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1)); - app.open_agent_picker(&mut app_server).await; + Box::pin(app.open_agent_picker(&mut app_server)).await; assert_eq!(app.thread_event_channels.contains_key(&thread_id), true); assert_eq!( @@ -1252,10 +1254,12 @@ async fn open_agent_picker_keeps_missing_threads_for_replay() -> Result<()> { #[tokio::test] async fn open_agent_picker_preserves_cached_metadata_for_replay_threads() -> Result<()> { - let mut app = make_test_app().await; - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); + let mut app = Box::pin(make_test_app()).await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); let thread_id = ThreadId::new(); app.thread_event_channels .insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1)); @@ -1266,7 +1270,7 @@ async fn open_agent_picker_preserves_cached_metadata_for_replay_threads() -> Res /*is_closed*/ true, ); - app.open_agent_picker(&mut app_server).await; + Box::pin(app.open_agent_picker(&mut app_server)).await; assert_eq!(app.thread_event_channels.contains_key(&thread_id), true); assert_eq!( @@ -1282,10 +1286,12 @@ async fn open_agent_picker_preserves_cached_metadata_for_replay_threads() -> Res #[tokio::test] async fn open_agent_picker_prunes_terminal_metadata_only_threads() -> Result<()> { - let mut app = make_test_app().await; - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); + let mut app = Box::pin(make_test_app()).await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); let thread_id = ThreadId::new(); app.agent_navigation.upsert( thread_id, @@ -1294,7 +1300,7 @@ async fn open_agent_picker_prunes_terminal_metadata_only_threads() -> Result<()> /*is_closed*/ false, ); - app.open_agent_picker(&mut app_server).await; + Box::pin(app.open_agent_picker(&mut app_server)).await; assert_eq!(app.agent_navigation.get(&thread_id), None); assert!(app.agent_navigation.is_empty()); @@ -1303,10 +1309,12 @@ async fn open_agent_picker_prunes_terminal_metadata_only_threads() -> Result<()> #[tokio::test] async fn open_agent_picker_marks_terminal_read_errors_closed() -> Result<()> { - let mut app = make_test_app().await; - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); + let mut app = Box::pin(make_test_app()).await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); let thread_id = ThreadId::new(); app.thread_event_channels .insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1)); @@ -1317,7 +1325,7 @@ async fn open_agent_picker_marks_terminal_read_errors_closed() -> Result<()> { /*is_closed*/ false, ); - app.open_agent_picker(&mut app_server).await; + Box::pin(app.open_agent_picker(&mut app_server)).await; assert_eq!( app.agent_navigation.get(&thread_id), @@ -1332,10 +1340,12 @@ async fn open_agent_picker_marks_terminal_read_errors_closed() -> Result<()> { #[tokio::test] async fn open_agent_picker_marks_loaded_threads_open() -> Result<()> { - let mut app = make_test_app().await; - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); + let mut app = Box::pin(make_test_app()).await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); let started = app_server .start_thread(app.chat_widget.config_ref()) .await?; @@ -1343,7 +1353,7 @@ async fn open_agent_picker_marks_loaded_threads_open() -> Result<()> { app.thread_event_channels .insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1)); - app.open_agent_picker(&mut app_server).await; + Box::pin(app.open_agent_picker(&mut app_server)).await; assert_eq!( app.agent_navigation.get(&thread_id), @@ -1356,65 +1366,87 @@ async fn open_agent_picker_marks_loaded_threads_open() -> Result<()> { Ok(()) } -#[tokio::test] -async fn attach_live_thread_for_selection_rejects_empty_non_ephemeral_fallback_threads() --> Result<()> { - let mut app = make_test_app().await; - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); - let started = app_server - .start_thread(app.chat_widget.config_ref()) - .await?; - let thread_id = started.session.thread_id; - app.agent_navigation.upsert( - thread_id, - Some("Scout".to_string()), - Some("worker".to_string()), - /*is_closed*/ false, - ); +#[test] +fn attach_live_thread_for_selection_rejects_empty_non_ephemeral_fallback_threads() -> Result<()> { + const WORKER_THREADS: usize = 1; + const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024; - let err = app - .attach_live_thread_for_selection(&mut app_server, thread_id) - .await - .expect_err("empty fallback should not attach as a blank replay-only thread"); + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(WORKER_THREADS) + .thread_stack_size(TEST_STACK_SIZE_BYTES) + .enable_all() + .build()?; - assert_eq!( - err.to_string(), - format!("Agent thread {thread_id} is not yet available for replay or live attach.") - ); - assert!(!app.thread_event_channels.contains_key(&thread_id)); - Ok(()) + runtime.block_on(async { + let config = { + let app = make_test_app().await; + app.chat_widget.config_ref().clone() + }; + let mut app_server = crate::start_embedded_app_server_for_picker(&config) + .await + .expect("embedded app server"); + let started = app_server.start_thread(&config).await?; + let thread_id = started.session.thread_id; + let mut app = make_test_app().await; + app.agent_navigation.upsert( + thread_id, + Some("Scout".to_string()), + Some("worker".to_string()), + /*is_closed*/ false, + ); + + let err = app + .attach_live_thread_for_selection(&mut app_server, thread_id) + .await + .expect_err("empty fallback should not attach as a blank replay-only thread"); + + assert_eq!( + err.to_string(), + format!("Agent thread {thread_id} is not yet available for replay or live attach.") + ); + assert!(!app.thread_event_channels.contains_key(&thread_id)); + Ok(()) + }) } -#[tokio::test] -async fn attach_live_thread_for_selection_rejects_unmaterialized_fallback_threads() -> Result<()> { - let mut app = make_test_app().await; - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); - let mut ephemeral_config = app.chat_widget.config_ref().clone(); - ephemeral_config.ephemeral = true; - let started = app_server.start_thread(&ephemeral_config).await?; - let thread_id = started.session.thread_id; - app.agent_navigation.upsert( - thread_id, - Some("Scout".to_string()), - Some("worker".to_string()), - /*is_closed*/ false, - ); +#[test] +fn attach_live_thread_for_selection_rejects_unmaterialized_fallback_threads() -> Result<()> { + const WORKER_THREADS: usize = 1; + const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024; - let err = app - .attach_live_thread_for_selection(&mut app_server, thread_id) - .await - .expect_err("ephemeral fallback should not attach as a blank live thread"); + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(WORKER_THREADS) + .thread_stack_size(TEST_STACK_SIZE_BYTES) + .enable_all() + .build()?; - assert_eq!( - err.to_string(), - format!("Agent thread {thread_id} is not yet available for replay or live attach.") - ); - assert!(!app.thread_event_channels.contains_key(&thread_id)); - Ok(()) + runtime.block_on(async { + let mut app = make_test_app().await; + let mut app_server = + crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()).await?; + let mut ephemeral_config = app.chat_widget.config_ref().clone(); + ephemeral_config.ephemeral = true; + let started = app_server.start_thread(&ephemeral_config).await?; + let thread_id = started.session.thread_id; + app.agent_navigation.upsert( + thread_id, + Some("Scout".to_string()), + Some("worker".to_string()), + /*is_closed*/ false, + ); + + let err = app + .attach_live_thread_for_selection(&mut app_server, thread_id) + .await + .expect_err("ephemeral fallback should not attach as a blank live thread"); + + assert_eq!( + err.to_string(), + format!("Agent thread {thread_id} is not yet available for replay or live attach.") + ); + assert!(!app.thread_event_channels.contains_key(&thread_id)); + Ok(()) + }) } #[tokio::test] @@ -1445,10 +1477,12 @@ async fn should_attach_live_thread_for_selection_skips_closed_metadata_only_thre #[tokio::test] async fn refresh_agent_picker_thread_liveness_prunes_closed_metadata_only_threads() -> Result<()> { - let mut app = make_test_app().await; - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); + let mut app = Box::pin(make_test_app()).await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); let thread_id = ThreadId::new(); app.agent_navigation.upsert( thread_id, @@ -1457,9 +1491,8 @@ async fn refresh_agent_picker_thread_liveness_prunes_closed_metadata_only_thread /*is_closed*/ false, ); - let is_available = app - .refresh_agent_picker_thread_liveness(&mut app_server, thread_id) - .await; + let is_available = + Box::pin(app.refresh_agent_picker_thread_liveness(&mut app_server, thread_id)).await; assert!(!is_available); assert_eq!(app.agent_navigation.get(&thread_id), None); @@ -1469,13 +1502,15 @@ async fn refresh_agent_picker_thread_liveness_prunes_closed_metadata_only_thread #[tokio::test] async fn open_agent_picker_prompts_to_enable_multi_agent_when_disabled() -> Result<()> { - let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await; - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); + let (mut app, mut app_event_rx, _op_rx) = Box::pin(make_test_app_with_channels()).await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); let _ = app.config.features.disable(Feature::Collab); - app.open_agent_picker(&mut app_server).await; + Box::pin(app.open_agent_picker(&mut app_server)).await; app.chat_widget .handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); @@ -1499,16 +1534,16 @@ async fn open_agent_picker_prompts_to_enable_multi_agent_when_disabled() -> Resu #[tokio::test] async fn update_memory_settings_persists_and_updates_widget_config() -> Result<()> { - let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let (mut app, _app_event_rx, _op_rx) = Box::pin(make_test_app_with_channels()).await; let codex_home = tempdir()?; app.config.codex_home = codex_home.path().to_path_buf().abs(); - let mut app_server = crate::start_embedded_app_server_for_picker(&app.config).await?; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(&app.config)).await?; - app.update_memory_settings_with_app_server( + Box::pin(app.update_memory_settings_with_app_server( &mut app_server, /*use_memories*/ false, /*generate_memories*/ false, - ) + )) .await; assert!(!app.config.memories.use_memories); @@ -1542,22 +1577,22 @@ async fn update_memory_settings_persists_and_updates_widget_config() -> Result<( #[tokio::test] async fn update_memory_settings_updates_current_thread_memory_mode() -> Result<()> { - let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let (mut app, _app_event_rx, _op_rx) = Box::pin(make_test_app_with_channels()).await; let codex_home = tempdir()?; app.config.codex_home = codex_home.path().to_path_buf().abs(); // Seed the previous setting so this test exercises the thread-mode update path. app.config.memories.generate_memories = true; - let mut app_server = crate::start_embedded_app_server_for_picker(&app.config).await?; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(&app.config)).await?; let started = app_server.start_thread(&app.config).await?; let thread_id = started.session.thread_id; app.active_thread_id = Some(thread_id); - app.update_memory_settings_with_app_server( + Box::pin(app.update_memory_settings_with_app_server( &mut app_server, /*use_memories*/ true, /*generate_memories*/ false, - ) + )) .await; let state_db = codex_state::StateRuntime::init( @@ -1578,7 +1613,7 @@ async fn update_memory_settings_updates_current_thread_memory_mode() -> Result<( #[tokio::test] async fn reset_memories_clears_local_memory_directories() -> Result<()> { - let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let (mut app, _app_event_rx, _op_rx) = Box::pin(make_test_app_with_channels()).await; let codex_home = tempdir()?; app.config.codex_home = codex_home.path().to_path_buf().abs(); app.config.sqlite_home = codex_home.path().to_path_buf(); @@ -1594,9 +1629,9 @@ async fn reset_memories_clears_local_memory_directories() -> Result<()> { )?; std::fs::write(extensions_root.join("stale.txt"), "stale extension\n")?; - let mut app_server = crate::start_embedded_app_server_for_picker(&app.config).await?; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(&app.config)).await?; - app.reset_memories_with_app_server(&mut app_server).await; + Box::pin(app.reset_memories_with_app_server(&mut app_server)).await; assert_eq!(std::fs::read_dir(&memory_root)?.count(), 0); @@ -2138,15 +2173,17 @@ async fn update_feature_flags_disabling_guardian_in_profile_keeps_inherited_non_ #[tokio::test] async fn open_agent_picker_allows_existing_agent_threads_when_feature_is_disabled() -> Result<()> { - let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await; - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); + let (mut app, mut app_event_rx, _op_rx) = Box::pin(make_test_app_with_channels()).await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); let thread_id = ThreadId::new(); app.thread_event_channels .insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1)); - app.open_agent_picker(&mut app_server).await; + Box::pin(app.open_agent_picker(&mut app_server)).await; app.chat_widget .handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE)); @@ -4856,7 +4893,7 @@ async fn thread_rollback_response_discards_queued_active_thread_events() { #[tokio::test] async fn new_session_requests_shutdown_for_previous_conversation() { - let (mut app, mut app_event_rx, mut op_rx) = make_test_app_with_channels().await; + let (mut app, mut app_event_rx, mut op_rx) = Box::pin(make_test_app_with_channels()).await; let thread_id = ThreadId::new(); let event = SessionConfiguredEvent { @@ -4887,10 +4924,12 @@ async fn new_session_requests_shutdown_for_previous_conversation() { while app_event_rx.try_recv().is_ok() {} while op_rx.try_recv().is_ok() {} - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); - app.shutdown_current_thread(&mut app_server).await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); + Box::pin(app.shutdown_current_thread(&mut app_server)).await; assert!( op_rx.try_recv().is_err(), @@ -4904,12 +4943,12 @@ async fn shutdown_first_exit_returns_immediate_exit_when_shutdown_submit_fails() let thread_id = ThreadId::new(); app.active_thread_id = Some(thread_id); - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); - let control = app - .handle_exit_mode(&mut app_server, ExitMode::ShutdownFirst) - .await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); + let control = Box::pin(app.handle_exit_mode(&mut app_server, ExitMode::ShutdownFirst)).await; assert_eq!(app.pending_shutdown_exit_thread_id, None); assert!(matches!( @@ -4920,16 +4959,16 @@ async fn shutdown_first_exit_returns_immediate_exit_when_shutdown_submit_fails() #[tokio::test] async fn shutdown_first_exit_uses_app_server_shutdown_without_submitting_op() { - let (mut app, _app_event_rx, mut op_rx) = make_test_app_with_channels().await; + let (mut app, _app_event_rx, mut op_rx) = Box::pin(make_test_app_with_channels()).await; let thread_id = ThreadId::new(); app.active_thread_id = Some(thread_id); - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); - let control = app - .handle_exit_mode(&mut app_server, ExitMode::ShutdownFirst) - .await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); + let control = Box::pin(app.handle_exit_mode(&mut app_server, ExitMode::ShutdownFirst)).await; assert_eq!(app.pending_shutdown_exit_thread_id, None); assert!(matches!( @@ -4945,9 +4984,11 @@ async fn shutdown_first_exit_uses_app_server_shutdown_without_submitting_op() { #[tokio::test] async fn interrupt_without_active_turn_is_treated_as_handled() { let mut app = make_test_app().await; - let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) - .await - .expect("embedded app server"); + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); let started = app_server .start_thread(app.chat_widget.config_ref()) .await @@ -4958,10 +4999,10 @@ async fn interrupt_without_active_turn_is_treated_as_handled() { .expect("primary thread should be registered"); let op = AppCommand::interrupt(); - let handled = app - .try_submit_active_thread_op_via_app_server(&mut app_server, thread_id, &op) - .await - .expect("interrupt submission should not fail"); + let handled = + Box::pin(app.try_submit_active_thread_op_via_app_server(&mut app_server, thread_id, &op)) + .await + .expect("interrupt submission should not fail"); assert_eq!(handled, true); } diff --git a/codex-rs/tui/src/approval_events.rs b/codex-rs/tui/src/approval_events.rs new file mode 100644 index 0000000000..57e0959d56 --- /dev/null +++ b/codex-rs/tui/src/approval_events.rs @@ -0,0 +1,119 @@ +//! TUI-owned approval request models used while rendering and queueing prompts. +//! +//! These structs normalize app-server request params into the shape the TUI +//! needs while an approval may be deferred behind streaming output. Exec +//! approvals keep app-server decision and permission types; patch approvals add +//! the file-change display model collected from nearby thread items. + +use std::collections::HashMap; +use std::path::PathBuf; + +use crate::diff_model::FileChange; +use codex_app_server_protocol::AdditionalPermissionProfile; +use codex_app_server_protocol::CommandExecutionApprovalDecision; +use codex_app_server_protocol::ExecPolicyAmendment; +use codex_app_server_protocol::NetworkApprovalContext; +use codex_app_server_protocol::NetworkPolicyAmendment; +use codex_app_server_protocol::NetworkPolicyRuleAction; +use codex_utils_absolute_path::AbsolutePathBuf; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct ExecApprovalRequestEvent { + pub(crate) call_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) approval_id: Option, + #[serde(default)] + pub(crate) turn_id: String, + pub(crate) command: Vec, + pub(crate) cwd: AbsolutePathBuf, + pub(crate) reason: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) proposed_execpolicy_amendment: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) proposed_network_policy_amendments: Option>, + #[serde(default)] + pub(crate) available_decisions: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) network_approval_context: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) additional_permissions: Option, +} + +impl ExecApprovalRequestEvent { + pub(crate) fn effective_approval_id(&self) -> String { + self.approval_id + .clone() + .unwrap_or_else(|| self.call_id.clone()) + } + + pub(crate) fn effective_available_decisions(&self) -> Vec { + match &self.available_decisions { + Some(decisions) => decisions.clone(), + None => Self::default_available_decisions( + self.network_approval_context.as_ref(), + self.proposed_execpolicy_amendment.as_ref(), + self.proposed_network_policy_amendments.as_deref(), + self.additional_permissions.as_ref(), + ), + } + } + + pub(crate) fn default_available_decisions( + network_approval_context: Option<&NetworkApprovalContext>, + proposed_execpolicy_amendment: Option<&ExecPolicyAmendment>, + proposed_network_policy_amendments: Option<&[NetworkPolicyAmendment]>, + additional_permissions: Option<&AdditionalPermissionProfile>, + ) -> Vec { + if network_approval_context.is_some() { + let mut decisions = vec![ + CommandExecutionApprovalDecision::Accept, + CommandExecutionApprovalDecision::AcceptForSession, + ]; + if let Some(amendment) = proposed_network_policy_amendments.and_then(|amendments| { + amendments + .iter() + .find(|amendment| amendment.action == NetworkPolicyRuleAction::Allow) + }) { + decisions.push( + CommandExecutionApprovalDecision::ApplyNetworkPolicyAmendment { + network_policy_amendment: amendment.clone(), + }, + ); + } + decisions.push(CommandExecutionApprovalDecision::Cancel); + return decisions; + } + + if additional_permissions.is_some() { + return vec![ + CommandExecutionApprovalDecision::Accept, + CommandExecutionApprovalDecision::Cancel, + ]; + } + + let mut decisions = vec![CommandExecutionApprovalDecision::Accept]; + if let Some(prefix) = proposed_execpolicy_amendment { + decisions.push( + CommandExecutionApprovalDecision::AcceptWithExecpolicyAmendment { + execpolicy_amendment: prefix.clone(), + }, + ); + } + decisions.push(CommandExecutionApprovalDecision::Cancel); + decisions + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct ApplyPatchApprovalRequestEvent { + pub(crate) call_id: String, + #[serde(default)] + pub(crate) turn_id: String, + pub(crate) changes: HashMap, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) reason: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) grant_root: Option, +} diff --git a/codex-rs/tui/src/chatwidget/user_messages.rs b/codex-rs/tui/src/chatwidget/user_messages.rs new file mode 100644 index 0000000000..a49a4da3b6 --- /dev/null +++ b/codex-rs/tui/src/chatwidget/user_messages.rs @@ -0,0 +1,107 @@ +//! User-message display models and helpers for the chat widget. +//! +//! The app-server preserves user input as structured chunks, while chat history +//! renders a single prompt row. This module owns that display projection and +//! the small compare key used to suppress duplicate rows for pending steers. + +use std::path::PathBuf; + +use codex_app_server_protocol::UserInput; +use codex_protocol::user_input::TextElement; + +use super::ChatWidget; +use super::append_text_with_rebased_elements; + +#[derive(Clone, Debug, PartialEq)] +pub(super) struct UserMessageDisplay { + pub(super) message: String, + pub(super) remote_image_urls: Vec, + pub(super) local_images: Vec, + pub(super) text_elements: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(super) struct PendingSteerCompareKey { + pub(super) message: String, + pub(super) image_count: usize, +} + +impl ChatWidget { + pub(super) fn user_message_display_from_parts( + message: String, + text_elements: Vec, + local_images: Vec, + remote_image_urls: Vec, + ) -> UserMessageDisplay { + UserMessageDisplay { + message, + remote_image_urls, + local_images, + text_elements, + } + } + + /// Build the compare key for a submitted pending steer without invoking the + /// expensive request-serialization path. Pending steers only need to match the + /// committed app-server `UserMessage` item emitted after input drains, which + /// preserves flattened text and total image count. + pub(super) fn pending_steer_compare_key_from_items( + items: &[UserInput], + ) -> PendingSteerCompareKey { + let mut message = String::new(); + let mut image_count = 0; + + for item in items { + match item { + UserInput::Text { text, .. } => message.push_str(text), + UserInput::Image { .. } | UserInput::LocalImage { .. } => image_count += 1, + UserInput::Skill { .. } | UserInput::Mention { .. } => {} + } + } + + PendingSteerCompareKey { + message, + image_count, + } + } + + pub(super) fn user_message_display_from_inputs(items: &[UserInput]) -> UserMessageDisplay { + let mut message = String::new(); + let mut remote_image_urls = Vec::new(); + let mut local_images = Vec::new(); + let mut text_elements = Vec::new(); + + for item in items { + match item { + UserInput::Text { + text, + text_elements: current_text_elements, + } => append_text_with_rebased_elements( + &mut message, + &mut text_elements, + text, + current_text_elements.iter().map(|element| { + let range = element.byte_range.clone(); + TextElement::new( + range.clone().into(), + element + .placeholder() + .or_else(|| text.get(range.start..range.end)) + .map(str::to_string), + ) + }), + ), + UserInput::Image { url } => remote_image_urls.push(url.clone()), + UserInput::LocalImage { path } => local_images.push(path.clone()), + UserInput::Skill { .. } | UserInput::Mention { .. } => {} + } + } + + Self::user_message_display_from_parts( + message, + text_elements, + local_images, + remote_image_urls, + ) + } +} diff --git a/codex-rs/tui/src/diff_model.rs b/codex-rs/tui/src/diff_model.rs new file mode 100644 index 0000000000..3b7055ea17 --- /dev/null +++ b/codex-rs/tui/src/diff_model.rs @@ -0,0 +1,21 @@ +//! Minimal file-change model used by TUI diff rendering and approval previews. + +use std::path::PathBuf; + +use serde::Deserialize; +use serde::Serialize; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub(crate) enum FileChange { + Add { + content: String, + }, + Delete { + content: String, + }, + Update { + unified_diff: String, + move_path: Option, + }, +} diff --git a/codex-rs/tui/src/session_resume.rs b/codex-rs/tui/src/session_resume.rs new file mode 100644 index 0000000000..169a096d1e --- /dev/null +++ b/codex-rs/tui/src/session_resume.rs @@ -0,0 +1,314 @@ +//! Resolve saved-session state needed before resuming or forking a thread. +//! +//! The app-server API owns normal thread lifecycle data. This module coordinates +//! the TUI-specific cwd prompt and falls back to local rollout metadata only +//! before the app server has resumed the selected thread. + +use std::io; +use std::path::Path; +use std::path::PathBuf; + +use crate::cwd_prompt; +use crate::cwd_prompt::CwdPromptAction; +use crate::cwd_prompt::CwdPromptOutcome; +use crate::cwd_prompt::CwdSelection; +use crate::legacy_core::config::Config; +use crate::tui::Tui; +use codex_protocol::ThreadId; +use codex_rollout::state_db::get_state_db; +use codex_utils_path as path_utils; +use serde::Deserialize; +use serde_json::Value; +use tokio::io::AsyncBufReadExt; + +#[derive(Default)] +struct RolloutResumeState { + thread_id: Option, + cwd: Option, + model: Option, +} + +#[derive(Deserialize)] +struct SessionMetadata { + id: ThreadId, + cwd: PathBuf, +} + +#[derive(Deserialize)] +struct TurnContextResumeState { + cwd: PathBuf, + model: String, +} + +#[derive(Deserialize)] +struct RawRecord { + #[serde(rename = "type")] + item_type: String, + payload: Option, +} + +pub(crate) enum ResolveCwdOutcome { + Continue(Option), + Exit, +} + +pub(crate) async fn resolve_session_thread_id( + path: &Path, + id_str_if_uuid: Option<&str>, +) -> Option { + match id_str_if_uuid { + Some(id_str) => ThreadId::from_string(id_str).ok(), + None => read_rollout_resume_state(path) + .await + .ok() + .and_then(|state| state.thread_id), + } +} + +pub(crate) async fn read_session_model( + config: &Config, + thread_id: ThreadId, + path: Option<&Path>, +) -> Option { + if let Some(state_db_ctx) = get_state_db(config).await + && let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await + && let Some(model) = metadata.model + { + return Some(model); + } + + let path = path?; + read_rollout_resume_state(path) + .await + .ok() + .and_then(|state| state.model) +} + +pub(crate) async fn resolve_cwd_for_resume_or_fork( + tui: &mut Tui, + config: &Config, + current_cwd: &Path, + thread_id: ThreadId, + path: Option<&Path>, + action: CwdPromptAction, + allow_prompt: bool, +) -> color_eyre::Result { + let Some(history_cwd) = read_session_cwd(config, thread_id, path).await else { + return Ok(ResolveCwdOutcome::Continue(None)); + }; + if allow_prompt && cwds_differ(current_cwd, &history_cwd) { + let selection_outcome = + cwd_prompt::run_cwd_selection_prompt(tui, action, current_cwd, &history_cwd).await?; + return Ok(match selection_outcome { + CwdPromptOutcome::Selection(CwdSelection::Current) => { + ResolveCwdOutcome::Continue(Some(current_cwd.to_path_buf())) + } + CwdPromptOutcome::Selection(CwdSelection::Session) => { + ResolveCwdOutcome::Continue(Some(history_cwd)) + } + CwdPromptOutcome::Exit => ResolveCwdOutcome::Exit, + }); + } + Ok(ResolveCwdOutcome::Continue(Some(history_cwd))) +} + +async fn read_session_cwd( + config: &Config, + thread_id: ThreadId, + path: Option<&Path>, +) -> Option { + if let Some(state_db_ctx) = get_state_db(config).await + && let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await + { + return Some(metadata.cwd); + } + + let path = path?; + match read_rollout_resume_state(path).await { + Ok(state) => state.cwd, + Err(err) => { + let rollout_path = path.display().to_string(); + tracing::warn!( + %rollout_path, + %err, + "Failed to read session metadata from rollout" + ); + None + } + } +} + +pub(crate) fn cwds_differ(current_cwd: &Path, session_cwd: &Path) -> bool { + !path_utils::paths_match_after_normalization(current_cwd, session_cwd) +} + +async fn read_rollout_resume_state(path: &Path) -> io::Result { + let file = tokio::fs::File::open(path).await?; + let reader = tokio::io::BufReader::new(file); + let mut lines = reader.lines(); + let mut state = RolloutResumeState::default(); + let mut saw_record = false; + + while let Some(line) = lines.next_line().await? { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + let Ok(record) = serde_json::from_str::(trimmed) else { + continue; + }; + saw_record = true; + let Some(payload) = record.payload else { + continue; + }; + + match record.item_type.as_str() { + "session_meta" if state.thread_id.is_none() => { + if let Ok(metadata) = serde_json::from_value::(payload) { + state.thread_id = Some(metadata.id); + state.cwd.get_or_insert(metadata.cwd); + } + } + "turn_context" => { + if let Ok(turn_context) = serde_json::from_value::(payload) + { + state.cwd = Some(turn_context.cwd); + state.model = Some(turn_context.model); + } + } + _ => {} + } + } + + if saw_record { + Ok(state) + } else { + Err(io::Error::other(format!( + "rollout at {} is empty", + path.display() + ))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + use tempfile::TempDir; + + fn rollout_line( + timestamp: &str, + item_type: &str, + payload: serde_json::Value, + ) -> serde_json::Value { + serde_json::json!({ + "timestamp": timestamp, + "type": item_type, + "payload": payload, + }) + } + + fn write_rollout_lines(path: &Path, lines: &[serde_json::Value]) -> std::io::Result<()> { + let mut text = String::new(); + for line in lines { + text.push_str(&serde_json::to_string(line).expect("serialize rollout")); + text.push('\n'); + } + std::fs::write(path, text) + } + + #[tokio::test] + async fn rollout_resume_state_prefers_latest_turn_context() -> std::io::Result<()> { + let temp_dir = TempDir::new()?; + let thread_id = ThreadId::new(); + let original = temp_dir.path().join("original"); + let latest = temp_dir.path().join("latest"); + let rollout_path = temp_dir.path().join("rollout.jsonl"); + write_rollout_lines( + &rollout_path, + &[ + rollout_line( + "t0", + "session_meta", + serde_json::json!({ + "id": thread_id, + "cwd": original, + "originator": "test", + "cli_version": "test", + }), + ), + rollout_line( + "t1", + "turn_context", + serde_json::json!({ "cwd": temp_dir.path().join("middle"), "model": "middle" }), + ), + rollout_line( + "t2", + "turn_context", + serde_json::json!({ "cwd": latest.clone(), "model": "latest" }), + ), + ], + )?; + + let state = read_rollout_resume_state(&rollout_path).await?; + + assert_eq!(state.thread_id, Some(thread_id)); + assert_eq!(state.cwd, Some(latest)); + assert_eq!(state.model, Some("latest".to_string())); + Ok(()) + } + + #[tokio::test] + async fn rollout_resume_state_falls_back_to_session_meta() -> std::io::Result<()> { + let temp_dir = TempDir::new()?; + let thread_id = ThreadId::new(); + let cwd = temp_dir.path().join("session"); + let rollout_path = temp_dir.path().join("rollout.jsonl"); + write_rollout_lines( + &rollout_path, + &[rollout_line( + "t0", + "session_meta", + serde_json::json!({ + "id": thread_id, + "cwd": cwd.clone(), + "originator": "test", + "cli_version": "test", + }), + )], + )?; + + let state = read_rollout_resume_state(&rollout_path).await?; + + assert_eq!(state.thread_id, Some(thread_id)); + assert_eq!(state.cwd, Some(cwd)); + assert_eq!(state.model, None); + Ok(()) + } + + #[tokio::test] + async fn rollout_resume_state_skips_malformed_lines() -> std::io::Result<()> { + let temp_dir = TempDir::new()?; + let thread_id = ThreadId::new(); + let cwd = temp_dir.path().join("session"); + let rollout_path = temp_dir.path().join("rollout.jsonl"); + let valid_line = serde_json::to_string(&rollout_line( + "t0", + "session_meta", + serde_json::json!({ + "id": thread_id, + "cwd": cwd.clone(), + "originator": "test", + "cli_version": "test", + }), + )) + .expect("serialize rollout line"); + std::fs::write(&rollout_path, format!("{valid_line}\n{{"))?; + + let state = read_rollout_resume_state(&rollout_path).await?; + + assert_eq!(state.thread_id, Some(thread_id)); + assert_eq!(state.cwd, Some(cwd)); + Ok(()) + } +} diff --git a/codex-rs/tui/src/session_state.rs b/codex-rs/tui/src/session_state.rs new file mode 100644 index 0000000000..ec0f7789d7 --- /dev/null +++ b/codex-rs/tui/src/session_state.rs @@ -0,0 +1,45 @@ +//! Canonical TUI session state shared across app-server routing, chat display, and status UI. +//! +//! The app-server API is the boundary for session lifecycle events. Once those responses enter +//! TUI, this module holds the small internal state shape used by app orchestration and widgets. + +use std::path::PathBuf; + +use codex_app_server_protocol::AskForApproval; +use codex_protocol::ThreadId; +use codex_protocol::models::ActivePermissionProfile; +use codex_protocol::models::PermissionProfile; +use codex_utils_absolute_path::AbsolutePathBuf; + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct SessionNetworkProxyRuntime { + pub(crate) http_addr: String, + pub(crate) socks_addr: String, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct ThreadSessionState { + pub(crate) thread_id: ThreadId, + pub(crate) forked_from_id: Option, + pub(crate) fork_parent_title: Option, + pub(crate) thread_name: Option, + pub(crate) model: String, + pub(crate) model_provider_id: String, + pub(crate) service_tier: Option, + pub(crate) approval_policy: AskForApproval, + pub(crate) approvals_reviewer: codex_protocol::config_types::ApprovalsReviewer, + /// Canonical active permissions for this session. Legacy app-server + /// responses are converted to a profile at ingestion time using the + /// response cwd so cached sessions do not reinterpret cwd-bound grants. + pub(crate) permission_profile: PermissionProfile, + /// Named or implicit built-in profile that produced `permission_profile`, + /// when the server knows it. + pub(crate) active_permission_profile: Option, + pub(crate) cwd: AbsolutePathBuf, + pub(crate) instruction_source_paths: Vec, + pub(crate) reasoning_effort: Option, + pub(crate) history_log_id: u64, + pub(crate) history_entry_count: u64, + pub(crate) network_proxy: Option, + pub(crate) rollout_path: Option, +} diff --git a/codex-rs/tui/src/status/tests.rs b/codex-rs/tui/src/status/tests.rs index bbc1c0ead8..a44e7da686 100644 --- a/codex-rs/tui/src/status/tests.rs +++ b/codex-rs/tui/src/status/tests.rs @@ -37,6 +37,7 @@ async fn test_config(temp_home: &TempDir) -> Config { .build() .await .expect("load config"); + config.approvals_reviewer = ApprovalsReviewer::User; config .permissions .set_permission_profile(PermissionProfile::workspace_write_with( diff --git a/codex-rs/tui/src/token_usage.rs b/codex-rs/tui/src/token_usage.rs new file mode 100644 index 0000000000..14d3d78c6c --- /dev/null +++ b/codex-rs/tui/src/token_usage.rs @@ -0,0 +1,87 @@ +//! TUI token usage models and display formatting. + +use std::fmt; + +use codex_protocol::num_format::format_with_separators; +use serde::Deserialize; +use serde::Serialize; + +const BASELINE_TOKENS: i64 = 12000; + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct TokenUsage { + pub input_tokens: i64, + pub cached_input_tokens: i64, + pub output_tokens: i64, + pub reasoning_output_tokens: i64, + pub total_tokens: i64, +} + +impl TokenUsage { + pub fn is_zero(&self) -> bool { + self.total_tokens == 0 + } + + pub(crate) fn cached_input(&self) -> i64 { + self.cached_input_tokens.max(0) + } + + pub(crate) fn non_cached_input(&self) -> i64 { + (self.input_tokens - self.cached_input()).max(0) + } + + pub(crate) fn blended_total(&self) -> i64 { + (self.non_cached_input() + self.output_tokens.max(0)).max(0) + } + + pub(crate) fn tokens_in_context_window(&self) -> i64 { + self.total_tokens + } + + pub(crate) fn percent_of_context_window_remaining(&self, context_window: i64) -> i64 { + if context_window <= BASELINE_TOKENS { + return 0; + } + let effective_window = context_window - BASELINE_TOKENS; + let used = (self.tokens_in_context_window() - BASELINE_TOKENS).max(0); + let remaining = (effective_window - used).max(0); + ((remaining as f64 / effective_window as f64) * 100.0) + .clamp(0.0, 100.0) + .round() as i64 + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct TokenUsageInfo { + pub(crate) total_token_usage: TokenUsage, + pub(crate) last_token_usage: TokenUsage, + pub(crate) model_context_window: Option, +} + +impl fmt::Display for TokenUsage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Token usage: total={} input={}{} output={}{}", + format_with_separators(self.blended_total()), + format_with_separators(self.non_cached_input()), + if self.cached_input() > 0 { + format!( + " (+ {} cached)", + format_with_separators(self.cached_input()) + ) + } else { + String::new() + }, + format_with_separators(self.output_tokens), + if self.reasoning_output_tokens > 0 { + format!( + " (reasoning {})", + format_with_separators(self.reasoning_output_tokens) + ) + } else { + String::new() + } + ) + } +}