mirror of
https://github.com/openai/codex.git
synced 2026-05-02 02:17:22 +00:00
fix: MCP leaks in app-server (#17223)
The disconnect path now reuses the same teardown flow as explicit unsubscribe, and the thread-state bookkeeping consistently reports only threads that lost their last subscriber https://github.com/openai/codex/issues/16895
This commit is contained in:
@@ -3883,9 +3883,18 @@ impl CodexMessageProcessor {
|
||||
self.command_exec_manager
|
||||
.connection_closed(connection_id)
|
||||
.await;
|
||||
self.thread_state_manager
|
||||
let thread_ids_with_no_subscribers = self
|
||||
.thread_state_manager
|
||||
.remove_connection(connection_id)
|
||||
.await;
|
||||
for thread_id in thread_ids_with_no_subscribers {
|
||||
let Ok(thread) = self.thread_manager.get_thread(thread_id).await else {
|
||||
self.finalize_thread_teardown(thread_id).await;
|
||||
continue;
|
||||
};
|
||||
self.unload_thread_without_subscribers(thread_id, thread)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn subscribe_running_assistant_turn_count(&self) -> watch::Receiver<usize> {
|
||||
@@ -5591,6 +5600,66 @@ impl CodexMessageProcessor {
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn unload_thread_without_subscribers(
|
||||
&mut self,
|
||||
thread_id: ThreadId,
|
||||
thread: Arc<CodexThread>,
|
||||
) {
|
||||
// This connection was the last subscriber. Only now do we unload the thread.
|
||||
info!("thread {thread_id} has no subscribers; shutting down");
|
||||
let should_start_unload_task = self.pending_thread_unloads.lock().await.insert(thread_id);
|
||||
|
||||
// Any pending app-server -> client requests for this thread can no longer be
|
||||
// answered; cancel their callbacks before shutdown/unload.
|
||||
self.outgoing
|
||||
.cancel_requests_for_thread(thread_id, /*error*/ None)
|
||||
.await;
|
||||
self.thread_state_manager
|
||||
.remove_thread_state(thread_id)
|
||||
.await;
|
||||
|
||||
if !should_start_unload_task {
|
||||
return;
|
||||
}
|
||||
|
||||
let outgoing = self.outgoing.clone();
|
||||
let pending_thread_unloads = self.pending_thread_unloads.clone();
|
||||
let thread_manager = self.thread_manager.clone();
|
||||
let thread_watch_manager = self.thread_watch_manager.clone();
|
||||
tokio::spawn(async move {
|
||||
match Self::wait_for_thread_shutdown(&thread).await {
|
||||
ThreadShutdownResult::Complete => {
|
||||
if thread_manager.remove_thread(&thread_id).await.is_none() {
|
||||
info!("thread {thread_id} was already removed before teardown finalized");
|
||||
thread_watch_manager
|
||||
.remove_thread(&thread_id.to_string())
|
||||
.await;
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
return;
|
||||
}
|
||||
thread_watch_manager
|
||||
.remove_thread(&thread_id.to_string())
|
||||
.await;
|
||||
let notification = ThreadClosedNotification {
|
||||
thread_id: thread_id.to_string(),
|
||||
};
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::ThreadClosed(notification))
|
||||
.await;
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
}
|
||||
ThreadShutdownResult::SubmitFailed => {
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
warn!("failed to submit Shutdown to thread {thread_id}");
|
||||
}
|
||||
ThreadShutdownResult::TimedOut => {
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
warn!("thread {thread_id} shutdown timed out; leaving thread loaded");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn thread_unsubscribe(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
@@ -5638,58 +5707,8 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
if !self.thread_state_manager.has_subscribers(thread_id).await {
|
||||
// This connection was the last subscriber. Only now do we unload the thread.
|
||||
info!("thread {thread_id} has no subscribers; shutting down");
|
||||
self.pending_thread_unloads.lock().await.insert(thread_id);
|
||||
// Any pending app-server -> client requests for this thread can no longer be
|
||||
// answered; cancel their callbacks before shutdown/unload.
|
||||
self.outgoing
|
||||
.cancel_requests_for_thread(thread_id, /*error*/ None)
|
||||
self.unload_thread_without_subscribers(thread_id, thread)
|
||||
.await;
|
||||
self.thread_state_manager
|
||||
.remove_thread_state(thread_id)
|
||||
.await;
|
||||
|
||||
let outgoing = self.outgoing.clone();
|
||||
let pending_thread_unloads = self.pending_thread_unloads.clone();
|
||||
let thread_manager = self.thread_manager.clone();
|
||||
let thread_watch_manager = self.thread_watch_manager.clone();
|
||||
tokio::spawn(async move {
|
||||
match Self::wait_for_thread_shutdown(&thread).await {
|
||||
ThreadShutdownResult::Complete => {
|
||||
if thread_manager.remove_thread(&thread_id).await.is_none() {
|
||||
info!(
|
||||
"thread {thread_id} was already removed before unsubscribe finalized"
|
||||
);
|
||||
thread_watch_manager
|
||||
.remove_thread(&thread_id.to_string())
|
||||
.await;
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
return;
|
||||
}
|
||||
thread_watch_manager
|
||||
.remove_thread(&thread_id.to_string())
|
||||
.await;
|
||||
let notification = ThreadClosedNotification {
|
||||
thread_id: thread_id.to_string(),
|
||||
};
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::ThreadClosed(
|
||||
notification,
|
||||
))
|
||||
.await;
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
}
|
||||
ThreadShutdownResult::SubmitFailed => {
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
warn!("failed to submit Shutdown to thread {thread_id}");
|
||||
}
|
||||
ThreadShutdownResult::TimedOut => {
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
warn!("thread {thread_id} shutdown timed out; leaving thread loaded");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
@@ -10047,7 +10066,8 @@ mod tests {
|
||||
state.lock().await.cancel_tx = Some(cancel_tx);
|
||||
}
|
||||
|
||||
manager.remove_connection(connection_a).await;
|
||||
let threads_to_unload = manager.remove_connection(connection_a).await;
|
||||
assert_eq!(threads_to_unload, Vec::<ThreadId>::new());
|
||||
assert!(
|
||||
tokio::time::timeout(Duration::from_millis(20), &mut cancel_rx)
|
||||
.await
|
||||
@@ -10068,7 +10088,8 @@ mod tests {
|
||||
let connection = ConnectionId(1);
|
||||
|
||||
manager.connection_initialized(connection).await;
|
||||
manager.remove_connection(connection).await;
|
||||
let threads_to_unload = manager.remove_connection(connection).await;
|
||||
assert_eq!(threads_to_unload, Vec::<ThreadId>::new());
|
||||
|
||||
assert!(
|
||||
manager
|
||||
|
||||
Reference in New Issue
Block a user