Compare commits

...

1 Commits

Author SHA1 Message Date
Brent Traut
a9a750d655 app-server: add cascade archive option 2026-06-01 10:13:31 -07:00
14 changed files with 153 additions and 5 deletions

View File

@@ -1662,6 +1662,7 @@ async fn unrelated_client_requests_are_ignored_by_reducer() {
request_id: RequestId::Integer(3),
params: ThreadArchiveParams {
thread_id: "thread-2".to_string(),
include_descendants: None,
},
}),
},

View File

@@ -115,6 +115,7 @@ fn sample_thread_archive_request() -> ClientRequest {
request_id: RequestId::Integer(3),
params: ThreadArchiveParams {
thread_id: "thread-1".to_string(),
include_descendants: None,
},
}
}

View File

@@ -3076,6 +3076,13 @@
},
"ThreadArchiveParams": {
"properties": {
"includeDescendants": {
"description": "Whether to archive spawned subagent descendants. Defaults to true for compatibility with the legacy `thread/archive` behavior.",
"type": [
"boolean",
"null"
]
},
"threadId": {
"type": "string"
}

View File

@@ -15588,6 +15588,13 @@
"ThreadArchiveParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"includeDescendants": {
"description": "Whether to archive spawned subagent descendants. Defaults to true for compatibility with the legacy `thread/archive` behavior.",
"type": [
"boolean",
"null"
]
},
"threadId": {
"type": "string"
}

View File

@@ -13412,6 +13412,13 @@
"ThreadArchiveParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"includeDescendants": {
"description": "Whether to archive spawned subagent descendants. Defaults to true for compatibility with the legacy `thread/archive` behavior.",
"type": [
"boolean",
"null"
]
},
"threadId": {
"type": "string"
}

View File

@@ -1,6 +1,13 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"includeDescendants": {
"description": "Whether to archive spawned subagent descendants. Defaults to true for compatibility with the legacy `thread/archive` behavior.",
"type": [
"boolean",
"null"
]
},
"threadId": {
"type": "string"
}

View File

@@ -2,4 +2,9 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadArchiveParams = { threadId: string, };
export type ThreadArchiveParams = { threadId: string,
/**
* Whether to archive spawned subagent descendants. Defaults to true for
* compatibility with the legacy `thread/archive` behavior.
*/
includeDescendants?: boolean | null, };

View File

