Handle startup thread edge cases

This commit is contained in:
Eric Traut
2026-05-19 21:38:24 -07:00
parent 62c88b87d7
commit 0db29bec43
3 changed files with 100 additions and 15 deletions

View File

@@ -430,15 +430,15 @@ impl App {
result: Result<AppServerStartedThread, String>,
) -> Result<()> {
if self.pending_startup_thread_start_request_id.as_deref() != Some(request_id.as_str()) {
if let Ok(started) = result
&& let Err(err) = app_server
.thread_unsubscribe(started.session.thread_id)
.await
{
tracing::warn!(
thread_id = %started.session.thread_id,
"failed to unsubscribe stale startup thread: {err}"
);
if let Ok(started) = result {
let thread_id = started.session.thread_id;
if let Err(err) = app_server.thread_unsubscribe(thread_id).await {
tracing::warn!(
thread_id = %thread_id,
"failed to unsubscribe stale startup thread: {err}"
);
}
self.discard_thread_local_state(thread_id).await;
}
return Ok(());
}
@@ -453,9 +453,14 @@ impl App {
self.chat_widget.maybe_send_next_queued_input();
}
Err(err) => {
self.chat_widget.add_error_message(format!(
"Failed to start a fresh session through the app server: {err}"
));
let message =
format!("Failed to start a fresh session through the app server: {err}");
tracing::warn!(
error = %err,
"startup thread/start failed"
);
self.chat_widget.add_error_message(message.clone());
return Err(color_eyre::eyre::eyre!(message));
}
}
Ok(())

View File

@@ -369,15 +369,15 @@ impl App {
self.chat_widget.add_error_message(message);
return false;
}
self.discard_side_thread_local(thread_id).await;
self.discard_thread_local_state(thread_id).await;
true
}
pub(super) async fn discard_closed_side_thread(&mut self, thread_id: ThreadId) {
self.discard_side_thread_local(thread_id).await;
self.discard_thread_local_state(thread_id).await;
}
async fn discard_side_thread_local(&mut self, thread_id: ThreadId) {
pub(super) async fn discard_thread_local_state(&mut self, thread_id: ThreadId) {
self.abort_thread_event_listener(thread_id);
self.thread_event_channels.remove(&thread_id);
self.side_threads.remove(&thread_id);

View File

@@ -181,6 +181,86 @@ async fn startup_thread_started_submits_queued_startup_input() {
}
}
#[tokio::test]
async fn startup_thread_start_failure_returns_error() {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let request_id = "startup-thread-start-failed".to_string();
app.pending_startup_thread_start_request_id = Some(request_id.clone());
app.chat_widget
.set_queue_submissions_until_session_configured(/*queue*/ true);
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let err = app
.handle_startup_thread_started(&mut app_server, request_id, Err("boom".to_string()))
.await
.expect_err("startup thread failure should exit instead of leaving chat unconfigured");
assert!(
err.to_string()
.contains("Failed to start a fresh session through the app server: boom")
);
assert_eq!(app.pending_startup_thread_start_request_id, None);
assert_eq!(app.primary_thread_id, None);
}
#[test]
fn stale_startup_thread_started_removes_local_routing_state() -> Result<()> {
const WORKER_THREADS: usize = 1;
const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024;
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(WORKER_THREADS)
.thread_stack_size(TEST_STACK_SIZE_BYTES)
.enable_all()
.build()?;
runtime.block_on(async {
let mut app = make_test_app().await;
let mut app_server =
crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()).await?;
let stale_started = app_server
.start_thread(app.chat_widget.config_ref())
.await?;
let primary_thread_id = ThreadId::new();
let stale_thread_id = stale_started.session.thread_id;
app.primary_thread_id = Some(primary_thread_id);
app.thread_event_channels.insert(
primary_thread_id,
ThreadEventChannel::new(THREAD_EVENT_CHANNEL_CAPACITY),
);
app.activate_thread_channel(primary_thread_id).await;
app.thread_event_channels.insert(
stale_thread_id,
ThreadEventChannel::new(THREAD_EVENT_CHANNEL_CAPACITY),
);
app.agent_navigation.upsert(
stale_thread_id,
Some("Stale".to_string()),
/*agent_role*/ None,
/*is_closed*/ false,
);
app.pending_startup_thread_start_request_id = Some("newer-startup-request".to_string());
assert!(app.thread_event_channels.contains_key(&stale_thread_id));
assert!(app.agent_navigation.get(&stale_thread_id).is_some());
app.handle_startup_thread_started(
&mut app_server,
"old-startup-request".to_string(),
Ok(stale_started),
)
.await?;
assert!(!app.thread_event_channels.contains_key(&stale_thread_id));
assert_eq!(app.agent_navigation.get(&stale_thread_id), None);
assert_eq!(app.active_thread_id, Some(primary_thread_id));
Ok(())
})
}
#[tokio::test]
async fn ignore_same_thread_resume_reports_noop_for_current_thread() {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;