diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index ccfc3478aa..0c0a257e75 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -5212,62 +5212,62 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: GetConversationSummaryParams, ) { - if let GetConversationSummaryParams::ThreadId { conversation_id } = ¶ms - && let Some(summary) = - read_summary_from_state_db_by_thread_id(&self.config, *conversation_id).await - { - let response = GetConversationSummaryResponse { summary }; - self.outgoing.send_response(request_id, response).await; - return; - } - - let path = match params { - GetConversationSummaryParams::RolloutPath { rollout_path } => { - if rollout_path.is_relative() { - self.config.codex_home.join(&rollout_path).to_path_buf() - } else { - rollout_path - } - } - GetConversationSummaryParams::ThreadId { conversation_id } => { - match codex_core::find_thread_path_by_id_str( - &self.config.codex_home, - &conversation_id.to_string(), - ) + let fallback_provider = self.config.model_provider_id.as_str(); + let read_result = match params { + GetConversationSummaryParams::ThreadId { conversation_id } => self + .thread_store + .read_thread(StoreReadThreadParams { + thread_id: conversation_id, + include_archived: true, + include_history: false, + }) .await - { - Ok(Some(p)) => p, - _ => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!( - "no rollout found for conversation id {conversation_id}" - ), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; - } - } + .map_err(|err| conversation_summary_thread_id_read_error(conversation_id, err)), + GetConversationSummaryParams::RolloutPath { rollout_path } => { + let Some(local_thread_store) = self + .thread_store + .as_any() + .downcast_ref::() + else { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: + "rollout path queries are only supported with the local thread store" + .to_string(), + data: None, + }; + return self.outgoing.send_error(request_id, error).await; + }; + + local_thread_store + .read_thread_by_rollout_path( + rollout_path.clone(), + /*include_archived*/ true, + /*include_history*/ false, + ) + .await + .map_err(|err| conversation_summary_rollout_path_read_error(&rollout_path, err)) } }; - let fallback_provider = self.config.model_provider_id.as_str(); - match read_summary_from_rollout(&path, fallback_provider).await { - Ok(summary) => { + match read_result { + Ok(stored_thread) => { + let Some(summary) = summary_from_stored_thread(stored_thread, fallback_provider) + else { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: + "failed to load conversation summary: thread is missing rollout path" + .to_string(), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + }; let response = GetConversationSummaryResponse { summary }; self.outgoing.send_response(request_id, response).await; } - Err(err) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!( - "failed to load conversation summary from {}: {}", - path.display(), - err - ), - data: None, - }; + Err(error) => { self.outgoing.send_error(request_id, error).await; } } @@ -9633,6 +9633,61 @@ fn thread_store_list_error(err: ThreadStoreError) -> JSONRPCErrorError { } } +fn conversation_summary_thread_id_read_error( + conversation_id: ThreadId, + err: ThreadStoreError, +) -> JSONRPCErrorError { + let no_rollout_message = format!("no rollout found for thread id {conversation_id}"); + match err { + ThreadStoreError::InvalidRequest { message } if message == no_rollout_message => { + conversation_summary_not_found_error(conversation_id) + } + ThreadStoreError::ThreadNotFound { thread_id } if thread_id == conversation_id => { + conversation_summary_not_found_error(conversation_id) + } + ThreadStoreError::InvalidRequest { message } => JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message, + data: None, + }, + err => JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to load conversation summary for {conversation_id}: {err}"), + data: None, + }, + } +} + +fn conversation_summary_not_found_error(conversation_id: ThreadId) -> JSONRPCErrorError { + JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("no rollout found for conversation id {conversation_id}"), + data: None, + } +} + +fn conversation_summary_rollout_path_read_error( + path: &Path, + err: ThreadStoreError, +) -> JSONRPCErrorError { + match err { + ThreadStoreError::InvalidRequest { message } => JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message, + data: None, + }, + err => JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!( + "failed to load conversation summary from {}: {}", + path.display(), + err + ), + data: None, + }, + } +} + fn thread_store_write_error(operation: &str, err: ThreadStoreError) -> JSONRPCErrorError { match err { ThreadStoreError::ThreadNotFound { thread_id } => JSONRPCErrorError { diff --git a/codex-rs/app-server/tests/suite/conversation_summary.rs b/codex-rs/app-server/tests/suite/conversation_summary.rs index 4690a44ca3..b05cee8230 100644 --- a/codex-rs/app-server/tests/suite/conversation_summary.rs +++ b/codex-rs/app-server/tests/suite/conversation_summary.rs @@ -6,6 +6,7 @@ use app_test_support::to_response; use codex_app_server_protocol::ConversationSummary; use codex_app_server_protocol::GetConversationSummaryParams; use codex_app_server_protocol::GetConversationSummaryResponse; +use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_protocol::ThreadId; @@ -18,16 +19,18 @@ use tokio::time::timeout; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const FILENAME_TS: &str = "2025-01-02T12-00-00"; const META_RFC3339: &str = "2025-01-02T12:00:00Z"; +const CREATED_AT_RFC3339: &str = "2025-01-02T12:00:00.000Z"; const UPDATED_AT_RFC3339: &str = "2025-01-02T12:00:00.000Z"; const PREVIEW: &str = "Summarize this conversation"; const MODEL_PROVIDER: &str = "openai"; +const INVALID_REQUEST_ERROR_CODE: i64 = -32600; fn expected_summary(conversation_id: ThreadId, path: PathBuf) -> ConversationSummary { ConversationSummary { conversation_id, path, preview: PREVIEW.to_string(), - timestamp: Some(META_RFC3339.to_string()), + timestamp: Some(CREATED_AT_RFC3339.to_string()), updated_at: Some(UPDATED_AT_RFC3339.to_string()), model_provider: MODEL_PROVIDER.to_string(), cwd: PathBuf::from("/"), @@ -77,6 +80,37 @@ async fn get_conversation_summary_by_thread_id_reads_rollout() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_conversation_summary_by_rollout_path_rejects_remote_thread_store() -> Result<()> { + let codex_home = TempDir::new()?; + std::fs::write( + codex_home.path().join("config.toml"), + r#"experimental_thread_store_endpoint = "http://127.0.0.1:1" +"#, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_get_conversation_summary_request(GetConversationSummaryParams::RolloutPath { + rollout_path: PathBuf::from("sessions/2025/01/02/rollout.jsonl"), + }) + .await?; + let error: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(request_id)), + ) + .await??; + + assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE); + assert_eq!( + error.error.message, + "rollout path queries are only supported with the local thread store" + ); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_conversation_summary_by_relative_rollout_path_resolves_from_codex_home() -> Result<()> { diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index 4867580a78..c7c62f4cfa 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -38,10 +38,30 @@ impl LocalThreadStore { pub fn new(config: RolloutConfig) -> Self { Self { config } } + + /// Read a local rollout-backed thread by path. + pub async fn read_thread_by_rollout_path( + &self, + rollout_path: std::path::PathBuf, + include_archived: bool, + include_history: bool, + ) -> ThreadStoreResult { + read_thread::read_thread_by_rollout_path( + self, + rollout_path, + include_archived, + include_history, + ) + .await + } } #[async_trait] impl ThreadStore for LocalThreadStore { + fn as_any(&self) -> &dyn std::any::Any { + self + } + async fn create_thread( &self, _params: CreateThreadParams, diff --git a/codex-rs/thread-store/src/local/read_thread.rs b/codex-rs/thread-store/src/local/read_thread.rs index 33a41336b2..751a940158 100644 --- a/codex-rs/thread-store/src/local/read_thread.rs +++ b/codex-rs/thread-store/src/local/read_thread.rs @@ -31,15 +31,7 @@ pub(super) async fn read_thread( && (params.include_archived || metadata.archived_at.is_none()) { let mut thread = stored_thread_from_sqlite_metadata(store, metadata).await; - if params.include_history { - let Some(path) = thread.rollout_path.clone() else { - return Err(ThreadStoreError::Internal { - message: format!("failed to locate rollout for thread {thread_id}"), - }); - }; - let items = load_history_items(&path).await?; - thread.history = Some(StoredThreadHistory { thread_id, items }); - } + attach_history_if_requested(&mut thread, params.include_history).await?; return Ok(thread); } @@ -49,19 +41,60 @@ pub(super) async fn read_thread( message: format!("no rollout found for thread id {thread_id}"), })?; - let mut thread = read_thread_from_rollout_path(store, thread_id, path).await?; - if params.include_history { - let Some(path) = thread.rollout_path.clone() else { - return Err(ThreadStoreError::Internal { - message: format!("failed to load thread history for thread {thread_id}"), - }); - }; - let items = load_history_items(&path).await?; - thread.history = Some(StoredThreadHistory { thread_id, items }); - } + let mut thread = read_thread_from_rollout_path(store, path).await?; + attach_history_if_requested(&mut thread, params.include_history).await?; Ok(thread) } +pub(super) async fn read_thread_by_rollout_path( + store: &LocalThreadStore, + rollout_path: std::path::PathBuf, + include_archived: bool, + include_history: bool, +) -> ThreadStoreResult { + let path = resolve_requested_rollout_path(store, rollout_path)?; + let mut thread = read_thread_from_rollout_path(store, path).await?; + if !include_archived && thread.archived_at.is_some() { + return Err(ThreadStoreError::InvalidRequest { + message: format!("thread {} is archived", thread.thread_id), + }); + } + attach_history_if_requested(&mut thread, include_history).await?; + Ok(thread) +} + +fn resolve_requested_rollout_path( + store: &LocalThreadStore, + rollout_path: std::path::PathBuf, +) -> ThreadStoreResult { + let path = if rollout_path.is_relative() { + store.config.codex_home.join(rollout_path) + } else { + rollout_path + }; + std::fs::canonicalize(&path).map_err(|err| ThreadStoreError::InvalidRequest { + message: format!("failed to resolve rollout path `{}`: {err}", path.display()), + }) +} + +async fn attach_history_if_requested( + thread: &mut StoredThread, + include_history: bool, +) -> ThreadStoreResult<()> { + if !include_history { + return Ok(()); + } + let thread_id = thread.thread_id; + let Some(path) = thread.rollout_path.clone() else { + return Err(ThreadStoreError::Internal { + message: format!("failed to load thread history for thread {thread_id}"), + }); + }; + let items = load_history_items(&path).await?; + thread.history = Some(StoredThreadHistory { thread_id, items }); + Ok(()) +} + async fn resolve_rollout_path( store: &LocalThreadStore, thread_id: codex_protocol::ThreadId, @@ -94,7 +127,6 @@ async fn resolve_rollout_path( async fn read_thread_from_rollout_path( store: &LocalThreadStore, - thread_id: codex_protocol::ThreadId, path: std::path::PathBuf, ) -> ThreadStoreResult { let Some(item) = read_thread_item_from_rollout(path.clone()).await else { @@ -116,7 +148,7 @@ async fn read_thread_from_rollout_path( .ok() .and_then(|meta_line| meta_line.meta.forked_from_id); if let Ok(Some(title)) = - find_thread_name_by_id(store.config.codex_home.as_path(), &thread_id).await + find_thread_name_by_id(store.config.codex_home.as_path(), &thread.thread_id).await { set_thread_name_from_title(&mut thread, title); } @@ -348,6 +380,36 @@ mod tests { ); } + #[tokio::test] + async fn read_thread_returns_rollout_path_summary() { + let home = TempDir::new().expect("temp dir"); + let store = LocalThreadStore::new(test_config(home.path())); + let uuid = Uuid::from_u128(211); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + let active_path = + write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file"); + let relative_path = active_path + .strip_prefix(home.path()) + .expect("path should be under codex home") + .to_path_buf(); + + let thread = store + .read_thread_by_rollout_path( + relative_path, + /*include_archived*/ false, + /*include_history*/ false, + ) + .await + .expect("read thread by rollout path"); + + assert_eq!(thread.thread_id, thread_id); + assert_eq!( + thread.rollout_path, + Some(std::fs::canonicalize(active_path).expect("canonical path")) + ); + assert_eq!(thread.preview, "Hello from user"); + } + #[tokio::test] async fn read_thread_returns_archived_rollout_when_requested() { let home = TempDir::new().expect("temp dir"); diff --git a/codex-rs/thread-store/src/remote/mod.rs b/codex-rs/thread-store/src/remote/mod.rs index 760a2dce65..5be760870e 100644 --- a/codex-rs/thread-store/src/remote/mod.rs +++ b/codex-rs/thread-store/src/remote/mod.rs @@ -48,6 +48,10 @@ impl RemoteThreadStore { #[async_trait] impl ThreadStore for RemoteThreadStore { + fn as_any(&self) -> &dyn std::any::Any { + self + } + async fn create_thread( &self, _params: CreateThreadParams, diff --git a/codex-rs/thread-store/src/store.rs b/codex-rs/thread-store/src/store.rs index 5331242c88..56cbd05f5c 100644 --- a/codex-rs/thread-store/src/store.rs +++ b/codex-rs/thread-store/src/store.rs @@ -1,3 +1,5 @@ +use std::any::Any; + use async_trait::async_trait; use crate::AppendThreadItemsParams; @@ -16,7 +18,11 @@ use crate::UpdateThreadMetadataParams; /// Storage-neutral thread persistence boundary. #[async_trait] -pub trait ThreadStore: Send + Sync { +pub trait ThreadStore: Any + Send + Sync { + /// Return this store as [`Any`] so callers at API boundaries can reject requests that only + /// make sense for a concrete store implementation. + fn as_any(&self) -> &dyn Any; + /// Creates a new thread and returns a live recorder for future appends. async fn create_thread( &self,