mirror of
https://github.com/openai/codex.git
synced 2026-04-30 09:26:44 +00:00
feat: cascade thread archive (#18112)
Cascade the thread archive endpoint to all the sub-agents in the agent tree Fix: https://github.com/openai/codex/issues/17867 --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
@@ -19,7 +20,11 @@ use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use codex_core::find_archived_thread_path_by_id_str;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_state::DirectionalThreadSpawnEdgeStatus;
|
||||
use codex_state::StateRuntime;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
@@ -160,6 +165,311 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_archives_spawned_descendants() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let child_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-01-00",
|
||||
"2025-01-01T00:01:00Z",
|
||||
"child",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let grandchild_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-02-00",
|
||||
"2025-01-01T00:02:00Z",
|
||||
"grandchild",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let child_thread_id = ThreadId::from_string(&child_id)?;
|
||||
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
child_thread_id,
|
||||
grandchild_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let mut archived_ids = Vec::new();
|
||||
for _ in 0..3 {
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
archived_ids.push(archived_notification.thread_id);
|
||||
}
|
||||
assert_eq!(archived_ids, vec![parent_id, grandchild_id, child_id]);
|
||||
|
||||
for thread_id in [parent_thread_id, child_thread_id, grandchild_thread_id] {
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_none(),
|
||||
"expected active rollout for {thread_id} to be archived"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_some(),
|
||||
"expected archived rollout for {thread_id} to exist"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_succeeds_when_descendant_archive_fails() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let child_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-01-00",
|
||||
"2025-01-01T00:01:00Z",
|
||||
"child",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let grandchild_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-02-00",
|
||||
"2025-01-01T00:02:00Z",
|
||||
"grandchild",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let child_thread_id = ThreadId::from_string(&child_id)?;
|
||||
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
child_thread_id,
|
||||
grandchild_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let child_rollout_path = find_thread_path_by_id_str(codex_home.path(), &child_id)
|
||||
.await?
|
||||
.expect("child rollout path");
|
||||
let archived_child_path = codex_home
|
||||
.path()
|
||||
.join(ARCHIVED_SESSIONS_SUBDIR)
|
||||
.join(child_rollout_path.file_name().expect("rollout file name"));
|
||||
std::fs::create_dir_all(&archived_child_path)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let mut archived_ids = Vec::new();
|
||||
for _ in 0..2 {
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
archived_ids.push(archived_notification.thread_id);
|
||||
}
|
||||
assert_eq!(archived_ids, vec![parent_id, grandchild_id]);
|
||||
|
||||
assert!(
|
||||
timeout(
|
||||
std::time::Duration::from_millis(250),
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
assert!(
|
||||
child_rollout_path.exists(),
|
||||
"child should stay active after descendant archive failure"
|
||||
);
|
||||
assert!(
|
||||
archived_child_path.is_dir(),
|
||||
"test conflict should remain in archived sessions"
|
||||
);
|
||||
for thread_id in [parent_thread_id, grandchild_thread_id] {
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_none(),
|
||||
"expected active rollout for {thread_id} to be archived"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_some(),
|
||||
"expected archived rollout for {thread_id} to exist"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_succeeds_when_spawned_descendant_is_missing() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let missing_child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000901")?;
|
||||
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
missing_child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
assert_eq!(archived_notification.thread_id, parent_id);
|
||||
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &parent_id)
|
||||
.await?
|
||||
.is_none(),
|
||||
"parent should be archived even when a descendant is missing"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &parent_id)
|
||||
.await?
|
||||
.is_some(),
|
||||
"parent should be moved into archived sessions"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_clears_stale_subscriptions_before_resume() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
Reference in New Issue
Block a user