From 88a9c2b339ae8a5037b0313ee776c9714eb6ce4f Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Sun, 31 May 2026 10:55:08 -0700 Subject: [PATCH] Simplify thread delete tests --- codex-rs/app-server/README.md | 4 +- .../tests/suite/v2/remote_thread_store.rs | 60 +++++-------- .../tests/suite/v2/thread_delete.rs | 87 ++++++------------- codex-rs/state/src/runtime/threads.rs | 51 ++++------- .../thread-store/src/local/delete_thread.rs | 60 ++++++------- 5 files changed, 92 insertions(+), 170 deletions(-) diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 025b840d9a..79983b45ba 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -69,7 +69,7 @@ The API exposes three top level primitives representing an interaction between a - **Turn**: One turn of the conversation, typically starting with a user message and finishing with an agent message. Each turn contains multiple items. - **Item**: Represents user inputs and agent outputs as part of the turn, persisted and used as the context for future conversations. Example items include user message, agent reasoning, agent message, shell command, file edit, etc. -Use the thread APIs to create, list, archive, or delete conversations. Drive a conversation with turn APIs and stream progress via turn notifications. +Use the thread APIs to create, list, or archive conversations. Drive a conversation with turn APIs and stream progress via turn notifications. ## Lifecycle Overview @@ -1191,7 +1191,7 @@ All filesystem paths in this section must be absolute. ## Events -Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `thread/archived`, `thread/deleted`, `thread/unarchived`, `thread/closed`, `turn/*`, and `item/*` notifications. +Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `thread/archived`, `thread/unarchived`, `thread/closed`, `turn/*`, and `item/*` notifications. Thread realtime uses a separate thread-scoped notification surface. `thread/realtime/*` notifications are ephemeral transport events, not `ThreadItem`s, and are not returned by `thread/read`, `thread/resume`, or `thread/fork`. diff --git a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs index bfbd56b963..9930e3d457 100644 --- a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs +++ b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs @@ -60,7 +60,7 @@ use uuid::Uuid; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); #[tokio::test] -async fn thread_start_with_non_local_thread_store_does_not_create_local_persistence() -> Result<()> +async fn thread_delete_with_non_local_thread_store_does_not_create_local_persistence() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; @@ -142,40 +142,10 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste assert_eq!(data[0].path, None); delete_thread(&client, /*request_id*/ 4, thread.id.clone()).await?; - - client.shutdown().await?; - - let calls = thread_store.calls().await; - assert_eq!(calls.create_thread, 1); - assert_eq!(calls.list_threads, 1); - assert_eq!(calls.delete_thread, 1); - assert!( - calls.append_items > 0, - "turn/start should append rollout items through the injected store" - ); - assert!( - calls.flush_thread > 0, - "turn completion should flush through the injected store" - ); - - assert_no_local_persistence_artifacts(codex_home.path())?; - - Ok(()) -} - -#[tokio::test] -async fn thread_delete_with_non_local_thread_store_deletes_unloaded_thread_without_sqlite() --> Result<()> { - let codex_home = TempDir::new()?; - let store_id = Uuid::new_v4().to_string(); - create_config_toml_with_thread_store(codex_home.path(), "http://127.0.0.1:1", &store_id)?; - - let thread_store = InMemoryThreadStore::for_id(store_id.clone()); - let _in_memory_store = InMemoryThreadStoreId { store_id }; - let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string())?; + let unloaded_thread_id = ThreadId::from_string(&Uuid::new_v4().to_string())?; thread_store .create_thread(StoreCreateThreadParams { - thread_id, + thread_id: unloaded_thread_id, forked_from_id: None, source: SessionSource::Cli, thread_source: None, @@ -189,16 +159,28 @@ async fn thread_delete_with_non_local_thread_store_deletes_unloaded_thread_witho event_persistence_mode: ThreadEventPersistenceMode::Limited, }) .await?; - - let client = start_in_process_server(codex_home.path()).await?; - - delete_thread(&client, /*request_id*/ 1, thread_id.to_string()).await?; + delete_thread( + &client, + /*request_id*/ 5, + unloaded_thread_id.to_string(), + ) + .await?; client.shutdown().await?; let calls = thread_store.calls().await; - assert_eq!(calls.read_thread, 1); - assert_eq!(calls.delete_thread, 1); + assert_eq!(calls.create_thread, 2); + assert_eq!(calls.list_threads, 1); + assert_eq!(calls.delete_thread, 2); + assert!( + calls.append_items > 0, + "turn/start should append rollout items through the injected store" + ); + assert!( + calls.flush_thread > 0, + "turn completion should flush through the injected store" + ); + assert_no_local_persistence_artifacts(codex_home.path())?; Ok(()) diff --git a/codex-rs/app-server/tests/suite/v2/thread_delete.rs b/codex-rs/app-server/tests/suite/v2/thread_delete.rs index 2df6d77bfe..9c83023b42 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_delete.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_delete.rs @@ -18,41 +18,20 @@ use codex_core::find_thread_path_by_id_str; use codex_protocol::ThreadId; use codex_state::DirectionalThreadSpawnEdgeStatus; use codex_state::StateRuntime; -use codex_state::ThreadGoalStatus; use pretty_assertions::assert_eq; +use std::path::Path; use tempfile::TempDir; use tokio::time::timeout; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); #[tokio::test] -async fn thread_delete_deletes_spawned_descendants_and_metadata() -> Result<()> { +async fn thread_delete_deletes_spawned_descendants() -> Result<()> { let codex_home = TempDir::new()?; - let parent_id = create_fake_rollout( - codex_home.path(), - "2025-01-01T00-00-00", - "2025-01-01T00:00:00Z", - "parent", - Some("mock_provider"), - /*git_info*/ None, - )?; - let child_id = create_fake_rollout( - codex_home.path(), - "2025-01-01T00-01-00", - "2025-01-01T00:01:00Z", - "child", - Some("mock_provider"), - /*git_info*/ None, - )?; - let grandchild_id = create_fake_rollout( - codex_home.path(), - "2025-01-01T00-02-00", - "2025-01-01T00:02:00Z", - "grandchild", - Some("mock_provider"), - /*git_info*/ None, - )?; + let parent_id = create_delete_test_rollout(codex_home.path(), 0, "parent")?; + let child_id = create_delete_test_rollout(codex_home.path(), 1, "child")?; + let grandchild_id = create_delete_test_rollout(codex_home.path(), 2, "grandchild")?; let state_db = StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?; @@ -60,38 +39,22 @@ async fn thread_delete_deletes_spawned_descendants_and_metadata() -> Result<()> let child_thread_id = ThreadId::from_string(&child_id)?; let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?; - state_db - .upsert_thread_spawn_edge( + for (parent, child, status) in [ + ( parent_thread_id, child_thread_id, DirectionalThreadSpawnEdgeStatus::Closed, - ) - .await?; - state_db - .upsert_thread_spawn_edge( + ), + ( child_thread_id, grandchild_thread_id, DirectionalThreadSpawnEdgeStatus::Open, - ) - .await?; - state_db - .thread_goals() - .replace_thread_goal( - parent_thread_id, - "parent goal", - ThreadGoalStatus::Active, - /*token_budget*/ None, - ) - .await?; - state_db - .thread_goals() - .replace_thread_goal( - child_thread_id, - "child goal", - ThreadGoalStatus::Active, - /*token_budget*/ None, - ) - .await?; + ), + ] { + state_db + .upsert_thread_spawn_edge(parent, child, status) + .await?; + } let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; @@ -136,18 +99,20 @@ async fn thread_delete_deletes_spawned_descendants_and_metadata() -> Result<()> "expected active rollout for {thread_id} to be deleted" ); } - let descendants = state_db - .list_thread_spawn_descendants(parent_thread_id) - .await?; - assert!(descendants.is_empty()); - for thread_id in [parent_thread_id, child_thread_id] { - let goal = state_db.thread_goals().get_thread_goal(thread_id).await?; - assert!(goal.is_none()); - } - Ok(()) } +fn create_delete_test_rollout(codex_home: &Path, minute: u8, preview: &str) -> Result { + create_fake_rollout( + codex_home, + &format!("2025-01-01T00-{minute:02}-00"), + &format!("2025-01-01T00:{minute:02}:00Z"), + preview, + Some("mock_provider"), + /*git_info*/ None, + ) +} + #[tokio::test] async fn thread_delete_rejects_live_ephemeral_thread_without_unloading() -> Result<()> { let codex_home = TempDir::new()?; diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 671a82ce57..5e32ed7447 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -1227,12 +1227,7 @@ mod tests { )) .await?; seed_thread_cleanup_state(&runtime, thread_id, child_thread_id).await?; - sqlx::query( - r#" -INSERT INTO thread_dynamic_tools (thread_id, position, name, description, input_schema) -VALUES (?, ?, ?, ?, ?) - "#, - ) + sqlx::query("INSERT INTO thread_dynamic_tools (thread_id, position, name, description, input_schema) VALUES (?, ?, ?, ?, ?)") .bind(thread_id.to_string()) .bind(0_i64) .bind("test_tool") @@ -1287,21 +1282,14 @@ VALUES (?, ?, ?, ?, ?) job_item.last_error, Some("assigned thread was deleted".to_string()) ); - Ok(()) - } - #[tokio::test] - async fn delete_thread_cleans_associated_state_when_thread_row_is_missing() -> Result<()> { - let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()).await?; - let thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000403")?; - let child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000404")?; - seed_thread_cleanup_state(&runtime, thread_id, child_thread_id).await?; + let missing_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000403")?; + let missing_child_thread_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000404")?; + seed_thread_cleanup_state(&runtime, missing_thread_id, missing_child_thread_id).await?; - let rows = runtime.delete_thread(thread_id).await?; - - assert_eq!(rows, 0); - assert_thread_cleanup_state(&runtime, thread_id).await?; + assert_eq!(runtime.delete_thread(missing_thread_id).await?, 0); + assert_thread_cleanup_state(&runtime, missing_thread_id).await?; Ok(()) } @@ -1326,20 +1314,9 @@ VALUES (?, ?, ?, ?, ?) /*token_budget*/ None, ) .await?; - runtime - .insert_log(&LogEntry { - ts: 1, - ts_nanos: 0, - level: "INFO".to_string(), - target: "test".to_string(), - message: Some("legacy log".to_string()), - feedback_log_body: Some("feedback log".to_string()), - thread_id: Some(thread_id.to_string()), - process_uuid: Some("process-1".to_string()), - module_path: None, - file: None, - line: None, - }) + sqlx::query("INSERT INTO logs (ts, ts_nanos, level, target, feedback_log_body, thread_id) VALUES (1, 0, 'INFO', 'test', 'feedback log', ?)") + .bind(thread_id.to_string()) + .execute(runtime.logs_pool.as_ref()) .await?; Ok(()) } @@ -1356,15 +1333,17 @@ VALUES (?, ?, ?, ?, ?) .fetch_one(runtime.pool.as_ref()) .await?; assert_eq!(spawn_edge_count, 0); - let goal = runtime.thread_goals().get_thread_goal(thread_id).await?; - assert!(goal.is_none()); + assert_eq!( + runtime.thread_goals().get_thread_goal(thread_id).await?, + None + ); let logs = runtime .query_logs(&LogQuery { thread_ids: vec![thread_id.to_string()], ..Default::default() }) .await?; - assert_eq!(logs.len(), 0); + assert!(logs.is_empty()); Ok(()) } diff --git a/codex-rs/thread-store/src/local/delete_thread.rs b/codex-rs/thread-store/src/local/delete_thread.rs index cdd2dd2565..7d955067de 100644 --- a/codex-rs/thread-store/src/local/delete_thread.rs +++ b/codex-rs/thread-store/src/local/delete_thread.rs @@ -150,37 +150,35 @@ mod tests { use crate::local::test_support::write_session_file; #[tokio::test] - async fn delete_thread_removes_active_rollout() { + async fn delete_thread_removes_active_and_archived_rollouts() { let home = TempDir::new().expect("temp dir"); let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); - let uuid = Uuid::from_u128(301); - let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); - let active_path = - write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file"); + let cases = [ + ( + Uuid::from_u128(301), + write_session_file(home.path(), "2025-01-03T12-00-00", Uuid::from_u128(301)) + .expect("session file"), + ), + ( + Uuid::from_u128(302), + write_archived_session_file( + home.path(), + "2025-01-03T12-00-00", + Uuid::from_u128(302), + ) + .expect("archived session file"), + ), + ]; - store - .delete_thread(DeleteThreadParams { thread_id }) - .await - .expect("delete thread"); + for (uuid, path) in cases { + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + store + .delete_thread(DeleteThreadParams { thread_id }) + .await + .expect("delete thread"); - assert!(!active_path.exists()); - } - - #[tokio::test] - async fn delete_thread_removes_archived_rollout() { - let home = TempDir::new().expect("temp dir"); - let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); - let uuid = Uuid::from_u128(302); - let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); - let archived_path = write_archived_session_file(home.path(), "2025-01-03T12-00-00", uuid) - .expect("archived session file"); - - store - .delete_thread(DeleteThreadParams { thread_id }) - .await - .expect("delete thread"); - - assert!(!archived_path.exists()); + assert!(!path.exists()); + } } #[tokio::test] @@ -194,13 +192,11 @@ mod tests { .await .expect("state db should initialize"); let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); - let uuid = Uuid::from_u128(303); - let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + let thread_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000303").expect("valid thread id"); let mut builder = codex_state::ThreadMetadataBuilder::new( thread_id, - home.path() - .join("sessions/2025/01/03") - .join(format!("rollout-2025-01-03T12-00-00-{uuid}.jsonl")), + home.path().join("sessions/missing-rollout.jsonl"), Utc::now(), SessionSource::Cli, );