diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 9f0db04fb8..d426c4299c 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -378,7 +378,7 @@ impl MessageProcessor { Arc::clone(&thread_manager), Arc::clone(&config), feedback, - log_db, + log_db.clone(), state_db.clone(), ); let git_processor = GitRequestProcessor::new(); @@ -431,6 +431,7 @@ impl MessageProcessor { Arc::clone(&thread_list_state_permit), thread_goal_processor.clone(), state_db, + log_db, Arc::clone(&skills_watcher), ); let turn_processor = TurnRequestProcessor::new( diff --git a/codex-rs/app-server/src/request_processors/thread_delete.rs b/codex-rs/app-server/src/request_processors/thread_delete.rs index f8f8cc0dcc..7428a6ee35 100644 --- a/codex-rs/app-server/src/request_processors/thread_delete.rs +++ b/codex-rs/app-server/src/request_processors/thread_delete.rs @@ -10,7 +10,11 @@ impl ThreadRequestProcessor { request_id: ConnectionRequestId, params: ThreadDeleteParams, ) -> Result, JSONRPCErrorError> { - match self.thread_delete_inner(params).await { + let result = { + let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?; + self.thread_delete_response(params).await + }; + match result { Ok((response, deleted_thread_ids)) => { self.outgoing .send_response(request_id.clone(), response) @@ -28,14 +32,6 @@ impl ThreadRequestProcessor { } } - async fn thread_delete_inner( - &self, - params: ThreadDeleteParams, - ) -> Result<(ThreadDeleteResponse, Vec), JSONRPCErrorError> { - let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?; - self.thread_delete_response(params).await - } - async fn thread_delete_response( &self, params: ThreadDeleteParams, @@ -58,17 +54,12 @@ impl ThreadRequestProcessor { } } } - Err(CodexErr::ThreadNotFound(_)) if self.state_db.is_some() => {} - Err(CodexErr::ThreadNotFound(_)) => { - return Err(internal_error(format!( - "cannot delete thread {thread_id}: sqlite state db is unavailable and the thread is not loaded" - ))); - } + Err(CodexErr::ThreadNotFound(_)) => {} Err(err) => return Err(core_thread_write_error("delete thread", err)), } self.validate_root_thread_delete(thread_id).await?; - self.prepare_thread_for_removal(thread_id, "delete").await; + self.prepare_thread_for_delete(thread_id).await; match self .thread_store .delete_thread(StoreDeleteThreadParams { thread_id }) @@ -80,8 +71,7 @@ impl ThreadRequestProcessor { let mut deleted_thread_ids = vec![thread_id.to_string()]; for descendant_thread_id in thread_ids.iter().skip(1).rev().copied() { - self.prepare_thread_for_removal(descendant_thread_id, "delete") - .await; + self.prepare_thread_for_delete(descendant_thread_id).await; match self .thread_store .delete_thread(StoreDeleteThreadParams { @@ -108,7 +98,7 @@ impl ThreadRequestProcessor { thread_id: ThreadId, ) -> Result<(), JSONRPCErrorError> { if let Ok(thread) = self.thread_manager.get_thread(thread_id).await - && thread.rollout_path().is_none() + && thread.config_snapshot().await.ephemeral { return Err(invalid_request(format!( "thread is not persisted and cannot be deleted: {thread_id}" @@ -124,6 +114,13 @@ impl ThreadRequestProcessor { .map(|_| ()) .map_err(thread_store_delete_error) } + + async fn prepare_thread_for_delete(&self, thread_id: ThreadId) { + self.prepare_thread_for_removal(thread_id, "delete").await; + if let Some(log_db) = self.log_db.as_ref() { + log_db.flush().await; + } + } } fn thread_store_delete_error(err: ThreadStoreError) -> JSONRPCErrorError { diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 097f802a26..d4efec78b7 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -337,6 +337,7 @@ pub(crate) struct ThreadRequestProcessor { pub(super) thread_list_state_permit: Arc, pub(super) thread_goal_processor: ThreadGoalRequestProcessor, pub(super) state_db: Option, + pub(super) log_db: Option, pub(super) background_tasks: TaskTracker, pub(super) skills_watcher: Arc, } @@ -357,6 +358,7 @@ impl ThreadRequestProcessor { thread_list_state_permit: Arc, thread_goal_processor: ThreadGoalRequestProcessor, state_db: Option, + log_db: Option, skills_watcher: Arc, ) -> Self { Self { @@ -373,6 +375,7 @@ impl ThreadRequestProcessor { thread_list_state_permit, thread_goal_processor, state_db, + log_db, background_tasks: TaskTracker::new(), skills_watcher, } diff --git a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs index 158ebb25d1..e2d7d139c1 100644 --- a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs +++ b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use anyhow::Result; use app_test_support::create_mock_responses_server_repeating_assistant; use codex_app_server::in_process; +use codex_app_server::in_process::InProcessClientHandle; use codex_app_server::in_process::InProcessServerEvent; use codex_app_server::in_process::InProcessStartArgs; use codex_app_server_protocol::ClientInfo; @@ -27,6 +28,8 @@ use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ThreadDeleteParams; +use codex_app_server_protocol::ThreadDeleteResponse; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; use codex_app_server_protocol::ThreadStartParams; @@ -40,8 +43,15 @@ use codex_config::NoopThreadConfigLoader; use codex_core::config::ConfigBuilder; use codex_exec_server::EnvironmentManager; use codex_feedback::CodexFeedback; +use codex_protocol::ThreadId; +use codex_protocol::models::BaseInstructions; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadMemoryMode; +use codex_thread_store::CreateThreadParams as StoreCreateThreadParams; use codex_thread_store::InMemoryThreadStore; +use codex_thread_store::ThreadEventPersistenceMode; +use codex_thread_store::ThreadPersistenceMetadata; +use codex_thread_store::ThreadStore; use pretty_assertions::assert_eq; use tempfile::TempDir; use tokio::time::timeout; @@ -59,43 +69,10 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste // here so this regression stays focused on thread persistence artifacts. create_config_toml_with_thread_store(codex_home.path(), &server.uri(), &store_id)?; - let loader_overrides = LoaderOverrides::without_managed_config_for_tests(); - let config = ConfigBuilder::default() - .codex_home(codex_home.path().to_path_buf()) - .fallback_cwd(Some(codex_home.path().to_path_buf())) - .loader_overrides(loader_overrides.clone()) - .build() - .await?; - let thread_store = InMemoryThreadStore::for_id(store_id.clone()); let _in_memory_store = InMemoryThreadStoreId { store_id }; - let mut client = in_process::start(InProcessStartArgs { - arg0_paths: Arg0DispatchPaths::default(), - config: Arc::new(config), - cli_overrides: Vec::new(), - loader_overrides, - strict_config: false, - cloud_requirements: CloudRequirementsLoader::default(), - thread_config_loader: Arc::new(NoopThreadConfigLoader), - feedback: CodexFeedback::new(), - log_db: None, - state_db: None, - environment_manager: Arc::new(EnvironmentManager::default_for_tests()), - config_warnings: Vec::new(), - session_source: SessionSource::Cli, - enable_codex_api_key_env: false, - initialize: InitializeParams { - client_info: ClientInfo { - name: "codex-app-server-tests".to_string(), - title: None, - version: "0.1.0".to_string(), - }, - capabilities: None, - }, - channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, - }) - .await?; + let mut client = start_in_process_server(codex_home.path()).await?; let response = client .request(ClientRequest::ThreadStart { @@ -164,11 +141,14 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste assert_eq!(data[0].id, thread.id); assert_eq!(data[0].path, None); + delete_thread(&client, 4, thread.id.clone()).await?; + client.shutdown().await?; let calls = thread_store.calls().await; assert_eq!(calls.create_thread, 1); assert_eq!(calls.list_threads, 1); + assert_eq!(calls.delete_thread, 1); assert!( calls.append_items > 0, "turn/start should append rollout items through the injected store" @@ -183,6 +163,100 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste Ok(()) } +#[tokio::test] +async fn thread_delete_with_non_local_thread_store_deletes_unloaded_thread_without_sqlite() +-> Result<()> { + let codex_home = TempDir::new()?; + let store_id = Uuid::new_v4().to_string(); + create_config_toml_with_thread_store(codex_home.path(), "http://127.0.0.1:1", &store_id)?; + + let thread_store = InMemoryThreadStore::for_id(store_id.clone()); + let _in_memory_store = InMemoryThreadStoreId { store_id }; + let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string())?; + thread_store + .create_thread(StoreCreateThreadParams { + thread_id, + forked_from_id: None, + source: SessionSource::Cli, + thread_source: None, + base_instructions: BaseInstructions::default(), + dynamic_tools: Vec::new(), + metadata: ThreadPersistenceMetadata { + cwd: Some(codex_home.path().to_path_buf()), + model_provider: "mock_provider".to_string(), + memory_mode: ThreadMemoryMode::Enabled, + }, + event_persistence_mode: ThreadEventPersistenceMode::Limited, + }) + .await?; + + let client = start_in_process_server(codex_home.path()).await?; + + delete_thread(&client, 1, thread_id.to_string()).await?; + + client.shutdown().await?; + + let calls = thread_store.calls().await; + assert_eq!(calls.read_thread, 1); + assert_eq!(calls.delete_thread, 1); + assert_no_local_persistence_artifacts(codex_home.path())?; + + Ok(()) +} + +async fn start_in_process_server(codex_home: &Path) -> Result { + let loader_overrides = LoaderOverrides::without_managed_config_for_tests(); + let config = ConfigBuilder::default() + .codex_home(codex_home.to_path_buf()) + .fallback_cwd(Some(codex_home.to_path_buf())) + .loader_overrides(loader_overrides.clone()) + .build() + .await?; + + Ok(in_process::start(InProcessStartArgs { + arg0_paths: Arg0DispatchPaths::default(), + config: Arc::new(config), + cli_overrides: Vec::new(), + loader_overrides, + strict_config: false, + cloud_requirements: CloudRequirementsLoader::default(), + thread_config_loader: Arc::new(NoopThreadConfigLoader), + feedback: CodexFeedback::new(), + log_db: None, + state_db: None, + environment_manager: Arc::new(EnvironmentManager::default_for_tests()), + config_warnings: Vec::new(), + session_source: SessionSource::Cli, + enable_codex_api_key_env: false, + initialize: InitializeParams { + client_info: ClientInfo { + name: "codex-app-server-tests".to_string(), + title: None, + version: "0.1.0".to_string(), + }, + capabilities: None, + }, + channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, + }) + .await?) +} + +async fn delete_thread( + client: &InProcessClientHandle, + request_id: i64, + thread_id: String, +) -> Result<()> { + let response = client + .request(ClientRequest::ThreadDelete { + request_id: RequestId::Integer(request_id), + params: ThreadDeleteParams { thread_id }, + }) + .await? + .map_err(|error| anyhow::anyhow!("thread/delete failed: {}", error.message))?; + let _: ThreadDeleteResponse = serde_json::from_value(response)?; + Ok(()) +} + fn assert_no_local_persistence_artifacts(codex_home: &Path) -> Result<()> { // These are the observable tripwires for accidental local persistence. If a // future code path constructs a local rollout/session store or opens the diff --git a/codex-rs/app-server/tests/suite/v2/thread_delete.rs b/codex-rs/app-server/tests/suite/v2/thread_delete.rs index 15ef8eaf4d..440ce09935 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_delete.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_delete.rs @@ -3,11 +3,7 @@ use anyhow::Result; use app_test_support::McpProcess; use app_test_support::create_fake_rollout; -use app_test_support::create_mock_responses_server_repeating_assistant; -use app_test_support::rollout_path; use app_test_support::to_response; -use chrono::DateTime; -use chrono::Utc; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; @@ -18,16 +14,12 @@ use codex_app_server_protocol::ThreadLoadedListParams; use codex_app_server_protocol::ThreadLoadedListResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; -use codex_core::find_archived_thread_path_by_id_str; use codex_core::find_thread_path_by_id_str; use codex_protocol::ThreadId; -use codex_protocol::protocol::SessionSource; use codex_state::DirectionalThreadSpawnEdgeStatus; use codex_state::StateRuntime; use codex_state::ThreadGoalStatus; -use codex_state::ThreadMetadataBuilder; use pretty_assertions::assert_eq; -use std::path::Path; use tempfile::TempDir; use tokio::time::timeout; @@ -64,30 +56,9 @@ async fn thread_delete_deletes_spawned_descendants_and_metadata() -> Result<()> let state_db = StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?; - let parent_thread_id = seed_thread_metadata( - &state_db, - codex_home.path(), - &parent_id, - "2025-01-01T00-00-00", - "2025-01-01T00:00:00Z", - ) - .await?; - let child_thread_id = seed_thread_metadata( - &state_db, - codex_home.path(), - &child_id, - "2025-01-01T00-01-00", - "2025-01-01T00:01:00Z", - ) - .await?; - let grandchild_thread_id = seed_thread_metadata( - &state_db, - codex_home.path(), - &grandchild_id, - "2025-01-01T00-02-00", - "2025-01-01T00:02:00Z", - ) - .await?; + let parent_thread_id = ThreadId::from_string(&parent_id)?; + let child_thread_id = ThreadId::from_string(&child_id)?; + let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?; state_db .upsert_thread_spawn_edge( @@ -154,68 +125,51 @@ async fn thread_delete_deletes_spawned_descendants_and_metadata() -> Result<()> assert_eq!(deleted_ids, vec![parent_id, grandchild_id, child_id]); for thread_id in [parent_thread_id, child_thread_id, grandchild_thread_id] { + let rollout_path = find_thread_path_by_id_str( + codex_home.path(), + &thread_id.to_string(), + /*state_db_ctx*/ None, + ) + .await?; assert!( - find_thread_path_by_id_str( - codex_home.path(), - &thread_id.to_string(), - /*state_db_ctx*/ None, - ) - .await? - .is_none(), + rollout_path.is_none(), "expected active rollout for {thread_id} to be deleted" ); - assert!( - find_archived_thread_path_by_id_str( - codex_home.path(), - &thread_id.to_string(), - /*state_db_ctx*/ None, - ) - .await? - .is_none(), - "expected archived rollout for {thread_id} to be absent" - ); - assert!( - state_db.get_thread(thread_id).await?.is_none(), - "expected sqlite metadata for {thread_id} to be deleted" - ); } - assert!( - state_db - .list_thread_spawn_descendants(parent_thread_id) - .await? - .is_empty() - ); - assert!( - state_db - .thread_goals() - .get_thread_goal(parent_thread_id) - .await? - .is_none() - ); - assert!( - state_db - .thread_goals() - .get_thread_goal(child_thread_id) - .await? - .is_none() - ); + let descendants = state_db + .list_thread_spawn_descendants(parent_thread_id) + .await?; + assert!(descendants.is_empty()); + for thread_id in [parent_thread_id, child_thread_id] { + let goal = state_db.thread_goals().get_thread_goal(thread_id).await?; + assert!(goal.is_none()); + } Ok(()) } #[tokio::test] async fn thread_delete_rejects_live_ephemeral_thread_without_unloading() -> Result<()> { - let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; - create_config_toml(codex_home.path(), &server.uri())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; - let thread_id = start_ephemeral_thread(&mut mcp).await?; + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + ephemeral: Some(true), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; let delete_id = mcp .send_thread_delete_request(ThreadDeleteParams { - thread_id: thread_id.clone(), + thread_id: thread.id.clone(), }) .await?; let delete_err: JSONRPCError = timeout( @@ -223,89 +177,24 @@ async fn thread_delete_rejects_live_ephemeral_thread_without_unloading() -> Resu mcp.read_stream_until_error_message(RequestId::Integer(delete_id)), ) .await??; - assert_eq!( - delete_err.error.message, - format!("thread is not persisted and cannot be deleted: {thread_id}") + let expected_message = format!( + "thread is not persisted and cannot be deleted: {}", + thread.id ); + assert_eq!(delete_err.error.message, expected_message); - let loaded_thread_ids = loaded_thread_ids(&mut mcp).await?; - assert_eq!(loaded_thread_ids, vec![thread_id]); - - Ok(()) -} - -async fn seed_thread_metadata( - state_db: &StateRuntime, - codex_home: &Path, - thread_id: &str, - filename_ts: &str, - meta_rfc3339: &str, -) -> Result { - let id = ThreadId::from_string(thread_id)?; - let created_at = DateTime::parse_from_rfc3339(meta_rfc3339)?.with_timezone(&Utc); - let mut builder = ThreadMetadataBuilder::new( - id, - rollout_path(codex_home, filename_ts, thread_id), - created_at, - SessionSource::Cli, - ); - builder.updated_at = Some(created_at); - builder.cwd = codex_home.to_path_buf(); - let metadata = builder.build("mock_provider"); - state_db.upsert_thread(&metadata).await?; - Ok(id) -} - -async fn start_ephemeral_thread(mcp: &mut McpProcess) -> Result { - let request_id = mcp - .send_thread_start_request(ThreadStartParams { - model: Some("gpt-5.2".to_string()), - ephemeral: Some(true), - ..Default::default() - }) - .await?; - let response: JSONRPCResponse = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(request_id)), - ) - .await??; - let ThreadStartResponse { thread, .. } = to_response::(response)?; - Ok(thread.id) -} - -async fn loaded_thread_ids(mcp: &mut McpProcess) -> Result> { - let request_id = mcp + let list_id = mcp .send_thread_loaded_list_request(ThreadLoadedListParams::default()) .await?; - let response: JSONRPCResponse = timeout( + let list_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + mcp.read_stream_until_response_message(RequestId::Integer(list_id)), ) .await??; let ThreadLoadedListResponse { mut data, .. } = - to_response::(response)?; + to_response::(list_resp)?; data.sort(); - Ok(data) -} - -fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { - std::fs::write( - codex_home.join("config.toml"), - format!( - r#" -model = "mock-model" -approval_policy = "never" -sandbox_mode = "read-only" - -model_provider = "mock_provider" - -[model_providers.mock_provider] -name = "Mock provider for test" -base_url = "{server_uri}/v1" -wire_api = "responses" -request_max_retries = 0 -stream_max_retries = 0 -"# - ), - ) + assert_eq!(data, vec![thread.id]); + + Ok(()) } diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 765f85ae47..8712da90eb 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -888,9 +888,6 @@ ON CONFLICT(id) DO UPDATE SET .execute(self.pool.as_ref()) .await?; let rows_affected = result.rows_affected(); - if rows_affected == 0 { - return Ok(0); - } if let Err(err) = sqlx::query("DELETE FROM thread_dynamic_tools WHERE thread_id = ?") .bind(thread_id_string.as_str()) @@ -1174,6 +1171,7 @@ mod tests { use crate::DirectionalThreadSpawnEdgeStatus; use crate::runtime::test_support::test_thread_metadata; use crate::runtime::test_support::unique_temp_dir; + use anyhow::Result; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::GitInfo; use codex_protocol::protocol::SessionMeta; @@ -1222,31 +1220,19 @@ mod tests { } #[tokio::test] - async fn delete_thread_cleans_associated_state() { + async fn delete_thread_cleans_associated_state() -> Result<()> { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) - .await - .expect("state db should initialize"); - let thread_id = - ThreadId::from_string("00000000-0000-0000-0000-000000000401").expect("valid thread id"); - let child_thread_id = - ThreadId::from_string("00000000-0000-0000-0000-000000000402").expect("valid thread id"); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()).await?; + let thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000401")?; + let child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000402")?; runtime .upsert_thread(&test_thread_metadata( &codex_home, thread_id, codex_home.clone(), )) - .await - .expect("thread insert should succeed"); - runtime - .upsert_thread_spawn_edge( - thread_id, - child_thread_id, - DirectionalThreadSpawnEdgeStatus::Closed, - ) - .await - .expect("spawn edge insert should succeed"); + .await?; + seed_thread_cleanup_state(&runtime, thread_id, child_thread_id).await?; sqlx::query( r#" INSERT INTO thread_dynamic_tools (thread_id, position, name, description, input_schema) @@ -1259,34 +1245,7 @@ VALUES (?, ?, ?, ?, ?) .bind("test dynamic tool") .bind("{}") .execute(runtime.pool.as_ref()) - .await - .expect("dynamic tool insert should succeed"); - runtime - .thread_goals() - .replace_thread_goal( - thread_id, - "test goal", - crate::ThreadGoalStatus::Active, - /*token_budget*/ None, - ) - .await - .expect("goal insert should succeed"); - runtime - .insert_log(&LogEntry { - ts: 1, - ts_nanos: 0, - level: "INFO".to_string(), - target: "test".to_string(), - message: Some("legacy log".to_string()), - feedback_log_body: Some("feedback log".to_string()), - thread_id: Some(thread_id.to_string()), - process_uuid: Some("process-1".to_string()), - module_path: None, - file: None, - line: None, - }) - .await - .expect("log insert should succeed"); + .await?; runtime .create_agent_job( &AgentJobCreateParams { @@ -1307,66 +1266,26 @@ VALUES (?, ?, ?, ?, ?) row_json: json!({"path": "file-1"}), }], ) - .await - .expect("agent job insert should succeed"); - runtime - .mark_agent_job_running("job-1") - .await - .expect("agent job should mark running"); + .await?; + runtime.mark_agent_job_running("job-1").await?; runtime .mark_agent_job_item_running_with_thread("job-1", "item-1", &thread_id.to_string()) - .await - .expect("agent job item should mark running"); + .await?; - let rows = runtime - .delete_thread(thread_id) - .await - .expect("thread delete should succeed"); + let rows = runtime.delete_thread(thread_id).await?; assert_eq!(rows, 1); - assert!( - runtime - .get_thread(thread_id) - .await - .expect("thread lookup should succeed") - .is_none() - ); + assert!(runtime.get_thread(thread_id).await?.is_none()); let dynamic_tool_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM thread_dynamic_tools WHERE thread_id = ?") .bind(thread_id.to_string()) .fetch_one(runtime.pool.as_ref()) - .await - .expect("dynamic tool count should be readable"); + .await?; assert_eq!(dynamic_tool_count, 0); - let spawn_edge_count: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM thread_spawn_edges WHERE parent_thread_id = ? OR child_thread_id = ?", - ) - .bind(thread_id.to_string()) - .bind(thread_id.to_string()) - .fetch_one(runtime.pool.as_ref()) - .await - .expect("spawn edge count should be readable"); - assert_eq!(spawn_edge_count, 0); - assert!( - runtime - .thread_goals() - .get_thread_goal(thread_id) - .await - .expect("goal lookup should succeed") - .is_none() - ); - let logs = runtime - .query_logs(&LogQuery { - thread_ids: vec![thread_id.to_string()], - ..Default::default() - }) - .await - .expect("log query should succeed"); - assert_eq!(logs.len(), 0); + assert_thread_cleanup_state(&runtime, thread_id).await?; let job_item = runtime .get_agent_job_item("job-1", "item-1") - .await - .expect("job item lookup should succeed") + .await? .expect("job item should exist"); assert_eq!(job_item.status, AgentJobItemStatus::Pending); assert_eq!(job_item.assigned_thread_id, None); @@ -1374,6 +1293,85 @@ VALUES (?, ?, ?, ?, ?) job_item.last_error, Some("assigned thread was deleted".to_string()) ); + Ok(()) + } + + #[tokio::test] + async fn delete_thread_cleans_associated_state_when_thread_row_is_missing() -> Result<()> { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()).await?; + let thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000403")?; + let child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000404")?; + seed_thread_cleanup_state(&runtime, thread_id, child_thread_id).await?; + + let rows = runtime.delete_thread(thread_id).await?; + + assert_eq!(rows, 0); + assert_thread_cleanup_state(&runtime, thread_id).await?; + Ok(()) + } + + async fn seed_thread_cleanup_state( + runtime: &StateRuntime, + thread_id: ThreadId, + child_thread_id: ThreadId, + ) -> Result<()> { + runtime + .upsert_thread_spawn_edge( + thread_id, + child_thread_id, + DirectionalThreadSpawnEdgeStatus::Closed, + ) + .await?; + runtime + .thread_goals() + .replace_thread_goal( + thread_id, + "test goal", + crate::ThreadGoalStatus::Active, + /*token_budget*/ None, + ) + .await?; + runtime + .insert_log(&LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "test".to_string(), + message: Some("legacy log".to_string()), + feedback_log_body: Some("feedback log".to_string()), + thread_id: Some(thread_id.to_string()), + process_uuid: Some("process-1".to_string()), + module_path: None, + file: None, + line: None, + }) + .await?; + Ok(()) + } + + async fn assert_thread_cleanup_state( + runtime: &StateRuntime, + thread_id: ThreadId, + ) -> Result<()> { + let spawn_edge_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM thread_spawn_edges WHERE parent_thread_id = ? OR child_thread_id = ?", + ) + .bind(thread_id.to_string()) + .bind(thread_id.to_string()) + .fetch_one(runtime.pool.as_ref()) + .await?; + assert_eq!(spawn_edge_count, 0); + let goal = runtime.thread_goals().get_thread_goal(thread_id).await?; + assert!(goal.is_none()); + let logs = runtime + .query_logs(&LogQuery { + thread_ids: vec![thread_id.to_string()], + ..Default::default() + }) + .await?; + assert_eq!(logs.len(), 0); + Ok(()) } #[tokio::test] diff --git a/codex-rs/thread-store/src/local/delete_thread.rs b/codex-rs/thread-store/src/local/delete_thread.rs index 5471b1782d..daf96c5265 100644 --- a/codex-rs/thread-store/src/local/delete_thread.rs +++ b/codex-rs/thread-store/src/local/delete_thread.rs @@ -83,6 +83,9 @@ pub(super) async fn delete_thread( match delete_rollout_file(store, rollout_path.as_path(), thread_id) { Ok(deleted) => deleted_rollout_file |= deleted, Err(err) if deleted_state_rows > 0 => { + // Once SQLite deletion commits, rollout cleanup is best effort. If this JSONL + // survives, compatibility read/list paths may rediscover it and repair metadata; + // that is preferable to failing a delete whose state-row commit already succeeded. warn!("failed to delete rollout file for thread {thread_id}: {err}"); } Err(err) => return Err(err), @@ -139,7 +142,6 @@ mod tests { use uuid::Uuid; use super::*; - use crate::ReadThreadParams; use crate::ThreadStore; use crate::local::LocalThreadStore; use crate::local::test_support::test_config; @@ -161,16 +163,6 @@ mod tests { .expect("delete thread"); assert!(!active_path.exists()); - assert!( - store - .read_thread(ReadThreadParams { - thread_id, - include_archived: true, - include_history: false, - }) - .await - .is_err() - ); } #[tokio::test]