diff --git a/codex-rs/tui/src/app/session_lifecycle.rs b/codex-rs/tui/src/app/session_lifecycle.rs index d1a8679f45..fcb21c3ed8 100644 --- a/codex-rs/tui/src/app/session_lifecycle.rs +++ b/codex-rs/tui/src/app/session_lifecycle.rs @@ -430,15 +430,15 @@ impl App { result: Result, ) -> Result<()> { if self.pending_startup_thread_start_request_id.as_deref() != Some(request_id.as_str()) { - if let Ok(started) = result - && let Err(err) = app_server - .thread_unsubscribe(started.session.thread_id) - .await - { - tracing::warn!( - thread_id = %started.session.thread_id, - "failed to unsubscribe stale startup thread: {err}" - ); + if let Ok(started) = result { + let thread_id = started.session.thread_id; + if let Err(err) = app_server.thread_unsubscribe(thread_id).await { + tracing::warn!( + thread_id = %thread_id, + "failed to unsubscribe stale startup thread: {err}" + ); + } + self.discard_thread_local_state(thread_id).await; } return Ok(()); } @@ -453,9 +453,14 @@ impl App { self.chat_widget.maybe_send_next_queued_input(); } Err(err) => { - self.chat_widget.add_error_message(format!( - "Failed to start a fresh session through the app server: {err}" - )); + let message = + format!("Failed to start a fresh session through the app server: {err}"); + tracing::warn!( + error = %err, + "startup thread/start failed" + ); + self.chat_widget.add_error_message(message.clone()); + return Err(color_eyre::eyre::eyre!(message)); } } Ok(()) diff --git a/codex-rs/tui/src/app/side.rs b/codex-rs/tui/src/app/side.rs index 044cb9b391..492639f139 100644 --- a/codex-rs/tui/src/app/side.rs +++ b/codex-rs/tui/src/app/side.rs @@ -369,15 +369,15 @@ impl App { self.chat_widget.add_error_message(message); return false; } - self.discard_side_thread_local(thread_id).await; + self.discard_thread_local_state(thread_id).await; true } pub(super) async fn discard_closed_side_thread(&mut self, thread_id: ThreadId) { - self.discard_side_thread_local(thread_id).await; + self.discard_thread_local_state(thread_id).await; } - async fn discard_side_thread_local(&mut self, thread_id: ThreadId) { + pub(super) async fn discard_thread_local_state(&mut self, thread_id: ThreadId) { self.abort_thread_event_listener(thread_id); self.thread_event_channels.remove(&thread_id); self.side_threads.remove(&thread_id); diff --git a/codex-rs/tui/src/app/tests/startup.rs b/codex-rs/tui/src/app/tests/startup.rs index 6760594e63..305bb8caa4 100644 --- a/codex-rs/tui/src/app/tests/startup.rs +++ b/codex-rs/tui/src/app/tests/startup.rs @@ -181,6 +181,86 @@ async fn startup_thread_started_submits_queued_startup_input() { } } +#[tokio::test] +async fn startup_thread_start_failure_returns_error() { + let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await; + let request_id = "startup-thread-start-failed".to_string(); + app.pending_startup_thread_start_request_id = Some(request_id.clone()); + app.chat_widget + .set_queue_submissions_until_session_configured(/*queue*/ true); + + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); + let err = app + .handle_startup_thread_started(&mut app_server, request_id, Err("boom".to_string())) + .await + .expect_err("startup thread failure should exit instead of leaving chat unconfigured"); + + assert!( + err.to_string() + .contains("Failed to start a fresh session through the app server: boom") + ); + assert_eq!(app.pending_startup_thread_start_request_id, None); + assert_eq!(app.primary_thread_id, None); +} + +#[test] +fn stale_startup_thread_started_removes_local_routing_state() -> Result<()> { + const WORKER_THREADS: usize = 1; + const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024; + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(WORKER_THREADS) + .thread_stack_size(TEST_STACK_SIZE_BYTES) + .enable_all() + .build()?; + + runtime.block_on(async { + let mut app = make_test_app().await; + let mut app_server = + crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()).await?; + let stale_started = app_server + .start_thread(app.chat_widget.config_ref()) + .await?; + let primary_thread_id = ThreadId::new(); + let stale_thread_id = stale_started.session.thread_id; + app.primary_thread_id = Some(primary_thread_id); + app.thread_event_channels.insert( + primary_thread_id, + ThreadEventChannel::new(THREAD_EVENT_CHANNEL_CAPACITY), + ); + app.activate_thread_channel(primary_thread_id).await; + app.thread_event_channels.insert( + stale_thread_id, + ThreadEventChannel::new(THREAD_EVENT_CHANNEL_CAPACITY), + ); + app.agent_navigation.upsert( + stale_thread_id, + Some("Stale".to_string()), + /*agent_role*/ None, + /*is_closed*/ false, + ); + app.pending_startup_thread_start_request_id = Some("newer-startup-request".to_string()); + assert!(app.thread_event_channels.contains_key(&stale_thread_id)); + assert!(app.agent_navigation.get(&stale_thread_id).is_some()); + + app.handle_startup_thread_started( + &mut app_server, + "old-startup-request".to_string(), + Ok(stale_started), + ) + .await?; + + assert!(!app.thread_event_channels.contains_key(&stale_thread_id)); + assert_eq!(app.agent_navigation.get(&stale_thread_id), None); + assert_eq!(app.active_thread_id, Some(primary_thread_id)); + Ok(()) + }) +} + #[tokio::test] async fn ignore_same_thread_resume_reports_noop_for_current_thread() { let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;