Read conversation summaries through thread store (#18716)

Migrate the conversation summary App Server methods to ThreadStore

Because this app server api allows explicitly fetching the thread by
rollout path, intercept that case in the app server code and (a) route
directly to underlying local thread store methods if we're using a local
thread store, or (b) throw an unsupported error if we're using a remote
thread store. This keeps the thread store API clean and all filesystem
operations inside of the local thread store, which pushing the
"fundamental incompatibility" check as early as possible.
This commit is contained in:
Tom
2026-04-20 15:39:10 -07:00
committed by GitHub
parent 660153b6de
commit a718b6fd47
6 changed files with 253 additions and 72 deletions

View File

@@ -5212,62 +5212,62 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: GetConversationSummaryParams,
) {
if let GetConversationSummaryParams::ThreadId { conversation_id } = &params
&& let Some(summary) =
read_summary_from_state_db_by_thread_id(&self.config, *conversation_id).await
{
let response = GetConversationSummaryResponse { summary };
self.outgoing.send_response(request_id, response).await;
return;
}
let path = match params {
GetConversationSummaryParams::RolloutPath { rollout_path } => {
if rollout_path.is_relative() {
self.config.codex_home.join(&rollout_path).to_path_buf()
} else {
rollout_path
}
}
GetConversationSummaryParams::ThreadId { conversation_id } => {
match codex_core::find_thread_path_by_id_str(
&self.config.codex_home,
&conversation_id.to_string(),
)
let fallback_provider = self.config.model_provider_id.as_str();
let read_result = match params {
GetConversationSummaryParams::ThreadId { conversation_id } => self
.thread_store
.read_thread(StoreReadThreadParams {
thread_id: conversation_id,
include_archived: true,
include_history: false,
})
.await
{
Ok(Some(p)) => p,
_ => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"no rollout found for conversation id {conversation_id}"
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
}
.map_err(|err| conversation_summary_thread_id_read_error(conversation_id, err)),
GetConversationSummaryParams::RolloutPath { rollout_path } => {
let Some(local_thread_store) = self
.thread_store
.as_any()
.downcast_ref::<LocalThreadStore>()
else {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message:
"rollout path queries are only supported with the local thread store"
.to_string(),
data: None,
};
return self.outgoing.send_error(request_id, error).await;
};
local_thread_store
.read_thread_by_rollout_path(
rollout_path.clone(),
/*include_archived*/ true,
/*include_history*/ false,
)
.await
.map_err(|err| conversation_summary_rollout_path_read_error(&rollout_path, err))
}
};
let fallback_provider = self.config.model_provider_id.as_str();
match read_summary_from_rollout(&path, fallback_provider).await {
Ok(summary) => {
match read_result {
Ok(stored_thread) => {
let Some(summary) = summary_from_stored_thread(stored_thread, fallback_provider)
else {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message:
"failed to load conversation summary: thread is missing rollout path"
.to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
};
let response = GetConversationSummaryResponse { summary };
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to load conversation summary from {}: {}",
path.display(),
err
),
data: None,
};
Err(error) => {
self.outgoing.send_error(request_id, error).await;
}
}
@@ -9633,6 +9633,61 @@ fn thread_store_list_error(err: ThreadStoreError) -> JSONRPCErrorError {
}
}
fn conversation_summary_thread_id_read_error(
conversation_id: ThreadId,
err: ThreadStoreError,
) -> JSONRPCErrorError {
let no_rollout_message = format!("no rollout found for thread id {conversation_id}");
match err {
ThreadStoreError::InvalidRequest { message } if message == no_rollout_message => {
conversation_summary_not_found_error(conversation_id)
}
ThreadStoreError::ThreadNotFound { thread_id } if thread_id == conversation_id => {
conversation_summary_not_found_error(conversation_id)
}
ThreadStoreError::InvalidRequest { message } => JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message,
data: None,
},
err => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to load conversation summary for {conversation_id}: {err}"),
data: None,
},
}
}
fn conversation_summary_not_found_error(conversation_id: ThreadId) -> JSONRPCErrorError {
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no rollout found for conversation id {conversation_id}"),
data: None,
}
}
fn conversation_summary_rollout_path_read_error(
path: &Path,
err: ThreadStoreError,
) -> JSONRPCErrorError {
match err {
ThreadStoreError::InvalidRequest { message } => JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message,
data: None,
},
err => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to load conversation summary from {}: {}",
path.display(),
err
),
data: None,
},
}
}
fn thread_store_write_error(operation: &str, err: ThreadStoreError) -> JSONRPCErrorError {
match err {
ThreadStoreError::ThreadNotFound { thread_id } => JSONRPCErrorError {