@@ -112,6 +112,22 @@ fn thread_turns_list_params_accepts_items_view() {
assert_eq!(params.items_view, Some(TurnItemsView::NotLoaded));
}
#[test]
fn thread_archive_params_accept_omitted_include_descendants() {
let params = serde_json::from_value::<ThreadArchiveParams>(json!({
"threadId": "thr_123",
}))
.expect("legacy thread archive params should deserialize");
assert_eq!(
params,
ThreadArchiveParams {
thread_id: "thr_123".to_string(),
include_descendants: None,
}
);
}
#[test]
fn thread_resume_params_accept_turns_page_bootstrap() {
let params = serde_json::from_value::<ThreadResumeParams>(json!({

View File

@@ -602,6 +602,10 @@ pub struct ThreadForkResponse {
#[ts(export_to = "v2/")]
pub struct ThreadArchiveParams {
pub thread_id: String,
/// Whether to archive spawned subagent descendants. Defaults to true for
/// compatibility with the legacy `thread/archive` behavior.
#[ts(optional = nullable)]
pub include_descendants: Option<bool>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]

View File

@@ -150,7 +150,7 @@ Example with notification opt-out:
- `thread/goal/cleared` — notification emitted whenever a thread goal is removed.
- `thread/settings/updated` — experimental notification emitted to subscribed clients when a loaded threads effective next-turn settings change; includes `threadId` and the full `threadSettings`.
- `thread/status/changed` — notification emitted when a loaded threads status changes (`threadId` + new `status`).
- `thread/archive` — move a threads rollout file into the archived directory and attempt to move any spawned descendant thread rollout files; returns `{}` on success and emits `thread/archived` for each archived thread.
- `thread/archive` — move a threads rollout file into the archived directory. The optional `includeDescendants` parameter controls whether the server attempts to archive spawned subagent descendants using the server-owned thread graph. It defaults to `true` for compatibility with older clients. Descendant moves are best-effort because thread stores expose single-thread archive operations. Returns `{}` on success and emits `thread/archived` for each archived thread.
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server keeps the thread loaded and unloads it only after it has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed`.
- `thread/name/set` — set or update a threads user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success and emits `thread/name/updated` to initialized, opted-in clients. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`.
@@ -584,10 +584,10 @@ Use `thread/goal/clear` to remove the current goal.
### Example: Archive a thread
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory and attempt to move any spawned descendant thread rollouts.
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory. Set `includeDescendants` to `true` to attempt to archive all spawned subagent descendants using the server-owned thread graph. The field defaults to `true` for compatibility with older clients; set it to `false` to archive only the requested thread.
```json
{ "method": "thread/archive", "id": 21, "params": { "threadId": "thr_b" } }
{ "method": "thread/archive", "id": 21, "params": { "threadId": "thr_b", "includeDescendants": true } }
{ "id": 21, "result": {} }
{ "method": "thread/archived", "params": { "threadId": "thr_b" } }
```

View File

@@ -1311,7 +1311,9 @@ impl ThreadRequestProcessor {
.map_err(|err| invalid_request(format!("invalid session id: {err}")))?;
let mut thread_ids = vec![thread_id];
if let Some(state_db_ctx) = self.state_db.as_ref() {
if params.include_descendants.unwrap_or(true)
&& let Some(state_db_ctx) = self.state_db.as_ref()
{
let descendants = state_db_ctx
.list_thread_spawn_descendants(thread_id)
.await

View File

@@ -73,6 +73,7 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: thread.id.clone(),
include_descendants: None,
})
.await?;
let archive_err: JSONRPCError = timeout(
@@ -128,6 +129,7 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: thread.id.clone(),
include_descendants: None,
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
@@ -227,6 +229,7 @@ async fn thread_archive_archives_spawned_descendants() -> Result<()> {
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent_id.clone(),
include_descendants: None,
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
@@ -278,6 +281,89 @@ async fn thread_archive_archives_spawned_descendants() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_archive_without_descendants_archives_only_requested_thread() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let parent_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-00-00",
"2025-01-01T00:00:00Z",
"parent",
Some("mock_provider"),
/*git_info*/ None,
)?;
let child_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-01-00",
"2025-01-01T00:01:00Z",
"child",
Some("mock_provider"),
/*git_info*/ None,
)?;
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
state_db
.upsert_thread_spawn_edge(
ThreadId::from_string(&parent_id)?,
ThreadId::from_string(&child_id)?,
DirectionalThreadSpawnEdgeStatus::Open,
)
.await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent_id.clone(),
include_descendants: Some(false),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/archived"),
)
.await??;
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
notification
.params
.expect("thread/archived notification params"),
)?;
assert_eq!(archived_notification.thread_id, parent_id);
assert!(
find_archived_thread_path_by_id_str(
codex_home.path(),
&parent_id,
/*state_db_ctx*/ None,
)
.await?
.is_some(),
"parent should be archived"
);
assert!(
find_thread_path_by_id_str(codex_home.path(), &child_id, /*state_db_ctx*/ None)
.await?
.is_some(),
"child should stay active when includeDescendants is false"
);
Ok(())
}
#[tokio::test]
async fn thread_archive_succeeds_when_descendant_archive_fails() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -348,6 +434,7 @@ async fn thread_archive_succeeds_when_descendant_archive_fails() -> Result<()> {
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent_id.clone(),
include_descendants: Some(true),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
@@ -452,6 +539,7 @@ async fn thread_archive_succeeds_when_spawned_descendant_is_missing() -> Result<
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent_id.clone(),
include_descendants: Some(true),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
@@ -545,6 +633,7 @@ async fn thread_archive_clears_stale_subscriptions_before_resume() -> Result<()>
let archive_id = primary
.send_thread_archive_request(ThreadArchiveParams {
thread_id: thread.id.clone(),
include_descendants: None,
})
.await?;
let archive_resp: JSONRPCResponse = timeout(

View File

@@ -110,6 +110,7 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: thread.id.clone(),
include_descendants: None,
})
.await?;
let archive_resp: JSONRPCResponse = timeout(

View File

@@ -576,6 +576,7 @@ impl AppServerSession {
request_id,
params: ThreadArchiveParams {
thread_id: thread_id.to_string(),
include_descendants: None,
},
})
.await