Compare commits

...

6 Commits

Author SHA1 Message Date
jif-oai
7d8b13ae9c make parents authorative 2026-04-17 14:13:21 +01:00
jif-oai
41fa376cc5 Merge branch 'main' into jif/cascade-thread-archive 2026-04-16 14:52:02 +01:00
jif-oai
bf01c21710 Merge branch 'main' into jif/cascade-thread-archive 2026-04-16 13:57:43 +01:00
jif-oai
7fbef76cf3 Merge branch 'main' into jif/cascade-thread-archive 2026-04-16 10:38:08 +01:00
jif-oai
88faf875d2 fix: notify partial cascade archives
Co-authored-by: Codex <noreply@openai.com>
2026-04-16 10:23:43 +01:00
jif-oai
1435addd61 feat: cascade thread archive 2026-04-16 10:01:32 +01:00
4 changed files with 444 additions and 25 deletions

View File

@@ -146,7 +146,7 @@ Example with notification opt-out:
- `thread/memoryMode/set` — experimental; set a threads persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success.
- `memory/reset` — experimental; clear the current `CODEX_HOME/memories` directory and reset persisted memory stage data in sqlite while preserving existing thread memory modes; returns `{}` on success.
- `thread/status/changed` — notification emitted when a loaded threads status changes (`threadId` + new `status`).
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success and emits `thread/archived`.
- `thread/archive` — move a threads rollout file into the archived directory and attempt to move any spawned descendant thread rollout files; returns `{}` on success and emits `thread/archived` for each archived thread.
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server keeps the thread loaded and unloads it only after it has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed`.
- `thread/name/set` — set or update a threads user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success and emits `thread/name/updated` to initialized, opted-in clients. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`.
@@ -426,7 +426,7 @@ Experimental: use `memory/reset` to clear local memory artifacts and sqlite-back
### Example: Archive a thread
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory.
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory and attempt to move any spawned descendant thread rollouts.
```json
{ "method": "thread/archive", "id": 21, "params": { "threadId": "thr_b" } }

View File

@@ -2732,8 +2732,36 @@ impl CodexMessageProcessor {
}
};
let thread_id_str = thread_id.to_string();
if let Err(err) = self
let mut thread_ids = vec![thread_id];
if let Some(state_db_ctx) = get_state_db(&self.config).await {
let descendants = match state_db_ctx.list_thread_spawn_descendants(thread_id).await {
Ok(descendants) => descendants,
Err(err) => {
self.outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to list spawned descendants for thread id {thread_id}: {err}"
),
data: None,
},
)
.await;
return;
}
};
let mut seen = HashSet::from([thread_id]);
for descendant_id in descendants {
if seen.insert(descendant_id) {
thread_ids.push(descendant_id);
}
}
}
let mut archive_thread_ids = Vec::new();
match self
.thread_store
.read_thread(StoreReadThreadParams {
thread_id,
@@ -2742,34 +2770,98 @@ impl CodexMessageProcessor {
})
.await
{
self.outgoing
.send_error(request_id, thread_store_archive_error("archive", err))
.await;
return;
}
self.prepare_thread_for_archive(thread_id).await;
match self
.thread_store
.archive_thread(StoreArchiveThreadParams { thread_id })
.await
{
Ok(()) => {
let response = ThreadArchiveResponse {};
self.outgoing.send_response(request_id, response).await;
let notification = ThreadArchivedNotification {
thread_id: thread_id_str,
};
self.outgoing
.send_server_notification(ServerNotification::ThreadArchived(notification))
.await;
Ok(thread) => {
if thread.archived_at.is_none() {
archive_thread_ids.push(thread_id);
}
}
Err(err) => {
self.outgoing
.send_error(request_id, thread_store_archive_error("archive", err))
.await;
return;
}
}
for descendant_thread_id in thread_ids.into_iter().skip(1) {
match self
.thread_store
.read_thread(StoreReadThreadParams {
thread_id: descendant_thread_id,
include_archived: true,
include_history: false,
})
.await
{
Ok(thread) => {
if thread.archived_at.is_none() {
archive_thread_ids.push(descendant_thread_id);
}
}
Err(err) => {
warn!(
"failed to read spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}"
);
}
}
}
let mut archived_thread_ids = Vec::new();
let Some((parent_thread_id, descendant_thread_ids)) = archive_thread_ids.split_first()
else {
self.outgoing
.send_response(request_id, ThreadArchiveResponse {})
.await;
return;
};
self.prepare_thread_for_archive(*parent_thread_id).await;
match self
.thread_store
.archive_thread(StoreArchiveThreadParams {
thread_id: *parent_thread_id,
})
.await
{
Ok(()) => {
archived_thread_ids.push(parent_thread_id.to_string());
}
Err(err) => {
self.outgoing
.send_error(request_id, thread_store_archive_error("archive", err))
.await;
return;
}
}
for descendant_thread_id in descendant_thread_ids.iter().rev().copied() {
self.prepare_thread_for_archive(descendant_thread_id).await;
match self
.thread_store
.archive_thread(StoreArchiveThreadParams {
thread_id: descendant_thread_id,
})
.await
{
Ok(()) => {
archived_thread_ids.push(descendant_thread_id.to_string());
}
Err(err) => {
warn!(
"failed to archive spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}"
);
}
}
}
self.outgoing
.send_response(request_id, ThreadArchiveResponse {})
.await;
for thread_id in archived_thread_ids {
let notification = ThreadArchivedNotification { thread_id };
self.outgoing
.send_server_notification(ServerNotification::ThreadArchived(notification))
.await;
}
}
async fn thread_increment_elicitation(

View File

@@ -1,5 +1,6 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
@@ -19,7 +20,11 @@ use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput;
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
use codex_protocol::ThreadId;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use codex_state::StateRuntime;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
@@ -160,6 +165,311 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_archive_archives_spawned_descendants() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let parent_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-00-00",
"2025-01-01T00:00:00Z",
"parent",
Some("mock_provider"),
/*git_info*/ None,
)?;
let child_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-01-00",
"2025-01-01T00:01:00Z",
"child",
Some("mock_provider"),
/*git_info*/ None,
)?;
let grandchild_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-02-00",
"2025-01-01T00:02:00Z",
"grandchild",
Some("mock_provider"),
/*git_info*/ None,
)?;
let parent_thread_id = ThreadId::from_string(&parent_id)?;
let child_thread_id = ThreadId::from_string(&child_id)?;
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
state_db
.upsert_thread_spawn_edge(
parent_thread_id,
child_thread_id,
DirectionalThreadSpawnEdgeStatus::Closed,
)
.await?;
state_db
.upsert_thread_spawn_edge(
child_thread_id,
grandchild_thread_id,
DirectionalThreadSpawnEdgeStatus::Open,
)
.await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent_id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
let mut archived_ids = Vec::new();
for _ in 0..3 {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/archived"),
)
.await??;
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
notification
.params
.expect("thread/archived notification params"),
)?;
archived_ids.push(archived_notification.thread_id);
}
assert_eq!(archived_ids, vec![parent_id, grandchild_id, child_id]);
for thread_id in [parent_thread_id, child_thread_id, grandchild_thread_id] {
assert!(
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
.await?
.is_none(),
"expected active rollout for {thread_id} to be archived"
);
assert!(
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
.await?
.is_some(),
"expected archived rollout for {thread_id} to exist"
);
}
Ok(())
}
#[tokio::test]
async fn thread_archive_succeeds_when_descendant_archive_fails() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let parent_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-00-00",
"2025-01-01T00:00:00Z",
"parent",
Some("mock_provider"),
/*git_info*/ None,
)?;
let child_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-01-00",
"2025-01-01T00:01:00Z",
"child",
Some("mock_provider"),
/*git_info*/ None,
)?;
let grandchild_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-02-00",
"2025-01-01T00:02:00Z",
"grandchild",
Some("mock_provider"),
/*git_info*/ None,
)?;
let parent_thread_id = ThreadId::from_string(&parent_id)?;
let child_thread_id = ThreadId::from_string(&child_id)?;
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
state_db
.upsert_thread_spawn_edge(
parent_thread_id,
child_thread_id,
DirectionalThreadSpawnEdgeStatus::Closed,
)
.await?;
state_db
.upsert_thread_spawn_edge(
child_thread_id,
grandchild_thread_id,
DirectionalThreadSpawnEdgeStatus::Open,
)
.await?;
let child_rollout_path = find_thread_path_by_id_str(codex_home.path(), &child_id)
.await?
.expect("child rollout path");
let archived_child_path = codex_home
.path()
.join(ARCHIVED_SESSIONS_SUBDIR)
.join(child_rollout_path.file_name().expect("rollout file name"));
std::fs::create_dir_all(&archived_child_path)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent_id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
let mut archived_ids = Vec::new();
for _ in 0..2 {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/archived"),
)
.await??;
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
notification
.params
.expect("thread/archived notification params"),
)?;
archived_ids.push(archived_notification.thread_id);
}
assert_eq!(archived_ids, vec![parent_id, grandchild_id]);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/archived"),
)
.await
.is_err()
);
assert!(
child_rollout_path.exists(),
"child should stay active after descendant archive failure"
);
assert!(
archived_child_path.is_dir(),
"test conflict should remain in archived sessions"
);
for thread_id in [parent_thread_id, grandchild_thread_id] {
assert!(
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
.await?
.is_none(),
"expected active rollout for {thread_id} to be archived"
);
assert!(
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
.await?
.is_some(),
"expected archived rollout for {thread_id} to exist"
);
}
Ok(())
}
#[tokio::test]
async fn thread_archive_succeeds_when_spawned_descendant_is_missing() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let parent_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-00-00",
"2025-01-01T00:00:00Z",
"parent",
Some("mock_provider"),
/*git_info*/ None,
)?;
let parent_thread_id = ThreadId::from_string(&parent_id)?;
let missing_child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000901")?;
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
state_db
.upsert_thread_spawn_edge(
parent_thread_id,
missing_child_thread_id,
DirectionalThreadSpawnEdgeStatus::Closed,
)
.await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent_id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/archived"),
)
.await??;
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
notification
.params
.expect("thread/archived notification params"),
)?;
assert_eq!(archived_notification.thread_id, parent_id);
assert!(
find_thread_path_by_id_str(codex_home.path(), &parent_id)
.await?
.is_none(),
"parent should be archived even when a descendant is missing"
);
assert!(
find_archived_thread_path_by_id_str(codex_home.path(), &parent_id)
.await?
.is_some(),
"parent should be moved into archived sessions"
);
Ok(())
}
#[tokio::test]
async fn thread_archive_clears_stale_subscriptions_before_resume() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;

View File

@@ -144,6 +144,17 @@ ON CONFLICT(child_thread_id) DO UPDATE SET
.await
}
/// List all spawned descendants of `root_thread_id`.
///
/// Descendants are returned breadth-first by depth, then by thread id for stable ordering.
pub async fn list_thread_spawn_descendants(
&self,
root_thread_id: ThreadId,
) -> anyhow::Result<Vec<ThreadId>> {
self.list_thread_spawn_descendants_matching(root_thread_id, /*status*/ None)
.await
}
/// Find a direct spawned child of `parent_thread_id` by canonical agent path.
pub async fn find_thread_spawn_child_by_path(
&self,
@@ -1655,5 +1666,11 @@ mod tests {
.await
.expect("open descendants from child should load");
assert_eq!(open_descendants_from_child, vec![grandchild_thread_id]);
let all_descendants = runtime
.list_thread_spawn_descendants(parent_thread_id)
.await
.expect("all descendants should load");
assert_eq!(all_descendants, vec![child_thread_id, grandchild_thread_id]);
}
}