Compare commits

...

3 Commits

Author SHA1 Message Date
jif-oai
2379b04d6c Merge branch 'main' into jif/app-server-close 2026-04-14 20:25:03 +01:00
jif-oai
2b0761abd2 Address comment 2026-04-14 20:18:08 +01:00
jif-oai
4c26d7af08 feat: app-server close thread 2026-04-14 18:28:09 +01:00
16 changed files with 323 additions and 12 deletions

View File

@@ -2636,6 +2636,17 @@
],
"type": "object"
},
"ThreadCloseParams": {
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"type": "object"
},
"ThreadCompactStartParams": {
"properties": {
"threadId": {
@@ -3769,6 +3780,30 @@
"title": "Thread/archiveRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/close"
],
"title": "Thread/closeRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadCloseParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/closeRequest",
"type": "object"
},
{
"properties": {
"id": {

View File

@@ -338,6 +338,30 @@
"title": "Thread/archiveRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
"thread/close"
],
"title": "Thread/closeRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ThreadCloseParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/closeRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -12779,6 +12803,24 @@
"title": "ThreadArchivedNotification",
"type": "object"
},
"ThreadCloseParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadCloseParams",
"type": "object"
},
"ThreadCloseResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadCloseResponse",
"type": "object"
},
"ThreadClosedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {

View File

@@ -920,6 +920,30 @@
"title": "Thread/archiveRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/close"
],
"title": "Thread/closeRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadCloseParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/closeRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -10627,6 +10651,24 @@
"title": "ThreadArchivedNotification",
"type": "object"
},
"ThreadCloseParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadCloseParams",
"type": "object"
},
"ThreadCloseResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadCloseResponse",
"type": "object"
},
"ThreadClosedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {

View File

@@ -0,0 +1,13 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadCloseParams",
"type": "object"
}

View File

