Compare commits

...

1 Commits

Author SHA1 Message Date
Brent Traut
ec6055bbd9 feat(app-server): expose persisted subagent lifecycle state 2026-05-30 11:12:42 -07:00
20 changed files with 785 additions and 0 deletions

View File

@@ -3898,6 +3898,13 @@
],
"type": "string"
},
"ThreadSubagentLifecycleStatus": {
"enum": [
"open",
"closed"
],
"type": "string"
},
"ThreadUnarchiveParams": {
"properties": {
"threadId": {

View File

@@ -18000,6 +18000,32 @@
"title": "ThreadStatusChangedNotification",
"type": "object"
},
"ThreadSubagent": {
"properties": {
"childThreadId": {
"type": "string"
},
"lifecycleStatus": {
"$ref": "#/definitions/v2/ThreadSubagentLifecycleStatus"
},
"parentThreadId": {
"type": "string"
}
},
"required": [
"childThreadId",
"lifecycleStatus",
"parentThreadId"
],
"type": "object"
},
"ThreadSubagentLifecycleStatus": {
"enum": [
"open",
"closed"
],
"type": "string"
},
"ThreadTokenUsage": {
"properties": {
"last": {

View File

@@ -15824,6 +15824,32 @@
"title": "ThreadStatusChangedNotification",
"type": "object"
},
"ThreadSubagent": {
"properties": {
"childThreadId": {
"type": "string"
},
"lifecycleStatus": {
"$ref": "#/definitions/ThreadSubagentLifecycleStatus"
},
"parentThreadId": {
"type": "string"
}
},
"required": [
"childThreadId",
"lifecycleStatus",
"parentThreadId"
],
"type": "object"
},
"ThreadSubagentLifecycleStatus": {
"enum": [
"open",
"closed"
],
"type": "string"
},
"ThreadTokenUsage": {
"properties": {
"last": {

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ThreadSubagentLifecycleStatus } from "./ThreadSubagentLifecycleStatus";
export type ThreadSubagent = { childThreadId: string, parentThreadId: string, lifecycleStatus: ThreadSubagentLifecycleStatus, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadSubagentLifecycleStatus = "open" | "closed";

View File

@@ -421,6 +421,8 @@ export type { ThreadStartSource } from "./ThreadStartSource";
export type { ThreadStartedNotification } from "./ThreadStartedNotification";
export type { ThreadStatus } from "./ThreadStatus";
export type { ThreadStatusChangedNotification } from "./ThreadStatusChangedNotification";
export type { ThreadSubagent } from "./ThreadSubagent";
export type { ThreadSubagentLifecycleStatus } from "./ThreadSubagentLifecycleStatus";
export type { ThreadTokenUsage } from "./ThreadTokenUsage";
export type { ThreadTokenUsageUpdatedNotification } from "./ThreadTokenUsageUpdatedNotification";
export type { ThreadUnarchiveParams } from "./ThreadUnarchiveParams";

View File

@@ -580,6 +580,18 @@ client_request_definitions! {
serialization: None,
response: v2::ThreadLoadedListResponse,
},
#[experimental("thread/subagents/list")]
ThreadSubagentsList => "thread/subagents/list" {
params: v2::ThreadSubagentsListParams,
serialization: None,
response: v2::ThreadSubagentsListResponse,
},
#[experimental("thread/subagents/read")]
ThreadSubagentsRead => "thread/subagents/read" {
params: v2::ThreadSubagentsReadParams,
serialization: None,
response: v2::ThreadSubagentsReadResponse,
},
ThreadRead => "thread/read" {
params: v2::ThreadReadParams,
serialization: thread_id(params.thread_id),

View File

@@ -1146,6 +1146,63 @@ pub struct ThreadLoadedListResponse {
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase", export_to = "v2/")]
pub enum ThreadSubagentLifecycleStatus {
Open,
Closed,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSubagent {
pub child_thread_id: String,
pub parent_thread_id: String,
pub lifecycle_status: ThreadSubagentLifecycleStatus,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSubagentsListParams {
pub parent_thread_id: String,
/// Opaque pagination cursor returned by a previous call.
#[ts(optional = nullable)]
pub cursor: Option<String>,
/// Optional page size; defaults to a reasonable server-side value.
#[ts(optional = nullable)]
pub limit: Option<u32>,
/// Optional durable spawn-edge lifecycle filter. Omit to include every status.
#[ts(optional = nullable)]
pub lifecycle_statuses: Option<Vec<ThreadSubagentLifecycleStatus>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSubagentsListResponse {
pub data: Vec<ThreadSubagent>,
/// Opaque cursor to pass to the next call to continue after the last item.
/// If None, there are no more items to return.
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSubagentsReadParams {
pub child_thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSubagentsReadResponse {
pub subagent: Option<ThreadSubagent>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "camelCase")]
#[ts(tag = "type")]

View File

@@ -136,6 +136,8 @@ Example with notification opt-out:
- `thread/start`, `thread/resume`, and `thread/fork` responses include the legacy `sandbox` compatibility projection. Experimental clients can read `runtimeWorkspaceRoots` for the thread-scoped runtime roots and `activePermissionProfile` for the named or implicit built-in profile identity/provenance when known.
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/subagents/list` — experimental; page through a thread's direct persisted subagent children without reading rollout transcripts or resuming threads. Supports an optional durable spawn-edge `lifecycleStatuses` filter.
- `thread/subagents/read` — experimental; read one persisted subagent spawn edge by child thread id without reading rollout transcripts or resuming threads.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
- `thread/turns/list` — experimental; page through a stored threads turn history without resuming it; supports cursor-based pagination with `sortDirection`, `itemsView`, `nextCursor`, and `backwardsCursor`.
- `thread/turns/items/list` — experimental; reserved for paging full items for one turn. The API shape is present, but app-server currently returns an unsupported-method JSON-RPC error.
@@ -368,6 +370,35 @@ Example:
When `nextCursor` is `null`, youve reached the final page.
### Example: List and read thread subagents (experimental)
Use `thread/subagents/list` with `capabilities.experimentalApi = true` to page through direct persisted subagent children without resuming threads or reading JSONL rollout transcripts. Results are ordered by `childThreadId`. Pass the opaque `nextCursor` from one response into the next request. Omit `lifecycleStatuses` to include every durable spawn-edge lifecycle status, or filter with `"open"` and `"closed"`.
`ThreadSubagent.lifecycleStatus` is distinct from `ThreadStatus`: subagent lifecycle is durable spawn-edge state, while thread status describes runtime activity such as `active`, `idle`, `systemError`, or `notLoaded`. Descendants are not returned recursively.
```json
{ "method": "thread/subagents/list", "id": 21, "params": {
"parentThreadId": "00000000-0000-0000-0000-000000000100",
"limit": 25,
"lifecycleStatuses": ["open", "closed"]
} }
{ "id": 21, "result": {
"data": [
{ "childThreadId": "00000000-0000-0000-0000-000000000101", "parentThreadId": "00000000-0000-0000-0000-000000000100", "lifecycleStatus": "open" }
],
"nextCursor": null
} }
```
Use `thread/subagents/read` for an O(1) lookup by child edge primary key. A child without a persisted spawn edge returns `{ "subagent": null }`.
```json
{ "method": "thread/subagents/read", "id": 22, "params": { "childThreadId": "00000000-0000-0000-0000-000000000101" } }
{ "id": 22, "result": {
"subagent": { "childThreadId": "00000000-0000-0000-0000-000000000101", "parentThreadId": "00000000-0000-0000-0000-000000000100", "lifecycleStatus": "open" }
} }
```
### Example: List loaded threads
`thread/loaded/list` returns thread ids currently loaded in memory. This is useful when you want to check which sessions are active without scanning rollouts on disk.

View File

@@ -1085,6 +1085,12 @@ impl MessageProcessor {
ClientRequest::ThreadLoadedList { params, .. } => {
self.thread_processor.thread_loaded_list(params).await
}
ClientRequest::ThreadSubagentsList { params, .. } => {
self.thread_processor.thread_subagents_list(params).await
}
ClientRequest::ThreadSubagentsRead { params, .. } => {
self.thread_processor.thread_subagents_read(params).await
}
ClientRequest::ThreadRead { params, .. } => {
self.thread_processor.thread_read(params).await
}

View File

@@ -235,6 +235,12 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadSubagent;
use codex_app_server_protocol::ThreadSubagentLifecycleStatus;
use codex_app_server_protocol::ThreadSubagentsListParams;
use codex_app_server_protocol::ThreadSubagentsListResponse;
use codex_app_server_protocol::ThreadSubagentsReadParams;
use codex_app_server_protocol::ThreadSubagentsReadResponse;
use codex_app_server_protocol::ThreadTurnsItemsListParams;
use codex_app_server_protocol::ThreadTurnsListParams;
use codex_app_server_protocol::ThreadTurnsListResponse;

View File

@@ -1,5 +1,7 @@
use super::*;
use crate::error_code::method_not_found;
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use codex_protocol::models::BUILT_IN_PERMISSION_PROFILE_DANGER_FULL_ACCESS;
use codex_protocol::models::BUILT_IN_PERMISSION_PROFILE_WORKSPACE;
@@ -607,6 +609,24 @@ impl ThreadRequestProcessor {
.map(|response| Some(response.into()))
}
pub(crate) async fn thread_subagents_list(
&self,
params: ThreadSubagentsListParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
self.thread_subagents_list_response_inner(params)
.await
.map(|response| Some(response.into()))
}
pub(crate) async fn thread_subagents_read(
&self,
params: ThreadSubagentsReadParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
self.thread_subagents_read_response_inner(params)
.await
.map(|response| Some(response.into()))
}
pub(crate) async fn thread_read(
&self,
params: ThreadReadParams,
@@ -2065,6 +2085,89 @@ impl ThreadRequestProcessor {
Ok(ThreadReadResponse { thread })
}
async fn thread_subagents_list_response_inner(
&self,
params: ThreadSubagentsListParams,
) -> Result<ThreadSubagentsListResponse, JSONRPCErrorError> {
let ThreadSubagentsListParams {
parent_thread_id,
cursor,
limit,
lifecycle_statuses,
} = params;
let parent_thread_id = ThreadId::from_string(&parent_thread_id)
.map_err(|err| invalid_request(format!("invalid parent thread id: {err}")))?;
let cursor = cursor
.as_deref()
.map(|cursor| {
URL_SAFE_NO_PAD
.decode(cursor)
.ok()
.and_then(|bytes| String::from_utf8(bytes).ok())
.and_then(|child_thread_id| ThreadId::from_string(&child_thread_id).ok())
.ok_or_else(|| invalid_request(format!("invalid cursor: {cursor}")))
})
.transpose()?;
let page_size = limit.unwrap_or(THREAD_LIST_DEFAULT_LIMIT as u32);
if !(1..=THREAD_LIST_MAX_LIMIT as u32).contains(&page_size) {
return Err(invalid_request(format!(
"limit must be between 1 and {THREAD_LIST_MAX_LIMIT}"
)));
}
let lifecycle_statuses = lifecycle_statuses.map(|statuses| {
statuses
.into_iter()
.map(|status| match status {
ThreadSubagentLifecycleStatus::Open => {
codex_state::DirectionalThreadSpawnEdgeStatus::Open
}
ThreadSubagentLifecycleStatus::Closed => {
codex_state::DirectionalThreadSpawnEdgeStatus::Closed
}
})
.collect::<Vec<_>>()
});
let state_db = self.state_db.as_ref().ok_or_else(|| {
internal_error("sqlite state db unavailable for thread/subagents/list")
})?;
let page = state_db
.list_thread_spawn_children_page(
parent_thread_id,
cursor,
page_size as usize,
lifecycle_statuses.as_deref(),
)
.await
.map_err(|err| internal_error(format!("failed to list thread subagents: {err}")))?;
Ok(ThreadSubagentsListResponse {
data: page
.items
.into_iter()
.map(thread_subagent_from_edge)
.collect(),
next_cursor: page
.next_cursor
.map(|child_thread_id| URL_SAFE_NO_PAD.encode(child_thread_id.to_string())),
})
}
async fn thread_subagents_read_response_inner(
&self,
params: ThreadSubagentsReadParams,
) -> Result<ThreadSubagentsReadResponse, JSONRPCErrorError> {
let child_thread_id = ThreadId::from_string(&params.child_thread_id)
.map_err(|err| invalid_request(format!("invalid child thread id: {err}")))?;
let state_db = self.state_db.as_ref().ok_or_else(|| {
internal_error("sqlite state db unavailable for thread/subagents/read")
})?;
let subagent = state_db
.get_thread_spawn_edge(child_thread_id)
.await
.map_err(|err| internal_error(format!("failed to read thread subagent: {err}")))?
.map(thread_subagent_from_edge);
Ok(ThreadSubagentsReadResponse { subagent })
}
/// Builds the API view for `thread/read` from persisted metadata plus optional live state.
async fn read_thread_view(
&self,
@@ -3818,6 +3921,21 @@ fn normalize_thread_turns_status(
}
}
fn thread_subagent_from_edge(edge: codex_state::ThreadSpawnEdge) -> ThreadSubagent {
ThreadSubagent {
child_thread_id: edge.child_thread_id.to_string(),
parent_thread_id: edge.parent_thread_id.to_string(),
lifecycle_status: match edge.status {
codex_state::DirectionalThreadSpawnEdgeStatus::Open => {
ThreadSubagentLifecycleStatus::Open
}
codex_state::DirectionalThreadSpawnEdgeStatus::Closed => {
ThreadSubagentLifecycleStatus::Closed
}
},
}
}
enum ThreadReadViewError {
InvalidRequest(String),
Unsupported(&'static str),

View File

@@ -94,6 +94,8 @@ use codex_app_server_protocol::ThreadSetNameParams;
use codex_app_server_protocol::ThreadSettingsUpdateParams;
use codex_app_server_protocol::ThreadShellCommandParams;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadSubagentsListParams;
use codex_app_server_protocol::ThreadSubagentsReadParams;
use codex_app_server_protocol::ThreadTurnsItemsListParams;
use codex_app_server_protocol::ThreadTurnsListParams;
use codex_app_server_protocol::ThreadUnarchiveParams;
@@ -551,6 +553,24 @@ impl McpProcess {
self.send_request("thread/loaded/list", params).await
}
/// Send a `thread/subagents/list` JSON-RPC request.
pub async fn send_thread_subagents_list_request(
&mut self,
params: ThreadSubagentsListParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/subagents/list", params).await
}
/// Send a `thread/subagents/read` JSON-RPC request.
pub async fn send_thread_subagents_read_request(
&mut self,
params: ThreadSubagentsReadParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/subagents/read", params).await
}
/// Send a `thread/read` JSON-RPC request.
pub async fn send_thread_read_request(
&mut self,

View File

@@ -62,6 +62,7 @@ mod thread_settings_update;
mod thread_shell_command;
mod thread_start;
mod thread_status;
mod thread_subagents;
mod thread_unarchive;
mod thread_unsubscribe;
mod turn_interrupt;

View File

@@ -0,0 +1,270 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadSubagent;
use codex_app_server_protocol::ThreadSubagentLifecycleStatus;
use codex_app_server_protocol::ThreadSubagentsListParams;
use codex_app_server_protocol::ThreadSubagentsListResponse;
use codex_app_server_protocol::ThreadSubagentsReadParams;
use codex_app_server_protocol::ThreadSubagentsReadResponse;
use codex_protocol::ThreadId;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use codex_state::StateRuntime;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_subagents_list_and_read_use_persisted_edges_without_rollouts() -> Result<()> {
let codex_home = TempDir::new()?;
let parent_thread_id = thread_id(/*value*/ 100)?;
let open_child_thread_id = thread_id(/*value*/ 101)?;
let closed_child_thread_id = thread_id(/*value*/ 102)?;
let grandchild_thread_id = thread_id(/*value*/ 103)?;
let missing_child_thread_id = thread_id(/*value*/ 104)?;
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
state_db
.upsert_thread_spawn_edge(
parent_thread_id,
open_child_thread_id,
DirectionalThreadSpawnEdgeStatus::Open,
)
.await?;
state_db
.upsert_thread_spawn_edge(
parent_thread_id,
closed_child_thread_id,
DirectionalThreadSpawnEdgeStatus::Closed,
)
.await?;
state_db
.upsert_thread_spawn_edge(
open_child_thread_id,
grandchild_thread_id,
DirectionalThreadSpawnEdgeStatus::Open,
)
.await?;
assert!(!codex_home.path().join("sessions").exists());
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let first_page = list_subagents(
&mut mcp,
ThreadSubagentsListParams {
limit: Some(1),
..list_params(parent_thread_id)
},
)
.await?;
let next_cursor = first_page.next_cursor.expect("first page cursor");
assert_ne!(next_cursor, open_child_thread_id.to_string());
assert_eq!(
first_page.data,
vec![subagent(
parent_thread_id,
open_child_thread_id,
ThreadSubagentLifecycleStatus::Open,
)]
);
let second_page = list_subagents(
&mut mcp,
ThreadSubagentsListParams {
cursor: Some(next_cursor),
limit: Some(1),
..list_params(parent_thread_id)
},
)
.await?;
assert_eq!(
second_page,
ThreadSubagentsListResponse {
data: vec![subagent(
parent_thread_id,
closed_child_thread_id,
ThreadSubagentLifecycleStatus::Closed,
)],
next_cursor: None,
}
);
assert_eq!(
list_subagents(
&mut mcp,
ThreadSubagentsListParams {
lifecycle_statuses: Some(vec![ThreadSubagentLifecycleStatus::Open]),
..list_params(parent_thread_id)
},
)
.await?,
ThreadSubagentsListResponse {
data: vec![subagent(
parent_thread_id,
open_child_thread_id,
ThreadSubagentLifecycleStatus::Open,
)],
next_cursor: None,
}
);
assert_eq!(
list_subagents(
&mut mcp,
ThreadSubagentsListParams {
lifecycle_statuses: Some(vec![ThreadSubagentLifecycleStatus::Closed]),
..list_params(parent_thread_id)
},
)
.await?,
ThreadSubagentsListResponse {
data: vec![subagent(
parent_thread_id,
closed_child_thread_id,
ThreadSubagentLifecycleStatus::Closed,
)],
next_cursor: None,
}
);
for (child_thread_id, expected) in [
(
open_child_thread_id,
Some(subagent(
parent_thread_id,
open_child_thread_id,
ThreadSubagentLifecycleStatus::Open,
)),
),
(
closed_child_thread_id,
Some(subagent(
parent_thread_id,
closed_child_thread_id,
ThreadSubagentLifecycleStatus::Closed,
)),
),
(missing_child_thread_id, None),
] {
assert_eq!(read_subagent(&mut mcp, child_thread_id).await?, expected);
}
assert!(!codex_home.path().join("sessions").exists());
Ok(())
}
#[tokio::test]
async fn thread_subagents_reject_malformed_ids_cursor_and_limits() -> Result<()> {
let codex_home = TempDir::new()?;
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
for params in [
ThreadSubagentsListParams {
parent_thread_id: "not-a-thread-id".to_string(),
cursor: None,
limit: None,
lifecycle_statuses: None,
},
ThreadSubagentsListParams {
cursor: Some("not-a-cursor".to_string()),
..list_params(thread_id(/*value*/ 200)?)
},
ThreadSubagentsListParams {
limit: Some(0),
..list_params(thread_id(/*value*/ 200)?)
},
ThreadSubagentsListParams {
limit: Some(101),
..list_params(thread_id(/*value*/ 200)?)
},
] {
let request_id = mcp.send_thread_subagents_list_request(params).await?;
assert_invalid_request(&mut mcp, request_id).await?;
}
let request_id = mcp
.send_thread_subagents_read_request(ThreadSubagentsReadParams {
child_thread_id: "not-a-thread-id".to_string(),
})
.await?;
assert_invalid_request(&mut mcp, request_id).await?;
Ok(())
}
async fn list_subagents(
mcp: &mut McpProcess,
params: ThreadSubagentsListParams,
) -> Result<ThreadSubagentsListResponse> {
let request_id = mcp.send_thread_subagents_list_request(params).await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
to_response(response)
}
async fn read_subagent(
mcp: &mut McpProcess,
child_thread_id: ThreadId,
) -> Result<Option<ThreadSubagent>> {
let request_id = mcp
.send_thread_subagents_read_request(ThreadSubagentsReadParams {
child_thread_id: child_thread_id.to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
Ok(to_response::<ThreadSubagentsReadResponse>(response)?.subagent)
}
async fn assert_invalid_request(mcp: &mut McpProcess, request_id: i64) -> Result<()> {
let error: JSONRPCError = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(error.error.code, -32600);
Ok(())
}
fn subagent(
parent_thread_id: ThreadId,
child_thread_id: ThreadId,
lifecycle_status: ThreadSubagentLifecycleStatus,
) -> ThreadSubagent {
ThreadSubagent {
child_thread_id: child_thread_id.to_string(),
parent_thread_id: parent_thread_id.to_string(),
lifecycle_status,
}
}
fn list_params(parent_thread_id: ThreadId) -> ThreadSubagentsListParams {
ThreadSubagentsListParams {
parent_thread_id: parent_thread_id.to_string(),
cursor: None,
limit: None,
lifecycle_statuses: None,
}
}
fn thread_id(value: u128) -> Result<ThreadId> {
ThreadId::from_string(&uuid::Uuid::from_u128(value).to_string()).map_err(Into::into)
}

View File

@@ -0,0 +1,2 @@
CREATE INDEX idx_thread_spawn_edges_parent_child_status
ON thread_spawn_edges(parent_thread_id, child_thread_id, status);

View File

@@ -50,6 +50,8 @@ pub use model::ThreadGoal;
pub use model::ThreadGoalStatus;
pub use model::ThreadMetadata;
pub use model::ThreadMetadataBuilder;
pub use model::ThreadSpawnEdge;
pub use model::ThreadSpawnEdgesPage;
pub use model::ThreadsPage;
pub use runtime::GoalAccountingMode;
pub use runtime::GoalAccountingOutcome;

View File

@@ -1,3 +1,4 @@
use codex_protocol::ThreadId;
use strum::AsRefStr;
use strum::Display;
use strum::EnumString;
@@ -9,3 +10,18 @@ pub enum DirectionalThreadSpawnEdgeStatus {
Open,
Closed,
}
/// Persisted directional parent-child edge for a spawned thread.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ThreadSpawnEdge {
pub parent_thread_id: ThreadId,
pub child_thread_id: ThreadId,
pub status: DirectionalThreadSpawnEdgeStatus,
}
/// Keyset-paginated direct children from the persisted thread spawn graph.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ThreadSpawnEdgesPage {
pub items: Vec<ThreadSpawnEdge>,
pub next_cursor: Option<ThreadId>,
}

View File

@@ -16,6 +16,8 @@ pub use agent_job::AgentJobStatus;
pub use backfill_state::BackfillState;
pub use backfill_state::BackfillStatus;
pub use graph::DirectionalThreadSpawnEdgeStatus;
pub use graph::ThreadSpawnEdge;
pub use graph::ThreadSpawnEdgesPage;
pub use log::LogEntry;
pub use log::LogQuery;
pub use log::LogRow;

View File

@@ -115,6 +115,78 @@ ON CONFLICT(child_thread_id) DO UPDATE SET
Ok(())
}
/// Look up a spawned thread's incoming edge by child id.
pub async fn get_thread_spawn_edge(
&self,
child_thread_id: ThreadId,
) -> anyhow::Result<Option<crate::ThreadSpawnEdge>> {
let row = sqlx::query(
r#"
SELECT parent_thread_id, child_thread_id, status
FROM thread_spawn_edges
WHERE child_thread_id = ?
"#,
)
.bind(child_thread_id.to_string())
.fetch_optional(self.pool.as_ref())
.await?;
row.map(thread_spawn_edge_from_row).transpose()
}
/// List a page of direct spawned children ordered by child id.
///
/// `statuses = None` includes every persisted lifecycle status. An empty
/// status slice intentionally matches no edges.
pub async fn list_thread_spawn_children_page(
&self,
parent_thread_id: ThreadId,
cursor: Option<ThreadId>,
page_size: usize,
statuses: Option<&[crate::DirectionalThreadSpawnEdgeStatus]>,
) -> anyhow::Result<crate::ThreadSpawnEdgesPage> {
if statuses.is_some_and(<[crate::DirectionalThreadSpawnEdgeStatus]>::is_empty) {
return Ok(crate::ThreadSpawnEdgesPage {
items: Vec::new(),
next_cursor: None,
});
}
let mut builder = QueryBuilder::<Sqlite>::new(
"SELECT parent_thread_id, child_thread_id, status FROM thread_spawn_edges WHERE parent_thread_id = ",
);
builder.push_bind(parent_thread_id.to_string());
if let Some(cursor) = cursor {
builder
.push(" AND child_thread_id > ")
.push_bind(cursor.to_string());
}
if let Some(statuses) = statuses {
builder.push(" AND status IN (");
let mut separated = builder.separated(", ");
for status in statuses {
separated.push_bind(status.as_ref());
}
separated.push_unseparated(")");
}
builder
.push(" ORDER BY child_thread_id LIMIT ")
.push_bind(page_size.saturating_add(1) as i64);
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
let mut items = rows
.into_iter()
.map(thread_spawn_edge_from_row)
.collect::<anyhow::Result<Vec<_>>>()?;
let has_more = items.len() > page_size;
items.truncate(page_size);
let next_cursor = if has_more {
items.last().map(|edge| edge.child_thread_id)
} else {
None
};
Ok(crate::ThreadSpawnEdgesPage { items, next_cursor })
}
/// List direct spawned children of `parent_thread_id` whose edge matches `status`.
pub async fn list_thread_spawn_children_with_status(
&self,
@@ -1085,6 +1157,18 @@ fn metadata_preview(metadata: &crate::ThreadMetadata) -> &str {
.unwrap_or_default()
}
fn thread_spawn_edge_from_row(
row: sqlx::sqlite::SqliteRow,
) -> anyhow::Result<crate::ThreadSpawnEdge> {
Ok(crate::ThreadSpawnEdge {
parent_thread_id: ThreadId::try_from(row.try_get::<String, _>("parent_thread_id")?)?,
child_thread_id: ThreadId::try_from(row.try_get::<String, _>("child_thread_id")?)?,
status: row
.try_get::<String, _>("status")?
.parse::<crate::DirectionalThreadSpawnEdgeStatus>()?,
})
}
#[cfg(test)]
mod tests {
use super::*;
@@ -2030,4 +2114,90 @@ INSERT INTO thread_spawn_edges (
]
);
}
#[tokio::test]
async fn thread_spawn_children_page_uses_child_id_keyset() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
.await
.expect("state db should initialize");
let parent_thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000920").expect("valid thread id");
let child_a_thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000921").expect("valid thread id");
let child_b_thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000922").expect("valid thread id");
let child_c_thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000923").expect("valid thread id");
for (child_thread_id, status) in [
(child_a_thread_id, DirectionalThreadSpawnEdgeStatus::Open),
(child_b_thread_id, DirectionalThreadSpawnEdgeStatus::Closed),
(child_c_thread_id, DirectionalThreadSpawnEdgeStatus::Open),
] {
runtime
.upsert_thread_spawn_edge(parent_thread_id, child_thread_id, status)
.await
.expect("child edge insert should succeed");
}
let first_page = runtime
.list_thread_spawn_children_page(
parent_thread_id,
/*cursor*/ None,
/*page_size*/ 2,
/*statuses*/ None,
)
.await
.expect("first page should load");
assert_eq!(
first_page,
crate::ThreadSpawnEdgesPage {
items: vec![
crate::ThreadSpawnEdge {
parent_thread_id,
child_thread_id: child_a_thread_id,
status: DirectionalThreadSpawnEdgeStatus::Open,
},
crate::ThreadSpawnEdge {
parent_thread_id,
child_thread_id: child_b_thread_id,
status: DirectionalThreadSpawnEdgeStatus::Closed,
},
],
next_cursor: Some(child_b_thread_id),
}
);
let second_page = runtime
.list_thread_spawn_children_page(
parent_thread_id,
first_page.next_cursor,
/*page_size*/ 2,
/*statuses*/ None,
)
.await
.expect("second page should load");
assert_eq!(
second_page,
crate::ThreadSpawnEdgesPage {
items: vec![crate::ThreadSpawnEdge {
parent_thread_id,
child_thread_id: child_c_thread_id,
status: DirectionalThreadSpawnEdgeStatus::Open,
}],
next_cursor: None,
}
);
assert_eq!(
runtime
.get_thread_spawn_edge(child_b_thread_id)
.await
.expect("edge lookup should succeed"),
Some(crate::ThreadSpawnEdge {
parent_thread_id,
child_thread_id: child_b_thread_id,
status: DirectionalThreadSpawnEdgeStatus::Closed,
})
);
}
}