Simplify thread delete handling

This commit is contained in:
Eric Traut
2026-05-28 22:32:30 -07:00
parent 2df653fcc2
commit 642ae9c6a2
4 changed files with 133 additions and 141 deletions

View File

@@ -515,6 +515,7 @@ use token_usage_replay::send_thread_token_usage_update_to_connection;
mod config_errors;
mod request_errors;
mod thread_delete;
mod thread_goal_processor;
mod thread_lifecycle;
mod thread_resume_redaction;

View File

@@ -0,0 +1,117 @@
//! `thread/delete` request handling.
use super::thread_processor::core_thread_write_error;
use super::thread_processor::unsupported_thread_store_operation;
use super::*;
impl ThreadRequestProcessor {
pub(crate) async fn thread_delete(
&self,
request_id: ConnectionRequestId,
params: ThreadDeleteParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
match self.thread_delete_inner(params).await {
Ok((response, deleted_thread_ids)) => {
self.outgoing
.send_response(request_id.clone(), response)
.await;
for thread_id in deleted_thread_ids {
self.outgoing
.send_server_notification(ServerNotification::ThreadDeleted(
ThreadDeletedNotification { thread_id },
))
.await;
}
Ok(None)
}
Err(error) => Err(error),
}
}
async fn thread_delete_inner(
&self,
params: ThreadDeleteParams,
) -> Result<(ThreadDeleteResponse, Vec<String>), JSONRPCErrorError> {
let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?;
self.thread_delete_response(params).await
}
async fn thread_delete_response(
&self,
params: ThreadDeleteParams,
) -> Result<(ThreadDeleteResponse, Vec<String>), JSONRPCErrorError> {
let thread_id = ThreadId::from_string(&params.thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))?;
let mut thread_ids = self.state_db_spawn_subtree_thread_ids(thread_id).await?;
let mut seen = thread_ids.iter().copied().collect::<HashSet<_>>();
match self
.thread_manager
.list_agent_subtree_thread_ids(thread_id)
.await
{
Ok(live_thread_ids) => {
for live_thread_id in live_thread_ids {
if seen.insert(live_thread_id) {
thread_ids.push(live_thread_id);
}
}
}
Err(CodexErr::ThreadNotFound(_)) if self.state_db.is_some() => {}
Err(CodexErr::ThreadNotFound(_)) => {
return Err(internal_error(format!(
"cannot delete thread {thread_id}: sqlite state db is unavailable and the thread is not loaded"
)));
}
Err(err) => return Err(core_thread_write_error("delete thread", err)),
}
self.prepare_thread_for_removal(thread_id, "delete").await;
match self
.thread_store
.delete_thread(StoreDeleteThreadParams { thread_id })
.await
{
Ok(()) => {}
Err(err) => return Err(thread_store_delete_error(err)),
}
let mut deleted_thread_ids = vec![thread_id.to_string()];
for descendant_thread_id in thread_ids.iter().skip(1).rev().copied() {
self.prepare_thread_for_removal(descendant_thread_id, "delete")
.await;
match self
.thread_store
.delete_thread(StoreDeleteThreadParams {
thread_id: descendant_thread_id,
})
.await
{
Ok(()) => {
deleted_thread_ids.push(descendant_thread_id.to_string());
}
Err(err) => {
warn!(
"failed to delete spawned descendant thread {descendant_thread_id} while deleting {thread_id}: {err}"
);
}
}
}
Ok((ThreadDeleteResponse {}, deleted_thread_ids))
}
}
fn thread_store_delete_error(err: ThreadStoreError) -> JSONRPCErrorError {
match err {
ThreadStoreError::ThreadNotFound { thread_id } => {
invalid_request(format!("thread not found: {thread_id}"))
}
ThreadStoreError::InvalidRequest { message } => invalid_request(message),
ThreadStoreError::Unsupported { operation } => {
unsupported_thread_store_operation(operation)
}
err => internal_error(format!("failed to delete thread: {err}")),
}
}

View File

