Streamline thread read handlers (#19494)

## Why

The thread read/list handlers mostly assemble views, but their error
handling was interleaved with response emission. Returning view-building
errors from the helper path keeps those handlers focused on data
assembly.

## What Changed

- Added a small mapper for `ThreadReadViewError` to JSON-RPC errors in
`codex-rs/app-server/src/codex_message_processor.rs`.
- Streamlined thread list, loaded-thread, read, turn-list, and summary
handlers to produce result values for the request boundary.
- Kept the existing invalid-request vs internal-error distinctions for
missing or unreadable thread data.

## Verification

- `cargo check -p codex-app-server`
- `cargo test -p codex-app-server --test all conversation_summary --
--test-threads=1`
This commit is contained in:
pakrym-oai
2026-04-27 14:30:24 -07:00
committed by GitHub
parent 0f40261e86
commit 2be9fd5a93

View File

@@ -496,6 +496,13 @@ enum ThreadReadViewError {
mod thread_goal_handlers;
use self::thread_goal_handlers::api_thread_goal_from_state;
fn thread_read_view_error(err: ThreadReadViewError) -> JSONRPCErrorError {
match err {
ThreadReadViewError::InvalidRequest(message) => invalid_request(message),
ThreadReadViewError::Internal(message) => internal_error(message),
}
}
impl Drop for ActiveLogin {
fn drop(&mut self) {
self.cancel();
@@ -3639,6 +3646,14 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: ThreadLoadedListParams,
) {
let result = self.thread_loaded_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn thread_loaded_list_response(
&self,
params: ThreadLoadedListParams,
) -> Result<ThreadLoadedListResponse, JSONRPCErrorError> {
let ThreadLoadedListParams { cursor, limit } = params;
let mut data = self
.thread_manager
@@ -3649,12 +3664,10 @@ impl CodexMessageProcessor {
.collect::<Vec<_>>();
if data.is_empty() {
let response = ThreadLoadedListResponse {
return Ok(ThreadLoadedListResponse {
data,
next_cursor: None,
};
self.outgoing.send_response(request_id, response).await;
return;
});
}
data.sort();
@@ -3663,15 +3676,7 @@ impl CodexMessageProcessor {
Some(cursor) => {
let cursor = match ThreadId::from_string(&cursor) {
Ok(id) => id.to_string(),
Err(_) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid cursor: {cursor}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
Err(_) => return Err(invalid_request(format!("invalid cursor: {cursor}"))),
};
match data.binary_search(&cursor) {
Ok(idx) => idx + 1,
@@ -3686,41 +3691,34 @@ impl CodexMessageProcessor {
let page = data[start..end].to_vec();
let next_cursor = page.last().filter(|_| end < total).cloned();
let response = ThreadLoadedListResponse {
Ok(ThreadLoadedListResponse {
data: page,
next_cursor,
};
self.outgoing.send_response(request_id, response).await;
})
}
async fn thread_read(&self, request_id: ConnectionRequestId, params: ThreadReadParams) {
let result = self.thread_read_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn thread_read_response(
&self,
params: ThreadReadParams,
) -> Result<ThreadReadResponse, JSONRPCErrorError> {
let ThreadReadParams {
thread_id,
include_turns,
} = params;
let thread_uuid = match ThreadId::from_string(&thread_id) {
Ok(id) => id,
Err(err) => {
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
.await;
return;
}
};
let thread_uuid = ThreadId::from_string(&thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))?;
let thread = match self.read_thread_view(thread_uuid, include_turns).await {
Ok(thread) => thread,
Err(ThreadReadViewError::InvalidRequest(message)) => {
self.send_invalid_request_error(request_id, message).await;
return;
}
Err(ThreadReadViewError::Internal(message)) => {
self.send_internal_error(request_id, message).await;
return;
}
};
let response = ThreadReadResponse { thread };
self.outgoing.send_response(request_id, response).await;
let thread = self
.read_thread_view(thread_uuid, include_turns)
.await
.map_err(thread_read_view_error)?;
Ok(ThreadReadResponse { thread })
}
/// Builds the API view for `thread/read` from persisted metadata plus optional live state.
@@ -3878,6 +3876,14 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: ThreadTurnsListParams,
) {
let result = self.thread_turns_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn thread_turns_list_response(
&self,
params: ThreadTurnsListParams,
) -> Result<ThreadTurnsListResponse, JSONRPCErrorError> {
let ThreadTurnsListParams {
thread_id,
cursor,
@@ -3885,14 +3891,8 @@ impl CodexMessageProcessor {
sort_direction,
} = params;
let thread_uuid = match ThreadId::from_string(&thread_id) {
Ok(id) => id,
Err(err) => {
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
.await;
return;
}
};
let thread_uuid = ThreadId::from_string(&thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))?;
let state_db_ctx = get_state_db(&self.config).await;
let mut rollout_path = self
@@ -3912,21 +3912,15 @@ impl CodexMessageProcessor {
{
Ok(path) => path,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate archived thread id {thread_uuid}: {err}"),
)
.await;
return;
return Err(invalid_request(format!(
"failed to locate archived thread id {thread_uuid}: {err}"
)));
}
},
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_uuid}: {err}"),
)
.await;
return;
return Err(invalid_request(format!(
"failed to locate thread id {thread_uuid}: {err}"
)));
}
};
}
@@ -3936,92 +3930,63 @@ impl CodexMessageProcessor {
Ok(thread) => {
rollout_path = thread.rollout_path();
if rollout_path.is_none() {
self.send_invalid_request_error(
request_id,
"ephemeral threads do not support thread/turns/list".to_string(),
)
.await;
return;
return Err(invalid_request(
"ephemeral threads do not support thread/turns/list",
));
}
}
Err(_) => {
self.send_invalid_request_error(
request_id,
format!("thread not loaded: {thread_uuid}"),
)
.await;
return;
}
Err(_) => return Err(invalid_request(format!("thread not loaded: {thread_uuid}"))),
}
}
let Some(rollout_path) = rollout_path.as_ref() else {
self.send_internal_error(
request_id,
format!("failed to locate rollout for thread {thread_uuid}"),
)
.await;
return;
return Err(internal_error(format!(
"failed to locate rollout for thread {thread_uuid}"
)));
};
match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => {
// This API optimizes network transfer by letting clients page through a
// thread's turns incrementally, but it still replays the entire rollout on
// every request. Rollback and compaction events can change earlier turns, so
// the server has to rebuild the full turn list until turn metadata is indexed
// separately.
let has_live_in_progress_turn =
match self.thread_manager.get_thread(thread_uuid).await {
Ok(thread) => matches!(thread.agent_status().await, AgentStatus::Running),
Err(_) => false,
};
let turns = reconstruct_thread_turns_from_rollout_items(
&items,
self.thread_watch_manager
.loaded_status_for_thread(&thread_uuid.to_string())
.await,
has_live_in_progress_turn,
);
let page = match paginate_thread_turns(
turns,
cursor.as_deref(),
limit,
sort_direction.unwrap_or(SortDirection::Desc),
) {
Ok(page) => page,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let response = ThreadTurnsListResponse {
data: page.turns,
next_cursor: page.next_cursor,
backwards_cursor: page.backwards_cursor,
};
self.outgoing.send_response(request_id, response).await;
}
let items = match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => items,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
self.send_invalid_request_error(
request_id,
format!(
"thread {thread_uuid} is not materialized yet; thread/turns/list is unavailable before first user message"
),
)
.await;
return Err(invalid_request(format!(
"thread {thread_uuid} is not materialized yet; thread/turns/list is unavailable before first user message"
)));
}
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
),
)
.await;
return Err(internal_error(format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
)));
}
}
};
// This API optimizes network transfer by letting clients page through a
// thread's turns incrementally, but it still replays the entire rollout on
// every request. Rollback and compaction events can change earlier turns, so
// the server has to rebuild the full turn list until turn metadata is indexed
// separately.
let has_live_in_progress_turn = match self.thread_manager.get_thread(thread_uuid).await {
Ok(thread) => matches!(thread.agent_status().await, AgentStatus::Running),
Err(_) => false,
};
let turns = reconstruct_thread_turns_from_rollout_items(
&items,
self.thread_watch_manager
.loaded_status_for_thread(&thread_uuid.to_string())
.await,
has_live_in_progress_turn,
);
let page = paginate_thread_turns(
turns,
cursor.as_deref(),
limit,
sort_direction.unwrap_or(SortDirection::Desc),
)?;
Ok(ThreadTurnsListResponse {
data: page.turns,
next_cursor: page.next_cursor,
backwards_cursor: page.backwards_cursor,
})
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
@@ -5062,6 +5027,14 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: GetConversationSummaryParams,
) {
let result = self.get_thread_summary_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn get_thread_summary_response(
&self,
params: GetConversationSummaryParams,
) -> Result<GetConversationSummaryResponse, JSONRPCErrorError> {
let fallback_provider = self.config.model_provider_id.as_str();
let read_result = match params {
GetConversationSummaryParams::ThreadId { conversation_id } => self
@@ -5079,13 +5052,9 @@ impl CodexMessageProcessor {
.as_any()
.downcast_ref::<LocalThreadStore>()
else {
self.send_invalid_request_error(
request_id,
"rollout path queries are only supported with the local thread store"
.to_string(),
)
.await;
return;
return Err(invalid_request(
"rollout path queries are only supported with the local thread store",
));
};
local_thread_store
@@ -5099,27 +5068,14 @@ impl CodexMessageProcessor {
}
};
match read_result {
Ok(stored_thread) => {
let Some(summary) = summary_from_stored_thread(stored_thread, fallback_provider)
else {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message:
"failed to load conversation summary: thread is missing rollout path"
.to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
};
let response = GetConversationSummaryResponse { summary };
self.outgoing.send_response(request_id, response).await;
}
Err(error) => {
self.outgoing.send_error(request_id, error).await;
}
}
let stored_thread = read_result?;
let summary =
summary_from_stored_thread(stored_thread, fallback_provider).ok_or_else(|| {
internal_error(
"failed to load conversation summary: thread is missing rollout path",
)
})?;
Ok(GetConversationSummaryResponse { summary })
}
async fn list_threads_common(