use super::*; pub(super) const THREAD_UNLOADING_DELAY: Duration = Duration::from_secs(30 * 60); #[derive(Clone)] pub(super) struct ListenerTaskContext { pub(super) thread_manager: Arc, pub(super) thread_state_manager: ThreadStateManager, pub(super) outgoing: Arc, pub(super) pending_thread_unloads: Arc>>, pub(super) thread_watch_manager: ThreadWatchManager, pub(super) thread_list_state_permit: Arc, pub(super) fallback_model_provider: String, pub(super) codex_home: PathBuf, pub(super) skills_watcher: Arc, } struct UnloadingState { delay: Duration, has_subscribers_rx: watch::Receiver, has_subscribers: (bool, Instant), thread_status_rx: watch::Receiver, is_active: (bool, Instant), } impl UnloadingState { async fn new( listener_task_context: &ListenerTaskContext, thread_id: ThreadId, delay: Duration, ) -> Option { let has_subscribers_rx = listener_task_context .thread_state_manager .subscribe_to_has_connections(thread_id) .await?; let thread_status_rx = listener_task_context .thread_watch_manager .subscribe(thread_id) .await?; let has_subscribers = (*has_subscribers_rx.borrow(), Instant::now()); let is_active = ( matches!(*thread_status_rx.borrow(), ThreadStatus::Active { .. }), Instant::now(), ); Some(Self { delay, has_subscribers_rx, has_subscribers, thread_status_rx, is_active, }) } fn unloading_target(&self) -> Option { match (self.has_subscribers, self.is_active) { ((false, has_no_subscribers_since), (false, is_inactive_since)) => { Some(std::cmp::max(has_no_subscribers_since, is_inactive_since) + self.delay) } _ => None, } } fn sync_receiver_values(&mut self) { let has_subscribers = *self.has_subscribers_rx.borrow(); if self.has_subscribers.0 != has_subscribers { self.has_subscribers = (has_subscribers, Instant::now()); } let is_active = matches!(*self.thread_status_rx.borrow(), ThreadStatus::Active { .. }); if self.is_active.0 != is_active { self.is_active = (is_active, Instant::now()); } } fn should_unload_now(&mut self) -> bool { self.sync_receiver_values(); self.unloading_target() .is_some_and(|target| target <= Instant::now()) } fn note_thread_activity_observed(&mut self) { if !self.is_active.0 { self.is_active = (false, Instant::now()); } } async fn wait_for_unloading_trigger(&mut self) -> bool { loop { self.sync_receiver_values(); let unloading_target = self.unloading_target(); if let Some(target) = unloading_target && target <= Instant::now() { return true; } let unloading_sleep = async { if let Some(target) = unloading_target { tokio::time::sleep_until(target.into()).await; } else { futures::future::pending::<()>().await; } }; tokio::select! { _ = unloading_sleep => return true, changed = self.has_subscribers_rx.changed() => { if changed.is_err() { return false; } self.sync_receiver_values(); }, changed = self.thread_status_rx.changed() => { if changed.is_err() { return false; } self.sync_receiver_values(); }, } } } } pub(super) enum ThreadShutdownResult { Complete, SubmitFailed, TimedOut, } pub(super) enum EnsureConversationListenerResult { Attached, ConnectionClosed, } #[expect( clippy::await_holding_invalid_type, reason = "listener subscription must be serialized against pending unloads" )] pub(super) async fn ensure_conversation_listener( listener_task_context: ListenerTaskContext, conversation_id: ThreadId, connection_id: ConnectionId, raw_events_enabled: bool, ) -> Result { let conversation = match listener_task_context .thread_manager .get_thread(conversation_id) .await { Ok(conv) => conv, Err(_) => { return Err(invalid_request(format!( "thread not found: {conversation_id}" ))); } }; let thread_state = { let pending_thread_unloads = listener_task_context.pending_thread_unloads.lock().await; if pending_thread_unloads.contains(&conversation_id) { return Err(invalid_request(format!( "thread {conversation_id} is closing; retry after the thread is closed" ))); } let Some(thread_state) = listener_task_context .thread_state_manager .try_ensure_connection_subscribed(conversation_id, connection_id, raw_events_enabled) .await else { return Ok(EnsureConversationListenerResult::ConnectionClosed); }; thread_state }; if let Err(error) = ensure_listener_task_running( listener_task_context.clone(), conversation_id, conversation, thread_state, ) .await { let _ = listener_task_context .thread_state_manager .unsubscribe_connection_from_thread(conversation_id, connection_id) .await; return Err(error); } Ok(EnsureConversationListenerResult::Attached) } pub(super) fn log_listener_attach_result( result: Result, thread_id: ThreadId, connection_id: ConnectionId, thread_kind: &'static str, ) { match result { Ok(EnsureConversationListenerResult::Attached) => {} Ok(EnsureConversationListenerResult::ConnectionClosed) => { tracing::debug!( thread_id = %thread_id, connection_id = ?connection_id, "skipping auto-attach for closed connection" ); } Err(err) => { tracing::warn!( "failed to attach listener for {thread_kind} {thread_id}: {message}", message = err.message ); } } } pub(super) async fn ensure_listener_task_running( listener_task_context: ListenerTaskContext, conversation_id: ThreadId, conversation: Arc, thread_state: Arc>, ) -> Result<(), JSONRPCErrorError> { let (cancel_tx, mut cancel_rx) = oneshot::channel(); let Some(mut unloading_state) = UnloadingState::new( &listener_task_context, conversation_id, THREAD_UNLOADING_DELAY, ) .await else { return Err(invalid_request(format!( "thread {conversation_id} is closing; retry after the thread is closed" ))); }; let config = conversation.config().await; let environments = conversation.environment_selections().await; let watch_registration = listener_task_context .skills_watcher .register_thread_config( config.as_ref(), listener_task_context.thread_manager.as_ref(), &environments, ) .await; let (mut listener_command_rx, listener_generation) = { let mut thread_state = thread_state.lock().await; if thread_state.listener_matches(&conversation) { return Ok(()); } thread_state.set_listener(cancel_tx, &conversation, watch_registration) }; let ListenerTaskContext { outgoing, thread_manager, thread_state_manager, pending_thread_unloads, thread_watch_manager, thread_list_state_permit, fallback_model_provider, codex_home, .. } = listener_task_context; let outgoing_for_task = Arc::clone(&outgoing); tokio::spawn(async move { loop { tokio::select! { biased; _ = &mut cancel_rx => { // Listener was superseded or the thread is being torn down. break; } listener_command = listener_command_rx.recv() => { let Some(listener_command) = listener_command else { break; }; handle_thread_listener_command( conversation_id, &conversation, codex_home.as_path(), &thread_state_manager, &thread_state, &thread_watch_manager, &outgoing_for_task, &pending_thread_unloads, listener_command, ) .await; } event = conversation.next_event() => { let event = match event { Ok(event) => event, Err(err) => { tracing::warn!("thread.next_event() failed with: {err}"); break; } }; // Track the event before emitting any typed translations // so thread-local state such as raw event opt-in stays // synchronized with the conversation. let raw_events_enabled = { let mut thread_state = thread_state.lock().await; thread_state.track_current_turn_event(&event.id, &event.msg); thread_state.experimental_raw_events }; let subscribed_connection_ids = thread_state_manager .subscribed_connection_ids(conversation_id) .await; let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing_for_task.clone(), subscribed_connection_ids, conversation_id, ); if let EventMsg::RawResponseItem(raw_response_item_event) = &event.msg && !raw_events_enabled { maybe_emit_hook_prompt_item_completed( conversation_id, &event.id, &raw_response_item_event.item, &thread_outgoing, ) .await; continue; } apply_bespoke_event_handling( event.clone(), conversation_id, conversation.clone(), thread_manager.clone(), thread_outgoing, thread_state.clone(), thread_watch_manager.clone(), thread_list_state_permit.clone(), fallback_model_provider.clone(), ) .await; } unloading_watchers_open = unloading_state.wait_for_unloading_trigger() => { if !unloading_watchers_open { break; } if !unloading_state.should_unload_now() { continue; } if matches!(conversation.agent_status().await, AgentStatus::Running) { unloading_state.note_thread_activity_observed(); continue; } { let mut pending_thread_unloads = pending_thread_unloads.lock().await; if pending_thread_unloads.contains(&conversation_id) { continue; } if !unloading_state.should_unload_now() { continue; } pending_thread_unloads.insert(conversation_id); } unload_thread_without_subscribers( thread_manager.clone(), outgoing_for_task.clone(), pending_thread_unloads.clone(), thread_state_manager.clone(), thread_watch_manager.clone(), conversation_id, conversation.clone(), ) .await; break; } } } let mut thread_state = thread_state.lock().await; if thread_state.listener_generation == listener_generation { thread_state.clear_listener(); } }); Ok(()) } pub(super) async fn wait_for_thread_shutdown(thread: &Arc) -> ThreadShutdownResult { match tokio::time::timeout(Duration::from_secs(10), thread.shutdown_and_wait()).await { Ok(Ok(())) => ThreadShutdownResult::Complete, Ok(Err(_)) => ThreadShutdownResult::SubmitFailed, Err(_) => ThreadShutdownResult::TimedOut, } } pub(super) async fn unload_thread_without_subscribers( thread_manager: Arc, outgoing: Arc, pending_thread_unloads: Arc>>, thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, thread_id: ThreadId, thread: Arc, ) { info!("thread {thread_id} has no subscribers and is idle; shutting down"); // Any pending app-server -> client requests for this thread can no longer be // answered; cancel their callbacks before shutdown/unload. outgoing .cancel_requests_for_thread(thread_id, /*error*/ None) .await; thread_state_manager.remove_thread_state(thread_id).await; tokio::spawn(async move { match wait_for_thread_shutdown(&thread).await { ThreadShutdownResult::Complete => { if thread_manager.remove_thread(&thread_id).await.is_none() { info!("thread {thread_id} was already removed before teardown finalized"); thread_watch_manager .remove_thread(&thread_id.to_string()) .await; pending_thread_unloads.lock().await.remove(&thread_id); return; } thread_watch_manager .remove_thread(&thread_id.to_string()) .await; let notification = ThreadClosedNotification { thread_id: thread_id.to_string(), }; outgoing .send_server_notification(ServerNotification::ThreadClosed(notification)) .await; pending_thread_unloads.lock().await.remove(&thread_id); } ThreadShutdownResult::SubmitFailed => { pending_thread_unloads.lock().await.remove(&thread_id); warn!("failed to submit Shutdown to thread {thread_id}"); } ThreadShutdownResult::TimedOut => { pending_thread_unloads.lock().await.remove(&thread_id); warn!("thread {thread_id} shutdown timed out; leaving thread loaded"); } } }); } #[allow(clippy::too_many_arguments)] pub(super) async fn handle_thread_listener_command( conversation_id: ThreadId, conversation: &Arc, codex_home: &Path, thread_state_manager: &ThreadStateManager, thread_state: &Arc>, thread_watch_manager: &ThreadWatchManager, outgoing: &Arc, pending_thread_unloads: &Arc>>, listener_command: ThreadListenerCommand, ) { match listener_command { ThreadListenerCommand::SendThreadResumeResponse(resume_request) => { handle_pending_thread_resume_request( conversation_id, conversation, codex_home, thread_state_manager, thread_state, thread_watch_manager, outgoing, pending_thread_unloads, *resume_request, ) .await; } ThreadListenerCommand::EmitThreadGoalUpdated { goal } => { let goal_is_active = goal.status == ThreadGoalStatus::Active; outgoing .send_server_notification(ServerNotification::ThreadGoalUpdated( ThreadGoalUpdatedNotification { thread_id: conversation_id.to_string(), turn_id: None, goal, }, )) .await; if !goal_is_active { flush_pending_terminal_plan_cleanup_for_subscribers( conversation_id, thread_state_manager, thread_state, outgoing, ) .await; } } ThreadListenerCommand::EmitThreadGoalCleared => { outgoing .send_server_notification(ServerNotification::ThreadGoalCleared( ThreadGoalClearedNotification { thread_id: conversation_id.to_string(), }, )) .await; flush_pending_terminal_plan_cleanup_for_subscribers( conversation_id, thread_state_manager, thread_state, outgoing, ) .await; } ThreadListenerCommand::EmitThreadGoalSnapshot { state_db } => { send_thread_goal_snapshot_notification(outgoing, conversation_id, &state_db).await; } ThreadListenerCommand::ResolveServerRequest { request_id, completion_tx, } => { resolve_pending_server_request( conversation_id, thread_state_manager, outgoing, request_id, ) .await; let _ = completion_tx.send(()); } } } async fn flush_pending_terminal_plan_cleanup_for_subscribers( conversation_id: ThreadId, thread_state_manager: &ThreadStateManager, thread_state: &Arc>, outgoing: &Arc, ) { let subscribed_connection_ids = thread_state_manager .subscribed_connection_ids(conversation_id) .await; let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing.clone(), subscribed_connection_ids, conversation_id, ); crate::bespoke_event_handling::flush_pending_terminal_plan_cleanup( conversation_id, thread_state, &thread_outgoing, ) .await; } #[allow(clippy::too_many_arguments)] #[expect( clippy::await_holding_invalid_type, reason = "running-thread resume subscription must be serialized against pending unloads" )] pub(super) async fn handle_pending_thread_resume_request( conversation_id: ThreadId, conversation: &Arc, _codex_home: &Path, thread_state_manager: &ThreadStateManager, thread_state: &Arc>, thread_watch_manager: &ThreadWatchManager, outgoing: &Arc, pending_thread_unloads: &Arc>>, pending: crate::thread_state::PendingThreadResumeRequest, ) { let active_turn = { let state = thread_state.lock().await; state.active_turn_snapshot() }; tracing::debug!( thread_id = %conversation_id, request_id = ?pending.request_id, active_turn_present = active_turn.is_some(), active_turn_id = ?active_turn.as_ref().map(|turn| turn.id.as_str()), active_turn_status = ?active_turn.as_ref().map(|turn| &turn.status), "composing running thread resume response" ); let has_live_in_progress_turn = matches!(conversation.agent_status().await, AgentStatus::Running) || active_turn .as_ref() .is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress)); let request_id = pending.request_id; let connection_id = request_id.connection_id; let mut thread = pending.thread_summary; if pending.include_turns { populate_thread_turns_from_history( &mut thread, &pending.history_items, active_turn.as_ref(), ); } let thread_status = thread_watch_manager .loaded_status_for_thread(&thread.id) .await; set_thread_status_and_interrupt_stale_turns( &mut thread, thread_status, has_live_in_progress_turn, ); let token_usage_thread = pending.include_turns.then(|| thread.clone()); if pending.redact_resume_payloads { redact_thread_resume_payloads(&mut thread); } { let pending_thread_unloads = pending_thread_unloads.lock().await; if pending_thread_unloads.contains(&conversation_id) { drop(pending_thread_unloads); outgoing .send_error( request_id, invalid_request(format!( "thread {conversation_id} is closing; retry thread/resume after the thread is closed" )), ) .await; return; } if !thread_state_manager .try_add_connection_to_thread(conversation_id, connection_id) .await { tracing::debug!( thread_id = %conversation_id, connection_id = ?connection_id, "skipping running thread resume for closed connection" ); return; } } if pending.emit_thread_goal_update && let Err(err) = conversation.apply_goal_resume_runtime_effects().await { tracing::warn!("failed to apply goal resume runtime effects: {err}"); } let ThreadConfigSnapshot { model, model_provider_id, service_tier, approval_policy, approvals_reviewer, permission_profile, active_permission_profile, cwd, workspace_roots, reasoning_effort, .. } = pending.config_snapshot; let instruction_sources = pending.instruction_sources; let sandbox = thread_response_sandbox_policy(&permission_profile, cwd.as_path()); let active_permission_profile = thread_response_active_permission_profile(active_permission_profile); let session_id = conversation.session_configured().session_id.to_string(); thread.session_id = session_id; let response = ThreadResumeResponse { thread, model, model_provider: model_provider_id, service_tier, cwd, runtime_workspace_roots: workspace_roots, instruction_sources, approval_policy: approval_policy.into(), approvals_reviewer: approvals_reviewer.into(), sandbox, active_permission_profile, reasoning_effort, }; outgoing.send_response(request_id, response).await; // Match cold resume: metadata-only resume should attach the listener without // paying the cost of turn reconstruction for historical usage replay. if let Some(token_usage_thread) = token_usage_thread { let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( &pending.history_items, token_usage_thread.turns.as_slice(), ); // Rejoining a loaded thread has the same UI contract as a cold resume, but // uses the live conversation state instead of reconstructing a new session. send_thread_token_usage_update_to_connection( outgoing, connection_id, conversation_id, &token_usage_thread, conversation.as_ref(), token_usage_turn_id, ) .await; } if pending.emit_thread_goal_update { if let Some(state_db) = pending.thread_goal_state_db { send_thread_goal_snapshot_notification(outgoing, conversation_id, &state_db).await; } else { tracing::warn!( thread_id = %conversation_id, "state db unavailable when reading thread goal for running thread resume" ); } } outgoing .replay_requests_to_connection_for_thread(connection_id, conversation_id) .await; // App-server owns resume response and snapshot ordering, so wait until // replay completes before letting core start goal continuation. if pending.emit_thread_goal_update && let Err(err) = conversation.continue_active_goal_if_idle().await { tracing::warn!("failed to continue active goal after running-thread resume: {err}"); } } pub(super) async fn send_thread_goal_snapshot_notification( outgoing: &Arc, thread_id: ThreadId, state_db: &StateDbHandle, ) { match state_db.thread_goals().get_thread_goal(thread_id).await { Ok(Some(goal)) => { outgoing .send_server_notification(ServerNotification::ThreadGoalUpdated( ThreadGoalUpdatedNotification { thread_id: thread_id.to_string(), turn_id: None, goal: api_thread_goal_from_state(goal), }, )) .await; } Ok(None) => { outgoing .send_server_notification(ServerNotification::ThreadGoalCleared( ThreadGoalClearedNotification { thread_id: thread_id.to_string(), }, )) .await; } Err(err) => { tracing::warn!( thread_id = %thread_id, "failed to read thread goal for resume snapshot: {err}" ); } } } pub(crate) fn populate_thread_turns_from_history( thread: &mut Thread, items: &[RolloutItem], active_turn: Option<&Turn>, ) { let mut turns = build_api_turns_from_rollout_items(items); if let Some(active_turn) = active_turn { merge_turn_history_with_active_turn(&mut turns, active_turn.clone()); } thread.turns = turns; } pub(super) async fn resolve_pending_server_request( conversation_id: ThreadId, thread_state_manager: &ThreadStateManager, outgoing: &Arc, request_id: RequestId, ) { let thread_id = conversation_id.to_string(); let subscribed_connection_ids = thread_state_manager .subscribed_connection_ids(conversation_id) .await; let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing.clone(), subscribed_connection_ids, conversation_id, ); outgoing .send_server_notification(ServerNotification::ServerRequestResolved( ServerRequestResolvedNotification { thread_id, request_id, }, )) .await; } pub(super) fn merge_turn_history_with_active_turn(turns: &mut Vec, active_turn: Turn) { turns.retain(|turn| turn.id != active_turn.id); turns.push(active_turn); } pub(super) fn set_thread_status_and_interrupt_stale_turns( thread: &mut Thread, loaded_status: ThreadStatus, has_live_in_progress_turn: bool, ) { let status = resolve_thread_status(loaded_status, has_live_in_progress_turn); if !matches!(status, ThreadStatus::Active { .. }) { for turn in &mut thread.turns { if matches!(turn.status, TurnStatus::InProgress) { turn.status = TurnStatus::Interrupted; } } } thread.status = status; } #[cfg(test)] mod tests { use super::*; use crate::outgoing_message::ConnectionId; use crate::outgoing_message::OutgoingEnvelope; use crate::outgoing_message::OutgoingMessage; use crate::thread_state::ConnectionCapabilities; use crate::thread_state::PendingTerminalPlanCleanup; use codex_protocol::plan_tool::PlanItemArg; use codex_protocol::plan_tool::StepStatus; use codex_protocol::plan_tool::UpdatePlanArgs; use core_test_support::load_default_config_for_test; use tempfile::TempDir; use tokio::sync::mpsc; async fn recv_message(rx: &mut mpsc::Receiver) -> OutgoingMessage { match rx.recv().await.expect("expected outgoing message") { OutgoingEnvelope::Broadcast { message } | OutgoingEnvelope::ToConnection { message, .. } => message, } } #[tokio::test] async fn listener_goal_clear_flushes_pending_terminal_plan_cleanup() -> anyhow::Result<()> { let codex_home = TempDir::new()?; let config = load_default_config_for_test(&codex_home).await; let thread_manager = Arc::new( codex_core::test_support::thread_manager_with_models_provider_and_home( CodexAuth::create_dummy_chatgpt_auth_for_testing(), config.model_provider.clone(), config.codex_home.to_path_buf(), Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), ), ); let codex_core::NewThread { thread_id, thread: conversation, .. } = thread_manager.start_thread(config).await?; let thread_state_manager = ThreadStateManager::new(); let connection_id = ConnectionId(1); thread_state_manager .connection_initialized(connection_id, ConnectionCapabilities::default()) .await; thread_state_manager .try_ensure_connection_subscribed( thread_id, connection_id, /*experimental_raw_events*/ false, ) .await .expect("connection should subscribe"); let thread_state = thread_state_manager.thread_state(thread_id).await; thread_state.lock().await.pending_terminal_plan_cleanups = vec![PendingTerminalPlanCleanup { turn_id: "terminal-turn".to_string(), plan_update: UpdatePlanArgs { explanation: Some("still working".to_string()), plan: vec![PlanItemArg { step: "first".to_string(), status: StepStatus::InProgress, }], }, }]; let (tx, mut rx) = mpsc::channel(4); let outgoing = Arc::new(OutgoingMessageSender::new( tx, codex_analytics::AnalyticsEventsClient::disabled(), )); handle_thread_listener_command( thread_id, &conversation, codex_home.path(), &thread_state_manager, &thread_state, &ThreadWatchManager::new(), &outgoing, &Arc::new(Mutex::new(HashSet::new())), ThreadListenerCommand::EmitThreadGoalCleared, ) .await; assert!(matches!( recv_message(&mut rx).await, OutgoingMessage::AppServerNotification(ServerNotification::ThreadGoalCleared(_)) )); assert!(matches!( recv_message(&mut rx).await, OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(_)) )); Ok(()) } }