Compare commits

...

17 Commits

Author SHA1 Message Date
Eric Traut
83f498d473 Tighten thread delete cleanup 2026-05-31 12:21:53 -07:00
Eric Traut
715a959076 Fix thread delete argument comments 2026-05-31 11:36:46 -07:00
Eric Traut
0e3ddd9e3e Handle unloaded roots in thread delete 2026-05-31 11:33:46 -07:00
Eric Traut
6e19cfeb55 Add live subagent delete coverage 2026-05-31 11:09:41 -07:00
Eric Traut
be00f91f20 Merge remote-tracking branch 'origin/main' into etraut/thread-delete-api 2026-05-31 10:55:22 -07:00
Eric Traut
88a9c2b339 Simplify thread delete tests 2026-05-31 10:55:08 -07:00
Eric Traut
1ddb4d9f9c Notify clients for partial thread deletes 2026-05-31 10:30:00 -07:00
Eric Traut
60c0f05bf0 Require rollout deletion before thread delete succeeds 2026-05-31 10:15:18 -07:00
Eric Traut
d49dccc8ea Merge remote-tracking branch 'origin/main' into etraut/thread-delete-api
# Conflicts:
#	codex-rs/app-server/src/request_processors/thread_processor.rs
2026-05-30 23:38:21 -07:00
Eric Traut
98e3548d9d Fix thread delete argument comments 2026-05-29 00:28:30 -07:00
Eric Traut
bc2a0d4e46 Simplify thread delete changes 2026-05-28 23:54:06 -07:00
Eric Traut
0b169881ec Merge branch 'main' into etraut/thread-delete-api 2026-05-28 23:34:22 -07:00
Eric Traut
d86f1d76f5 Handle thread deleted notifications in TUI 2026-05-28 23:02:49 -07:00
Eric Traut
b053bea788 Validate thread delete before teardown 2026-05-28 22:51:03 -07:00
Eric Traut
9d81a26e2c Merge remote-tracking branch 'origin/main' into etraut/thread-delete-api
# Conflicts:
#	codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts
2026-05-28 22:46:20 -07:00
Eric Traut
642ae9c6a2 Simplify thread delete handling 2026-05-28 22:32:30 -07:00
Eric Traut
2df653fcc2 Add app-server thread/delete API
Add a v2 thread/delete method that hard-deletes a requested thread, discovers spawned descendants before the root delete, emits thread/deleted notifications for committed deletes, and cleans up associated state best effort after the root state row is deleted.
2026-05-28 22:21:11 -07:00
36 changed files with 1337 additions and 74 deletions

View File

@@ -3096,6 +3096,17 @@
],
"type": "object"
},
"ThreadDeleteParams": {
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"type": "object"
},
"ThreadForkParams": {
"description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using a non-empty path, the thread_id param will be ignored. Empty string path values are treated as absent.\n\nPrefer using thread_id whenever possible.",
"properties": {
@@ -4412,6 +4423,30 @@
"title": "Thread/archiveRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/delete"
],
"title": "Thread/deleteRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadDeleteParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/deleteRequest",
"type": "object"
},
{
"properties": {
"id": {

View File

@@ -3465,6 +3465,17 @@
],
"type": "object"
},
"ThreadDeletedNotification": {
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"type": "object"
},
"ThreadGoal": {
"properties": {
"createdAt": {
@@ -5345,6 +5356,26 @@
"title": "Thread/archivedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/deleted"
],
"title": "Thread/deletedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadDeletedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/deletedNotification",
"type": "object"
},
{
"properties": {
"method": {

View File

@@ -324,6 +324,30 @@
"title": "Thread/archiveRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
"thread/delete"
],
"title": "Thread/deleteRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ThreadDeleteParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/deleteRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -4031,6 +4055,26 @@
"title": "Thread/archivedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/deleted"
],
"title": "Thread/deletedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ThreadDeletedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/deletedNotification",
"type": "object"
},
{
"properties": {
"method": {
@@ -15647,6 +15691,37 @@
"title": "ThreadCompactStartResponse",
"type": "object"
},
"ThreadDeleteParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadDeleteParams",
"type": "object"
},
"ThreadDeleteResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadDeleteResponse",
"type": "object"
},
"ThreadDeletedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadDeletedNotification",
"type": "object"
},
"ThreadForkParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using a non-empty path, the thread_id param will be ignored. Empty string path values are treated as absent.\n\nPrefer using thread_id whenever possible.",

View File

@@ -1072,6 +1072,30 @@
"title": "Thread/archiveRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/delete"
],
"title": "Thread/deleteRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadDeleteParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/deleteRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -11387,6 +11411,26 @@
"title": "Thread/archivedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/deleted"
],
"title": "Thread/deletedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadDeletedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/deletedNotification",
"type": "object"
},
{
"properties": {
"method": {
@@ -13471,6 +13515,37 @@
"title": "ThreadCompactStartResponse",
"type": "object"
},
"ThreadDeleteParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadDeleteParams",
"type": "object"
},
"ThreadDeleteResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadDeleteResponse",
"type": "object"
},
"ThreadDeletedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadDeletedNotification",
"type": "object"
},
"ThreadForkParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using a non-empty path, the thread_id param will be ignored. Empty string path values are treated as absent.\n\nPrefer using thread_id whenever possible.",

View File

@@ -0,0 +1,13 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadDeleteParams",
"type": "object"
}

View File

@@ -0,0 +1,5 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadDeleteResponse",
"type": "object"
}

View File

