Simplify thread delete changes

This commit is contained in:
Eric Traut
2026-05-28 23:53:35 -07:00
parent 0b169881ec
commit bc2a0d4e46
7 changed files with 269 additions and 315 deletions

View File

@@ -378,7 +378,7 @@ impl MessageProcessor {
Arc::clone(&thread_manager),
Arc::clone(&config),
feedback,
log_db,
log_db.clone(),
state_db.clone(),
);
let git_processor = GitRequestProcessor::new();
@@ -431,6 +431,7 @@ impl MessageProcessor {
Arc::clone(&thread_list_state_permit),
thread_goal_processor.clone(),
state_db,
log_db,
Arc::clone(&skills_watcher),
);
let turn_processor = TurnRequestProcessor::new(

View File

@@ -10,7 +10,11 @@ impl ThreadRequestProcessor {
request_id: ConnectionRequestId,
params: ThreadDeleteParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
match self.thread_delete_inner(params).await {
let result = {
let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?;
self.thread_delete_response(params).await
};
match result {
Ok((response, deleted_thread_ids)) => {
self.outgoing
.send_response(request_id.clone(), response)
@@ -28,14 +32,6 @@ impl ThreadRequestProcessor {
}
}
async fn thread_delete_inner(
&self,
params: ThreadDeleteParams,
) -> Result<(ThreadDeleteResponse, Vec<String>), JSONRPCErrorError> {
let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?;
self.thread_delete_response(params).await
}
async fn thread_delete_response(
&self,
params: ThreadDeleteParams,
@@ -58,17 +54,12 @@ impl ThreadRequestProcessor {
}
}
}
Err(CodexErr::ThreadNotFound(_)) if self.state_db.is_some() => {}
Err(CodexErr::ThreadNotFound(_)) => {
return Err(internal_error(format!(
"cannot delete thread {thread_id}: sqlite state db is unavailable and the thread is not loaded"
)));
}
Err(CodexErr::ThreadNotFound(_)) => {}
Err(err) => return Err(core_thread_write_error("delete thread", err)),
}
self.validate_root_thread_delete(thread_id).await?;
self.prepare_thread_for_removal(thread_id, "delete").await;
self.prepare_thread_for_delete(thread_id).await;
match self
.thread_store
.delete_thread(StoreDeleteThreadParams { thread_id })
@@ -80,8 +71,7 @@ impl ThreadRequestProcessor {
let mut deleted_thread_ids = vec![thread_id.to_string()];
for descendant_thread_id in thread_ids.iter().skip(1).rev().copied() {
self.prepare_thread_for_removal(descendant_thread_id, "delete")
.await;
self.prepare_thread_for_delete(descendant_thread_id).await;
match self
.thread_store
.delete_thread(StoreDeleteThreadParams {
@@ -108,7 +98,7 @@ impl ThreadRequestProcessor {
thread_id: ThreadId,
) -> Result<(), JSONRPCErrorError> {
if let Ok(thread) = self.thread_manager.get_thread(thread_id).await
&& thread.rollout_path().is_none()
&& thread.config_snapshot().await.ephemeral
{
return Err(invalid_request(format!(
"thread is not persisted and cannot be deleted: {thread_id}"
@@ -124,6 +114,13 @@ impl ThreadRequestProcessor {
.map(|_| ())
.map_err(thread_store_delete_error)
}
async fn prepare_thread_for_delete(&self, thread_id: ThreadId) {
self.prepare_thread_for_removal(thread_id, "delete").await;
if let Some(log_db) = self.log_db.as_ref() {
log_db.flush().await;
}
}
}
fn thread_store_delete_error(err: ThreadStoreError) -> JSONRPCErrorError {

View File

@@ -337,6 +337,7 @@ pub(crate) struct ThreadRequestProcessor {
pub(super) thread_list_state_permit: Arc<Semaphore>,
pub(super) thread_goal_processor: ThreadGoalRequestProcessor,
pub(super) state_db: Option<StateDbHandle>,
pub(super) log_db: Option<LogDbLayer>,
pub(super) background_tasks: TaskTracker,
pub(super) skills_watcher: Arc<SkillsWatcher>,
}
@@ -357,6 +358,7 @@ impl ThreadRequestProcessor {
thread_list_state_permit: Arc<Semaphore>,
thread_goal_processor: ThreadGoalRequestProcessor,
state_db: Option<StateDbHandle>,
log_db: Option<LogDbLayer>,
skills_watcher: Arc<SkillsWatcher>,
) -> Self {
Self {
@@ -373,6 +375,7 @@ impl ThreadRequestProcessor {
thread_list_state_permit,
thread_goal_processor,
state_db,
log_db,
background_tasks: TaskTracker::new(),
skills_watcher,
}

View File

@@ -20,6 +20,7 @@ use std::sync::Arc;
use anyhow::Result;
use app_test_support::create_mock_responses_server_repeating_assistant;
use codex_app_server::in_process;
use codex_app_server::in_process::InProcessClientHandle;
use codex_app_server::in_process::InProcessServerEvent;
use codex_app_server::in_process::InProcessStartArgs;
use codex_app_server_protocol::ClientInfo;
@@ -27,6 +28,8 @@ use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadDeleteParams;
use codex_app_server_protocol::ThreadDeleteResponse;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadStartParams;
@@ -40,8 +43,15 @@ use codex_config::NoopThreadConfigLoader;
use codex_core::config::ConfigBuilder;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_protocol::ThreadId;
use codex_protocol::models::BaseInstructions;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::ThreadMemoryMode;
use codex_thread_store::CreateThreadParams as StoreCreateThreadParams;
use codex_thread_store::InMemoryThreadStore;
use codex_thread_store::ThreadEventPersistenceMode;
use codex_thread_store::ThreadPersistenceMetadata;
use codex_thread_store::ThreadStore;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -59,43 +69,10 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste
// here so this regression stays focused on thread persistence artifacts.
create_config_toml_with_thread_store(codex_home.path(), &server.uri(), &store_id)?;
let loader_overrides = LoaderOverrides::without_managed_config_for_tests();
let config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(codex_home.path().to_path_buf()))
.loader_overrides(loader_overrides.clone())
.build()
.await?;
let thread_store = InMemoryThreadStore::for_id(store_id.clone());
let _in_memory_store = InMemoryThreadStoreId { store_id };
let mut client = in_process::start(InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(config),
cli_overrides: Vec::new(),
loader_overrides,
strict_config: false,
cloud_requirements: CloudRequirementsLoader::default(),
thread_config_loader: Arc::new(NoopThreadConfigLoader),
feedback: CodexFeedback::new(),
log_db: None,
state_db: None,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
config_warnings: Vec::new(),
session_source: SessionSource::Cli,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-app-server-tests".to_string(),
title: None,
version: "0.1.0".to_string(),
},
capabilities: None,
},
channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await?;
let mut client = start_in_process_server(codex_home.path()).await?;
let response = client
.request(ClientRequest::ThreadStart {
@@ -164,11 +141,14 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste
assert_eq!(data[0].id, thread.id);
assert_eq!(data[0].path, None);
delete_thread(&client, 4, thread.id.clone()).await?;
client.shutdown().await?;
let calls = thread_store.calls().await;
assert_eq!(calls.create_thread, 1);
assert_eq!(calls.list_threads, 1);
assert_eq!(calls.delete_thread, 1);
assert!(
calls.append_items > 0,
"turn/start should append rollout items through the injected store"
@@ -183,6 +163,100 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste
Ok(())
}
#[tokio::test]
async fn thread_delete_with_non_local_thread_store_deletes_unloaded_thread_without_sqlite()
-> Result<()> {
let codex_home = TempDir::new()?;
let store_id = Uuid::new_v4().to_string();
create_config_toml_with_thread_store(codex_home.path(), "http://127.0.0.1:1", &store_id)?;
let thread_store = InMemoryThreadStore::for_id(store_id.clone());
let _in_memory_store = InMemoryThreadStoreId { store_id };
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string())?;
thread_store
.create_thread(StoreCreateThreadParams {
thread_id,
forked_from_id: None,
source: SessionSource::Cli,
thread_source: None,
base_instructions: BaseInstructions::default(),
dynamic_tools: Vec::new(),
metadata: ThreadPersistenceMetadata {
cwd: Some(codex_home.path().to_path_buf()),
model_provider: "mock_provider".to_string(),
memory_mode: ThreadMemoryMode::Enabled,
},
event_persistence_mode: ThreadEventPersistenceMode::Limited,
})
.await?;
let client = start_in_process_server(codex_home.path()).await?;
delete_thread(&client, 1, thread_id.to_string()).await?;
client.shutdown().await?;
let calls = thread_store.calls().await;
assert_eq!(calls.read_thread, 1);
assert_eq!(calls.delete_thread, 1);
assert_no_local_persistence_artifacts(codex_home.path())?;
Ok(())
}
async fn start_in_process_server(codex_home: &Path) -> Result<InProcessClientHandle> {
let loader_overrides = LoaderOverrides::without_managed_config_for_tests();
let config = ConfigBuilder::default()
.codex_home(codex_home.to_path_buf())
.fallback_cwd(Some(codex_home.to_path_buf()))
.loader_overrides(loader_overrides.clone())
.build()
.await?;
Ok(in_process::start(InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(config),
cli_overrides: Vec::new(),
loader_overrides,
strict_config: false,
cloud_requirements: CloudRequirementsLoader::default(),
thread_config_loader: Arc::new(NoopThreadConfigLoader),
feedback: CodexFeedback::new(),
log_db: None,
state_db: None,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
config_warnings: Vec::new(),
session_source: SessionSource::Cli,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-app-server-tests".to_string(),
title: None,
version: "0.1.0".to_string(),
},
capabilities: None,
},
channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await?)
}
async fn delete_thread(
client: &InProcessClientHandle,
request_id: i64,
thread_id: String,
) -> Result<()> {
let response = client
.request(ClientRequest::ThreadDelete {
request_id: RequestId::Integer(request_id),
params: ThreadDeleteParams { thread_id },
})
.await?
.map_err(|error| anyhow::anyhow!("thread/delete failed: {}", error.message))?;
let _: ThreadDeleteResponse = serde_json::from_value(response)?;
Ok(())
}
fn assert_no_local_persistence_artifacts(codex_home: &Path) -> Result<()> {
// These are the observable tripwires for accidental local persistence. If a
// future code path constructs a local rollout/session store or opens the

View File

@@ -3,11 +3,7 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::rollout_path;
use app_test_support::to_response;
use chrono::DateTime;
use chrono::Utc;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
@@ -18,16 +14,12 @@ use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SessionSource;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use codex_state::StateRuntime;
use codex_state::ThreadGoalStatus;
use codex_state::ThreadMetadataBuilder;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -64,30 +56,9 @@ async fn thread_delete_deletes_spawned_descendants_and_metadata() -> Result<()>
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
let parent_thread_id = seed_thread_metadata(
&state_db,
codex_home.path(),
&parent_id,
"2025-01-01T00-00-00",
"2025-01-01T00:00:00Z",
)
.await?;
let child_thread_id = seed_thread_metadata(
&state_db,
codex_home.path(),
&child_id,
"2025-01-01T00-01-00",
"2025-01-01T00:01:00Z",
)
.await?;
let grandchild_thread_id = seed_thread_metadata(
&state_db,
codex_home.path(),
&grandchild_id,
"2025-01-01T00-02-00",
"2025-01-01T00:02:00Z",
)
.await?;
let parent_thread_id = ThreadId::from_string(&parent_id)?;
let child_thread_id = ThreadId::from_string(&child_id)?;
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
state_db
.upsert_thread_spawn_edge(
@@ -154,68 +125,51 @@ async fn thread_delete_deletes_spawned_descendants_and_metadata() -> Result<()>
assert_eq!(deleted_ids, vec![parent_id, grandchild_id, child_id]);
for thread_id in [parent_thread_id, child_thread_id, grandchild_thread_id] {
let rollout_path = find_thread_path_by_id_str(
codex_home.path(),
&thread_id.to_string(),
/*state_db_ctx*/ None,
)
.await?;
assert!(
find_thread_path_by_id_str(
codex_home.path(),
&thread_id.to_string(),
/*state_db_ctx*/ None,
)
.await?
.is_none(),
rollout_path.is_none(),
"expected active rollout for {thread_id} to be deleted"
);
assert!(
find_archived_thread_path_by_id_str(
codex_home.path(),
&thread_id.to_string(),
/*state_db_ctx*/ None,
)
.await?
.is_none(),
"expected archived rollout for {thread_id} to be absent"
);
assert!(
state_db.get_thread(thread_id).await?.is_none(),
"expected sqlite metadata for {thread_id} to be deleted"
);
}
assert!(
state_db
.list_thread_spawn_descendants(parent_thread_id)
.await?
.is_empty()
);
assert!(
state_db
.thread_goals()
.get_thread_goal(parent_thread_id)
.await?
.is_none()
);
assert!(
state_db
.thread_goals()
.get_thread_goal(child_thread_id)
.await?
.is_none()
);
let descendants = state_db
.list_thread_spawn_descendants(parent_thread_id)
.await?;
assert!(descendants.is_empty());
for thread_id in [parent_thread_id, child_thread_id] {
let goal = state_db.thread_goals().get_thread_goal(thread_id).await?;
assert!(goal.is_none());
}
Ok(())
}
#[tokio::test]
async fn thread_delete_rejects_live_ephemeral_thread_without_unloading() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_ephemeral_thread(&mut mcp).await?;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
ephemeral: Some(true),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let delete_id = mcp
.send_thread_delete_request(ThreadDeleteParams {
thread_id: thread_id.clone(),
thread_id: thread.id.clone(),
})
.await?;
let delete_err: JSONRPCError = timeout(
@@ -223,89 +177,24 @@ async fn thread_delete_rejects_live_ephemeral_thread_without_unloading() -> Resu
mcp.read_stream_until_error_message(RequestId::Integer(delete_id)),
)
.await??;
assert_eq!(
delete_err.error.message,
format!("thread is not persisted and cannot be deleted: {thread_id}")
let expected_message = format!(
"thread is not persisted and cannot be deleted: {}",
thread.id
);
assert_eq!(delete_err.error.message, expected_message);
let loaded_thread_ids = loaded_thread_ids(&mut mcp).await?;
assert_eq!(loaded_thread_ids, vec![thread_id]);
Ok(())
}
async fn seed_thread_metadata(
state_db: &StateRuntime,
codex_home: &Path,
thread_id: &str,
filename_ts: &str,
meta_rfc3339: &str,
) -> Result<ThreadId> {
let id = ThreadId::from_string(thread_id)?;
let created_at = DateTime::parse_from_rfc3339(meta_rfc3339)?.with_timezone(&Utc);
let mut builder = ThreadMetadataBuilder::new(
id,
rollout_path(codex_home, filename_ts, thread_id),
created_at,
SessionSource::Cli,
);
builder.updated_at = Some(created_at);
builder.cwd = codex_home.to_path_buf();
let metadata = builder.build("mock_provider");
state_db.upsert_thread(&metadata).await?;
Ok(id)
}
async fn start_ephemeral_thread(mcp: &mut McpProcess) -> Result<String> {
let request_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.2".to_string()),
ephemeral: Some(true),
..Default::default()
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(response)?;
Ok(thread.id)
}
async fn loaded_thread_ids(mcp: &mut McpProcess) -> Result<Vec<String>> {
let request_id = mcp
let list_id = mcp
.send_thread_loaded_list_request(ThreadLoadedListParams::default())
.await?;
let response: JSONRPCResponse = timeout(
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ThreadLoadedListResponse { mut data, .. } =
to_response::<ThreadLoadedListResponse>(response)?;
to_response::<ThreadLoadedListResponse>(list_resp)?;
data.sort();
Ok(data)
}
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
assert_eq!(data, vec![thread.id]);
Ok(())
}

View File

@@ -888,9 +888,6 @@ ON CONFLICT(id) DO UPDATE SET
.execute(self.pool.as_ref())
.await?;
let rows_affected = result.rows_affected();
if rows_affected == 0 {
return Ok(0);
}
if let Err(err) = sqlx::query("DELETE FROM thread_dynamic_tools WHERE thread_id = ?")
.bind(thread_id_string.as_str())
@@ -1174,6 +1171,7 @@ mod tests {
use crate::DirectionalThreadSpawnEdgeStatus;
use crate::runtime::test_support::test_thread_metadata;
use crate::runtime::test_support::unique_temp_dir;
use anyhow::Result;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::GitInfo;
use codex_protocol::protocol::SessionMeta;
@@ -1222,31 +1220,19 @@ mod tests {
}
#[tokio::test]
async fn delete_thread_cleans_associated_state() {
async fn delete_thread_cleans_associated_state() -> Result<()> {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("state db should initialize");
let thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000401").expect("valid thread id");
let child_thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000402").expect("valid thread id");
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()).await?;
let thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000401")?;
let child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000402")?;
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id,
codex_home.clone(),
))
.await
.expect("thread insert should succeed");
runtime
.upsert_thread_spawn_edge(
thread_id,
child_thread_id,
DirectionalThreadSpawnEdgeStatus::Closed,
)
.await
.expect("spawn edge insert should succeed");
.await?;
seed_thread_cleanup_state(&runtime, thread_id, child_thread_id).await?;
sqlx::query(
r#"
INSERT INTO thread_dynamic_tools (thread_id, position, name, description, input_schema)
@@ -1259,34 +1245,7 @@ VALUES (?, ?, ?, ?, ?)
.bind("test dynamic tool")
.bind("{}")
.execute(runtime.pool.as_ref())
.await
.expect("dynamic tool insert should succeed");
runtime
.thread_goals()
.replace_thread_goal(
thread_id,
"test goal",
crate::ThreadGoalStatus::Active,
/*token_budget*/ None,
)
.await
.expect("goal insert should succeed");
runtime
.insert_log(&LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "test".to_string(),
message: Some("legacy log".to_string()),
feedback_log_body: Some("feedback log".to_string()),
thread_id: Some(thread_id.to_string()),
process_uuid: Some("process-1".to_string()),
module_path: None,
file: None,
line: None,
})
.await
.expect("log insert should succeed");
.await?;
runtime
.create_agent_job(
&AgentJobCreateParams {
@@ -1307,66 +1266,26 @@ VALUES (?, ?, ?, ?, ?)
row_json: json!({"path": "file-1"}),
}],
)
.await
.expect("agent job insert should succeed");
runtime
.mark_agent_job_running("job-1")
.await
.expect("agent job should mark running");
.await?;
runtime.mark_agent_job_running("job-1").await?;
runtime
.mark_agent_job_item_running_with_thread("job-1", "item-1", &thread_id.to_string())
.await
.expect("agent job item should mark running");
.await?;
let rows = runtime
.delete_thread(thread_id)
.await
.expect("thread delete should succeed");
let rows = runtime.delete_thread(thread_id).await?;
assert_eq!(rows, 1);
assert!(
runtime
.get_thread(thread_id)
.await
.expect("thread lookup should succeed")
.is_none()
);
assert!(runtime.get_thread(thread_id).await?.is_none());
let dynamic_tool_count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM thread_dynamic_tools WHERE thread_id = ?")
.bind(thread_id.to_string())
.fetch_one(runtime.pool.as_ref())
.await
.expect("dynamic tool count should be readable");
.await?;
assert_eq!(dynamic_tool_count, 0);
let spawn_edge_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM thread_spawn_edges WHERE parent_thread_id = ? OR child_thread_id = ?",
)
.bind(thread_id.to_string())
.bind(thread_id.to_string())
.fetch_one(runtime.pool.as_ref())
.await
.expect("spawn edge count should be readable");
assert_eq!(spawn_edge_count, 0);
assert!(
runtime
.thread_goals()
.get_thread_goal(thread_id)
.await
.expect("goal lookup should succeed")
.is_none()
);
let logs = runtime
.query_logs(&LogQuery {
thread_ids: vec![thread_id.to_string()],
..Default::default()
})
.await
.expect("log query should succeed");
assert_eq!(logs.len(), 0);
assert_thread_cleanup_state(&runtime, thread_id).await?;
let job_item = runtime
.get_agent_job_item("job-1", "item-1")
.await
.expect("job item lookup should succeed")
.await?
.expect("job item should exist");
assert_eq!(job_item.status, AgentJobItemStatus::Pending);
assert_eq!(job_item.assigned_thread_id, None);
@@ -1374,6 +1293,85 @@ VALUES (?, ?, ?, ?, ?)
job_item.last_error,
Some("assigned thread was deleted".to_string())
);
Ok(())
}
#[tokio::test]
async fn delete_thread_cleans_associated_state_when_thread_row_is_missing() -> Result<()> {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()).await?;
let thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000403")?;
let child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000404")?;
seed_thread_cleanup_state(&runtime, thread_id, child_thread_id).await?;
let rows = runtime.delete_thread(thread_id).await?;
assert_eq!(rows, 0);
assert_thread_cleanup_state(&runtime, thread_id).await?;
Ok(())
}
async fn seed_thread_cleanup_state(
runtime: &StateRuntime,
thread_id: ThreadId,
child_thread_id: ThreadId,
) -> Result<()> {
runtime
.upsert_thread_spawn_edge(
thread_id,
child_thread_id,
DirectionalThreadSpawnEdgeStatus::Closed,
)
.await?;
runtime
.thread_goals()
.replace_thread_goal(
thread_id,
"test goal",
crate::ThreadGoalStatus::Active,
/*token_budget*/ None,
)
.await?;
runtime
.insert_log(&LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "test".to_string(),
message: Some("legacy log".to_string()),
feedback_log_body: Some("feedback log".to_string()),
thread_id: Some(thread_id.to_string()),
process_uuid: Some("process-1".to_string()),
module_path: None,
file: None,
line: None,
})
.await?;
Ok(())
}
async fn assert_thread_cleanup_state(
runtime: &StateRuntime,
thread_id: ThreadId,
) -> Result<()> {
let spawn_edge_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM thread_spawn_edges WHERE parent_thread_id = ? OR child_thread_id = ?",
)
.bind(thread_id.to_string())
.bind(thread_id.to_string())
.fetch_one(runtime.pool.as_ref())
.await?;
assert_eq!(spawn_edge_count, 0);
let goal = runtime.thread_goals().get_thread_goal(thread_id).await?;
assert!(goal.is_none());
let logs = runtime
.query_logs(&LogQuery {
thread_ids: vec![thread_id.to_string()],
..Default::default()
})
.await?;
assert_eq!(logs.len(), 0);
Ok(())
}
#[tokio::test]

View File

@@ -83,6 +83,9 @@ pub(super) async fn delete_thread(
match delete_rollout_file(store, rollout_path.as_path(), thread_id) {
Ok(deleted) => deleted_rollout_file |= deleted,
Err(err) if deleted_state_rows > 0 => {
// Once SQLite deletion commits, rollout cleanup is best effort. If this JSONL
// survives, compatibility read/list paths may rediscover it and repair metadata;
// that is preferable to failing a delete whose state-row commit already succeeded.
warn!("failed to delete rollout file for thread {thread_id}: {err}");
}
Err(err) => return Err(err),
@@ -139,7 +142,6 @@ mod tests {
use uuid::Uuid;
use super::*;
use crate::ReadThreadParams;
use crate::ThreadStore;
use crate::local::LocalThreadStore;
use crate::local::test_support::test_config;
@@ -161,16 +163,6 @@ mod tests {
.expect("delete thread");
assert!(!active_path.exists());
assert!(
store
.read_thread(ReadThreadParams {
thread_id,
include_archived: true,
include_history: false,
})
.await
.is_err()
);
}
#[tokio::test]