mirror of
https://github.com/openai/codex.git
synced 2026-06-04 04:12:03 +00:00
Compare commits
1 Commits
cconger/co
...
btraut/thr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec6055bbd9 |
@@ -3898,6 +3898,13 @@
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"ThreadSubagentLifecycleStatus": {
|
||||
"enum": [
|
||||
"open",
|
||||
"closed"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"ThreadUnarchiveParams": {
|
||||
"properties": {
|
||||
"threadId": {
|
||||
|
||||
@@ -18000,6 +18000,32 @@
|
||||
"title": "ThreadStatusChangedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadSubagent": {
|
||||
"properties": {
|
||||
"childThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"lifecycleStatus": {
|
||||
"$ref": "#/definitions/v2/ThreadSubagentLifecycleStatus"
|
||||
},
|
||||
"parentThreadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"childThreadId",
|
||||
"lifecycleStatus",
|
||||
"parentThreadId"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadSubagentLifecycleStatus": {
|
||||
"enum": [
|
||||
"open",
|
||||
"closed"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"ThreadTokenUsage": {
|
||||
"properties": {
|
||||
"last": {
|
||||
|
||||
@@ -15824,6 +15824,32 @@
|
||||
"title": "ThreadStatusChangedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadSubagent": {
|
||||
"properties": {
|
||||
"childThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"lifecycleStatus": {
|
||||
"$ref": "#/definitions/ThreadSubagentLifecycleStatus"
|
||||
},
|
||||
"parentThreadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"childThreadId",
|
||||
"lifecycleStatus",
|
||||
"parentThreadId"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadSubagentLifecycleStatus": {
|
||||
"enum": [
|
||||
"open",
|
||||
"closed"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"ThreadTokenUsage": {
|
||||
"properties": {
|
||||
"last": {
|
||||
|
||||
6
codex-rs/app-server-protocol/schema/typescript/v2/ThreadSubagent.ts
generated
Normal file
6
codex-rs/app-server-protocol/schema/typescript/v2/ThreadSubagent.ts
generated
Normal file
@@ -0,0 +1,6 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { ThreadSubagentLifecycleStatus } from "./ThreadSubagentLifecycleStatus";
|
||||
|
||||
export type ThreadSubagent = { childThreadId: string, parentThreadId: string, lifecycleStatus: ThreadSubagentLifecycleStatus, };
|
||||
5
codex-rs/app-server-protocol/schema/typescript/v2/ThreadSubagentLifecycleStatus.ts
generated
Normal file
5
codex-rs/app-server-protocol/schema/typescript/v2/ThreadSubagentLifecycleStatus.ts
generated
Normal file
@@ -0,0 +1,5 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
|
||||
export type ThreadSubagentLifecycleStatus = "open" | "closed";
|
||||
@@ -421,6 +421,8 @@ export type { ThreadStartSource } from "./ThreadStartSource";
|
||||
export type { ThreadStartedNotification } from "./ThreadStartedNotification";
|
||||
export type { ThreadStatus } from "./ThreadStatus";
|
||||
export type { ThreadStatusChangedNotification } from "./ThreadStatusChangedNotification";
|
||||
export type { ThreadSubagent } from "./ThreadSubagent";
|
||||
export type { ThreadSubagentLifecycleStatus } from "./ThreadSubagentLifecycleStatus";
|
||||
export type { ThreadTokenUsage } from "./ThreadTokenUsage";
|
||||
export type { ThreadTokenUsageUpdatedNotification } from "./ThreadTokenUsageUpdatedNotification";
|
||||
export type { ThreadUnarchiveParams } from "./ThreadUnarchiveParams";
|
||||
|
||||
@@ -580,6 +580,18 @@ client_request_definitions! {
|
||||
serialization: None,
|
||||
response: v2::ThreadLoadedListResponse,
|
||||
},
|
||||
#[experimental("thread/subagents/list")]
|
||||
ThreadSubagentsList => "thread/subagents/list" {
|
||||
params: v2::ThreadSubagentsListParams,
|
||||
serialization: None,
|
||||
response: v2::ThreadSubagentsListResponse,
|
||||
},
|
||||
#[experimental("thread/subagents/read")]
|
||||
ThreadSubagentsRead => "thread/subagents/read" {
|
||||
params: v2::ThreadSubagentsReadParams,
|
||||
serialization: None,
|
||||
response: v2::ThreadSubagentsReadResponse,
|
||||
},
|
||||
ThreadRead => "thread/read" {
|
||||
params: v2::ThreadReadParams,
|
||||
serialization: thread_id(params.thread_id),
|
||||
|
||||
@@ -1146,6 +1146,63 @@ pub struct ThreadLoadedListResponse {
|
||||
pub next_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase", export_to = "v2/")]
|
||||
pub enum ThreadSubagentLifecycleStatus {
|
||||
Open,
|
||||
Closed,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadSubagent {
|
||||
pub child_thread_id: String,
|
||||
pub parent_thread_id: String,
|
||||
pub lifecycle_status: ThreadSubagentLifecycleStatus,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadSubagentsListParams {
|
||||
pub parent_thread_id: String,
|
||||
/// Opaque pagination cursor returned by a previous call.
|
||||
#[ts(optional = nullable)]
|
||||
pub cursor: Option<String>,
|
||||
/// Optional page size; defaults to a reasonable server-side value.
|
||||
#[ts(optional = nullable)]
|
||||
pub limit: Option<u32>,
|
||||
/// Optional durable spawn-edge lifecycle filter. Omit to include every status.
|
||||
#[ts(optional = nullable)]
|
||||
pub lifecycle_statuses: Option<Vec<ThreadSubagentLifecycleStatus>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadSubagentsListResponse {
|
||||
pub data: Vec<ThreadSubagent>,
|
||||
/// Opaque cursor to pass to the next call to continue after the last item.
|
||||
/// If None, there are no more items to return.
|
||||
pub next_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadSubagentsReadParams {
|
||||
pub child_thread_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadSubagentsReadResponse {
|
||||
pub subagent: Option<ThreadSubagent>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(tag = "type", rename_all = "camelCase")]
|
||||
#[ts(tag = "type")]
|
||||
|
||||
@@ -136,6 +136,8 @@ Example with notification opt-out:
|
||||
- `thread/start`, `thread/resume`, and `thread/fork` responses include the legacy `sandbox` compatibility projection. Experimental clients can read `runtimeWorkspaceRoots` for the thread-scoped runtime roots and `activePermissionProfile` for the named or implicit built-in profile identity/provenance when known.
|
||||
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
- `thread/loaded/list` — list the thread ids currently loaded in memory.
|
||||
- `thread/subagents/list` — experimental; page through a thread's direct persisted subagent children without reading rollout transcripts or resuming threads. Supports an optional durable spawn-edge `lifecycleStatuses` filter.
|
||||
- `thread/subagents/read` — experimental; read one persisted subagent spawn edge by child thread id without reading rollout transcripts or resuming threads.
|
||||
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
- `thread/turns/list` — experimental; page through a stored thread’s turn history without resuming it; supports cursor-based pagination with `sortDirection`, `itemsView`, `nextCursor`, and `backwardsCursor`.
|
||||
- `thread/turns/items/list` — experimental; reserved for paging full items for one turn. The API shape is present, but app-server currently returns an unsupported-method JSON-RPC error.
|
||||
@@ -368,6 +370,35 @@ Example:
|
||||
|
||||
When `nextCursor` is `null`, you’ve reached the final page.
|
||||
|
||||
### Example: List and read thread subagents (experimental)
|
||||
|
||||
Use `thread/subagents/list` with `capabilities.experimentalApi = true` to page through direct persisted subagent children without resuming threads or reading JSONL rollout transcripts. Results are ordered by `childThreadId`. Pass the opaque `nextCursor` from one response into the next request. Omit `lifecycleStatuses` to include every durable spawn-edge lifecycle status, or filter with `"open"` and `"closed"`.
|
||||
|
||||
`ThreadSubagent.lifecycleStatus` is distinct from `ThreadStatus`: subagent lifecycle is durable spawn-edge state, while thread status describes runtime activity such as `active`, `idle`, `systemError`, or `notLoaded`. Descendants are not returned recursively.
|
||||
|
||||
```json
|
||||
{ "method": "thread/subagents/list", "id": 21, "params": {
|
||||
"parentThreadId": "00000000-0000-0000-0000-000000000100",
|
||||
"limit": 25,
|
||||
"lifecycleStatuses": ["open", "closed"]
|
||||
} }
|
||||
{ "id": 21, "result": {
|
||||
"data": [
|
||||
{ "childThreadId": "00000000-0000-0000-0000-000000000101", "parentThreadId": "00000000-0000-0000-0000-000000000100", "lifecycleStatus": "open" }
|
||||
],
|
||||
"nextCursor": null
|
||||
} }
|
||||
```
|
||||
|
||||
Use `thread/subagents/read` for an O(1) lookup by child edge primary key. A child without a persisted spawn edge returns `{ "subagent": null }`.
|
||||
|
||||
```json
|
||||
{ "method": "thread/subagents/read", "id": 22, "params": { "childThreadId": "00000000-0000-0000-0000-000000000101" } }
|
||||
{ "id": 22, "result": {
|
||||
"subagent": { "childThreadId": "00000000-0000-0000-0000-000000000101", "parentThreadId": "00000000-0000-0000-0000-000000000100", "lifecycleStatus": "open" }
|
||||
} }
|
||||
```
|
||||
|
||||
### Example: List loaded threads
|
||||
|
||||
`thread/loaded/list` returns thread ids currently loaded in memory. This is useful when you want to check which sessions are active without scanning rollouts on disk.
|
||||
|
||||
@@ -1085,6 +1085,12 @@ impl MessageProcessor {
|
||||
ClientRequest::ThreadLoadedList { params, .. } => {
|
||||
self.thread_processor.thread_loaded_list(params).await
|
||||
}
|
||||
ClientRequest::ThreadSubagentsList { params, .. } => {
|
||||
self.thread_processor.thread_subagents_list(params).await
|
||||
}
|
||||
ClientRequest::ThreadSubagentsRead { params, .. } => {
|
||||
self.thread_processor.thread_subagents_read(params).await
|
||||
}
|
||||
ClientRequest::ThreadRead { params, .. } => {
|
||||
self.thread_processor.thread_read(params).await
|
||||
}
|
||||
|
||||
@@ -235,6 +235,12 @@ use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::ThreadSubagent;
|
||||
use codex_app_server_protocol::ThreadSubagentLifecycleStatus;
|
||||
use codex_app_server_protocol::ThreadSubagentsListParams;
|
||||
use codex_app_server_protocol::ThreadSubagentsListResponse;
|
||||
use codex_app_server_protocol::ThreadSubagentsReadParams;
|
||||
use codex_app_server_protocol::ThreadSubagentsReadResponse;
|
||||
use codex_app_server_protocol::ThreadTurnsItemsListParams;
|
||||
use codex_app_server_protocol::ThreadTurnsListParams;
|
||||
use codex_app_server_protocol::ThreadTurnsListResponse;
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use super::*;
|
||||
use crate::error_code::method_not_found;
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
|
||||
use codex_protocol::models::BUILT_IN_PERMISSION_PROFILE_DANGER_FULL_ACCESS;
|
||||
use codex_protocol::models::BUILT_IN_PERMISSION_PROFILE_WORKSPACE;
|
||||
|
||||
@@ -607,6 +609,24 @@ impl ThreadRequestProcessor {
|
||||
.map(|response| Some(response.into()))
|
||||
}
|
||||
|
||||
pub(crate) async fn thread_subagents_list(
|
||||
&self,
|
||||
params: ThreadSubagentsListParams,
|
||||
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
||||
self.thread_subagents_list_response_inner(params)
|
||||
.await
|
||||
.map(|response| Some(response.into()))
|
||||
}
|
||||
|
||||
pub(crate) async fn thread_subagents_read(
|
||||
&self,
|
||||
params: ThreadSubagentsReadParams,
|
||||
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
||||
self.thread_subagents_read_response_inner(params)
|
||||
.await
|
||||
.map(|response| Some(response.into()))
|
||||
}
|
||||
|
||||
pub(crate) async fn thread_read(
|
||||
&self,
|
||||
params: ThreadReadParams,
|
||||
@@ -2065,6 +2085,89 @@ impl ThreadRequestProcessor {
|
||||
Ok(ThreadReadResponse { thread })
|
||||
}
|
||||
|
||||
async fn thread_subagents_list_response_inner(
|
||||
&self,
|
||||
params: ThreadSubagentsListParams,
|
||||
) -> Result<ThreadSubagentsListResponse, JSONRPCErrorError> {
|
||||
let ThreadSubagentsListParams {
|
||||
parent_thread_id,
|
||||
cursor,
|
||||
limit,
|
||||
lifecycle_statuses,
|
||||
} = params;
|
||||
let parent_thread_id = ThreadId::from_string(&parent_thread_id)
|
||||
.map_err(|err| invalid_request(format!("invalid parent thread id: {err}")))?;
|
||||
let cursor = cursor
|
||||
.as_deref()
|
||||
.map(|cursor| {
|
||||
URL_SAFE_NO_PAD
|
||||
.decode(cursor)
|
||||
.ok()
|
||||
.and_then(|bytes| String::from_utf8(bytes).ok())
|
||||
.and_then(|child_thread_id| ThreadId::from_string(&child_thread_id).ok())
|
||||
.ok_or_else(|| invalid_request(format!("invalid cursor: {cursor}")))
|
||||
})
|
||||
.transpose()?;
|
||||
let page_size = limit.unwrap_or(THREAD_LIST_DEFAULT_LIMIT as u32);
|
||||
if !(1..=THREAD_LIST_MAX_LIMIT as u32).contains(&page_size) {
|
||||
return Err(invalid_request(format!(
|
||||
"limit must be between 1 and {THREAD_LIST_MAX_LIMIT}"
|
||||
)));
|
||||
}
|
||||
let lifecycle_statuses = lifecycle_statuses.map(|statuses| {
|
||||
statuses
|
||||
.into_iter()
|
||||
.map(|status| match status {
|
||||
ThreadSubagentLifecycleStatus::Open => {
|
||||
codex_state::DirectionalThreadSpawnEdgeStatus::Open
|
||||
}
|
||||
ThreadSubagentLifecycleStatus::Closed => {
|
||||
codex_state::DirectionalThreadSpawnEdgeStatus::Closed
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
});
|
||||
let state_db = self.state_db.as_ref().ok_or_else(|| {
|
||||
internal_error("sqlite state db unavailable for thread/subagents/list")
|
||||
})?;
|
||||
let page = state_db
|
||||
.list_thread_spawn_children_page(
|
||||
parent_thread_id,
|
||||
cursor,
|
||||
page_size as usize,
|
||||
lifecycle_statuses.as_deref(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| internal_error(format!("failed to list thread subagents: {err}")))?;
|
||||
Ok(ThreadSubagentsListResponse {
|
||||
data: page
|
||||
.items
|
||||
.into_iter()
|
||||
.map(thread_subagent_from_edge)
|
||||
.collect(),
|
||||
next_cursor: page
|
||||
.next_cursor
|
||||
.map(|child_thread_id| URL_SAFE_NO_PAD.encode(child_thread_id.to_string())),
|
||||
})
|
||||
}
|
||||
|
||||
async fn thread_subagents_read_response_inner(
|
||||
&self,
|
||||
params: ThreadSubagentsReadParams,
|
||||
) -> Result<ThreadSubagentsReadResponse, JSONRPCErrorError> {
|
||||
let child_thread_id = ThreadId::from_string(¶ms.child_thread_id)
|
||||
.map_err(|err| invalid_request(format!("invalid child thread id: {err}")))?;
|
||||
let state_db = self.state_db.as_ref().ok_or_else(|| {
|
||||
internal_error("sqlite state db unavailable for thread/subagents/read")
|
||||
})?;
|
||||
let subagent = state_db
|
||||
.get_thread_spawn_edge(child_thread_id)
|
||||
.await
|
||||
.map_err(|err| internal_error(format!("failed to read thread subagent: {err}")))?
|
||||
.map(thread_subagent_from_edge);
|
||||
Ok(ThreadSubagentsReadResponse { subagent })
|
||||
}
|
||||
|
||||
/// Builds the API view for `thread/read` from persisted metadata plus optional live state.
|
||||
async fn read_thread_view(
|
||||
&self,
|
||||
@@ -3818,6 +3921,21 @@ fn normalize_thread_turns_status(
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_subagent_from_edge(edge: codex_state::ThreadSpawnEdge) -> ThreadSubagent {
|
||||
ThreadSubagent {
|
||||
child_thread_id: edge.child_thread_id.to_string(),
|
||||
parent_thread_id: edge.parent_thread_id.to_string(),
|
||||
lifecycle_status: match edge.status {
|
||||
codex_state::DirectionalThreadSpawnEdgeStatus::Open => {
|
||||
ThreadSubagentLifecycleStatus::Open
|
||||
}
|
||||
codex_state::DirectionalThreadSpawnEdgeStatus::Closed => {
|
||||
ThreadSubagentLifecycleStatus::Closed
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
enum ThreadReadViewError {
|
||||
InvalidRequest(String),
|
||||
Unsupported(&'static str),
|
||||
|
||||
@@ -94,6 +94,8 @@ use codex_app_server_protocol::ThreadSetNameParams;
|
||||
use codex_app_server_protocol::ThreadSettingsUpdateParams;
|
||||
use codex_app_server_protocol::ThreadShellCommandParams;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadSubagentsListParams;
|
||||
use codex_app_server_protocol::ThreadSubagentsReadParams;
|
||||
use codex_app_server_protocol::ThreadTurnsItemsListParams;
|
||||
use codex_app_server_protocol::ThreadTurnsListParams;
|
||||
use codex_app_server_protocol::ThreadUnarchiveParams;
|
||||
@@ -551,6 +553,24 @@ impl McpProcess {
|
||||
self.send_request("thread/loaded/list", params).await
|
||||
}
|
||||
|
||||
/// Send a `thread/subagents/list` JSON-RPC request.
|
||||
pub async fn send_thread_subagents_list_request(
|
||||
&mut self,
|
||||
params: ThreadSubagentsListParams,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
self.send_request("thread/subagents/list", params).await
|
||||
}
|
||||
|
||||
/// Send a `thread/subagents/read` JSON-RPC request.
|
||||
pub async fn send_thread_subagents_read_request(
|
||||
&mut self,
|
||||
params: ThreadSubagentsReadParams,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
self.send_request("thread/subagents/read", params).await
|
||||
}
|
||||
|
||||
/// Send a `thread/read` JSON-RPC request.
|
||||
pub async fn send_thread_read_request(
|
||||
&mut self,
|
||||
|
||||
@@ -62,6 +62,7 @@ mod thread_settings_update;
|
||||
mod thread_shell_command;
|
||||
mod thread_start;
|
||||
mod thread_status;
|
||||
mod thread_subagents;
|
||||
mod thread_unarchive;
|
||||
mod thread_unsubscribe;
|
||||
mod turn_interrupt;
|
||||
|
||||
270
codex-rs/app-server/tests/suite/v2/thread_subagents.rs
Normal file
270
codex-rs/app-server/tests/suite/v2/thread_subagents.rs
Normal file
@@ -0,0 +1,270 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadSubagent;
|
||||
use codex_app_server_protocol::ThreadSubagentLifecycleStatus;
|
||||
use codex_app_server_protocol::ThreadSubagentsListParams;
|
||||
use codex_app_server_protocol::ThreadSubagentsListResponse;
|
||||
use codex_app_server_protocol::ThreadSubagentsReadParams;
|
||||
use codex_app_server_protocol::ThreadSubagentsReadResponse;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_state::DirectionalThreadSpawnEdgeStatus;
|
||||
use codex_state::StateRuntime;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_subagents_list_and_read_use_persisted_edges_without_rollouts() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let parent_thread_id = thread_id(/*value*/ 100)?;
|
||||
let open_child_thread_id = thread_id(/*value*/ 101)?;
|
||||
let closed_child_thread_id = thread_id(/*value*/ 102)?;
|
||||
let grandchild_thread_id = thread_id(/*value*/ 103)?;
|
||||
let missing_child_thread_id = thread_id(/*value*/ 104)?;
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
open_child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
closed_child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
open_child_thread_id,
|
||||
grandchild_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await?;
|
||||
assert!(!codex_home.path().join("sessions").exists());
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let first_page = list_subagents(
|
||||
&mut mcp,
|
||||
ThreadSubagentsListParams {
|
||||
limit: Some(1),
|
||||
..list_params(parent_thread_id)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let next_cursor = first_page.next_cursor.expect("first page cursor");
|
||||
assert_ne!(next_cursor, open_child_thread_id.to_string());
|
||||
assert_eq!(
|
||||
first_page.data,
|
||||
vec![subagent(
|
||||
parent_thread_id,
|
||||
open_child_thread_id,
|
||||
ThreadSubagentLifecycleStatus::Open,
|
||||
)]
|
||||
);
|
||||
|
||||
let second_page = list_subagents(
|
||||
&mut mcp,
|
||||
ThreadSubagentsListParams {
|
||||
cursor: Some(next_cursor),
|
||||
limit: Some(1),
|
||||
..list_params(parent_thread_id)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(
|
||||
second_page,
|
||||
ThreadSubagentsListResponse {
|
||||
data: vec![subagent(
|
||||
parent_thread_id,
|
||||
closed_child_thread_id,
|
||||
ThreadSubagentLifecycleStatus::Closed,
|
||||
)],
|
||||
next_cursor: None,
|
||||
}
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
list_subagents(
|
||||
&mut mcp,
|
||||
ThreadSubagentsListParams {
|
||||
lifecycle_statuses: Some(vec![ThreadSubagentLifecycleStatus::Open]),
|
||||
..list_params(parent_thread_id)
|
||||
},
|
||||
)
|
||||
.await?,
|
||||
ThreadSubagentsListResponse {
|
||||
data: vec![subagent(
|
||||
parent_thread_id,
|
||||
open_child_thread_id,
|
||||
ThreadSubagentLifecycleStatus::Open,
|
||||
)],
|
||||
next_cursor: None,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
list_subagents(
|
||||
&mut mcp,
|
||||
ThreadSubagentsListParams {
|
||||
lifecycle_statuses: Some(vec![ThreadSubagentLifecycleStatus::Closed]),
|
||||
..list_params(parent_thread_id)
|
||||
},
|
||||
)
|
||||
.await?,
|
||||
ThreadSubagentsListResponse {
|
||||
data: vec![subagent(
|
||||
parent_thread_id,
|
||||
closed_child_thread_id,
|
||||
ThreadSubagentLifecycleStatus::Closed,
|
||||
)],
|
||||
next_cursor: None,
|
||||
}
|
||||
);
|
||||
|
||||
for (child_thread_id, expected) in [
|
||||
(
|
||||
open_child_thread_id,
|
||||
Some(subagent(
|
||||
parent_thread_id,
|
||||
open_child_thread_id,
|
||||
ThreadSubagentLifecycleStatus::Open,
|
||||
)),
|
||||
),
|
||||
(
|
||||
closed_child_thread_id,
|
||||
Some(subagent(
|
||||
parent_thread_id,
|
||||
closed_child_thread_id,
|
||||
ThreadSubagentLifecycleStatus::Closed,
|
||||
)),
|
||||
),
|
||||
(missing_child_thread_id, None),
|
||||
] {
|
||||
assert_eq!(read_subagent(&mut mcp, child_thread_id).await?, expected);
|
||||
}
|
||||
assert!(!codex_home.path().join("sessions").exists());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_subagents_reject_malformed_ids_cursor_and_limits() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
for params in [
|
||||
ThreadSubagentsListParams {
|
||||
parent_thread_id: "not-a-thread-id".to_string(),
|
||||
cursor: None,
|
||||
limit: None,
|
||||
lifecycle_statuses: None,
|
||||
},
|
||||
ThreadSubagentsListParams {
|
||||
cursor: Some("not-a-cursor".to_string()),
|
||||
..list_params(thread_id(/*value*/ 200)?)
|
||||
},
|
||||
ThreadSubagentsListParams {
|
||||
limit: Some(0),
|
||||
..list_params(thread_id(/*value*/ 200)?)
|
||||
},
|
||||
ThreadSubagentsListParams {
|
||||
limit: Some(101),
|
||||
..list_params(thread_id(/*value*/ 200)?)
|
||||
},
|
||||
] {
|
||||
let request_id = mcp.send_thread_subagents_list_request(params).await?;
|
||||
assert_invalid_request(&mut mcp, request_id).await?;
|
||||
}
|
||||
|
||||
let request_id = mcp
|
||||
.send_thread_subagents_read_request(ThreadSubagentsReadParams {
|
||||
child_thread_id: "not-a-thread-id".to_string(),
|
||||
})
|
||||
.await?;
|
||||
assert_invalid_request(&mut mcp, request_id).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn list_subagents(
|
||||
mcp: &mut McpProcess,
|
||||
params: ThreadSubagentsListParams,
|
||||
) -> Result<ThreadSubagentsListResponse> {
|
||||
let request_id = mcp.send_thread_subagents_list_request(params).await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
to_response(response)
|
||||
}
|
||||
|
||||
async fn read_subagent(
|
||||
mcp: &mut McpProcess,
|
||||
child_thread_id: ThreadId,
|
||||
) -> Result<Option<ThreadSubagent>> {
|
||||
let request_id = mcp
|
||||
.send_thread_subagents_read_request(ThreadSubagentsReadParams {
|
||||
child_thread_id: child_thread_id.to_string(),
|
||||
})
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
Ok(to_response::<ThreadSubagentsReadResponse>(response)?.subagent)
|
||||
}
|
||||
|
||||
async fn assert_invalid_request(mcp: &mut McpProcess, request_id: i64) -> Result<()> {
|
||||
let error: JSONRPCError = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
assert_eq!(error.error.code, -32600);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn subagent(
|
||||
parent_thread_id: ThreadId,
|
||||
child_thread_id: ThreadId,
|
||||
lifecycle_status: ThreadSubagentLifecycleStatus,
|
||||
) -> ThreadSubagent {
|
||||
ThreadSubagent {
|
||||
child_thread_id: child_thread_id.to_string(),
|
||||
parent_thread_id: parent_thread_id.to_string(),
|
||||
lifecycle_status,
|
||||
}
|
||||
}
|
||||
|
||||
fn list_params(parent_thread_id: ThreadId) -> ThreadSubagentsListParams {
|
||||
ThreadSubagentsListParams {
|
||||
parent_thread_id: parent_thread_id.to_string(),
|
||||
cursor: None,
|
||||
limit: None,
|
||||
lifecycle_statuses: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_id(value: u128) -> Result<ThreadId> {
|
||||
ThreadId::from_string(&uuid::Uuid::from_u128(value).to_string()).map_err(Into::into)
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
CREATE INDEX idx_thread_spawn_edges_parent_child_status
|
||||
ON thread_spawn_edges(parent_thread_id, child_thread_id, status);
|
||||
@@ -50,6 +50,8 @@ pub use model::ThreadGoal;
|
||||
pub use model::ThreadGoalStatus;
|
||||
pub use model::ThreadMetadata;
|
||||
pub use model::ThreadMetadataBuilder;
|
||||
pub use model::ThreadSpawnEdge;
|
||||
pub use model::ThreadSpawnEdgesPage;
|
||||
pub use model::ThreadsPage;
|
||||
pub use runtime::GoalAccountingMode;
|
||||
pub use runtime::GoalAccountingOutcome;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use codex_protocol::ThreadId;
|
||||
use strum::AsRefStr;
|
||||
use strum::Display;
|
||||
use strum::EnumString;
|
||||
@@ -9,3 +10,18 @@ pub enum DirectionalThreadSpawnEdgeStatus {
|
||||
Open,
|
||||
Closed,
|
||||
}
|
||||
|
||||
/// Persisted directional parent-child edge for a spawned thread.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct ThreadSpawnEdge {
|
||||
pub parent_thread_id: ThreadId,
|
||||
pub child_thread_id: ThreadId,
|
||||
pub status: DirectionalThreadSpawnEdgeStatus,
|
||||
}
|
||||
|
||||
/// Keyset-paginated direct children from the persisted thread spawn graph.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ThreadSpawnEdgesPage {
|
||||
pub items: Vec<ThreadSpawnEdge>,
|
||||
pub next_cursor: Option<ThreadId>,
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ pub use agent_job::AgentJobStatus;
|
||||
pub use backfill_state::BackfillState;
|
||||
pub use backfill_state::BackfillStatus;
|
||||
pub use graph::DirectionalThreadSpawnEdgeStatus;
|
||||
pub use graph::ThreadSpawnEdge;
|
||||
pub use graph::ThreadSpawnEdgesPage;
|
||||
pub use log::LogEntry;
|
||||
pub use log::LogQuery;
|
||||
pub use log::LogRow;
|
||||
|
||||
@@ -115,6 +115,78 @@ ON CONFLICT(child_thread_id) DO UPDATE SET
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Look up a spawned thread's incoming edge by child id.
|
||||
pub async fn get_thread_spawn_edge(
|
||||
&self,
|
||||
child_thread_id: ThreadId,
|
||||
) -> anyhow::Result<Option<crate::ThreadSpawnEdge>> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT parent_thread_id, child_thread_id, status
|
||||
FROM thread_spawn_edges
|
||||
WHERE child_thread_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(child_thread_id.to_string())
|
||||
.fetch_optional(self.pool.as_ref())
|
||||
.await?;
|
||||
row.map(thread_spawn_edge_from_row).transpose()
|
||||
}
|
||||
|
||||
/// List a page of direct spawned children ordered by child id.
|
||||
///
|
||||
/// `statuses = None` includes every persisted lifecycle status. An empty
|
||||
/// status slice intentionally matches no edges.
|
||||
pub async fn list_thread_spawn_children_page(
|
||||
&self,
|
||||
parent_thread_id: ThreadId,
|
||||
cursor: Option<ThreadId>,
|
||||
page_size: usize,
|
||||
statuses: Option<&[crate::DirectionalThreadSpawnEdgeStatus]>,
|
||||
) -> anyhow::Result<crate::ThreadSpawnEdgesPage> {
|
||||
if statuses.is_some_and(<[crate::DirectionalThreadSpawnEdgeStatus]>::is_empty) {
|
||||
return Ok(crate::ThreadSpawnEdgesPage {
|
||||
items: Vec::new(),
|
||||
next_cursor: None,
|
||||
});
|
||||
}
|
||||
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
"SELECT parent_thread_id, child_thread_id, status FROM thread_spawn_edges WHERE parent_thread_id = ",
|
||||
);
|
||||
builder.push_bind(parent_thread_id.to_string());
|
||||
if let Some(cursor) = cursor {
|
||||
builder
|
||||
.push(" AND child_thread_id > ")
|
||||
.push_bind(cursor.to_string());
|
||||
}
|
||||
if let Some(statuses) = statuses {
|
||||
builder.push(" AND status IN (");
|
||||
let mut separated = builder.separated(", ");
|
||||
for status in statuses {
|
||||
separated.push_bind(status.as_ref());
|
||||
}
|
||||
separated.push_unseparated(")");
|
||||
}
|
||||
builder
|
||||
.push(" ORDER BY child_thread_id LIMIT ")
|
||||
.push_bind(page_size.saturating_add(1) as i64);
|
||||
|
||||
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
|
||||
let mut items = rows
|
||||
.into_iter()
|
||||
.map(thread_spawn_edge_from_row)
|
||||
.collect::<anyhow::Result<Vec<_>>>()?;
|
||||
let has_more = items.len() > page_size;
|
||||
items.truncate(page_size);
|
||||
let next_cursor = if has_more {
|
||||
items.last().map(|edge| edge.child_thread_id)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(crate::ThreadSpawnEdgesPage { items, next_cursor })
|
||||
}
|
||||
|
||||
/// List direct spawned children of `parent_thread_id` whose edge matches `status`.
|
||||
pub async fn list_thread_spawn_children_with_status(
|
||||
&self,
|
||||
@@ -1085,6 +1157,18 @@ fn metadata_preview(metadata: &crate::ThreadMetadata) -> &str {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn thread_spawn_edge_from_row(
|
||||
row: sqlx::sqlite::SqliteRow,
|
||||
) -> anyhow::Result<crate::ThreadSpawnEdge> {
|
||||
Ok(crate::ThreadSpawnEdge {
|
||||
parent_thread_id: ThreadId::try_from(row.try_get::<String, _>("parent_thread_id")?)?,
|
||||
child_thread_id: ThreadId::try_from(row.try_get::<String, _>("child_thread_id")?)?,
|
||||
status: row
|
||||
.try_get::<String, _>("status")?
|
||||
.parse::<crate::DirectionalThreadSpawnEdgeStatus>()?,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -2030,4 +2114,90 @@ INSERT INTO thread_spawn_edges (
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_spawn_children_page_uses_child_id_keyset() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
let parent_thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000920").expect("valid thread id");
|
||||
let child_a_thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000921").expect("valid thread id");
|
||||
let child_b_thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000922").expect("valid thread id");
|
||||
let child_c_thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000923").expect("valid thread id");
|
||||
for (child_thread_id, status) in [
|
||||
(child_a_thread_id, DirectionalThreadSpawnEdgeStatus::Open),
|
||||
(child_b_thread_id, DirectionalThreadSpawnEdgeStatus::Closed),
|
||||
(child_c_thread_id, DirectionalThreadSpawnEdgeStatus::Open),
|
||||
] {
|
||||
runtime
|
||||
.upsert_thread_spawn_edge(parent_thread_id, child_thread_id, status)
|
||||
.await
|
||||
.expect("child edge insert should succeed");
|
||||
}
|
||||
let first_page = runtime
|
||||
.list_thread_spawn_children_page(
|
||||
parent_thread_id,
|
||||
/*cursor*/ None,
|
||||
/*page_size*/ 2,
|
||||
/*statuses*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("first page should load");
|
||||
assert_eq!(
|
||||
first_page,
|
||||
crate::ThreadSpawnEdgesPage {
|
||||
items: vec![
|
||||
crate::ThreadSpawnEdge {
|
||||
parent_thread_id,
|
||||
child_thread_id: child_a_thread_id,
|
||||
status: DirectionalThreadSpawnEdgeStatus::Open,
|
||||
},
|
||||
crate::ThreadSpawnEdge {
|
||||
parent_thread_id,
|
||||
child_thread_id: child_b_thread_id,
|
||||
status: DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
},
|
||||
],
|
||||
next_cursor: Some(child_b_thread_id),
|
||||
}
|
||||
);
|
||||
|
||||
let second_page = runtime
|
||||
.list_thread_spawn_children_page(
|
||||
parent_thread_id,
|
||||
first_page.next_cursor,
|
||||
/*page_size*/ 2,
|
||||
/*statuses*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("second page should load");
|
||||
assert_eq!(
|
||||
second_page,
|
||||
crate::ThreadSpawnEdgesPage {
|
||||
items: vec![crate::ThreadSpawnEdge {
|
||||
parent_thread_id,
|
||||
child_thread_id: child_c_thread_id,
|
||||
status: DirectionalThreadSpawnEdgeStatus::Open,
|
||||
}],
|
||||
next_cursor: None,
|
||||
}
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
runtime
|
||||
.get_thread_spawn_edge(child_b_thread_id)
|
||||
.await
|
||||
.expect("edge lookup should succeed"),
|
||||
Some(crate::ThreadSpawnEdge {
|
||||
parent_thread_id,
|
||||
child_thread_id: child_b_thread_id,
|
||||
status: DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user