feat(app-server): thread/unsubscribe API (#10954)

Adds a new v2 app-server API for a client to be able to unsubscribe to a
thread:
- New RPC method: `thread/unsubscribe`
- New server notification: `thread/closed`

Today clients can start/resume/archive threads, but there wasn’t a way
to explicitly unload a live thread from memory without archiving it.
With `thread/unsubscribe`, a client can indicate it is no longer
actively working with a live Thread. If this is the only client
subscribed to that given thread, the thread will be automatically closed
by app-server, at which point the server will send `thread/closed` and
`thread/status/changed` with `status: notLoaded` notifications.

This gives clients a way to prevent long-running app-server processes
from accumulating too many thread (and related) objects in memory.

Closed threads will also be removed from `thread/loaded/list`.
This commit is contained in:
Owen Lin
2026-02-25 13:14:30 -08:00
committed by GitHub
parent d45ffd5830
commit 21f7032dbb
26 changed files with 1191 additions and 74 deletions

View File

@@ -127,6 +127,7 @@ use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadArchivedNotification;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanParams;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanResponse;
use codex_app_server_protocol::ThreadClosedNotification;
use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadCompactStartResponse;
use codex_app_server_protocol::ThreadForkParams;
@@ -160,6 +161,9 @@ use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::ThreadUnarchivedNotification;
use codex_app_server_protocol::ThreadUnsubscribeParams;
use codex_app_server_protocol::ThreadUnsubscribeResponse;
use codex_app_server_protocol::ThreadUnsubscribeStatus;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnStartParams;
@@ -323,6 +327,12 @@ enum AppListLoadResult {
Directory(Result<Vec<AppInfo>, String>),
}
enum ThreadShutdownResult {
Complete,
SubmitFailed,
TimedOut,
}
fn convert_remote_scope(scope: ApiHazelnutScope) -> RemoteSkillHazelnutScope {
match scope {
ApiHazelnutScope::WorkspaceShared => RemoteSkillHazelnutScope::WorkspaceShared,
@@ -358,6 +368,7 @@ pub(crate) struct CodexMessageProcessor {
cli_overrides: Vec<(String, TomlValue)>,
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
active_login: Arc<Mutex<Option<ActiveLogin>>>,
pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
thread_state_manager: ThreadStateManager,
thread_watch_manager: ThreadWatchManager,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
@@ -430,6 +441,7 @@ impl CodexMessageProcessor {
cli_overrides,
cloud_requirements,
active_login: Arc::new(Mutex::new(None)),
pending_thread_unloads: Arc::new(Mutex::new(HashSet::new())),
thread_state_manager: ThreadStateManager::new(),
thread_watch_manager: ThreadWatchManager::new_with_outgoing(outgoing),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
@@ -557,6 +569,10 @@ impl CodexMessageProcessor {
self.thread_start(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadUnsubscribe { request_id, params } => {
self.thread_unsubscribe(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadResume { request_id, params } => {
self.thread_resume(to_connection_request_id(request_id), params)
.await;
@@ -2876,6 +2892,23 @@ impl CodexMessageProcessor {
}
async fn thread_resume(&mut self, request_id: ConnectionRequestId, params: ThreadResumeParams) {
if let Ok(thread_id) = ThreadId::from_string(&params.thread_id)
&& self
.pending_thread_unloads
.lock()
.await
.contains(&thread_id)
{
self.send_invalid_request_error(
request_id,
format!(
"thread {thread_id} is closing; retry thread/resume after the thread is closed"
),
)
.await;
return;
}
if self
.resume_running_thread(request_id.clone(), &params)
.await
@@ -4729,6 +4762,150 @@ impl CodexMessageProcessor {
}
}
async fn wait_for_thread_shutdown(thread: &Arc<CodexThread>) -> ThreadShutdownResult {
match thread.submit(Op::Shutdown).await {
Ok(_) => {
let wait_for_shutdown = async {
loop {
if matches!(thread.agent_status().await, AgentStatus::Shutdown) {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
};
if tokio::time::timeout(Duration::from_secs(10), wait_for_shutdown)
.await
.is_err()
{
ThreadShutdownResult::TimedOut
} else {
ThreadShutdownResult::Complete
}
}
Err(_) => ThreadShutdownResult::SubmitFailed,
}
}
async fn finalize_thread_teardown(&mut self, thread_id: ThreadId) {
self.pending_thread_unloads.lock().await.remove(&thread_id);
self.outgoing.cancel_requests_for_thread(thread_id).await;
self.thread_state_manager
.remove_thread_state(thread_id)
.await;
self.thread_watch_manager
.remove_thread(&thread_id.to_string())
.await;
}
async fn thread_unsubscribe(
&mut self,
request_id: ConnectionRequestId,
params: ThreadUnsubscribeParams,
) {
let thread_id = match ThreadId::from_string(&params.thread_id) {
Ok(id) => id,
Err(err) => {
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
.await;
return;
}
};
let Ok(thread) = self.thread_manager.get_thread(thread_id).await else {
// Reconcile stale app-server bookkeeping when the thread has already been
// removed from the core manager. This keeps loaded-status/subscription state
// consistent with the source of truth before reporting NotLoaded.
self.finalize_thread_teardown(thread_id).await;
self.outgoing
.send_response(
request_id,
ThreadUnsubscribeResponse {
status: ThreadUnsubscribeStatus::NotLoaded,
},
)
.await;
return;
};
let was_subscribed = self
.thread_state_manager
.unsubscribe_connection_from_thread(thread_id, request_id.connection_id)
.await;
if !was_subscribed {
self.outgoing
.send_response(
request_id,
ThreadUnsubscribeResponse {
status: ThreadUnsubscribeStatus::NotSubscribed,
},
)
.await;
return;
}
if !self.thread_state_manager.has_subscribers(thread_id).await {
// This connection was the last subscriber. Only now do we unload the thread.
info!("thread {thread_id} has no subscribers; shutting down");
self.pending_thread_unloads.lock().await.insert(thread_id);
// Any pending app-server -> client requests for this thread can no longer be
// answered; cancel their callbacks before shutdown/unload.
self.outgoing.cancel_requests_for_thread(thread_id).await;
self.thread_state_manager
.remove_thread_state(thread_id)
.await;
let outgoing = self.outgoing.clone();
let pending_thread_unloads = self.pending_thread_unloads.clone();
let thread_manager = self.thread_manager.clone();
let thread_watch_manager = self.thread_watch_manager.clone();
tokio::spawn(async move {
match Self::wait_for_thread_shutdown(&thread).await {
ThreadShutdownResult::Complete => {
if thread_manager.remove_thread(&thread_id).await.is_none() {
info!(
"thread {thread_id} was already removed before unsubscribe finalized"
);
thread_watch_manager
.remove_thread(&thread_id.to_string())
.await;
pending_thread_unloads.lock().await.remove(&thread_id);
return;
}
thread_watch_manager
.remove_thread(&thread_id.to_string())
.await;
let notification = ThreadClosedNotification {
thread_id: thread_id.to_string(),
};
outgoing
.send_server_notification(ServerNotification::ThreadClosed(
notification,
))
.await;
pending_thread_unloads.lock().await.remove(&thread_id);
}
ThreadShutdownResult::SubmitFailed => {
pending_thread_unloads.lock().await.remove(&thread_id);
warn!("failed to submit Shutdown to thread {thread_id}");
}
ThreadShutdownResult::TimedOut => {
pending_thread_unloads.lock().await.remove(&thread_id);
warn!("thread {thread_id} shutdown timed out; leaving thread loaded");
}
}
});
}
self.outgoing
.send_response(
request_id,
ThreadUnsubscribeResponse {
status: ThreadUnsubscribeStatus::Unsubscribed,
},
)
.await;
}
async fn archive_thread_common(
&mut self,
thread_id: ThreadId,
@@ -4800,37 +4977,19 @@ impl CodexMessageProcessor {
state_db_ctx = Some(ctx);
}
info!("thread {thread_id} was active; shutting down");
// Request shutdown.
match conversation.submit(Op::Shutdown).await {
Ok(_) => {
// Poll agent status rather than consuming events so attached listeners do not block shutdown.
let wait_for_shutdown = async {
loop {
if matches!(conversation.agent_status().await, AgentStatus::Shutdown) {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
};
if tokio::time::timeout(Duration::from_secs(10), wait_for_shutdown)
.await
.is_err()
{
warn!("thread {thread_id} shutdown timed out; proceeding with archive");
}
match Self::wait_for_thread_shutdown(&conversation).await {
ThreadShutdownResult::Complete => {}
ThreadShutdownResult::SubmitFailed => {
error!(
"failed to submit Shutdown to thread {thread_id}; proceeding with archive"
);
}
Err(err) => {
error!("failed to submit Shutdown to thread {thread_id}: {err}");
ThreadShutdownResult::TimedOut => {
warn!("thread {thread_id} shutdown timed out; proceeding with archive");
}
}
self.thread_state_manager
.remove_thread_state(thread_id)
.await;
}
self.thread_watch_manager
.remove_thread(&thread_id.to_string())
.await;
self.finalize_thread_teardown(thread_id).await;
if state_db_ctx.is_none() {
state_db_ctx = get_state_db(&self.config, None).await;
@@ -6180,6 +6339,7 @@ impl CodexMessageProcessor {
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing_for_task.clone(),
subscribed_connection_ids,
conversation_id,
);
apply_bespoke_event_handling(
event.clone(),
@@ -6482,10 +6642,8 @@ impl CodexMessageProcessor {
WindowsSandboxSetupMode::Unelevated => CoreWindowsSandboxSetupMode::Unelevated,
};
let config = Arc::clone(&self.config);
let outgoing = ThreadScopedOutgoingMessageSender::new(
Arc::clone(&self.outgoing),
vec![request_id.connection_id],
);
let outgoing = Arc::clone(&self.outgoing);
let connection_id = request_id.connection_id;
tokio::spawn(async move {
let setup_request = WindowsSandboxSetupRequest {
@@ -6508,9 +6666,10 @@ impl CodexMessageProcessor {
error: setup_result.err().map(|err| err.to_string()),
};
outgoing
.send_server_notification(ServerNotification::WindowsSandboxSetupCompleted(
notification,
))
.send_server_notification_to_connections(
&[connection_id],
ServerNotification::WindowsSandboxSetupCompleted(notification),
)
.await;
});
}