@@ -464,29 +464,6 @@ impl ThreadRequestProcessor {
}
}
pub(crate) async fn thread_delete(
&self,
request_id: ConnectionRequestId,
params: ThreadDeleteParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
match self.thread_delete_inner(params).await {
Ok((response, deleted_thread_ids)) => {
self.outgoing
.send_response(request_id.clone(), response)
.await;
for thread_id in deleted_thread_ids {
self.outgoing
.send_server_notification(ServerNotification::ThreadDeleted(
ThreadDeletedNotification { thread_id },
))
.await;
}
Ok(None)
}
Err(error) => Err(error),
}
}
pub(crate) async fn thread_increment_elicitation(
&self,
params: ThreadIncrementElicitationParams,
@@ -708,7 +685,7 @@ impl ThreadRequestProcessor {
Ok((thread_id, thread))
}
async fn acquire_thread_list_state_permit(
pub(super) async fn acquire_thread_list_state_permit(
&self,
) -> Result<SemaphorePermit<'_>, JSONRPCErrorError> {
self.thread_list_state_permit
@@ -783,11 +760,7 @@ impl ThreadRequestProcessor {
self.prepare_thread_for_removal(thread_id, "archive").await;
}
async fn prepare_thread_for_delete(&self, thread_id: ThreadId) {
self.prepare_thread_for_removal(thread_id, "delete").await;
}
async fn prepare_thread_for_removal(&self, thread_id: ThreadId, operation: &str) {
pub(super) async fn prepare_thread_for_removal(&self, thread_id: ThreadId, operation: &str) {
let removed_conversation = self.thread_manager.remove_thread(&thread_id).await;
if let Some(conversation) = removed_conversation {
info!("thread {thread_id} was active; shutting down");
@@ -1341,10 +1314,7 @@ impl ThreadRequestProcessor {
let thread_id = ThreadId::from_string(&params.thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))?;
let mut thread_ids = vec![thread_id];
let mut seen = HashSet::from([thread_id]);
self.append_state_db_spawn_descendants(thread_id, &mut thread_ids, &mut seen)
.await?;
let thread_ids = self.state_db_spawn_subtree_thread_ids(thread_id).await?;
let mut archive_thread_ids = Vec::new();
match self
@@ -1429,15 +1399,15 @@ impl ThreadRequestProcessor {
Ok((ThreadArchiveResponse {}, archived_thread_ids))
}
async fn append_state_db_spawn_descendants(
pub(super) async fn state_db_spawn_subtree_thread_ids(
&self,
thread_id: ThreadId,
thread_ids: &mut Vec<ThreadId>,
seen: &mut HashSet<ThreadId>,
) -> Result<(), JSONRPCErrorError> {
) -> Result<Vec<ThreadId>, JSONRPCErrorError> {
let mut thread_ids = vec![thread_id];
let Some(state_db_ctx) = self.state_db.as_ref() else {
return Ok(());
return Ok(thread_ids);
};
let mut seen = HashSet::from([thread_id]);
let descendants = state_db_ctx
.list_thread_spawn_descendants(thread_id)
.await
@@ -1451,91 +1421,7 @@ impl ThreadRequestProcessor {
thread_ids.push(descendant_id);
}
}
Ok(())
}
async fn thread_delete_inner(
&self,
params: ThreadDeleteParams,
) -> Result<(ThreadDeleteResponse, Vec<String>), JSONRPCErrorError> {
let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?;
self.thread_delete_response(params).await
}
async fn thread_delete_response(
&self,
params: ThreadDeleteParams,
) -> Result<(ThreadDeleteResponse, Vec<String>), JSONRPCErrorError> {
let thread_id = ThreadId::from_string(&params.thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))?;
let mut thread_ids = vec![thread_id];
let mut seen = HashSet::from([thread_id]);
let has_state_db = self.state_db.is_some();
self.append_state_db_spawn_descendants(thread_id, &mut thread_ids, &mut seen)
.await?;
match self
.thread_manager
.list_agent_subtree_thread_ids(thread_id)
.await
{
Ok(live_thread_ids) => {
for live_thread_id in live_thread_ids {
if seen.insert(live_thread_id) {
thread_ids.push(live_thread_id);
}
}
}
Err(CodexErr::ThreadNotFound(_)) if has_state_db => {}
Err(CodexErr::ThreadNotFound(_)) => {
return Err(internal_error(format!(
"cannot delete thread {thread_id}: sqlite state db is unavailable and the thread is not loaded"
)));
}
Err(err) => return Err(core_thread_write_error("delete thread", err)),
}
let mut deleted_thread_ids = Vec::new();
let Some((parent_thread_id, descendant_thread_ids)) = thread_ids.split_first() else {
return Ok((ThreadDeleteResponse {}, deleted_thread_ids));
};
self.prepare_thread_for_delete(*parent_thread_id).await;
match self
.thread_store
.delete_thread(StoreDeleteThreadParams {
thread_id: *parent_thread_id,
})
.await
{
Ok(()) => {
deleted_thread_ids.push(parent_thread_id.to_string());
}
Err(err) => return Err(thread_store_delete_error(err)),
}
for descendant_thread_id in descendant_thread_ids.iter().rev().copied() {
self.prepare_thread_for_delete(descendant_thread_id).await;
match self
.thread_store
.delete_thread(StoreDeleteThreadParams {
thread_id: descendant_thread_id,
})
.await
{
Ok(()) => {
deleted_thread_ids.push(descendant_thread_id.to_string());
}
Err(err) => {
warn!(
"failed to delete spawned descendant thread {descendant_thread_id} while deleting {thread_id}: {err}"
);
}
}
}
Ok((ThreadDeleteResponse {}, deleted_thread_ids))
Ok(thread_ids)
}
async fn thread_increment_elicitation_inner(
@@ -3953,7 +3839,7 @@ fn thread_read_view_error(err: ThreadReadViewError) -> JSONRPCErrorError {
}
}
fn unsupported_thread_store_operation(operation: &'static str) -> JSONRPCErrorError {
pub(super) fn unsupported_thread_store_operation(operation: &'static str) -> JSONRPCErrorError {
method_not_found(format!("{operation} is not supported yet"))
}
@@ -4074,7 +3960,7 @@ fn conversation_summary_rollout_path_read_error(
}
}
fn core_thread_write_error(operation: &str, err: CodexErr) -> JSONRPCErrorError {
pub(super) fn core_thread_write_error(operation: &str, err: CodexErr) -> JSONRPCErrorError {
match err {
CodexErr::ThreadNotFound(thread_id) => {
invalid_request(format!("thread not found: {thread_id}"))
@@ -4095,19 +3981,6 @@ fn thread_store_archive_error(operation: &str, err: ThreadStoreError) -> JSONRPC
}
}
fn thread_store_delete_error(err: ThreadStoreError) -> JSONRPCErrorError {
match err {
ThreadStoreError::ThreadNotFound { thread_id } => {
invalid_request(format!("thread not found: {thread_id}"))
}
ThreadStoreError::InvalidRequest { message } => invalid_request(message),
ThreadStoreError::Unsupported { operation } => {
unsupported_thread_store_operation(operation)
}
err => internal_error(format!("failed to delete thread: {err}")),
}
}
fn set_thread_name_from_title(thread: &mut Thread, title: String) {
if title.trim().is_empty() || thread.preview.trim() == title.trim() {
return;

View File

@@ -24,13 +24,14 @@ pub(super) async fn delete_thread(
params: DeleteThreadParams,
) -> ThreadStoreResult<()> {
let thread_id = params.thread_id;
let thread_id_str = thread_id.to_string();
let state_db_ctx = store.state_db().await;
let mut rollout_paths = Vec::new();
let mut path_lookup_errors = Vec::new();
match find_thread_path_by_id_str(
store.config.codex_home.as_path(),
&thread_id.to_string(),
thread_id_str.as_str(),
state_db_ctx.as_deref(),
)
.await
@@ -44,13 +45,13 @@ pub(super) async fn delete_thread(
match find_archived_thread_path_by_id_str(
store.config.codex_home.as_path(),
&thread_id.to_string(),
thread_id_str.as_str(),
state_db_ctx.as_deref(),
)
.await
{
Ok(Some(path)) => {
if !rollout_paths.iter().any(|existing| existing == &path) {
if !rollout_paths.contains(&path) {
rollout_paths.push(path);
}
}