app-server: retain thread listener across disconnects (#12373)

- keep the per-thread app-server listener alive when the last client
unsubscribes or disconnects
- preserve listener-side active turn history so running `thread/resume`
can merge an in-progress turn snapshot after reconnect
- add `ThreadStateManager` regressions for disconnect/unsubscribe
retention and explicit thread teardown cleanup

Added unit tests, and I manually tested to confirm the fix

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Max Johnson
2026-02-21 21:33:33 -08:00
committed by GitHub
parent c4f1af7a86
commit 37610240ec
3 changed files with 109 additions and 14 deletions

View File

@@ -5876,7 +5876,7 @@ impl CodexMessageProcessor {
loop {
tokio::select! {
_ = &mut cancel_rx => {
// User has unsubscribed, so exit this task.
// Listener was superseded or the thread is being torn down.
break;
}
event = conversation.next_event() => {
@@ -6296,6 +6296,14 @@ async fn handle_pending_thread_resume_request(
let state = thread_state.lock().await;
state.active_turn_snapshot()
};
tracing::debug!(
thread_id = %conversation_id,
request_id = ?pending.request_id,
active_turn_present = active_turn.is_some(),
active_turn_id = ?active_turn.as_ref().map(|turn| turn.id.as_str()),
active_turn_status = ?active_turn.as_ref().map(|turn| &turn.status),
"composing running thread resume response"
);
let mut has_in_progress_turn = active_turn
.as_ref()
.is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress));
@@ -7344,8 +7352,8 @@ mod tests {
}
#[tokio::test]
async fn removing_one_listener_does_not_cancel_other_subscriptions_for_same_thread()
-> Result<()> {
async fn removing_listeners_retains_thread_listener_when_last_subscriber_leaves() -> Result<()>
{
let mut manager = ThreadStateManager::new();
let thread_id = ThreadId::from_string("ad7f0408-99b8-4f6e-a46f-bd0eec433370")?;
let listener_a = Uuid::new_v4();
@@ -7372,7 +7380,13 @@ mod tests {
.is_err()
);
assert_eq!(manager.remove_listener(listener_b).await, Some(thread_id));
assert_eq!(cancel_rx.await, Ok(()));
assert!(
tokio::time::timeout(Duration::from_millis(20), &mut cancel_rx)
.await
.is_err()
);
let state = manager.thread_state(thread_id);
assert!(state.lock().await.subscribed_connection_ids().is_empty());
Ok(())
}
@@ -7424,28 +7438,79 @@ mod tests {
}
#[tokio::test]
async fn removing_connection_clears_subscription_and_listener_when_last_subscriber()
async fn removing_connection_retains_listener_and_active_turn_when_last_subscriber_disconnects()
-> Result<()> {
let mut manager = ThreadStateManager::new();
let thread_id = ThreadId::from_string("ad7f0408-99b8-4f6e-a46f-bd0eec433370")?;
let listener = Uuid::new_v4();
let connection = ConnectionId(1);
let (cancel_tx, cancel_rx) = oneshot::channel();
let (cancel_tx, mut cancel_rx) = oneshot::channel();
manager
.set_listener(listener, thread_id, connection, false)
.await;
{
let state = manager.thread_state(thread_id);
state.lock().await.cancel_tx = Some(cancel_tx);
let mut state = state.lock().await;
state.cancel_tx = Some(cancel_tx);
state.track_current_turn_event(&EventMsg::TurnStarted(
codex_protocol::protocol::TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
},
));
}
manager.remove_connection(connection).await;
assert_eq!(cancel_rx.await, Ok(()));
assert!(
tokio::time::timeout(Duration::from_millis(20), &mut cancel_rx)
.await
.is_err()
);
assert_eq!(manager.remove_listener(listener).await, None);
let state = manager.thread_state(thread_id);
assert!(state.lock().await.subscribed_connection_ids().is_empty());
let state = state.lock().await;
assert!(state.subscribed_connection_ids().is_empty());
assert!(state.cancel_tx.is_some());
let active_turn = state.active_turn_snapshot().expect("active turn snapshot");
assert_eq!(active_turn.id, "turn-1");
assert_eq!(active_turn.status, TurnStatus::InProgress);
Ok(())
}
#[tokio::test]
async fn removing_thread_state_clears_listener_and_active_turn_history() -> Result<()> {
let mut manager = ThreadStateManager::new();
let thread_id = ThreadId::from_string("ad7f0408-99b8-4f6e-a46f-bd0eec433370")?;
let connection = ConnectionId(1);
let (cancel_tx, cancel_rx) = oneshot::channel();
manager
.ensure_connection_subscribed(thread_id, connection, false)
.await;
{
let state = manager.thread_state(thread_id);
let mut state = state.lock().await;
state.cancel_tx = Some(cancel_tx);
state.track_current_turn_event(&EventMsg::TurnStarted(
codex_protocol::protocol::TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
},
));
}
manager.remove_thread_state(thread_id).await;
assert_eq!(cancel_rx.await, Ok(()));
let state = manager.thread_state(thread_id);
let state = state.lock().await;
assert!(state.subscribed_connection_ids().is_empty());
assert!(state.cancel_tx.is_none());
assert!(state.active_turn_snapshot().is_none());
Ok(())
}