@@ -0,0 +1,5 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadCloseResponse",
"type": "object"
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadCloseParams = { threadId: string, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadCloseResponse = Record<string, never>;

View File

@@ -279,6 +279,8 @@ export type { ThreadActiveFlag } from "./ThreadActiveFlag";
export type { ThreadArchiveParams } from "./ThreadArchiveParams";
export type { ThreadArchiveResponse } from "./ThreadArchiveResponse";
export type { ThreadArchivedNotification } from "./ThreadArchivedNotification";
export type { ThreadCloseParams } from "./ThreadCloseParams";
export type { ThreadCloseResponse } from "./ThreadCloseResponse";
export type { ThreadClosedNotification } from "./ThreadClosedNotification";
export type { ThreadCompactStartParams } from "./ThreadCompactStartParams";
export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse";

View File

@@ -255,6 +255,10 @@ client_request_definitions! {
params: v2::ThreadArchiveParams,
response: v2::ThreadArchiveResponse,
},
ThreadClose => "thread/close" {
params: v2::ThreadCloseParams,
response: v2::ThreadCloseResponse,
},
ThreadUnsubscribe => "thread/unsubscribe" {
params: v2::ThreadUnsubscribeParams,
response: v2::ThreadUnsubscribeResponse,

View File

@@ -2913,6 +2913,18 @@ pub struct ThreadArchiveParams {
#[ts(export_to = "v2/")]
pub struct ThreadArchiveResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCloseParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCloseResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -128,6 +128,8 @@ use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadArchivedNotification;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanParams;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanResponse;
use codex_app_server_protocol::ThreadCloseParams;
use codex_app_server_protocol::ThreadCloseResponse;
use codex_app_server_protocol::ThreadClosedNotification;
use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadCompactStartResponse;
@@ -891,6 +893,10 @@ impl CodexMessageProcessor {
self.thread_archive(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadClose { request_id, params } => {
self.thread_close(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadIncrementElicitation { request_id, params } => {
self.thread_increment_elicitation(to_connection_request_id(request_id), params)
.await;
@@ -2768,6 +2774,41 @@ impl CodexMessageProcessor {
}
}
async fn thread_close(&self, request_id: ConnectionRequestId, params: ThreadCloseParams) {
let thread_id = match ThreadId::from_string(&params.thread_id) {
Ok(id) => id,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid thread id: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
match self.close_thread_common(thread_id).await {
Ok(did_close) => {
self.outgoing
.send_response(request_id, ThreadCloseResponse {})
.await;
if did_close {
let notification = ThreadClosedNotification {
thread_id: thread_id.to_string(),
};
self.outgoing
.send_server_notification(ServerNotification::ThreadClosed(notification))
.await;
}
}
Err(err) => {
self.outgoing.send_error(request_id, err).await;
}
}
}
async fn thread_increment_elicitation(
&self,
request_id: ConnectionRequestId,
@@ -5805,6 +5846,53 @@ impl CodexMessageProcessor {
.await;
}
async fn close_thread_common(&self, thread_id: ThreadId) -> Result<bool, JSONRPCErrorError> {
{
let mut pending_thread_unloads = self.pending_thread_unloads.lock().await;
if !pending_thread_unloads.insert(thread_id) {
return Ok(false);
}
}
let thread = match self.thread_manager.get_thread(thread_id).await {
Ok(thread) => thread,
Err(_) => {
self.finalize_thread_teardown(thread_id).await;
return Ok(false);
}
};
match Self::wait_for_thread_shutdown(&thread).await {
ThreadShutdownResult::Complete => {
let did_close = self
.thread_manager
.remove_thread(&thread_id)
.await
.is_some();
self.finalize_thread_teardown(thread_id).await;
Ok(did_close)
}
ThreadShutdownResult::SubmitFailed => {
self.pending_thread_unloads.lock().await.remove(&thread_id);
Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to close thread {thread_id}: failed to submit shutdown"
),
data: None,
})
}
ThreadShutdownResult::TimedOut => {
self.pending_thread_unloads.lock().await.remove(&thread_id);
Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to close thread {thread_id}: shutdown timed out"),
data: None,
})
}
}
}
async fn unload_thread_without_subscribers(
thread_manager: Arc<ThreadManager>,
outgoing: Arc<OutgoingMessageSender>,

View File

@@ -61,6 +61,7 @@ use codex_app_server_protocol::ReviewStartParams;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SkillsListParams;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadCloseParams;
use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadInjectItemsParams;
@@ -369,6 +370,15 @@ impl McpProcess {
self.send_request("thread/archive", params).await
}
/// Send a `thread/close` JSON-RPC request.
pub async fn send_thread_close_request(
&mut self,
params: ThreadCloseParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/close", params).await
}
/// Send a `thread/name/set` JSON-RPC request.
pub async fn send_thread_set_name_request(
&mut self,

View File

@@ -9,6 +9,8 @@ use app_test_support::to_response;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadCloseParams;
use codex_app_server_protocol::ThreadCloseResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
@@ -126,6 +128,51 @@ async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<(
Ok(())
}
#[tokio::test]
async fn thread_close_unloads_thread_immediately() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_thread(&mut mcp).await?;
let close_id = mcp
.send_thread_close_request(ThreadCloseParams {
thread_id: thread_id.clone(),
})
.await?;
let close_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(close_id)),
)
.await??;
let _: ThreadCloseResponse = to_response::<ThreadCloseResponse>(close_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
let list_id = mcp
.send_thread_loaded_list_request(ThreadLoadedListParams::default())
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ThreadLoadedListResponse { data, next_cursor } =
to_response::<ThreadLoadedListResponse>(list_resp)?;
assert_eq!(data, Vec::<String>::new());
assert_eq!(next_cursor, None);
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
#[cfg(target_os = "windows")]

View File

@@ -1553,8 +1553,8 @@ impl App {
if let Some(thread_id) = self.chat_widget.thread_id() {
// Clear any in-flight rollback guard when switching threads.
self.backtrack.pending_rollback = None;
if let Err(err) = app_server.thread_unsubscribe(thread_id).await {
tracing::warn!("failed to unsubscribe thread {thread_id}: {err}");
if let Err(err) = app_server.thread_close(thread_id).await {
tracing::warn!("failed to close thread {thread_id}: {err}");
}
self.abort_thread_event_listener(thread_id);
}
@@ -3347,8 +3347,8 @@ impl App {
let tracked_thread_ids: Vec<ThreadId> =
self.thread_event_channels.keys().copied().collect();
for thread_id in tracked_thread_ids {
if let Err(err) = app_server.thread_unsubscribe(thread_id).await {
tracing::warn!("failed to unsubscribe tracked thread {thread_id}: {err}");
if let Err(err) = app_server.thread_close(thread_id).await {
tracing::warn!("failed to close tracked thread {thread_id}: {err}");
}
}
self.config = config.clone();

View File

@@ -30,6 +30,8 @@ use codex_app_server_protocol::SkillsListResponse;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanParams;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanResponse;
use codex_app_server_protocol::ThreadCloseParams;
use codex_app_server_protocol::ThreadCloseResponse;
use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadCompactStartResponse;
use codex_app_server_protocol::ThreadForkParams;
@@ -60,8 +62,6 @@ use codex_app_server_protocol::ThreadShellCommandResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartSource;
use codex_app_server_protocol::ThreadUnsubscribeParams;
use codex_app_server_protocol::ThreadUnsubscribeResponse;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnInterruptResponse;
@@ -511,18 +511,18 @@ impl AppServerSession {
Ok(())
}
pub(crate) async fn thread_unsubscribe(&mut self, thread_id: ThreadId) -> Result<()> {
pub(crate) async fn thread_close(&mut self, thread_id: ThreadId) -> Result<()> {
let request_id = self.next_request_id();
let _: ThreadUnsubscribeResponse = self
let _: ThreadCloseResponse = self
.client
.request_typed(ClientRequest::ThreadUnsubscribe {
.request_typed(ClientRequest::ThreadClose {
request_id,
params: ThreadUnsubscribeParams {
params: ThreadCloseParams {
thread_id: thread_id.to_string(),
},
})
.await
.wrap_err("thread/unsubscribe failed in TUI")?;
.wrap_err("thread/close failed in TUI")?;
Ok(())
}