@@ -0,0 +1,13 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadDeletedNotification",
"type": "object"
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadDeleteParams = { threadId: string, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadDeleteResponse = Record<string, never>;

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadDeletedNotification = { threadId: string, };

View File

@@ -365,6 +365,9 @@ export type { ThreadArchivedNotification } from "./ThreadArchivedNotification";
export type { ThreadClosedNotification } from "./ThreadClosedNotification";
export type { ThreadCompactStartParams } from "./ThreadCompactStartParams";
export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse";
export type { ThreadDeleteParams } from "./ThreadDeleteParams";
export type { ThreadDeleteResponse } from "./ThreadDeleteResponse";
export type { ThreadDeletedNotification } from "./ThreadDeletedNotification";
export type { ThreadForkParams } from "./ThreadForkParams";
export type { ThreadForkResponse } from "./ThreadForkResponse";
export type { ThreadGoal } from "./ThreadGoal";

View File

@@ -465,6 +465,11 @@ client_request_definitions! {
serialization: thread_id(params.thread_id),
response: v2::ThreadArchiveResponse,
},
ThreadDelete => "thread/delete" {
params: v2::ThreadDeleteParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadDeleteResponse,
},
ThreadUnsubscribe => "thread/unsubscribe" {
params: v2::ThreadUnsubscribeParams,
serialization: thread_id(params.thread_id),
@@ -1477,6 +1482,7 @@ server_notification_definitions! {
ThreadStarted => "thread/started" (v2::ThreadStartedNotification),
ThreadStatusChanged => "thread/status/changed" (v2::ThreadStatusChangedNotification),
ThreadArchived => "thread/archived" (v2::ThreadArchivedNotification),
ThreadDeleted => "thread/deleted" (v2::ThreadDeletedNotification),
ThreadUnarchived => "thread/unarchived" (v2::ThreadUnarchivedNotification),
ThreadClosed => "thread/closed" (v2::ThreadClosedNotification),
SkillsChanged => "skills/changed" (v2::SkillsChangedNotification),

View File

@@ -609,6 +609,18 @@ pub struct ThreadArchiveParams {
#[ts(export_to = "v2/")]
pub struct ThreadArchiveResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadDeleteParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadDeleteResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -1345,6 +1357,13 @@ pub struct ThreadArchivedNotification {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadDeletedNotification {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -151,6 +151,7 @@ Example with notification opt-out:
- `thread/settings/updated` — experimental notification emitted to subscribed clients when a loaded threads effective next-turn settings change; includes `threadId` and the full `threadSettings`.
- `thread/status/changed` — notification emitted when a loaded threads status changes (`threadId` + new `status`).
- `thread/archive` — move a threads rollout file into the archived directory and attempt to move any spawned descendant thread rollout files; returns `{}` on success and emits `thread/archived` for each archived thread.
- `thread/delete` — hard-delete an active or archived thread and any spawned descendant threads; returns `{}` on success and emits `thread/deleted` for each deleted thread.
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server keeps the thread loaded and unloads it only after it has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed`.
- `thread/name/set` — set or update a threads user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success and emits `thread/name/updated` to initialized, opted-in clients. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`.
@@ -594,6 +595,16 @@ Use `thread/archive` to move the persisted rollout (stored as a JSONL file on di
An archived thread will not appear in `thread/list` unless `archived` is set to `true`.
### Example: Delete a thread
Use `thread/delete` to hard-delete a thread and its spawned descendant threads. Existing rollout files must be removed before the request succeeds; missing rollout files are treated as already deleted, and metadata cleanup is best effort afterward.
```json
{ "method": "thread/delete", "id": 23, "params": { "threadId": "thr_b" } }
{ "id": 23, "result": {} }
{ "method": "thread/deleted", "params": { "threadId": "thr_b" } }
```
### Example: Unarchive a thread
Use `thread/unarchive` to move an archived rollout back into the sessions directory.

View File

@@ -378,7 +378,7 @@ impl MessageProcessor {
Arc::clone(&thread_manager),
Arc::clone(&config),
feedback,
log_db,
log_db.clone(),
state_db.clone(),
);
let git_processor = GitRequestProcessor::new();
@@ -431,6 +431,7 @@ impl MessageProcessor {
Arc::clone(&thread_list_state_permit),
thread_goal_processor.clone(),
state_db,
log_db,
Arc::clone(&skills_watcher),
);
let turn_processor = TurnRequestProcessor::new(
@@ -1016,6 +1017,11 @@ impl MessageProcessor {
.thread_archive(request_id.clone(), params)
.await
}
ClientRequest::ThreadDelete { params, .. } => {
self.thread_processor
.thread_delete(request_id.clone(), params)
.await
}
ClientRequest::ThreadIncrementElicitation { params, .. } => {
self.thread_processor
.thread_increment_elicitation(params)

View File

@@ -174,6 +174,9 @@ use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadCompactStartResponse;
use codex_app_server_protocol::ThreadDecrementElicitationParams;
use codex_app_server_protocol::ThreadDecrementElicitationResponse;
use codex_app_server_protocol::ThreadDeleteParams;
use codex_app_server_protocol::ThreadDeleteResponse;
use codex_app_server_protocol::ThreadDeletedNotification;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadGoal;
@@ -412,6 +415,7 @@ use codex_rollout::state_db::reconcile_rollout;
use codex_state::ThreadMetadata;
use codex_state::log_db::LogDbLayer;
use codex_thread_store::ArchiveThreadParams as StoreArchiveThreadParams;
use codex_thread_store::DeleteThreadParams as StoreDeleteThreadParams;
use codex_thread_store::GitInfoPatch as StoreGitInfoPatch;
use codex_thread_store::ListThreadsParams as StoreListThreadsParams;
use codex_thread_store::LocalThreadStore;
@@ -513,6 +517,7 @@ use token_usage_replay::send_thread_token_usage_update_to_connection;
mod config_errors;
mod request_errors;
mod thread_delete;
mod thread_goal_processor;
mod thread_lifecycle;
mod thread_resume_redaction;

View File

@@ -0,0 +1,160 @@
//! `thread/delete` request handling.
use super::thread_processor::core_thread_write_error;
use super::thread_processor::unsupported_thread_store_operation;
use super::*;
impl ThreadRequestProcessor {
pub(crate) async fn thread_delete(
&self,
request_id: ConnectionRequestId,
params: ThreadDeleteParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
let mut deleted_thread_ids = Vec::new();
let result = {
let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?;
self.thread_delete_response(params, &mut deleted_thread_ids)
.await
};
match result {
Ok(response) => {
self.outgoing
.send_response(request_id.clone(), response)
.await;
self.send_thread_deleted_notifications(deleted_thread_ids)
.await;
Ok(None)
}
Err(error) => {
self.send_thread_deleted_notifications(deleted_thread_ids)
.await;
Err(error)
}
}
}
async fn thread_delete_response(
&self,
params: ThreadDeleteParams,
deleted_thread_ids: &mut Vec<String>,
) -> Result<ThreadDeleteResponse, JSONRPCErrorError> {
let thread_id = ThreadId::from_string(&params.thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))?;
let mut thread_ids = self.state_db_spawn_subtree_thread_ids(thread_id).await?;
let mut seen = thread_ids.iter().copied().collect::<HashSet<_>>();
match self
.thread_manager
.list_agent_subtree_thread_ids(thread_id)
.await
{
Ok(live_thread_ids) => {
for live_thread_id in live_thread_ids {
if seen.insert(live_thread_id) {
thread_ids.push(live_thread_id);
}
}
}
Err(err) => return Err(core_thread_write_error("delete thread", err)),
}
self.validate_root_thread_delete(thread_id).await?;
for thread_id_to_delete in thread_ids.iter().copied() {
self.prepare_thread_for_delete(thread_id_to_delete).await;
}
for descendant_thread_id in thread_ids.iter().skip(1).rev().copied() {
match self
.thread_store
.delete_thread(StoreDeleteThreadParams {
thread_id: descendant_thread_id,
})
.await
{
Ok(()) => {
self.cleanup_deleted_thread_state(descendant_thread_id)
.await;
deleted_thread_ids.push(descendant_thread_id.to_string());
}
Err(ThreadStoreError::ThreadNotFound { .. }) => {
warn!(
"spawned descendant thread {descendant_thread_id} was already missing while deleting {thread_id}"
);
}
Err(err) => {
return Err(thread_store_delete_error(err));
}
}
}
self.thread_store
.delete_thread(StoreDeleteThreadParams { thread_id })
.await
.map_err(thread_store_delete_error)?;
self.cleanup_deleted_thread_state(thread_id).await;
deleted_thread_ids.push(thread_id.to_string());
Ok(ThreadDeleteResponse {})
}
async fn send_thread_deleted_notifications(&self, deleted_thread_ids: Vec<String>) {
for thread_id in deleted_thread_ids {
self.outgoing
.send_server_notification(ServerNotification::ThreadDeleted(
ThreadDeletedNotification { thread_id },
))
.await;
}
}
async fn validate_root_thread_delete(
&self,
thread_id: ThreadId,
) -> Result<(), JSONRPCErrorError> {
if let Ok(thread) = self.thread_manager.get_thread(thread_id).await
&& thread.config_snapshot().await.ephemeral
{
return Err(invalid_request(format!(
"thread is not persisted and cannot be deleted: {thread_id}"
)));
}
self.thread_store
.read_thread(StoreReadThreadParams {
thread_id,
include_archived: true,
include_history: false,
})
.await
.map(|_| ())
.map_err(thread_store_delete_error)
}
async fn prepare_thread_for_delete(&self, thread_id: ThreadId) {
self.prepare_thread_for_removal(thread_id, "delete").await;
if let Some(log_db) = self.log_db.as_ref() {
log_db.flush().await;
}
}
async fn cleanup_deleted_thread_state(&self, thread_id: ThreadId) {
if let Some(state_db) = self.state_db.as_ref()
&& let Err(err) = state_db.delete_thread(thread_id).await
{
warn!("failed to delete app-server state for deleted thread {thread_id}: {err}");
}
}
}
fn thread_store_delete_error(err: ThreadStoreError) -> JSONRPCErrorError {
match err {
ThreadStoreError::ThreadNotFound { thread_id } => {
invalid_request(format!("thread not found: {thread_id}"))
}
ThreadStoreError::InvalidRequest { message } => invalid_request(message),
ThreadStoreError::Unsupported { operation } => {
unsupported_thread_store_operation(operation)
}
err => internal_error(format!("failed to delete thread: {err}")),
}
}

View File

@@ -337,6 +337,7 @@ pub(crate) struct ThreadRequestProcessor {
pub(super) thread_list_state_permit: Arc<Semaphore>,
pub(super) thread_goal_processor: ThreadGoalRequestProcessor,
pub(super) state_db: Option<StateDbHandle>,
pub(super) log_db: Option<LogDbLayer>,
pub(super) background_tasks: TaskTracker,
pub(super) skills_watcher: Arc<SkillsWatcher>,
}
@@ -357,6 +358,7 @@ impl ThreadRequestProcessor {
thread_list_state_permit: Arc<Semaphore>,
thread_goal_processor: ThreadGoalRequestProcessor,
state_db: Option<StateDbHandle>,
log_db: Option<LogDbLayer>,
skills_watcher: Arc<SkillsWatcher>,
) -> Self {
Self {
@@ -373,6 +375,7 @@ impl ThreadRequestProcessor {
thread_list_state_permit,
thread_goal_processor,
state_db,
log_db,
background_tasks: TaskTracker::new(),
skills_watcher,
}
@@ -685,7 +688,7 @@ impl ThreadRequestProcessor {
Ok((thread_id, thread))
}
async fn acquire_thread_list_state_permit(
pub(super) async fn acquire_thread_list_state_permit(
&self,
) -> Result<SemaphorePermit<'_>, JSONRPCErrorError> {
self.thread_list_state_permit
@@ -757,6 +760,10 @@ impl ThreadRequestProcessor {
}
async fn prepare_thread_for_archive(&self, thread_id: ThreadId) {
self.prepare_thread_for_removal(thread_id, "archive").await;
}
pub(super) async fn prepare_thread_for_removal(&self, thread_id: ThreadId, operation: &str) {
let removed_conversation = self.thread_manager.remove_thread(&thread_id).await;
if let Some(conversation) = removed_conversation {
info!("thread {thread_id} was active; shutting down");
@@ -764,11 +771,11 @@ impl ThreadRequestProcessor {
ThreadShutdownResult::Complete => {}
ThreadShutdownResult::SubmitFailed => {
error!(
"failed to submit Shutdown to thread {thread_id}; proceeding with archive"
"failed to submit Shutdown to thread {thread_id}; proceeding with {operation}"
);
}
ThreadShutdownResult::TimedOut => {
warn!("thread {thread_id} shutdown timed out; proceeding with archive");
warn!("thread {thread_id} shutdown timed out; proceeding with {operation}");
}
}
}
@@ -1310,23 +1317,7 @@ impl ThreadRequestProcessor {
let thread_id = ThreadId::from_string(&params.thread_id)
.map_err(|err| invalid_request(format!("invalid session id: {err}")))?;
let mut thread_ids = vec![thread_id];
if let Some(state_db_ctx) = self.state_db.as_ref() {
let descendants = state_db_ctx
.list_thread_spawn_descendants(thread_id)
.await
.map_err(|err| {
internal_error(format!(
"failed to list spawned descendants for session {thread_id}: {err}"
))
})?;
let mut seen = HashSet::from([thread_id]);
for descendant_id in descendants {
if seen.insert(descendant_id) {
thread_ids.push(descendant_id);
}
}
}
let thread_ids = self.state_db_spawn_subtree_thread_ids(thread_id).await?;
let mut archive_thread_ids = Vec::new();
match self
@@ -1411,6 +1402,31 @@ impl ThreadRequestProcessor {
Ok((ThreadArchiveResponse {}, archived_thread_ids))
}
pub(super) async fn state_db_spawn_subtree_thread_ids(
&self,
thread_id: ThreadId,
) -> Result<Vec<ThreadId>, JSONRPCErrorError> {
let mut thread_ids = vec![thread_id];
let Some(state_db_ctx) = self.state_db.as_ref() else {
return Ok(thread_ids);
};
let mut seen = HashSet::from([thread_id]);
let descendants = state_db_ctx
.list_thread_spawn_descendants(thread_id)
.await
.map_err(|err| {
internal_error(format!(
"failed to list spawned descendants for thread id {thread_id}: {err}"
))
})?;
for descendant_id in descendants {
if seen.insert(descendant_id) {
thread_ids.push(descendant_id);
}
}
Ok(thread_ids)
}
async fn thread_increment_elicitation_inner(
&self,
params: ThreadIncrementElicitationParams,
@@ -3834,7 +3850,7 @@ fn thread_read_view_error(err: ThreadReadViewError) -> JSONRPCErrorError {
}
}
fn unsupported_thread_store_operation(operation: &'static str) -> JSONRPCErrorError {
pub(super) fn unsupported_thread_store_operation(operation: &'static str) -> JSONRPCErrorError {
method_not_found(format!("{operation} is not supported yet"))
}
@@ -3955,7 +3971,7 @@ fn conversation_summary_rollout_path_read_error(
}
}
fn core_thread_write_error(operation: &str, err: CodexErr) -> JSONRPCErrorError {
pub(super) fn core_thread_write_error(operation: &str, err: CodexErr) -> JSONRPCErrorError {
match err {
CodexErr::ThreadNotFound(thread_id) => {
invalid_request(format!("thread not found: {thread_id}"))

View File

@@ -75,6 +75,7 @@ use codex_app_server_protocol::SkillsExtraRootsSetParams;
use codex_app_server_protocol::SkillsListParams;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadDeleteParams;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadInjectItemsParams;
use codex_app_server_protocol::ThreadListParams;
@@ -452,6 +453,15 @@ impl McpProcess {
self.send_request("thread/archive", params).await
}
/// Send a `thread/delete` JSON-RPC request.
pub async fn send_thread_delete_request(
&mut self,
params: ThreadDeleteParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/delete", params).await
}
/// Send a `thread/name/set` JSON-RPC request.
pub async fn send_thread_set_name_request(
&mut self,

View File

@@ -48,6 +48,7 @@ mod review;
mod safety_check_downgrade;
mod skills_list;
mod thread_archive;
mod thread_delete;
mod thread_fork;
mod thread_inject_items;
mod thread_list;

View File

@@ -20,6 +20,7 @@ use std::sync::Arc;
use anyhow::Result;
use app_test_support::create_mock_responses_server_repeating_assistant;
use codex_app_server::in_process;
use codex_app_server::in_process::InProcessClientHandle;
use codex_app_server::in_process::InProcessServerEvent;
use codex_app_server::in_process::InProcessStartArgs;
use codex_app_server_protocol::ClientInfo;
@@ -27,6 +28,8 @@ use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadDeleteParams;
use codex_app_server_protocol::ThreadDeleteResponse;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadStartParams;
@@ -40,8 +43,15 @@ use codex_config::NoopThreadConfigLoader;
use codex_core::config::ConfigBuilder;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_protocol::ThreadId;
use codex_protocol::models::BaseInstructions;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::ThreadMemoryMode;
use codex_thread_store::CreateThreadParams as StoreCreateThreadParams;
use codex_thread_store::InMemoryThreadStore;
use codex_thread_store::ThreadEventPersistenceMode;
use codex_thread_store::ThreadPersistenceMetadata;
use codex_thread_store::ThreadStore;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -50,7 +60,7 @@ use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_start_with_non_local_thread_store_does_not_create_local_persistence() -> Result<()>
async fn thread_delete_with_non_local_thread_store_does_not_create_local_persistence() -> Result<()>
{
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
@@ -59,43 +69,10 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste
// here so this regression stays focused on thread persistence artifacts.
create_config_toml_with_thread_store(codex_home.path(), &server.uri(), &store_id)?;
let loader_overrides = LoaderOverrides::without_managed_config_for_tests();
let config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(codex_home.path().to_path_buf()))
.loader_overrides(loader_overrides.clone())
.build()
.await?;
let thread_store = InMemoryThreadStore::for_id(store_id.clone());
let _in_memory_store = InMemoryThreadStoreId { store_id };
let mut client = in_process::start(InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(config),
cli_overrides: Vec::new(),
loader_overrides,
strict_config: false,
cloud_requirements: CloudRequirementsLoader::default(),
thread_config_loader: Arc::new(NoopThreadConfigLoader),
feedback: CodexFeedback::new(),
log_db: None,
state_db: None,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
config_warnings: Vec::new(),
session_source: SessionSource::Cli,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-app-server-tests".to_string(),
title: None,
version: "0.1.0".to_string(),
},
capabilities: None,
},
channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await?;
let mut client = start_in_process_server(codex_home.path()).await?;
let response = client
.request(ClientRequest::ThreadStart {
@@ -164,11 +141,37 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste
assert_eq!(data[0].id, thread.id);
assert_eq!(data[0].path, None);
delete_thread(&client, /*request_id*/ 4, thread.id.clone()).await?;
let unloaded_thread_id = ThreadId::from_string(&Uuid::new_v4().to_string())?;
thread_store
.create_thread(StoreCreateThreadParams {
thread_id: unloaded_thread_id,
forked_from_id: None,
source: SessionSource::Cli,
thread_source: None,
base_instructions: BaseInstructions::default(),
dynamic_tools: Vec::new(),
metadata: ThreadPersistenceMetadata {
cwd: Some(codex_home.path().to_path_buf()),
model_provider: "mock_provider".to_string(),
memory_mode: ThreadMemoryMode::Enabled,
},
event_persistence_mode: ThreadEventPersistenceMode::Limited,
})
.await?;
delete_thread(
&client,
/*request_id*/ 5,
unloaded_thread_id.to_string(),
)
.await?;
client.shutdown().await?;
let calls = thread_store.calls().await;
assert_eq!(calls.create_thread, 1);
assert_eq!(calls.create_thread, 2);
assert_eq!(calls.list_threads, 1);
assert_eq!(calls.delete_thread, 2);
assert!(
calls.append_items > 0,
"turn/start should append rollout items through the injected store"
@@ -183,6 +186,59 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste
Ok(())
}
async fn start_in_process_server(codex_home: &Path) -> Result<InProcessClientHandle> {
let loader_overrides = LoaderOverrides::without_managed_config_for_tests();
let config = ConfigBuilder::default()
.codex_home(codex_home.to_path_buf())
.fallback_cwd(Some(codex_home.to_path_buf()))
.loader_overrides(loader_overrides.clone())
.build()
.await?;
Ok(in_process::start(InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(config),
cli_overrides: Vec::new(),
loader_overrides,
strict_config: false,
cloud_requirements: CloudRequirementsLoader::default(),
thread_config_loader: Arc::new(NoopThreadConfigLoader),
feedback: CodexFeedback::new(),
log_db: None,
state_db: None,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
config_warnings: Vec::new(),
session_source: SessionSource::Cli,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-app-server-tests".to_string(),
title: None,
version: "0.1.0".to_string(),
},
capabilities: None,
},
channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await?)
}
async fn delete_thread(
client: &InProcessClientHandle,
request_id: i64,
thread_id: String,
) -> Result<()> {
let response = client
.request(ClientRequest::ThreadDelete {
request_id: RequestId::Integer(request_id),
params: ThreadDeleteParams { thread_id },
})
.await?
.map_err(|error| anyhow::anyhow!("thread/delete failed: {}", error.message))?;
let _: ThreadDeleteResponse = serde_json::from_value(response)?;
Ok(())
}
fn assert_no_local_persistence_artifacts(codex_home: &Path) -> Result<()> {
// These are the observable tripwires for accidental local persistence. If a
// future code path constructs a local rollout/session store or opens the

View File

@@ -0,0 +1,166 @@
//! Integration coverage for the v2 `thread/delete` API.
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadDeleteParams;
use codex_app_server_protocol::ThreadDeleteResponse;
use codex_app_server_protocol::ThreadDeletedNotification;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_core::find_thread_path_by_id_str;
use codex_protocol::ThreadId;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use codex_state::StateRuntime;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_delete_deletes_spawned_descendants() -> Result<()> {
let codex_home = TempDir::new()?;
let parent_id = create_delete_test_rollout(codex_home.path(), /*minute*/ 0, "parent")?;
let child_id = create_delete_test_rollout(codex_home.path(), /*minute*/ 1, "child")?;
let grandchild_id =
create_delete_test_rollout(codex_home.path(), /*minute*/ 2, "grandchild")?;
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
let parent_thread_id = ThreadId::from_string(&parent_id)?;
let child_thread_id = ThreadId::from_string(&child_id)?;
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
for (parent, child, status) in [
(
parent_thread_id,
child_thread_id,
DirectionalThreadSpawnEdgeStatus::Closed,
),
(
child_thread_id,
grandchild_thread_id,
DirectionalThreadSpawnEdgeStatus::Open,
),
] {
state_db
.upsert_thread_spawn_edge(parent, child, status)
.await?;
}
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let delete_id = mcp
.send_thread_delete_request(ThreadDeleteParams {
thread_id: parent_id.clone(),
})
.await?;
let delete_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(delete_id)),
)
.await??;
let _: ThreadDeleteResponse = to_response::<ThreadDeleteResponse>(delete_resp)?;
let mut deleted_ids = Vec::new();
for _ in 0..3 {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/deleted"),
)
.await??;
let deleted_notification: ThreadDeletedNotification = serde_json::from_value(
notification
.params
.expect("thread/deleted notification params"),
)?;
deleted_ids.push(deleted_notification.thread_id);
}
assert_eq!(deleted_ids, vec![grandchild_id, child_id, parent_id]);
for thread_id in [parent_thread_id, child_thread_id, grandchild_thread_id] {
let rollout_path = find_thread_path_by_id_str(
codex_home.path(),
&thread_id.to_string(),
/*state_db_ctx*/ None,
)
.await?;
assert!(
rollout_path.is_none(),
"expected active rollout for {thread_id} to be deleted"
);
}
Ok(())
}
fn create_delete_test_rollout(codex_home: &Path, minute: u8, preview: &str) -> Result<String> {
create_fake_rollout(
codex_home,
&format!("2025-01-01T00-{minute:02}-00"),
&format!("2025-01-01T00:{minute:02}:00Z"),
preview,
Some("mock_provider"),
/*git_info*/ None,
)
}
#[tokio::test]
async fn thread_delete_rejects_live_ephemeral_thread_without_unloading() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
ephemeral: Some(true),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let delete_id = mcp
.send_thread_delete_request(ThreadDeleteParams {
thread_id: thread.id.clone(),
})
.await?;
let delete_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(delete_id)),
)
.await??;
let expected_message = format!(
"thread is not persisted and cannot be deleted: {}",
thread.id
);
assert_eq!(delete_err.error.message, expected_message);
let list_id = mcp
.send_thread_loaded_list_request(ThreadLoadedListParams::default())
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ThreadLoadedListResponse { mut data, .. } =
to_response::<ThreadLoadedListResponse>(list_resp)?;
data.sort();
assert_eq!(data, vec![thread.id]);
Ok(())
}

View File

@@ -41,7 +41,12 @@ use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerRequestResolvedNotification;
use codex_app_server_protocol::TextElement;
use codex_app_server_protocol::ThreadDeleteParams;
use codex_app_server_protocol::ThreadDeleteResponse;
use codex_app_server_protocol::ThreadDeletedNotification;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
use codex_app_server_protocol::ThreadSource;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
@@ -3205,6 +3210,49 @@ async fn turn_start_emits_spawn_agent_item_with_model_metadata_v2() -> Result<()
assert_eq!(turn_completed.thread_id, thread.id);
assert_eq!(turn_completed.turn.id, turn.turn.id);
// Reuse this live spawn setup to cover thread/delete's ThreadManager descendant path.
let delete_req = mcp
.send_thread_delete_request(ThreadDeleteParams {
thread_id: thread.id.clone(),
})
.await?;
let delete_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(delete_req)),
)
.await??;
let _: ThreadDeleteResponse = to_response::<ThreadDeleteResponse>(delete_resp)?;
let mut deleted_thread_ids = Vec::new();
for _ in 0..2 {
let deleted_notif = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/deleted"),
)
.await??;
let deleted: ThreadDeletedNotification = serde_json::from_value(
deleted_notif
.params
.expect("thread/deleted notification params"),
)?;
deleted_thread_ids.push(deleted.thread_id);
}
assert_eq!(
deleted_thread_ids,
vec![receiver_thread_id, thread.id.clone()]
);
let list_req = mcp
.send_thread_loaded_list_request(ThreadLoadedListParams::default())
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_req)),
)
.await??;
let ThreadLoadedListResponse { data, .. } = to_response::<ThreadLoadedListResponse>(list_resp)?;
assert_eq!(data, Vec::<String>::new());
Ok(())
}

View File

@@ -2176,7 +2176,7 @@ async fn list_agent_subtree_thread_ids_includes_anonymous_and_closed_descendants
}
#[tokio::test]
async fn list_agent_subtree_thread_ids_includes_live_descendants_without_state_db() {
async fn list_agent_subtree_thread_ids_finds_live_descendants_of_unloaded_root() {
let (_home, config) = test_config().await;
let manager = ThreadManager::with_models_provider_home_and_state_for_tests(
CodexAuth::from_api_key("dummy"),
@@ -2221,6 +2221,8 @@ async fn list_agent_subtree_thread_ids_includes_live_descendants_without_state_d
.await
.expect("grandchild spawn should succeed");
manager.remove_thread(&parent_thread_id).await;
let mut subtree_thread_ids = manager
.list_agent_subtree_thread_ids(parent_thread_id)
.await

View File

@@ -499,14 +499,12 @@ impl ThreadManager {
&self,
thread_id: ThreadId,
) -> CodexResult<Vec<ThreadId>> {
let thread = self.state.get_thread(thread_id).await?;
let mut subtree_thread_ids = Vec::new();
let mut seen_thread_ids = HashSet::new();
subtree_thread_ids.push(thread_id);
seen_thread_ids.insert(thread_id);
if let Some(state_db_ctx) = thread.state_db() {
if let Some(state_db_ctx) = self.state.state_db() {
for status in [
DirectionalThreadSpawnEdgeStatus::Open,
DirectionalThreadSpawnEdgeStatus::Closed,
@@ -525,11 +523,8 @@ impl ThreadManager {
}
}
for descendant_id in thread
.codex
.session
.services
.agent_control
for descendant_id in self
.agent_control()
.list_live_agent_subtree_thread_ids(thread_id)
.await?
{

View File

@@ -882,15 +882,88 @@ ON CONFLICT(id) DO UPDATE SET
/// Delete a thread metadata row by id.
pub async fn delete_thread(&self, thread_id: ThreadId) -> anyhow::Result<u64> {
let thread_id_string = thread_id.to_string();
let result = sqlx::query("DELETE FROM threads WHERE id = ?")
.bind(thread_id.to_string())
.bind(thread_id_string.as_str())
.execute(self.pool.as_ref())
.await?;
let rows_affected = result.rows_affected();
self.memories.delete_thread_memory(thread_id).await?;
if rows_affected > 0 {
self.thread_goals.delete_thread_goal(thread_id).await?;
if let Err(err) = sqlx::query("DELETE FROM thread_dynamic_tools WHERE thread_id = ?")
.bind(thread_id_string.as_str())
.execute(self.pool.as_ref())
.await
{
warn!("failed to delete dynamic tools for thread {thread_id}: {err}");
}
if let Err(err) = sqlx::query(
"DELETE FROM thread_spawn_edges WHERE parent_thread_id = ? OR child_thread_id = ?",
)
.bind(thread_id_string.as_str())
.bind(thread_id_string.as_str())
.execute(self.pool.as_ref())
.await
{
warn!("failed to delete spawn edges for thread {thread_id}: {err}");
}
let now = Utc::now().timestamp();
if let Err(err) = sqlx::query(
r#"
UPDATE agent_job_items
SET
status = ?,
assigned_thread_id = NULL,
updated_at = ?,
last_error = ?
WHERE assigned_thread_id = ? AND status = ?
"#,
)
.bind(AgentJobItemStatus::Pending.as_str())
.bind(now)
.bind("assigned thread was deleted")
.bind(thread_id_string.as_str())
.bind(AgentJobItemStatus::Running.as_str())
.execute(self.pool.as_ref())
.await
{
warn!("failed to requeue agent job items for deleted thread {thread_id}: {err}");
}
if let Err(err) = sqlx::query(
r#"
UPDATE agent_job_items
SET assigned_thread_id = NULL, updated_at = ?
WHERE assigned_thread_id = ?
"#,
)
.bind(now)
.bind(thread_id_string.as_str())
.execute(self.pool.as_ref())
.await
{
warn!(
"failed to clear agent job item assignments for deleted thread {thread_id}: {err}"
);
}
if let Err(err) = sqlx::query("DELETE FROM logs WHERE thread_id = ?")
.bind(thread_id_string.as_str())
.execute(self.logs_pool.as_ref())
.await
{
warn!("failed to delete logs for thread {thread_id}: {err}");
}
if let Err(err) = self.memories.delete_thread_memory(thread_id).await {
warn!("failed to delete memory metadata for thread {thread_id}: {err}");
}
if let Err(err) = self.thread_goals.delete_thread_goal(thread_id).await {
warn!("failed to delete goal for thread {thread_id}: {err}");
}
Ok(rows_affected)
}
}
@@ -1092,12 +1165,14 @@ mod tests {
use crate::DirectionalThreadSpawnEdgeStatus;
use crate::runtime::test_support::test_thread_metadata;
use crate::runtime::test_support::unique_temp_dir;
use anyhow::Result;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::GitInfo;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::PathBuf;
#[tokio::test]
@@ -1138,6 +1213,140 @@ mod tests {
assert_eq!(memory_mode, "disabled");
}
#[tokio::test]
async fn delete_thread_cleans_associated_state() -> Result<()> {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()).await?;
let thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000401")?;
let child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000402")?;
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id,
codex_home.clone(),
))
.await?;
seed_thread_cleanup_state(&runtime, thread_id, child_thread_id).await?;
sqlx::query("INSERT INTO thread_dynamic_tools (thread_id, position, name, description, input_schema) VALUES (?, ?, ?, ?, ?)")
.bind(thread_id.to_string())
.bind(0_i64)
.bind("test_tool")
.bind("test dynamic tool")
.bind("{}")
.execute(runtime.pool.as_ref())
.await?;
runtime
.create_agent_job(
&AgentJobCreateParams {
id: "job-1".to_string(),
name: "test-job".to_string(),
instruction: "Return a result".to_string(),
auto_export: true,
max_runtime_seconds: None,
output_schema_json: None,
input_headers: vec!["path".to_string()],
input_csv_path: "/tmp/in.csv".to_string(),
output_csv_path: "/tmp/out.csv".to_string(),
},
&[AgentJobItemCreateParams {
item_id: "item-1".to_string(),
row_index: 0,
source_id: None,
row_json: json!({"path": "file-1"}),
}],
)
.await?;
runtime.mark_agent_job_running("job-1").await?;
runtime
.mark_agent_job_item_running_with_thread("job-1", "item-1", &thread_id.to_string())
.await?;
let rows = runtime.delete_thread(thread_id).await?;
assert_eq!(rows, 1);
assert!(runtime.get_thread(thread_id).await?.is_none());
let dynamic_tool_count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM thread_dynamic_tools WHERE thread_id = ?")
.bind(thread_id.to_string())
.fetch_one(runtime.pool.as_ref())
.await?;
assert_eq!(dynamic_tool_count, 0);
assert_thread_cleanup_state(&runtime, thread_id).await?;
let job_item = runtime
.get_agent_job_item("job-1", "item-1")
.await?
.expect("job item should exist");
assert_eq!(job_item.status, AgentJobItemStatus::Pending);
assert_eq!(job_item.assigned_thread_id, None);
assert_eq!(
job_item.last_error,
Some("assigned thread was deleted".to_string())
);
let missing_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000403")?;
let missing_child_thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000404")?;
seed_thread_cleanup_state(&runtime, missing_thread_id, missing_child_thread_id).await?;
assert_eq!(runtime.delete_thread(missing_thread_id).await?, 0);
assert_thread_cleanup_state(&runtime, missing_thread_id).await?;
Ok(())
}
async fn seed_thread_cleanup_state(
runtime: &StateRuntime,
thread_id: ThreadId,
child_thread_id: ThreadId,
) -> Result<()> {
runtime
.upsert_thread_spawn_edge(
thread_id,
child_thread_id,
DirectionalThreadSpawnEdgeStatus::Closed,
)
.await?;
runtime
.thread_goals()
.replace_thread_goal(
thread_id,
"test goal",
crate::ThreadGoalStatus::Active,
/*token_budget*/ None,
)
.await?;
sqlx::query("INSERT INTO logs (ts, ts_nanos, level, target, feedback_log_body, thread_id) VALUES (1, 0, 'INFO', 'test', 'feedback log', ?)")
.bind(thread_id.to_string())
.execute(runtime.logs_pool.as_ref())
.await?;
Ok(())
}
async fn assert_thread_cleanup_state(
runtime: &StateRuntime,
thread_id: ThreadId,
) -> Result<()> {
let spawn_edge_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM thread_spawn_edges WHERE parent_thread_id = ? OR child_thread_id = ?",
)
.bind(thread_id.to_string())
.bind(thread_id.to_string())
.fetch_one(runtime.pool.as_ref())
.await?;
assert_eq!(spawn_edge_count, 0);
assert_eq!(
runtime.thread_goals().get_thread_goal(thread_id).await?,
None
);
let logs = runtime
.query_logs(&LogQuery {
thread_ids: vec![thread_id.to_string()],
..Default::default()
})
.await?;
assert!(logs.is_empty());
Ok(())
}
#[tokio::test]
async fn list_threads_updated_after_returns_oldest_changes_first() {
let codex_home = unique_temp_dir();

View File

@@ -15,6 +15,7 @@ use codex_protocol::protocol::RolloutItem;
use crate::AppendThreadItemsParams;
use crate::ArchiveThreadParams;
use crate::CreateThreadParams;
use crate::DeleteThreadParams;
use crate::ListThreadsParams;
use crate::LoadThreadHistoryParams;
use crate::ReadThreadByRolloutPathParams;
@@ -111,6 +112,7 @@ pub struct InMemoryThreadStoreCalls {
pub update_thread_metadata: usize,
pub archive_thread: usize,
pub unarchive_thread: usize,
pub delete_thread: usize,
}
/// In-memory [`ThreadStore`] implementation for tests and debug configs.
@@ -297,6 +299,25 @@ impl ThreadStore for InMemoryThreadStore {
state.calls.unarchive_thread += 1;
stored_thread_from_state(&state, params.thread_id, /*include_history*/ false)
}
async fn delete_thread(&self, params: DeleteThreadParams) -> ThreadStoreResult<()> {
let mut state = self.state.lock().await;
state.calls.delete_thread += 1;
let existed = state.histories.remove(&params.thread_id).is_some();
state.created_threads.remove(&params.thread_id);
state.names.remove(&params.thread_id);
state.metadata_updates.remove(&params.thread_id);
state
.rollout_paths
.retain(|_, thread_id| *thread_id != params.thread_id);
if existed {
Ok(())
} else {
Err(ThreadStoreError::ThreadNotFound {
thread_id: params.thread_id,
})
}
}
}
fn stored_thread_from_state(

View File

@@ -25,6 +25,7 @@ pub use types::AppendThreadItemsParams;
pub use types::ArchiveThreadParams;
pub use types::ClearableField;
pub use types::CreateThreadParams;
pub use types::DeleteThreadParams;
pub use types::GitInfoPatch;
pub use types::ItemPage;
pub use types::ListItemsParams;

View File

@@ -0,0 +1,245 @@
//! Local hard-delete support for persisted threads.
//!
//! Existing rollout files are deleted before this operation reports success. Missing rollout files
//! count as already deleted; SQLite and compatibility metadata cleanup is best effort after rollout
//! deletion succeeds.
use std::io::ErrorKind;
use std::path::Path;
use codex_rollout::ARCHIVED_SESSIONS_SUBDIR;
use codex_rollout::SESSIONS_SUBDIR;
use codex_rollout::find_archived_thread_path_by_id_str;
use codex_rollout::find_thread_path_by_id_str;
use tracing::warn;
use super::LocalThreadStore;
use super::helpers::matching_rollout_file_name;
use super::helpers::scoped_rollout_path;
use crate::DeleteThreadParams;
use crate::ThreadStoreError;
use crate::ThreadStoreResult;
pub(super) async fn delete_thread(
store: &LocalThreadStore,
params: DeleteThreadParams,
) -> ThreadStoreResult<()> {
let thread_id = params.thread_id;
let thread_id_str = thread_id.to_string();
let state_db_ctx = store.state_db().await;
let mut rollout_paths = Vec::new();
let state_thread_exists = if let Some(ctx) = state_db_ctx.as_ref() {
match ctx.get_thread(thread_id).await {
Ok(Some(_)) => true,
Ok(None) => false,
Err(err) => {
warn!("failed to check thread metadata for {thread_id}: {err}");
false
}
}
} else {
false
};
match find_thread_path_by_id_str(
store.config.codex_home.as_path(),
thread_id_str.as_str(),
state_db_ctx.as_deref(),
)
.await
{
Ok(Some(path)) => rollout_paths.push(path),
Ok(None) => {}
Err(err) => {
return Err(ThreadStoreError::InvalidRequest {
message: format!("failed to locate thread id {thread_id}: {err}"),
});
}
}
match find_archived_thread_path_by_id_str(
store.config.codex_home.as_path(),
thread_id_str.as_str(),
state_db_ctx.as_deref(),
)
.await
{
Ok(Some(path)) => {
if !rollout_paths.contains(&path) {
rollout_paths.push(path);
}
}
Ok(None) => {}
Err(err) => {
return Err(ThreadStoreError::InvalidRequest {
message: format!("failed to locate archived thread id {thread_id}: {err}"),
});
}
}
store.live_recorders.lock().await.remove(&thread_id);
let mut deleted_rollout_file = false;
for rollout_path in rollout_paths {
deleted_rollout_file |= delete_rollout_file(store, rollout_path.as_path(), thread_id)?;
}
let deleted_state_rows = if let Some(ctx) = state_db_ctx.as_ref() {
match ctx.delete_thread(thread_id).await {
Ok(rows) => rows,
Err(err) if state_thread_exists && !deleted_rollout_file => {
return Err(ThreadStoreError::Internal {
message: format!("failed to delete thread metadata for {thread_id}: {err}"),
});
}
Err(err) => {
warn!("failed to delete thread metadata for {thread_id}: {err}");
0
}
}
} else {
0
};
if !deleted_rollout_file && !state_thread_exists && deleted_state_rows == 0 {
return Err(ThreadStoreError::ThreadNotFound { thread_id });
}
Ok(())
}
fn delete_rollout_file(
store: &LocalThreadStore,
rollout_path: &Path,
thread_id: codex_protocol::ThreadId,
) -> ThreadStoreResult<bool> {
let canonical_rollout_path = scoped_rollout_path(
store.config.codex_home.join(SESSIONS_SUBDIR),
rollout_path,
"sessions",
)
.or_else(|_| {
scoped_rollout_path(
store.config.codex_home.join(ARCHIVED_SESSIONS_SUBDIR),
rollout_path,
"archived sessions",
)
})?;
matching_rollout_file_name(&canonical_rollout_path, thread_id, rollout_path)?;
match std::fs::remove_file(&canonical_rollout_path) {
Ok(()) => Ok(true),
Err(err) if err.kind() == ErrorKind::NotFound => Ok(false),
Err(err) => Err(ThreadStoreError::Internal {
message: format!(
"failed to delete rollout file `{}`: {err}",
canonical_rollout_path.display()
),
}),
}
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SessionSource;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use uuid::Uuid;
use super::*;
use crate::ThreadStore;
use crate::local::LocalThreadStore;
use crate::local::test_support::test_config;
use crate::local::test_support::write_archived_session_file;
use crate::local::test_support::write_session_file;
#[tokio::test]
async fn delete_thread_removes_active_and_archived_rollouts() {
let home = TempDir::new().expect("temp dir");
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let cases = [
(
Uuid::from_u128(301),
write_session_file(home.path(), "2025-01-03T12-00-00", Uuid::from_u128(301))
.expect("session file"),
),
(
Uuid::from_u128(302),
write_archived_session_file(
home.path(),
"2025-01-03T12-00-00",
Uuid::from_u128(302),
)
.expect("archived session file"),
),
];
for (uuid, path) in cases {
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
store
.delete_thread(DeleteThreadParams { thread_id })
.await
.expect("delete thread");
assert!(!path.exists());
}
}
#[tokio::test]
async fn delete_thread_treats_missing_rollout_as_already_deleted_when_sqlite_row_exists() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000303").expect("valid thread id");
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
home.path().join("sessions/missing-rollout.jsonl"),
Utc::now(),
SessionSource::Cli,
);
builder.cwd = home.path().to_path_buf();
let metadata = builder.build(config.default_model_provider_id.as_str());
runtime
.upsert_thread(&metadata)
.await
.expect("state db upsert should succeed");
store
.delete_thread(DeleteThreadParams { thread_id })
.await
.expect("delete thread");
assert_eq!(
runtime
.get_thread(thread_id)
.await
.expect("sqlite metadata read"),
None
);
}
#[tokio::test]
async fn delete_thread_reports_missing_thread() {
let home = TempDir::new().expect("temp dir");
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000304").expect("valid thread id");
let err = store
.delete_thread(DeleteThreadParams { thread_id })
.await
.expect_err("missing thread should fail");
assert_eq!(
err.to_string(),
"thread 00000000-0000-0000-0000-000000000304 not found"
);
}
}

View File

@@ -1,5 +1,6 @@
mod archive_thread;
mod create_thread;
mod delete_thread;
mod helpers;
mod list_threads;
mod live_writer;
@@ -24,6 +25,7 @@ use tokio::sync::Mutex;
use crate::AppendThreadItemsParams;
use crate::ArchiveThreadParams;
use crate::CreateThreadParams;
use crate::DeleteThreadParams;
use crate::ListThreadsParams;
use crate::LoadThreadHistoryParams;
use crate::ReadThreadByRolloutPathParams;
@@ -288,6 +290,10 @@ impl ThreadStore for LocalThreadStore {
) -> ThreadStoreResult<StoredThread> {
unarchive_thread::unarchive_thread(self, params).await
}
async fn delete_thread(&self, params: DeleteThreadParams) -> ThreadStoreResult<()> {
delete_thread::delete_thread(self, params).await
}
}
#[cfg(test)]

View File

@@ -5,6 +5,7 @@ use std::any::Any;
use crate::AppendThreadItemsParams;
use crate::ArchiveThreadParams;
use crate::CreateThreadParams;
use crate::DeleteThreadParams;
use crate::ItemPage;
use crate::ListItemsParams;
use crate::ListThreadsParams;
@@ -119,4 +120,7 @@ pub trait ThreadStore: Any + Send + Sync {
&self,
params: ArchiveThreadParams,
) -> ThreadStoreResult<StoredThread>;
/// Deletes a thread's persisted rollout data and associated metadata.
async fn delete_thread(&self, params: DeleteThreadParams) -> ThreadStoreResult<()>;
}

View File

@@ -652,6 +652,13 @@ pub struct ArchiveThreadParams {
pub thread_id: ThreadId,
}
/// Parameters for deleting a thread.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct DeleteThreadParams {
/// Thread id to delete.
pub thread_id: ThreadId,
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;

View File

@@ -48,6 +48,7 @@ pub(super) fn server_notification_thread_target(
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadArchived(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadDeleted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadUnarchived(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadClosed(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadNameUpdated(notification) => {

View File

@@ -221,6 +221,7 @@ impl ChatWidget {
| ServerNotification::ThreadStarted(_)
| ServerNotification::ThreadStatusChanged(_)
| ServerNotification::ThreadArchived(_)
| ServerNotification::ThreadDeleted(_)
| ServerNotification::ThreadUnarchived(_)
| ServerNotification::RawResponseItemCompleted(_)
| ServerNotification::CommandExecOutputDelta(_)