diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index cac1c33d9b..c60acbdf30 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -4096,6 +4096,31 @@ ], "type": "object" }, + "TurnItemsView": { + "oneOf": [ + { + "description": "`items` was not loaded for this turn. The field is intentionally empty.", + "enum": [ + "notLoaded" + ], + "type": "string" + }, + { + "description": "`items` contains only a display summary for this turn.", + "enum": [ + "summary" + ], + "type": "string" + }, + { + "description": "`items` contains every ThreadItem available from persisted app-server history for this turn.", + "enum": [ + "full" + ], + "type": "string" + } + ] + }, "TurnStartParams": { "properties": { "approvalPolicy": { diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index e79c99a9c9..e72e3c14c7 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -581,6 +581,13 @@ client_request_definitions! { serialization: None, response: v2::ThreadTurnsListResponse, }, + #[experimental("thread/turns/items/list")] + ThreadTurnsItemsList => "thread/turns/items/list" { + params: v2::ThreadTurnsItemsListParams, + // Explicitly concurrent: this primarily reads append-only rollout storage. + serialization: None, + response: v2::ThreadTurnsItemsListResponse, + }, /// Append raw Responses API items to the thread history without starting a user turn. ThreadInjectItems => "thread/inject_items" { params: v2::ThreadInjectItemsParams, @@ -1843,10 +1850,23 @@ mod tests { cursor: None, limit: None, sort_direction: None, + items_view: None, }, }; assert_eq!(thread_turns_list.serialization_scope(), None); + let thread_turns_items_list = ClientRequest::ThreadTurnsItemsList { + request_id: request_id(), + params: v2::ThreadTurnsItemsListParams { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + cursor: None, + limit: None, + sort_direction: None, + }, + }; + assert_eq!(thread_turns_items_list.serialization_scope(), None); + let mcp_resource_read = ClientRequest::McpResourceRead { request_id: request_id(), params: v2::McpResourceReadParams { diff --git a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs index 4e923c5804..d6e49279ce 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs @@ -95,6 +95,59 @@ fn turn_defaults_legacy_missing_items_view_to_full() { assert_eq!(turn.items_view, TurnItemsView::Full); } +#[test] +fn thread_turns_list_params_accepts_items_view() { + let params = serde_json::from_value::(json!({ + "threadId": "thr_123", + "cursor": null, + "limit": 25, + "sortDirection": "desc", + "itemsView": "notLoaded", + })) + .expect("thread turns list params should deserialize"); + + assert_eq!(params.thread_id, "thr_123"); + assert_eq!(params.items_view, Some(TurnItemsView::NotLoaded)); +} + +#[test] +fn thread_turns_items_list_round_trips() { + let params = ThreadTurnsItemsListParams { + thread_id: "thr_123".to_string(), + turn_id: "turn_456".to_string(), + cursor: Some("cursor_1".to_string()), + limit: Some(50), + sort_direction: Some(SortDirection::Asc), + }; + + assert_eq!( + serde_json::to_value(¶ms).expect("serialize params"), + json!({ + "threadId": "thr_123", + "turnId": "turn_456", + "cursor": "cursor_1", + "limit": 50, + "sortDirection": "asc", + }) + ); + let response = ThreadTurnsItemsListResponse { + data: vec![ThreadItem::ContextCompaction { + id: "item_1".to_string(), + }], + next_cursor: None, + backwards_cursor: Some("cursor_0".to_string()), + }; + + assert_eq!( + serde_json::to_value(&response).expect("serialize response"), + json!({ + "data": [{"type": "contextCompaction", "id": "item_1"}], + "nextCursor": null, + "backwardsCursor": "cursor_0", + }) + ); +} + #[test] fn thread_list_params_accepts_single_cwd() { let params = serde_json::from_value::(json!({ diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index 5f293ff8f6..458722b3a2 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -6,9 +6,11 @@ use super::PermissionProfileSelectionParams; use super::SandboxMode; use super::SandboxPolicy; use super::Thread; +use super::ThreadItem; use super::ThreadSource; use super::Turn; use super::TurnEnvironmentParams; +use super::TurnItemsView; use super::shared::v2_enum_from_core; use codex_experimental_api_macros::ExperimentalApi; use codex_protocol::config_types::Personality; @@ -1005,6 +1007,9 @@ pub struct ThreadTurnsListParams { /// Optional turn pagination direction; defaults to descending. #[ts(optional = nullable)] pub sort_direction: Option, + /// How much item detail to include for each returned turn; defaults to summary. + #[ts(optional = nullable)] + pub items_view: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -1022,6 +1027,36 @@ pub struct ThreadTurnsListResponse { pub backwards_cursor: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadTurnsItemsListParams { + pub thread_id: String, + pub turn_id: String, + /// Opaque cursor to pass to the next call to continue after the last item. + #[ts(optional = nullable)] + pub cursor: Option, + /// Optional item page size. + #[ts(optional = nullable)] + pub limit: Option, + /// Optional item pagination direction; defaults to ascending. + #[ts(optional = nullable)] + pub sort_direction: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadTurnsItemsListResponse { + pub data: Vec, + /// Opaque cursor to pass to the next call to continue after the last item. + /// if None, there are no more items to return. + pub next_cursor: Option, + /// Opaque cursor to pass as `cursor` when reversing `sortDirection`. + /// This is only populated when the page contains at least one item. + pub backwards_cursor: Option, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index babfac99ba..5dd75ee6f4 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -149,7 +149,8 @@ Example with notification opt-out: - `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. - `thread/loaded/list` — list the thread ids currently loaded in memory. - `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. -- `thread/turns/list` — experimental; page through a stored thread’s turn history without resuming it; supports cursor-based pagination with `sortDirection`, `nextCursor`, and `backwardsCursor`. +- `thread/turns/list` — experimental; page through a stored thread’s turn history without resuming it; supports cursor-based pagination with `sortDirection`, `itemsView`, `nextCursor`, and `backwardsCursor`. +- `thread/turns/items/list` — experimental; reserved for paging full items for one turn. The API shape is present, but app-server currently returns an unsupported-method JSON-RPC error. - `thread/metadata/update` — patch stored thread metadata in sqlite; currently supports updating persisted `gitInfo` fields and returns the refreshed `thread`. - `thread/memoryMode/set` — experimental; set a thread’s persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success. - `memory/reset` — experimental; clear the current `CODEX_HOME/memories` directory and reset persisted memory stage data in sqlite while preserving existing thread memory modes; returns `{}` on success. @@ -424,13 +425,14 @@ Use `thread/read` to fetch a stored thread by id without resuming it. Pass `incl Use `thread/turns/list` with `capabilities.experimentalApi = true` to page a stored thread’s turn history without resuming it. By default, results are sorted descending so clients can start at the present and fetch older turns with `nextCursor`. The response also includes `backwardsCursor`; pass it as `cursor` on a later request with `sortDirection: "asc"` to fetch turns newer than the first item from the earlier page. -Every returned `Turn` includes `itemsView`, which tells clients whether the `items` array was omitted intentionally (`notLoaded`), contains only summary items (`summary`), or contains every item available from persisted app-server history (`full`). Current `thread/turns/list` responses return `full` turns. +Every returned `Turn` includes `itemsView`, which tells clients whether the `items` array was omitted intentionally (`notLoaded`), contains only summary items (`summary`), or contains every item available from persisted app-server history (`full`). Pass `itemsView` to choose the returned detail level; omitted `itemsView` defaults to `"summary"`. ```json { "method": "thread/turns/list", "id": 24, "params": { "threadId": "thr_123", "limit": 50, - "sortDirection": "desc" + "sortDirection": "desc", + "itemsView": "summary" } } { "id": 24, "result": { "data": [ ... ], @@ -439,6 +441,19 @@ Every returned `Turn` includes `itemsView`, which tells clients whether the `ite } } ``` +`thread/turns/items/list` is the planned hydration API for fetching full items for one turn: + +```json +{ "method": "thread/turns/items/list", "id": 25, "params": { + "threadId": "thr_123", + "turnId": "turn_456", + "limit": 100, + "sortDirection": "asc" +} } +``` + +This method currently returns JSON-RPC `-32601` with message `thread/turns/items/list is not supported yet`. + ### Example: Update stored thread metadata Use `thread/metadata/update` to patch sqlite-backed metadata for a thread without resuming it. Today this supports persisted `gitInfo`; omitted fields are left unchanged, while explicit `null` clears a stored value. diff --git a/codex-rs/app-server/src/error_code.rs b/codex-rs/app-server/src/error_code.rs index 0054d2988f..48e401f7bc 100644 --- a/codex-rs/app-server/src/error_code.rs +++ b/codex-rs/app-server/src/error_code.rs @@ -1,6 +1,7 @@ use codex_app_server_protocol::JSONRPCErrorError; pub(crate) const INVALID_REQUEST_ERROR_CODE: i64 = -32600; +pub(crate) const METHOD_NOT_FOUND_ERROR_CODE: i64 = -32601; pub const INVALID_PARAMS_ERROR_CODE: i64 = -32602; pub(crate) const INTERNAL_ERROR_CODE: i64 = -32603; pub(crate) const OVERLOADED_ERROR_CODE: i64 = -32001; @@ -10,6 +11,10 @@ pub(crate) fn invalid_request(message: impl Into) -> JSONRPCErrorError { error(INVALID_REQUEST_ERROR_CODE, message) } +pub(crate) fn method_not_found(message: impl Into) -> JSONRPCErrorError { + error(METHOD_NOT_FOUND_ERROR_CODE, message) +} + pub(crate) fn invalid_params(message: impl Into) -> JSONRPCErrorError { error(INVALID_PARAMS_ERROR_CODE, message) } diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 2ca0a87d84..7006c40343 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -1008,6 +1008,9 @@ impl MessageProcessor { ClientRequest::ThreadTurnsList { params, .. } => { self.thread_processor.thread_turns_list(params).await } + ClientRequest::ThreadTurnsItemsList { params, .. } => { + self.thread_processor.thread_turns_items_list(params).await + } ClientRequest::ThreadShellCommand { params, .. } => { self.thread_processor .thread_shell_command(&request_id, params) diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 795b5d4c32..19201b8ae1 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -215,6 +215,7 @@ use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStartedNotification; use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::ThreadTurnsItemsListParams; use codex_app_server_protocol::ThreadTurnsListParams; use codex_app_server_protocol::ThreadTurnsListResponse; use codex_app_server_protocol::ThreadUnarchiveParams; diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index d83f5e631f..615e37f2c9 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -1,4 +1,5 @@ use super::*; +use crate::error_code::method_not_found; const THREAD_LIST_DEFAULT_LIMIT: usize = 25; const THREAD_LIST_MAX_LIMIT: usize = 100; @@ -591,6 +592,15 @@ impl ThreadRequestProcessor { .map(|response| Some(response.into())) } + pub(crate) async fn thread_turns_items_list( + &self, + _params: ThreadTurnsItemsListParams, + ) -> Result, JSONRPCErrorError> { + Err(method_not_found( + "thread/turns/items/list is not supported yet", + )) + } + pub(crate) async fn thread_shell_command( &self, request_id: &ConnectionRequestId, @@ -2072,7 +2082,9 @@ impl ThreadRequestProcessor { cursor, limit, sort_direction, + items_view, } = params; + let items_view = items_view.unwrap_or(TurnItemsView::Summary); let thread_uuid = ThreadId::from_string(&thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; @@ -2101,7 +2113,7 @@ impl ThreadRequestProcessor { } else { None }; - let turns = reconstruct_thread_turns_for_turns_list( + let mut turns = reconstruct_thread_turns_for_turns_list( &items, self.thread_watch_manager .loaded_status_for_thread(&thread_uuid.to_string()) @@ -2109,6 +2121,41 @@ impl ThreadRequestProcessor { has_live_running_thread, active_turn, ); + for turn in &mut turns { + match items_view { + TurnItemsView::NotLoaded => { + turn.items.clear(); + turn.items_view = TurnItemsView::NotLoaded; + } + TurnItemsView::Summary => { + let first_user_message = turn + .items + .iter() + .find(|item| matches!(item, ThreadItem::UserMessage { .. })) + .cloned(); + let final_agent_message = turn + .items + .iter() + .rev() + .find(|item| matches!(item, ThreadItem::AgentMessage { .. })) + .cloned(); + turn.items = match (first_user_message, final_agent_message) { + (Some(user_message), Some(agent_message)) + if user_message.id() != agent_message.id() => + { + vec![user_message, agent_message] + } + (Some(user_message), _) => vec![user_message], + (None, Some(agent_message)) => vec![agent_message], + (None, None) => Vec::new(), + }; + turn.items_view = TurnItemsView::Summary; + } + TurnItemsView::Full => { + turn.items_view = TurnItemsView::Full; + } + } + } let page = paginate_thread_turns( turns, cursor.as_deref(), @@ -3496,19 +3543,30 @@ fn normalize_thread_turns_status( enum ThreadReadViewError { InvalidRequest(String), + Unsupported(&'static str), Internal(String), } fn thread_read_view_error(err: ThreadReadViewError) -> JSONRPCErrorError { match err { ThreadReadViewError::InvalidRequest(message) => invalid_request(message), + ThreadReadViewError::Unsupported(operation) => { + unsupported_thread_store_operation(operation) + } ThreadReadViewError::Internal(message) => internal_error(message), } } +fn unsupported_thread_store_operation(operation: &'static str) -> JSONRPCErrorError { + method_not_found(format!("{operation} is not supported yet")) +} + fn thread_store_list_error(err: ThreadStoreError) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), + ThreadStoreError::Unsupported { operation } => { + unsupported_thread_store_operation(operation) + } err => internal_error(format!("failed to list threads: {err}")), } } @@ -3516,6 +3574,9 @@ fn thread_store_list_error(err: ThreadStoreError) -> JSONRPCErrorError { fn thread_store_resume_read_error(err: ThreadStoreError) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), + ThreadStoreError::Unsupported { operation } => { + unsupported_thread_store_operation(operation) + } ThreadStoreError::ThreadNotFound { thread_id } => { invalid_request(format!("no rollout found for thread id {thread_id}")) } @@ -3538,6 +3599,7 @@ fn thread_turns_list_history_load_error( ThreadStoreError::InvalidRequest { message } => { ThreadReadViewError::InvalidRequest(message) } + ThreadStoreError::Unsupported { operation } => ThreadReadViewError::Unsupported(operation), err => ThreadReadViewError::Internal(format!( "failed to load thread history for thread {thread_id}: {err}" )), @@ -3564,6 +3626,7 @@ fn thread_read_history_load_error( ThreadStoreError::InvalidRequest { message } => { ThreadReadViewError::InvalidRequest(message) } + ThreadStoreError::Unsupported { operation } => ThreadReadViewError::Unsupported(operation), err => ThreadReadViewError::Internal(format!( "failed to load thread history for thread {thread_id}: {err}" )), @@ -3579,6 +3642,9 @@ fn conversation_summary_thread_id_read_error( ThreadStoreError::InvalidRequest { message } if message == no_rollout_message => { conversation_summary_not_found_error(conversation_id) } + ThreadStoreError::Unsupported { operation } => { + unsupported_thread_store_operation(operation) + } ThreadStoreError::ThreadNotFound { thread_id } if thread_id == conversation_id => { conversation_summary_not_found_error(conversation_id) } @@ -3601,6 +3667,9 @@ fn conversation_summary_rollout_path_read_error( ) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), + ThreadStoreError::Unsupported { operation } => { + unsupported_thread_store_operation(operation) + } err => internal_error(format!( "failed to load conversation summary from {}: {}", path.display(), @@ -3615,6 +3684,9 @@ fn thread_store_write_error(operation: &str, err: ThreadStoreError) -> JSONRPCEr invalid_request(format!("thread not found: {thread_id}")) } ThreadStoreError::InvalidRequest { message } => invalid_request(message), + ThreadStoreError::Unsupported { operation } => { + unsupported_thread_store_operation(operation) + } err => internal_error(format!("failed to {operation}: {err}")), } } @@ -3622,6 +3694,9 @@ fn thread_store_write_error(operation: &str, err: ThreadStoreError) -> JSONRPCEr fn thread_store_archive_error(operation: &str, err: ThreadStoreError) -> JSONRPCErrorError { match err { ThreadStoreError::InvalidRequest { message } => invalid_request(message), + ThreadStoreError::Unsupported { + operation: unsupported_operation, + } => unsupported_thread_store_operation(unsupported_operation), err => internal_error(format!("failed to {operation} thread: {err}")), } } diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index c2a49d8fa6..81a5b2b401 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -89,6 +89,7 @@ use codex_app_server_protocol::ThreadRollbackParams; use codex_app_server_protocol::ThreadSetNameParams; use codex_app_server_protocol::ThreadShellCommandParams; use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadTurnsItemsListParams; use codex_app_server_protocol::ThreadTurnsListParams; use codex_app_server_protocol::ThreadUnarchiveParams; use codex_app_server_protocol::ThreadUnsubscribeParams; @@ -522,6 +523,15 @@ impl McpProcess { self.send_request("thread/turns/list", params).await } + /// Send a `thread/turns/items/list` JSON-RPC request. + pub async fn send_thread_turns_items_list_request( + &mut self, + params: ThreadTurnsItemsListParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/turns/items/list", params).await + } + /// Send a `model/list` JSON-RPC request. pub async fn send_list_models_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index 0e9a7a9a05..52420c0c80 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -31,6 +31,7 @@ use codex_app_server_protocol::ThreadSetNameResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::ThreadTurnsItemsListParams; use codex_app_server_protocol::ThreadTurnsListParams; use codex_app_server_protocol::ThreadTurnsListResponse; use codex_app_server_protocol::TurnItemsView; @@ -46,6 +47,7 @@ use codex_core::config::ConfigBuilder; use codex_exec_server::EnvironmentManager; use codex_feedback::CodexFeedback; use codex_protocol::models::BaseInstructions; +use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource as ProtocolSessionSource; @@ -223,6 +225,7 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { cursor: None, limit: Some(2), sort_direction: Some(SortDirection::Desc), + items_view: None, }) .await?; let read_resp: JSONRPCResponse = timeout( @@ -238,7 +241,7 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { assert_eq!(turn_user_texts(&data), vec!["third", "second"]); assert!( data.iter() - .all(|turn| turn.items_view == TurnItemsView::Full) + .all(|turn| turn.items_view == TurnItemsView::Summary) ); let next_cursor = next_cursor.expect("expected nextCursor for older turns"); let backwards_cursor = backwards_cursor.expect("expected backwardsCursor for newest turn"); @@ -249,6 +252,7 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { cursor: Some(next_cursor), limit: Some(10), sort_direction: Some(SortDirection::Desc), + items_view: None, }) .await?; let read_resp: JSONRPCResponse = timeout( @@ -267,6 +271,7 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { cursor: Some(backwards_cursor), limit: Some(10), sort_direction: Some(SortDirection::Asc), + items_view: None, }) .await?; let read_resp: JSONRPCResponse = timeout( @@ -280,6 +285,74 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_turns_list_supports_requested_items_view() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let filename_ts = "2025-01-05T12-00-00"; + let conversation_id = create_fake_rollout_with_text_elements( + codex_home.path(), + filename_ts, + "2025-01-05T12:00:00Z", + "first", + vec![], + Some("mock_provider"), + /*git_info*/ None, + )?; + let rollout_path = rollout_path(codex_home.path(), filename_ts, &conversation_id); + append_agent_message(rollout_path.as_path(), "2025-01-05T12:01:00Z", "draft")?; + append_agent_message(rollout_path.as_path(), "2025-01-05T12:02:00Z", "final")?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let full = read_single_turn_items_view( + &mut mcp, + conversation_id.as_str(), + Some(TurnItemsView::Full), + ) + .await?; + assert_eq!(full.items_view, TurnItemsView::Full); + assert_eq!( + turn_agent_texts(std::slice::from_ref(&full)), + vec!["draft", "final"] + ); + + let summary = read_single_turn_items_view( + &mut mcp, + conversation_id.as_str(), + Some(TurnItemsView::Summary), + ) + .await?; + assert_eq!(summary.items_view, TurnItemsView::Summary); + assert_eq!( + turn_user_texts(std::slice::from_ref(&summary)), + vec!["first"] + ); + assert_eq!( + turn_agent_texts(std::slice::from_ref(&summary)), + vec!["final"] + ); + + let not_loaded = read_single_turn_items_view( + &mut mcp, + conversation_id.as_str(), + Some(TurnItemsView::NotLoaded), + ) + .await?; + assert_eq!(not_loaded.items_view, TurnItemsView::NotLoaded); + assert!(not_loaded.items.is_empty()); + assert_eq!(not_loaded.id, full.id); + assert_eq!(not_loaded.status, full.status); + assert_eq!(not_loaded.started_at, full.started_at); + assert_eq!(not_loaded.completed_at, full.completed_at); + assert_eq!(not_loaded.duration_ms, full.duration_ms); + + Ok(()) +} + #[tokio::test] async fn thread_turns_list_reads_store_history_without_rollout_path() -> Result<()> { let codex_home = TempDir::new()?; @@ -334,6 +407,7 @@ async fn thread_turns_list_reads_store_history_without_rollout_path() -> Result< cursor: None, limit: Some(10), sort_direction: Some(SortDirection::Asc), + items_view: None, }, }) .await? @@ -583,6 +657,7 @@ async fn thread_turns_list_rejects_cursor_when_anchor_turn_is_rolled_back() -> R cursor: None, limit: Some(2), sort_direction: Some(SortDirection::Desc), + items_view: None, }) .await?; let read_resp: JSONRPCResponse = timeout( @@ -607,6 +682,7 @@ async fn thread_turns_list_rejects_cursor_when_anchor_turn_is_rolled_back() -> R cursor: Some(backwards_cursor), limit: Some(10), sort_direction: Some(SortDirection::Asc), + items_view: None, }) .await?; let read_err: JSONRPCError = timeout( @@ -963,6 +1039,7 @@ async fn thread_turns_list_rejects_unmaterialized_loaded_thread() -> Result<()> cursor: None, limit: None, sort_direction: None, + items_view: None, }) .await?; let read_err: JSONRPCError = timeout( @@ -983,6 +1060,39 @@ async fn thread_turns_list_rejects_unmaterialized_loaded_thread() -> Result<()> Ok(()) } +#[tokio::test] +async fn thread_turns_items_list_returns_unsupported() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let read_id = mcp + .send_thread_turns_items_list_request(ThreadTurnsItemsListParams { + thread_id: "thr_123".to_string(), + turn_id: "turn_456".to_string(), + cursor: None, + limit: None, + sort_direction: None, + }) + .await?; + let read_err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(read_id)), + ) + .await??; + + assert_eq!(read_err.error.code, -32601); + assert_eq!( + read_err.error.message, + "thread/turns/items/list is not supported yet" + ); + + Ok(()) +} + #[tokio::test] async fn thread_read_reports_system_error_idle_flag_after_failed_turn() -> Result<()> { let server = responses::start_mock_server().await; @@ -1068,6 +1178,24 @@ fn append_user_message(path: &Path, timestamp: &str, text: &str) -> std::io::Res ) } +fn append_agent_message(path: &Path, timestamp: &str, text: &str) -> anyhow::Result<()> { + let mut file = std::fs::OpenOptions::new().append(true).open(path)?; + writeln!( + file, + "{}", + json!({ + "timestamp": timestamp, + "type": "event_msg", + "payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent { + message: text.to_string(), + phase: None, + memory_citation: None, + }))?, + }) + )?; + Ok(()) +} + fn append_thread_rollback(path: &Path, timestamp: &str, num_turns: u32) -> std::io::Result<()> { let mut file = std::fs::OpenOptions::new().append(true).open(path)?; writeln!( @@ -1084,6 +1212,31 @@ fn append_thread_rollback(path: &Path, timestamp: &str, num_turns: u32) -> std:: ) } +async fn read_single_turn_items_view( + mcp: &mut McpProcess, + thread_id: &str, + items_view: Option, +) -> anyhow::Result { + let read_id = mcp + .send_thread_turns_list_request(ThreadTurnsListParams { + thread_id: thread_id.to_string(), + cursor: None, + limit: Some(10), + sort_direction: Some(SortDirection::Asc), + items_view, + }) + .await?; + let read_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(read_id)), + ) + .await??; + let ThreadTurnsListResponse { mut data, .. } = + to_response::(read_resp)?; + assert_eq!(data.len(), 1); + Ok(data.remove(0)) +} + fn turn_user_texts(turns: &[codex_app_server_protocol::Turn]) -> Vec<&str> { turns .iter() @@ -1100,6 +1253,17 @@ fn turn_user_texts(turns: &[codex_app_server_protocol::Turn]) -> Vec<&str> { .collect() } +fn turn_agent_texts(turns: &[codex_app_server_protocol::Turn]) -> Vec<&str> { + turns + .iter() + .flat_map(|turn| &turn.items) + .filter_map(|item| match item { + ThreadItem::AgentMessage { text, .. } => Some(text.as_str()), + _ => None, + }) + .collect() +} + struct InMemoryThreadStoreId { store_id: String, } diff --git a/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs b/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs index eebc4077df..b7cfba2f95 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs @@ -151,6 +151,7 @@ async fn thread_shell_command_history_responses_exclude_persisted_command_execut cursor: None, limit: None, sort_direction: Some(SortDirection::Asc), + items_view: None, }) .await?; let turns_list_resp: JSONRPCResponse = timeout( diff --git a/codex-rs/thread-store/src/error.rs b/codex-rs/thread-store/src/error.rs index c5cee9a8b8..2244c93185 100644 --- a/codex-rs/thread-store/src/error.rs +++ b/codex-rs/thread-store/src/error.rs @@ -27,6 +27,13 @@ pub enum ThreadStoreError { message: String, }, + /// The store implementation does not support this operation yet. + #[error("thread-store unsupported operation: {operation}")] + Unsupported { + /// Stable operation name for callers that need to map unsupported operations. + operation: &'static str, + }, + /// Catch-all for implementation failures that do not fit a more specific category. #[error("thread-store internal error: {message}")] Internal { diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index 77739260b8..1483906267 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -35,6 +35,57 @@ fn stores() -> &'static Mutex>> { IN_MEMORY_THREAD_STORES.get_or_init(|| Mutex::new(HashMap::new())) } +#[cfg(test)] +mod tests { + use super::*; + use crate::ListItemsParams; + use crate::ListTurnsParams; + use crate::SortDirection; + use crate::StoredTurnItemsView; + + #[tokio::test] + async fn default_turn_pagination_methods_return_unsupported() { + let store = InMemoryThreadStore::default(); + let thread_id = ThreadId::default(); + + let turns_err = store + .list_turns(ListTurnsParams { + thread_id, + include_archived: true, + cursor: None, + page_size: 10, + sort_direction: SortDirection::Asc, + items_view: StoredTurnItemsView::Summary, + }) + .await + .expect_err("default list_turns should be unsupported"); + assert!(matches!( + turns_err, + ThreadStoreError::Unsupported { + operation: "list_turns" + } + )); + + let items_err = store + .list_items(ListItemsParams { + thread_id, + turn_id: "turn_1".to_string(), + include_archived: true, + cursor: None, + page_size: 10, + sort_direction: SortDirection::Asc, + }) + .await + .expect_err("default list_items should be unsupported"); + assert!(matches!( + items_err, + ThreadStoreError::Unsupported { + operation: "list_items" + } + )); + } +} + fn stores_guard() -> MutexGuard<'static, HashMap>> { match stores().lock() { Ok(guard) => guard, diff --git a/codex-rs/thread-store/src/lib.rs b/codex-rs/thread-store/src/lib.rs index 52b7f5ea1f..a5daeff44a 100644 --- a/codex-rs/thread-store/src/lib.rs +++ b/codex-rs/thread-store/src/lib.rs @@ -26,7 +26,10 @@ pub use types::AppendThreadItemsParams; pub use types::ArchiveThreadParams; pub use types::CreateThreadParams; pub use types::GitInfoPatch; +pub use types::ItemPage; +pub use types::ListItemsParams; pub use types::ListThreadsParams; +pub use types::ListTurnsParams; pub use types::LoadThreadHistoryParams; pub use types::OptionalStringPatch; pub use types::ReadThreadByRolloutPathParams; @@ -35,9 +38,14 @@ pub use types::ResumeThreadParams; pub use types::SortDirection; pub use types::StoredThread; pub use types::StoredThreadHistory; +pub use types::StoredTurn; +pub use types::StoredTurnError; +pub use types::StoredTurnItemsView; +pub use types::StoredTurnStatus; pub use types::ThreadEventPersistenceMode; pub use types::ThreadMetadataPatch; pub use types::ThreadPage; pub use types::ThreadPersistenceMetadata; pub use types::ThreadSortKey; +pub use types::TurnPage; pub use types::UpdateThreadMetadataParams; diff --git a/codex-rs/thread-store/src/store.rs b/codex-rs/thread-store/src/store.rs index 238e56aa92..bd5e3e7d3d 100644 --- a/codex-rs/thread-store/src/store.rs +++ b/codex-rs/thread-store/src/store.rs @@ -5,7 +5,10 @@ use std::any::Any; use crate::AppendThreadItemsParams; use crate::ArchiveThreadParams; use crate::CreateThreadParams; +use crate::ItemPage; +use crate::ListItemsParams; use crate::ListThreadsParams; +use crate::ListTurnsParams; use crate::LoadThreadHistoryParams; use crate::ReadThreadByRolloutPathParams; use crate::ReadThreadParams; @@ -13,7 +16,9 @@ use crate::ResumeThreadParams; use crate::StoredThread; use crate::StoredThreadHistory; use crate::ThreadPage; +use crate::ThreadStoreError; use crate::ThreadStoreResult; +use crate::TurnPage; use crate::UpdateThreadMetadataParams; /// Storage-neutral thread persistence boundary. @@ -67,6 +72,20 @@ pub trait ThreadStore: Any + Send + Sync { /// Lists stored threads matching the supplied filters. async fn list_threads(&self, params: ListThreadsParams) -> ThreadStoreResult; + /// Lists turns within a stored thread. + async fn list_turns(&self, _params: ListTurnsParams) -> ThreadStoreResult { + Err(ThreadStoreError::Unsupported { + operation: "list_turns", + }) + } + + /// Lists persisted items within a stored turn. + async fn list_items(&self, _params: ListItemsParams) -> ThreadStoreResult { + Err(ThreadStoreError::Unsupported { + operation: "list_items", + }) + } + /// Applies a mutable metadata patch and returns the updated thread. async fn update_thread_metadata( &self, diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index 06aa2998c9..1fed7bc829 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -183,6 +183,117 @@ pub struct ThreadPage { pub next_cursor: Option, } +/// Requested amount of item detail for stored turns. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +pub enum StoredTurnItemsView { + /// Return turn metadata only. + NotLoaded, + /// Return display summary items for each turn. + #[default] + Summary, + /// Return every persisted item available for each turn. + Full, +} + +/// Store-owned status for a persisted turn. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum StoredTurnStatus { + /// The turn completed normally. + Completed, + /// The turn was interrupted before normal completion. + Interrupted, + /// The turn failed. + Failed, + /// The turn is still in progress. + InProgress, +} + +/// Store-owned error details for a failed persisted turn. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct StoredTurnError { + /// User-visible error message. + pub message: String, + /// Optional additional detail for clients that expose expanded error context. + pub additional_details: Option, +} + +/// Parameters for listing turns within a stored thread. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ListTurnsParams { + /// Thread id to read. + pub thread_id: ThreadId, + /// Whether archived threads are eligible. + pub include_archived: bool, + /// Opaque cursor returned by a previous list call. + pub cursor: Option, + /// Maximum number of turns to return. + pub page_size: usize, + /// Sort direction requested by the caller. + pub sort_direction: SortDirection, + /// Requested amount of item detail for each returned turn. + pub items_view: StoredTurnItemsView, +} + +/// Store-owned turn representation used by turn pagination APIs. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct StoredTurn { + /// Turn id. + pub turn_id: String, + /// Persisted rollout items associated with this turn, according to `items_view`. + pub items: Vec, + /// Amount of item detail included in `items`. + pub items_view: StoredTurnItemsView, + /// Store-owned status for API layer projection. + pub status: StoredTurnStatus, + /// Error message when the turn failed. + pub error: Option, + /// Unix timestamp (seconds) when the turn started. + pub started_at: Option, + /// Unix timestamp (seconds) when the turn completed. + pub completed_at: Option, + /// Duration between turn start and completion in milliseconds, if known. + pub duration_ms: Option, +} + +/// A page of stored turns. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TurnPage { + /// Turns returned for this page. + pub turns: Vec, + /// Opaque cursor to continue listing. + pub next_cursor: Option, + /// Opaque cursor for fetching in the opposite direction. + pub backwards_cursor: Option, +} + +/// Parameters for listing persisted items within a single turn. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ListItemsParams { + /// Thread id to read. + pub thread_id: ThreadId, + /// Turn id to hydrate. + pub turn_id: String, + /// Whether archived threads are eligible. + pub include_archived: bool, + /// Opaque cursor returned by a previous list call. + pub cursor: Option, + /// Maximum number of items to return. + pub page_size: usize, + /// Sort direction requested by the caller. + pub sort_direction: SortDirection, +} + +/// A page of persisted rollout items within a turn. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ItemPage { + /// Items returned for this page. + pub items: Vec, + /// Opaque cursor to continue listing. + pub next_cursor: Option, + /// Opaque cursor for fetching in the opposite direction. + pub backwards_cursor: Option, +} + /// Store-owned thread metadata used by list/read/resume responses. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct StoredThread {