From 0d0835dd537b57913627e877f29031151b49429e Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Thu, 7 May 2026 15:44:43 -0700 Subject: [PATCH] feat(app-server, threadstore): Thread pagination APIs and ThreadStore contract (#21566) ## Why The goal of this PR is to align on app-server and `ThreadStore` API updates for paginating through large threads. #### app-server ##### `thread/turns/list` - Updates `thread/turns/list` to support `itemsView?: "notLoaded" | "summary" | "full" | null`, defaulting to `summary`. - Implements the current `thread/turns/list` behavior over the existing persisted rollout-history fallback: - `notLoaded` returns turn envelopes with empty `items`. - `summary` returns the first user message and final assistant message when available. - `full` preserves the existing full item behavior. Note that this method still uses the naive approach of loading the entire rollout file, and returns just the filtered slice of the data. Real pagination will come later by leveraging SQLite. ##### `thread/turns/items/list` - Adds the experimental `thread/turns/items/list` protocol, schema, dispatcher, and processor stub. The app-server currently returns JSON-RPC `-32601` with `thread/turns/items/list is not supported yet`. #### ThreadStore - Adds the experimental `thread/turns/items/list` protocol, schema, dispatcher, and processor stub. The app-server currently returns JSON-RPC `-32601` with `thread/turns/items/list is not supported yet`. - Adds `ThreadStore` contract types and stubbed methods for listing thread turns and listing items within a turn. - Adds a typed `StoredTurnStatus` and `StoredTurnError` to avoid baking app-server API enums or lossy string status values into the store-facing turn contract. - Adds a typed `StoredTurnStatus` and `StoredTurnError` to avoid baking app-server API enums or lossy string status values into the store-facing turn contract. This also sketches the storage abstraction we expect to need once turns are indexed/stored. In particular, `notLoaded` is useful only if ThreadStore can eventually list turn metadata without loading every persisted item for each turn. ## Validation - Added/updated protocol serialization coverage for the new request and response shapes. - Added app-server integration coverage for `thread/turns/list` default summary behavior and all three `itemsView` modes. - Added app-server integration coverage that `thread/turns/items/list` returns the expected unsupported JSON-RPC error when experimental APIs are enabled. - Added thread-store coverage that the default trait methods return `ThreadStoreError::Unsupported`. No developers.openai.com documentation update is needed for this internal experimental app-server API surface. --- .../schema/json/ClientRequest.json | 25 +++ .../src/protocol/common.rs | 20 +++ .../src/protocol/v2/tests.rs | 53 ++++++ .../src/protocol/v2/thread.rs | 35 ++++ codex-rs/app-server/README.md | 21 ++- codex-rs/app-server/src/error_code.rs | 5 + codex-rs/app-server/src/message_processor.rs | 3 + codex-rs/app-server/src/request_processors.rs | 1 + .../request_processors/thread_processor.rs | 77 +++++++- .../app-server/tests/common/mcp_process.rs | 10 ++ .../app-server/tests/suite/v2/thread_read.rs | 166 +++++++++++++++++- .../tests/suite/v2/thread_shell_command.rs | 1 + codex-rs/thread-store/src/error.rs | 7 + codex-rs/thread-store/src/in_memory.rs | 51 ++++++ codex-rs/thread-store/src/lib.rs | 8 + codex-rs/thread-store/src/store.rs | 19 ++ codex-rs/thread-store/src/types.rs | 111 ++++++++++++ 17 files changed, 608 insertions(+), 5 deletions(-) diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index cac1c33d9b..c60acbdf30 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -4096,6 +4096,31 @@ ], "type": "object" }, + "TurnItemsView": { + "oneOf": [ + { + "description": "`items` was not loaded for this turn. The field is intentionally empty.", + "enum": [ + "notLoaded" + ], + "type": "string" + }, + { + "description": "`items` contains only a display summary for this turn.", + "enum": [ + "summary" + ], + "type": "string" + }, + { + "description": "`items` contains every ThreadItem available from persisted app-server history for this turn.", + "enum": [ + "full" + ], + "type": "string" + } + ] + }, "TurnStartParams": { "properties": { "approvalPolicy": { diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index e79c99a9c9..e72e3c14c7 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -581,6 +581,13 @@ client_request_definitions! { serialization: None, response: v2::ThreadTurnsListResponse, }, + #[experimental("thread/turns/items/list")] + ThreadTurnsItemsList => "thread/turns/items/list" { + params: v2::ThreadTurnsItemsListParams, + // Explicitly concurrent: this primarily reads append-only rollout storage. + serialization: None, + response: v2::ThreadTurnsItemsListResponse, + }, /// Append raw Responses API items to the thread history without starting a user turn. ThreadInjectItems => "thread/inject_items" { params: v2::ThreadInjectItemsParams, @@ -1843,10 +1850,23 @@ mod tests { cursor: None, limit: None, sort_direction: None, + items_view: None, }, }; assert_eq!(thread_turns_list.serialization_scope(), None); + let thread_turns_items_list = ClientRequest::ThreadTurnsItemsList { + request_id: request_id(), + params: v2::ThreadTurnsItemsListParams { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + cursor: None, + limit: None, + sort_direction: None, + }, + }; + assert_eq!(thread_turns_items_list.serialization_scope(), None); + let mcp_resource_read = ClientRequest::McpResourceRead { request_id: request_id(), params: v2::McpResourceReadParams { diff --git a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs index 4e923c5804..d6e49279ce 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs @@ -95,6 +95,59 @@ fn turn_defaults_legacy_missing_items_view_to_full() { assert_eq!(turn.items_view, TurnItemsView::Full); } +#[test] +fn thread_turns_list_params_accepts_items_view() { + let params = serde_json::from_value::(json!({ + "threadId": "thr_123", + "cursor": null, + "limit": 25, + "sortDirection": "desc", + "itemsView": "notLoaded", + })) + .expect("thread turns list params should deserialize"); + + assert_eq!(params.thread_id, "thr_123"); + assert_eq!(params.items_view, Some(TurnItemsView::NotLoaded)); +} + +#[test] +fn thread_turns_items_list_round_trips() { + let params = ThreadTurnsItemsListParams { + thread_id: "thr_123".to_string(), + turn_id: "turn_456".to_string(), + cursor: Some("cursor_1".to_string()), + limit: Some(50), + sort_direction: Some(SortDirection::Asc), + }; + + assert_eq!( + serde_json::to_value(¶ms).expect("serialize params"), + json!({ + "threadId": "thr_123", + "turnId": "turn_456", + "cursor": "cursor_1", + "limit": 50, + "sortDirection": "asc", + }) + ); + let response = ThreadTurnsItemsListResponse { + data: vec![ThreadItem::ContextCompaction { + id: "item_1".to_string(), + }], + next_cursor: None, + backwards_cursor: Some("cursor_0".to_string()), + }; + + assert_eq!( + serde_json::to_value(&response).expect("serialize response"), + json!({ + "data": [{"type": "contextCompaction", "id": "item_1"}], + "nextCursor": null, + "backwardsCursor": "cursor_0", + }) + ); +} + #[test] fn thread_list_params_accepts_single_cwd() { let params = serde_json::from_value::(json!({ diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index 5f293ff8f6..458722b3a2 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -6,9 +6,11 @@ use super::PermissionProfileSelectionParams; use super::SandboxMode; use super::SandboxPolicy; use super::Thread; +use super::ThreadItem; use super::ThreadSource; use super::Turn; use super::TurnEnvironmentParams; +use super::TurnItemsView; use super::shared::v2_enum_from_core; use codex_experimental_api_macros::ExperimentalApi; use codex_protocol::config_types::Personality; @@ -1005,6 +1007,9 @@ pub struct ThreadTurnsListParams { /// Optional turn pagination direction; defaults to descending. #[ts(optional = nullable)] pub sort_direction: Option, + /// How much item detail to include for each returned turn; defaults to summary. + #[ts(optional = nullable)] + pub items_view: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -1022,6 +1027,36 @@ pub struct ThreadTurnsListResponse { pub backwards_cursor: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadTurnsItemsListParams { + pub thread_id: String, + pub turn_id: String, + /// Opaque cursor to pass to the next call to continue after the last item. + #[ts(optional = nullable)] + pub cursor: Option, + /// Optional item page size. + #[ts(optional = nullable)] + pub limit: Option, + /// Optional item pagination direction; defaults to ascending. + #[ts(optional = nullable)] + pub sort_direction: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadTurnsItemsListResponse { + pub data: Vec, + /// Opaque cursor to pass to the next call to continue after the last item. + /// if None, there are no more items to return. + pub next_cursor: Option, + /// Opaque cursor to pass as `cursor` when reversing `sortDirection`. + /// This is only populated when the page contains at least one item. + pub backwards_cursor: Option, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index babfac99ba..5dd75ee6f4 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -149,7 +149,8 @@ Example with notification opt-out: - `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. - `thread/loaded/list` — list the thread ids currently loaded in memory. - `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. -- `thread/turns/list` — experimental; page through a stored thread’s turn history without resuming it; supports cursor-based pagination with `sortDirection`, `nextCursor`, and `backwardsCursor`. +- `thread/turns/list` — experimental; page through a stored thread’s turn history without resuming it; supports cursor-based pagination with `sortDirection`, `itemsView`, `nextCursor`, and `backwardsCursor`. +- `thread/turns/items/list` — experimental; reserved for paging full items for one turn. The API shape is present, but app-server currently returns an unsupported-method JSON-RPC error. - `thread/metadata/update` — patch stored thread metadata in sqlite; currently supports updating persisted `gitInfo` fields and returns the refreshed `thread`. - `thread/memoryMode/set` — experimental; set a thread’s persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success. - `memory/reset` — experimental; clear the current `CODEX_HOME/memories` directory and reset persisted memory stage data in sqlite while preserving existing thread memory modes; returns `{}` on success. @@ -424,13 +425,14 @@ Use `thread/read` to fetch a stored thread by id without resuming it. Pass `incl Use `thread/turns/list` with `capabilities.experimentalApi = true` to page a stored thread’s turn history without resuming it. By default, results are sorted descending so clients can start at the present and fetch older turns with `nextCursor`. The response also includes `backwardsCursor`; pass it as `cursor` on a later request with `sortDirection: "asc"` to fetch turns newer than the first item from the earlier page. -Every returned `Turn` includes `itemsView`, which tells clients whether the `items` array was omitted intentionally (`notLoaded`), contains only summary items (`summary`), or contains every item available from persisted app-server history (`full`). Current `thread/turns/list` responses return `full` turns. +Every returned `Turn` includes `itemsView`, which tells clients whether the `items` array was omitted intentionally (`notLoaded`), contains only summary items (`summary`), or contains every item available from persisted app-server history (`full`). Pass `itemsView` to choose the returned detail level; omitted `itemsView` defaults to `"summary"`. ```json { "method": "thread/turns/list", "id": 24, "params": { "threadId": "thr_123", "limit": 50, - "sortDirection": "desc" + "sortDirection": "desc", + "itemsView": "summary" } } { "id": 24, "result": { "data": [ ... ], @@ -439,6 +441,19 @@ Every returned `Turn` includes `itemsView`, which tells clients whether the `ite } } ``` +`thread/turns/items/list` is the planned hydration API for fetching full items for one turn: + +```json +{ "method": "thread/turns/items/list", "id": 25, "params": { + "threadId": "thr_123", + "turnId": "turn_456", + "limit": 100, + "sortDirection": "asc" +} } +``` + +This method currently returns JSON-RPC `-32601` with message `thread/turns/items/list is not supported yet`. + ### Example: Update stored thread metadata Use `thread/metadata/update` to patch sqlite-backed metadata for a thread without resuming it. Today this supports persisted `gitInfo`; omitted fields are left unchanged, while explicit `null` clears a stored value. diff --git a/codex-rs/app-server/src/error_code.rs b/codex-rs/app-server/src/error_code.rs index 0054d2988f..48e401f7bc 100644 --- a/codex-rs/app-server/src/error_code.rs +++ b/codex-rs/app-server/src/error_code.rs @@ -1,6 +1,7 @@ use codex_app_server_protocol::JSONRPCErrorError; pub(crate) const INVALID_REQUEST_ERROR_CODE: i64 = -32600; +pub(crate) const METHOD_NOT_FOUND_ERROR_CODE: i64 = -32601; pub const INVALID_PARAMS_ERROR_CODE: i64 = -32602; pub(crate) const INTERNAL_ERROR_CODE: i64 = -32603; pub(crate) const OVERLOADED_ERROR_CODE: i64 = -32001; @@ -10,6 +11,10 @@ pub(crate) fn invalid_request(message: impl Into) -> JSONRPCErrorError { error(INVALID_REQUEST_ERROR_CODE, message) } +pub(crate) fn method_not_found(message: impl Into) -> JSONRPCErrorError { + error(METHOD_NOT_FOUND_ERROR_CODE, message) +} + pub(crate) fn invalid_params(message: impl Into) -> JSONRPCErrorError { error(INVALID_PARAMS_ERROR_CODE, message) } diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 2ca0a87d84..7006c40343 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -1008,6 +1008,9 @@ impl MessageProcessor { ClientRequest::ThreadTurnsList { params, .. } => { self.thread_processor.thread_turns_list(params).await } + ClientRequest::ThreadTurnsItemsList { params, .. } => { + self.thread_processor.thread_turns_items_list(params).await + } ClientRequest::ThreadShellCommand { params, .. } => { self.thread_processor .thread_shell_command(&request_id, params) diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 795b5d4c32..19201b8ae1 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -215,6 +215,7 @@ use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStartedNotification; use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::ThreadTurnsItemsListParams; use codex_app_server_protocol::ThreadTurnsListParams; use codex_app_server_protocol::ThreadTurnsListResponse; use codex_app_server_protocol::ThreadUnarchiveParams; diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index d83f5e631f..615e37f2c9 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -1,4 +1,5 @@ use super::*; +use crate::error_code::method_not_found; const THREAD_LIST_DEFAULT_LIMIT: usize = 25; const THREAD_LIST_MAX_LIMIT: usize = 100; @@ -591,6 +592,15 @@ impl ThreadRequestProcessor { .map(|response| Some(response.into())) } + pub(crate) async fn thread_turns_items_list( + &self, + _params: ThreadTurnsItemsListParams, + ) -> Result, JSONRPCErrorError> { + Err(method_not_found( + "thread/turns/items/list is not supported yet", + )) + } + pub(crate) async fn thread_shell_command( &self, request_id: &ConnectionRequestId, @@ -2072,7 +2082,9 @@ impl ThreadRequestProcessor { cursor, limit, sort_direction, + items_view, } = params; + let items_view = items_view.unwrap_or(TurnItemsView::Summary); let thread_uuid = ThreadId::from_string(&thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; @@ -2101,7 +2113,7 @@ impl ThreadRequestProcessor { } else { None }; - let turns = reconstruct_thread_turns_for_turns_list( + let mut turns = reconstruct_thread_turns_for_turns_list( &items, self.thread_watch_manager .loaded_status_for_thread(&thread_uuid.to_string()) @@ -2109,6 +2121,41 @@ impl ThreadRequestProcessor { has_live_running_thread, active_turn, ); + for turn in &mut turns { + match items_view { + TurnItemsView::NotLoaded => { + turn.items.clear(); + turn.items_view = TurnItemsView::NotLoaded; + } + TurnItemsView::Summary => { + let first_user_message = turn + .items + .iter() + .find(|item| matches!(item, ThreadItem::UserMessage { .. })) + .cloned(); + let final_agent_message = turn + .items + .iter() + .rev() + .find(|item| matches!(item, ThreadItem::AgentMessage { .. })) + .cloned(); + turn.items = match (first_user_message, final_agent_message) { + (Some(user_message), Some(agent_message)) + if user_message.id() != agent_message.id() => + { + vec![user_message, agent_message] + } + (Some(user_message), _) => vec![user_message], + (None, Some(agent_message)) => vec![agent_message], + (None, None) => Vec::new(), + }; + turn.items_view = TurnItemsView::Summary; + } + TurnItemsView::Full => { + turn.items_view = TurnItemsView::Full; + } + } + } let page = paginate_thread_turns( turns, cursor.as_deref(), @@ -3496,19 +3543,30 @@ fn normalize_thread_turns_status( enum ThreadReadViewError { InvalidRequest(String), + Unsupported(&'static str), Internal(String), } fn thread_read_view_error(err: ThreadReadViewError) -> JSONRPCErrorError { match err { ThreadReadViewError::InvalidRequest(message) => invalid_request(message), + ThreadReadViewError::Unsupported(operation) => { + unsupported_thread_store_operation(operation) + } ThreadReadViewError::Internal(message) => internal_error(message), } } +fn unsupported_thread_store_operation(operation: &'static str) -> JSONRPCErrorError { + method_not_found(format!("{operation} is not supported yet")) +} + fn thread_store_list_error(err: ThreadStoreError) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), + ThreadStoreError::Unsupported { operation } => { + unsupported_thread_store_operation(operation) + } err => internal_error(format!("failed to list threads: {err}")), } } @@ -3516,6 +3574,9 @@ fn thread_store_list_error(err: ThreadStoreError) -> JSONRPCErrorError { fn thread_store_resume_read_error(err: ThreadStoreError) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), + ThreadStoreError::Unsupported { operation } => { + unsupported_thread_store_operation(operation) + } ThreadStoreError::ThreadNotFound { thread_id } => { invalid_request(format!("no rollout found for thread id {thread_id}")) } @@ -3538,6 +3599,7 @@ fn thread_turns_list_history_load_error( ThreadStoreError::InvalidRequest { message } => { ThreadReadViewError::InvalidRequest(message) } + ThreadStoreError::Unsupported { operation } => ThreadReadViewError::Unsupported(operation), err => ThreadReadViewError::Internal(format!( "failed to load thread history for thread {thread_id}: {err}" )), @@ -3564,6 +3626,7 @@ fn thread_read_history_load_error( ThreadStoreError::InvalidRequest { message } => { ThreadReadViewError::InvalidRequest(message) } + ThreadStoreError::Unsupported { operation } => ThreadReadViewError::Unsupported(operation), err => ThreadReadViewError::Internal(format!( "failed to load thread history for thread {thread_id}: {err}" )), @@ -3579,6 +3642,9 @@ fn conversation_summary_thread_id_read_error( ThreadStoreError::InvalidRequest { message } if message == no_rollout_message => { conversation_summary_not_found_error(conversation_id) } + ThreadStoreError::Unsupported { operation } => { + unsupported_thread_store_operation(operation) + } ThreadStoreError::ThreadNotFound { thread_id } if thread_id == conversation_id => { conversation_summary_not_found_error(conversation_id) } @@ -3601,6 +3667,9 @@ fn conversation_summary_rollout_path_read_error( ) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), + ThreadStoreError::Unsupported { operation } => { + unsupported_thread_store_operation(operation) + } err => internal_error(format!( "failed to load conversation summary from {}: {}", path.display(), @@ -3615,6 +3684,9 @@ fn thread_store_write_error(operation: &str, err: ThreadStoreError) -> JSONRPCEr invalid_request(format!("thread not found: {thread_id}")) } ThreadStoreError::InvalidRequest { message } => invalid_request(message), + ThreadStoreError::Unsupported { operation } => { + unsupported_thread_store_operation(operation) + } err => internal_error(format!("failed to {operation}: {err}")), } } @@ -3622,6 +3694,9 @@ fn thread_store_write_error(operation: &str, err: ThreadStoreError) -> JSONRPCEr fn thread_store_archive_error(operation: &str, err: ThreadStoreError) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), + ThreadStoreError::Unsupported { + operation: unsupported_operation, + } => unsupported_thread_store_operation(unsupported_operation), err => internal_error(format!("failed to {operation} thread: {err}")), } } diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index c2a49d8fa6..81a5b2b401 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -89,6 +89,7 @@ use codex_app_server_protocol::ThreadRollbackParams; use codex_app_server_protocol::ThreadSetNameParams; use codex_app_server_protocol::ThreadShellCommandParams; use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadTurnsItemsListParams; use codex_app_server_protocol::ThreadTurnsListParams; use codex_app_server_protocol::ThreadUnarchiveParams; use codex_app_server_protocol::ThreadUnsubscribeParams; @@ -522,6 +523,15 @@ impl McpProcess { self.send_request("thread/turns/list", params).await } + /// Send a `thread/turns/items/list` JSON-RPC request. + pub async fn send_thread_turns_items_list_request( + &mut self, + params: ThreadTurnsItemsListParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/turns/items/list", params).await + } + /// Send a `model/list` JSON-RPC request. pub async fn send_list_models_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index 0e9a7a9a05..52420c0c80 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -31,6 +31,7 @@ use codex_app_server_protocol::ThreadSetNameResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::ThreadTurnsItemsListParams; use codex_app_server_protocol::ThreadTurnsListParams; use codex_app_server_protocol::ThreadTurnsListResponse; use codex_app_server_protocol::TurnItemsView; @@ -46,6 +47,7 @@ use codex_core::config::ConfigBuilder; use codex_exec_server::EnvironmentManager; use codex_feedback::CodexFeedback; use codex_protocol::models::BaseInstructions; +use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource as ProtocolSessionSource; @@ -223,6 +225,7 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { cursor: None, limit: Some(2), sort_direction: Some(SortDirection::Desc), + items_view: None, }) .await?; let read_resp: JSONRPCResponse = timeout( @@ -238,7 +241,7 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { assert_eq!(turn_user_texts(&data), vec!["third", "second"]); assert!( data.iter() - .all(|turn| turn.items_view == TurnItemsView::Full) + .all(|turn| turn.items_view == TurnItemsView::Summary) ); let next_cursor = next_cursor.expect("expected nextCursor for older turns"); let backwards_cursor = backwards_cursor.expect("expected backwardsCursor for newest turn"); @@ -249,6 +252,7 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { cursor: Some(next_cursor), limit: Some(10), sort_direction: Some(SortDirection::Desc), + items_view: None, }) .await?; let read_resp: JSONRPCResponse = timeout( @@ -267,6 +271,7 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { cursor: Some(backwards_cursor), limit: Some(10), sort_direction: Some(SortDirection::Asc), + items_view: None, }) .await?; let read_resp: JSONRPCResponse = timeout( @@ -280,6 +285,74 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_turns_list_supports_requested_items_view() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let filename_ts = "2025-01-05T12-00-00"; + let conversation_id = create_fake_rollout_with_text_elements( + codex_home.path(), + filename_ts, + "2025-01-05T12:00:00Z", + "first", + vec![], + Some("mock_provider"), + /*git_info*/ None, + )?; + let rollout_path = rollout_path(codex_home.path(), filename_ts, &conversation_id); + append_agent_message(rollout_path.as_path(), "2025-01-05T12:01:00Z", "draft")?; + append_agent_message(rollout_path.as_path(), "2025-01-05T12:02:00Z", "final")?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let full = read_single_turn_items_view( + &mut mcp, + conversation_id.as_str(), + Some(TurnItemsView::Full), + ) + .await?; + assert_eq!(full.items_view, TurnItemsView::Full); + assert_eq!( + turn_agent_texts(std::slice::from_ref(&full)), + vec!["draft", "final"] + ); + + let summary = read_single_turn_items_view( + &mut mcp, + conversation_id.as_str(), + Some(TurnItemsView::Summary), + ) + .await?; + assert_eq!(summary.items_view, TurnItemsView::Summary); + assert_eq!( + turn_user_texts(std::slice::from_ref(&summary)), + vec!["first"] + ); + assert_eq!( + turn_agent_texts(std::slice::from_ref(&summary)), + vec!["final"] + ); + + let not_loaded = read_single_turn_items_view( + &mut mcp, + conversation_id.as_str(), + Some(TurnItemsView::NotLoaded), + ) + .await?; + assert_eq!(not_loaded.items_view, TurnItemsView::NotLoaded); + assert!(not_loaded.items.is_empty()); + assert_eq!(not_loaded.id, full.id); + assert_eq!(not_loaded.status, full.status); + assert_eq!(not_loaded.started_at, full.started_at); + assert_eq!(not_loaded.completed_at, full.completed_at); + assert_eq!(not_loaded.duration_ms, full.duration_ms); + + Ok(()) +} + #[tokio::test] async fn thread_turns_list_reads_store_history_without_rollout_path() -> Result<()> { let codex_home = TempDir::new()?; @@ -334,6 +407,7 @@ async fn thread_turns_list_reads_store_history_without_rollout_path() -> Result< cursor: None, limit: Some(10), sort_direction: Some(SortDirection::Asc), + items_view: None, }, }) .await? @@ -583,6 +657,7 @@ async fn thread_turns_list_rejects_cursor_when_anchor_turn_is_rolled_back() -> R cursor: None, limit: Some(2), sort_direction: Some(SortDirection::Desc), + items_view: None, }) .await?; let read_resp: JSONRPCResponse = timeout( @@ -607,6 +682,7 @@ async fn thread_turns_list_rejects_cursor_when_anchor_turn_is_rolled_back() -> R cursor: Some(backwards_cursor), limit: Some(10), sort_direction: Some(SortDirection::Asc), + items_view: None, }) .await?; let read_err: JSONRPCError = timeout( @@ -963,6 +1039,7 @@ async fn thread_turns_list_rejects_unmaterialized_loaded_thread() -> Result<()> cursor: None, limit: None, sort_direction: None, + items_view: None, }) .await?; let read_err: JSONRPCError = timeout( @@ -983,6 +1060,39 @@ async fn thread_turns_list_rejects_unmaterialized_loaded_thread() -> Result<()> Ok(()) } +#[tokio::test] +async fn thread_turns_items_list_returns_unsupported() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let read_id = mcp + .send_thread_turns_items_list_request(ThreadTurnsItemsListParams { + thread_id: "thr_123".to_string(), + turn_id: "turn_456".to_string(), + cursor: None, + limit: None, + sort_direction: None, + }) + .await?; + let read_err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(read_id)), + ) + .await??; + + assert_eq!(read_err.error.code, -32601); + assert_eq!( + read_err.error.message, + "thread/turns/items/list is not supported yet" + ); + + Ok(()) +} + #[tokio::test] async fn thread_read_reports_system_error_idle_flag_after_failed_turn() -> Result<()> { let server = responses::start_mock_server().await; @@ -1068,6 +1178,24 @@ fn append_user_message(path: &Path, timestamp: &str, text: &str) -> std::io::Res ) } +fn append_agent_message(path: &Path, timestamp: &str, text: &str) -> anyhow::Result<()> { + let mut file = std::fs::OpenOptions::new().append(true).open(path)?; + writeln!( + file, + "{}", + json!({ + "timestamp": timestamp, + "type": "event_msg", + "payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent { + message: text.to_string(), + phase: None, + memory_citation: None, + }))?, + }) + )?; + Ok(()) +} + fn append_thread_rollback(path: &Path, timestamp: &str, num_turns: u32) -> std::io::Result<()> { let mut file = std::fs::OpenOptions::new().append(true).open(path)?; writeln!( @@ -1084,6 +1212,31 @@ fn append_thread_rollback(path: &Path, timestamp: &str, num_turns: u32) -> std:: ) } +async fn read_single_turn_items_view( + mcp: &mut McpProcess, + thread_id: &str, + items_view: Option, +) -> anyhow::Result { + let read_id = mcp + .send_thread_turns_list_request(ThreadTurnsListParams { + thread_id: thread_id.to_string(), + cursor: None, + limit: Some(10), + sort_direction: Some(SortDirection::Asc), + items_view, + }) + .await?; + let read_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(read_id)), + ) + .await??; + let ThreadTurnsListResponse { mut data, .. } = + to_response::(read_resp)?; + assert_eq!(data.len(), 1); + Ok(data.remove(0)) +} + fn turn_user_texts(turns: &[codex_app_server_protocol::Turn]) -> Vec<&str> { turns .iter() @@ -1100,6 +1253,17 @@ fn turn_user_texts(turns: &[codex_app_server_protocol::Turn]) -> Vec<&str> { .collect() } +fn turn_agent_texts(turns: &[codex_app_server_protocol::Turn]) -> Vec<&str> { + turns + .iter() + .flat_map(|turn| &turn.items) + .filter_map(|item| match item { + ThreadItem::AgentMessage { text, .. } => Some(text.as_str()), + _ => None, + }) + .collect() +} + struct InMemoryThreadStoreId { store_id: String, } diff --git a/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs b/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs index eebc4077df..b7cfba2f95 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs @@ -151,6 +151,7 @@ async fn thread_shell_command_history_responses_exclude_persisted_command_execut cursor: None, limit: None, sort_direction: Some(SortDirection::Asc), + items_view: None, }) .await?; let turns_list_resp: JSONRPCResponse = timeout( diff --git a/codex-rs/thread-store/src/error.rs b/codex-rs/thread-store/src/error.rs index c5cee9a8b8..2244c93185 100644 --- a/codex-rs/thread-store/src/error.rs +++ b/codex-rs/thread-store/src/error.rs @@ -27,6 +27,13 @@ pub enum ThreadStoreError { message: String, }, + /// The store implementation does not support this operation yet. + #[error("thread-store unsupported operation: {operation}")] + Unsupported { + /// Stable operation name for callers that need to map unsupported operations. + operation: &'static str, + }, + /// Catch-all for implementation failures that do not fit a more specific category. #[error("thread-store internal error: {message}")] Internal { diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index 77739260b8..1483906267 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -35,6 +35,57 @@ fn stores() -> &'static Mutex>> { IN_MEMORY_THREAD_STORES.get_or_init(|| Mutex::new(HashMap::new())) } +#[cfg(test)] +mod tests { + use super::*; + use crate::ListItemsParams; + use crate::ListTurnsParams; + use crate::SortDirection; + use crate::StoredTurnItemsView; + + #[tokio::test] + async fn default_turn_pagination_methods_return_unsupported() { + let store = InMemoryThreadStore::default(); + let thread_id = ThreadId::default(); + + let turns_err = store + .list_turns(ListTurnsParams { + thread_id, + include_archived: true, + cursor: None, + page_size: 10, + sort_direction: SortDirection::Asc, + items_view: StoredTurnItemsView::Summary, + }) + .await + .expect_err("default list_turns should be unsupported"); + assert!(matches!( + turns_err, + ThreadStoreError::Unsupported { + operation: "list_turns" + } + )); + + let items_err = store + .list_items(ListItemsParams { + thread_id, + turn_id: "turn_1".to_string(), + include_archived: true, + cursor: None, + page_size: 10, + sort_direction: SortDirection::Asc, + }) + .await + .expect_err("default list_items should be unsupported"); + assert!(matches!( + items_err, + ThreadStoreError::Unsupported { + operation: "list_items" + } + )); + } +} + fn stores_guard() -> MutexGuard<'static, HashMap>> { match stores().lock() { Ok(guard) => guard, diff --git a/codex-rs/thread-store/src/lib.rs b/codex-rs/thread-store/src/lib.rs index 52b7f5ea1f..a5daeff44a 100644 --- a/codex-rs/thread-store/src/lib.rs +++ b/codex-rs/thread-store/src/lib.rs @@ -26,7 +26,10 @@ pub use types::AppendThreadItemsParams; pub use types::ArchiveThreadParams; pub use types::CreateThreadParams; pub use types::GitInfoPatch; +pub use types::ItemPage; +pub use types::ListItemsParams; pub use types::ListThreadsParams; +pub use types::ListTurnsParams; pub use types::LoadThreadHistoryParams; pub use types::OptionalStringPatch; pub use types::ReadThreadByRolloutPathParams; @@ -35,9 +38,14 @@ pub use types::ResumeThreadParams; pub use types::SortDirection; pub use types::StoredThread; pub use types::StoredThreadHistory; +pub use types::StoredTurn; +pub use types::StoredTurnError; +pub use types::StoredTurnItemsView; +pub use types::StoredTurnStatus; pub use types::ThreadEventPersistenceMode; pub use types::ThreadMetadataPatch; pub use types::ThreadPage; pub use types::ThreadPersistenceMetadata; pub use types::ThreadSortKey; +pub use types::TurnPage; pub use types::UpdateThreadMetadataParams; diff --git a/codex-rs/thread-store/src/store.rs b/codex-rs/thread-store/src/store.rs index 238e56aa92..bd5e3e7d3d 100644 --- a/codex-rs/thread-store/src/store.rs +++ b/codex-rs/thread-store/src/store.rs @@ -5,7 +5,10 @@ use std::any::Any; use crate::AppendThreadItemsParams; use crate::ArchiveThreadParams; use crate::CreateThreadParams; +use crate::ItemPage; +use crate::ListItemsParams; use crate::ListThreadsParams; +use crate::ListTurnsParams; use crate::LoadThreadHistoryParams; use crate::ReadThreadByRolloutPathParams; use crate::ReadThreadParams; @@ -13,7 +16,9 @@ use crate::ResumeThreadParams; use crate::StoredThread; use crate::StoredThreadHistory; use crate::ThreadPage; +use crate::ThreadStoreError; use crate::ThreadStoreResult; +use crate::TurnPage; use crate::UpdateThreadMetadataParams; /// Storage-neutral thread persistence boundary. @@ -67,6 +72,20 @@ pub trait ThreadStore: Any + Send + Sync { /// Lists stored threads matching the supplied filters. async fn list_threads(&self, params: ListThreadsParams) -> ThreadStoreResult; + /// Lists turns within a stored thread. + async fn list_turns(&self, _params: ListTurnsParams) -> ThreadStoreResult { + Err(ThreadStoreError::Unsupported { + operation: "list_turns", + }) + } + + /// Lists persisted items within a stored turn. + async fn list_items(&self, _params: ListItemsParams) -> ThreadStoreResult { + Err(ThreadStoreError::Unsupported { + operation: "list_items", + }) + } + /// Applies a mutable metadata patch and returns the updated thread. async fn update_thread_metadata( &self, diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index 06aa2998c9..1fed7bc829 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -183,6 +183,117 @@ pub struct ThreadPage { pub next_cursor: Option, } +/// Requested amount of item detail for stored turns. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +pub enum StoredTurnItemsView { + /// Return turn metadata only. + NotLoaded, + /// Return display summary items for each turn. + #[default] + Summary, + /// Return every persisted item available for each turn. + Full, +} + +/// Store-owned status for a persisted turn. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum StoredTurnStatus { + /// The turn completed normally. + Completed, + /// The turn was interrupted before normal completion. + Interrupted, + /// The turn failed. + Failed, + /// The turn is still in progress. + InProgress, +} + +/// Store-owned error details for a failed persisted turn. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct StoredTurnError { + /// User-visible error message. + pub message: String, + /// Optional additional detail for clients that expose expanded error context. + pub additional_details: Option, +} + +/// Parameters for listing turns within a stored thread. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ListTurnsParams { + /// Thread id to read. + pub thread_id: ThreadId, + /// Whether archived threads are eligible. + pub include_archived: bool, + /// Opaque cursor returned by a previous list call. + pub cursor: Option, + /// Maximum number of turns to return. + pub page_size: usize, + /// Sort direction requested by the caller. + pub sort_direction: SortDirection, + /// Requested amount of item detail for each returned turn. + pub items_view: StoredTurnItemsView, +} + +/// Store-owned turn representation used by turn pagination APIs. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct StoredTurn { + /// Turn id. + pub turn_id: String, + /// Persisted rollout items associated with this turn, according to `items_view`. + pub items: Vec, + /// Amount of item detail included in `items`. + pub items_view: StoredTurnItemsView, + /// Store-owned status for API layer projection. + pub status: StoredTurnStatus, + /// Error message when the turn failed. + pub error: Option, + /// Unix timestamp (seconds) when the turn started. + pub started_at: Option, + /// Unix timestamp (seconds) when the turn completed. + pub completed_at: Option, + /// Duration between turn start and completion in milliseconds, if known. + pub duration_ms: Option, +} + +/// A page of stored turns. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TurnPage { + /// Turns returned for this page. + pub turns: Vec, + /// Opaque cursor to continue listing. + pub next_cursor: Option, + /// Opaque cursor for fetching in the opposite direction. + pub backwards_cursor: Option, +} + +/// Parameters for listing persisted items within a single turn. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ListItemsParams { + /// Thread id to read. + pub thread_id: ThreadId, + /// Turn id to hydrate. + pub turn_id: String, + /// Whether archived threads are eligible. + pub include_archived: bool, + /// Opaque cursor returned by a previous list call. + pub cursor: Option, + /// Maximum number of items to return. + pub page_size: usize, + /// Sort direction requested by the caller. + pub sort_direction: SortDirection, +} + +/// A page of persisted rollout items within a turn. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ItemPage { + /// Items returned for this page. + pub items: Vec, + /// Opaque cursor to continue listing. + pub next_cursor: Option, + /// Opaque cursor for fetching in the opposite direction. + pub backwards_cursor: Option, +} + /// Store-owned thread metadata used by list/read/resume responses. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct StoredThread {