codex: split thread/read view loading (#18231)

Summary
- refactor thread/read into explicit persisted-load, live-load, and
merge steps
- preserve existing SQLite/filesystem/live-thread behavior exactly
- keep ThreadStore migration out of this PR so the next PR is easier to
review

Validation
- this one's a pure reorganization that relies on existing test coverage
This commit is contained in:
Tom
2026-04-16 21:06:03 -07:00
committed by GitHub
parent dd00efe781
commit 9d6f4f2e2e

View File

@@ -438,6 +438,11 @@ enum ThreadShutdownResult {
TimedOut,
}
enum ThreadReadViewError {
InvalidRequest(String),
Internal(String),
}
impl Drop for ActiveLogin {
fn drop(&mut self) {
self.cancel();
@@ -3786,126 +3791,45 @@ impl CodexMessageProcessor {
}
};
let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok();
let loaded_thread_state_db = loaded_thread.as_ref().and_then(|thread| thread.state_db());
let db_summary = if let Some(state_db_ctx) = loaded_thread_state_db.as_ref() {
read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_uuid).await
} else {
read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await
};
let mut rollout_path = db_summary.as_ref().map(|summary| summary.path.clone());
if rollout_path.is_none() || include_turns {
rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
.await
{
Ok(Some(path)) => Some(path),
Ok(None) => {
if include_turns {
None
} else {
rollout_path
}
}
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_uuid}: {err}"),
)
.await;
return;
}
};
}
if include_turns && rollout_path.is_none() && db_summary.is_some() {
self.send_internal_error(
request_id,
format!("failed to locate rollout for thread {thread_uuid}"),
)
.await;
return;
}
let mut thread = if let Some(summary) = db_summary {
summary_to_thread(summary, &self.config.cwd)
} else if let Some(rollout_path) = rollout_path.as_ref() {
let fallback_provider = self.config.model_provider_id.as_str();
match read_summary_from_rollout(rollout_path, fallback_provider).await {
Ok(summary) => summary_to_thread(summary, &self.config.cwd),
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
),
)
.await;
return;
}
}
} else {
let Some(thread) = loaded_thread.as_ref() else {
self.send_invalid_request_error(
request_id,
format!("thread not loaded: {thread_uuid}"),
)
.await;
return;
};
let config_snapshot = thread.config_snapshot().await;
let loaded_rollout_path = thread.rollout_path();
if include_turns && loaded_rollout_path.is_none() {
self.send_invalid_request_error(
request_id,
"ephemeral threads do not support includeTurns".to_string(),
)
.await;
let thread = match self.read_thread_view(thread_uuid, include_turns).await {
Ok(thread) => thread,
Err(ThreadReadViewError::InvalidRequest(message)) => {
self.send_invalid_request_error(request_id, message).await;
return;
}
if include_turns {
rollout_path = loaded_rollout_path.clone();
Err(ThreadReadViewError::Internal(message)) => {
self.send_internal_error(request_id, message).await;
return;
}
build_thread_from_snapshot(thread_uuid, &config_snapshot, loaded_rollout_path)
};
if thread.forked_from_id.is_none()
&& let Some(rollout_path) = rollout_path.as_ref()
let response = ThreadReadResponse { thread };
self.outgoing.send_response(request_id, response).await;
}
/// Builds the API view for `thread/read` from persisted metadata plus optional live state.
async fn read_thread_view(
&self,
thread_id: ThreadId,
include_turns: bool,
) -> Result<Thread, ThreadReadViewError> {
let loaded_thread = self.load_live_thread_for_read(thread_id).await;
let mut thread = if let Some(thread) = self
.load_persisted_thread_for_read(thread_id, include_turns, loaded_thread.as_ref())
.await?
{
thread.forked_from_id = forked_from_id_from_rollout(rollout_path).await;
}
self.attach_thread_name(thread_uuid, &mut thread).await;
thread
} else if let Some(thread) = self
.load_live_thread_view(thread_id, include_turns, loaded_thread.as_ref())
.await?
{
thread
} else {
return Err(ThreadReadViewError::InvalidRequest(format!(
"thread not loaded: {thread_id}"
)));
};
if include_turns && let Some(rollout_path) = rollout_path.as_ref() {
match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
self.send_invalid_request_error(
request_id,
format!(
"thread {thread_uuid} is not materialized yet; includeTurns is unavailable before first user message"
),
)
.await;
return;
}
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
),
)
.await;
return;
}
}
}
let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread.as_ref() {
let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread {
matches!(loaded_thread.agent_status().await, AgentStatus::Running)
} else {
false
@@ -3921,8 +3845,151 @@ impl CodexMessageProcessor {
thread_status,
has_live_in_progress_turn,
);
let response = ThreadReadResponse { thread };
self.outgoing.send_response(request_id, response).await;
Ok(thread)
}
async fn load_live_thread_for_read(&self, thread_id: ThreadId) -> Option<Arc<CodexThread>> {
self.thread_manager.get_thread(thread_id).await.ok()
}
async fn load_persisted_thread_for_read(
&self,
thread_id: ThreadId,
include_turns: bool,
loaded_thread: Option<&Arc<CodexThread>>,
) -> Result<Option<Thread>, ThreadReadViewError> {
let loaded_thread_state_db = loaded_thread.and_then(|thread| thread.state_db());
let db_summary = if let Some(state_db_ctx) = loaded_thread_state_db.as_ref() {
read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_id).await
} else {
read_summary_from_state_db_by_thread_id(&self.config, thread_id).await
};
let mut rollout_path = db_summary.as_ref().map(|summary| summary.path.clone());
if rollout_path.is_none() || include_turns {
rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string())
.await
{
Ok(Some(path)) => Some(path),
Ok(None) => {
if include_turns {
None
} else {
rollout_path
}
}
Err(err) => {
return Err(ThreadReadViewError::InvalidRequest(format!(
"failed to locate thread id {thread_id}: {err}"
)));
}
};
}
if include_turns && rollout_path.is_none() && db_summary.is_some() {
return Err(ThreadReadViewError::Internal(format!(
"failed to locate rollout for thread {thread_id}"
)));
}
if let Some(summary) = db_summary {
let mut thread = summary_to_thread(summary, &self.config.cwd);
self.apply_thread_read_rollout_fields(
thread_id,
&mut thread,
rollout_path.as_deref(),
include_turns,
)
.await?;
return Ok(Some(thread));
}
let Some(rollout_path) = rollout_path else {
return Ok(None);
};
let fallback_provider = self.config.model_provider_id.as_str();
match read_summary_from_rollout(&rollout_path, fallback_provider).await {
Ok(summary) => {
let mut thread = summary_to_thread(summary, &self.config.cwd);
self.apply_thread_read_rollout_fields(
thread_id,
&mut thread,
Some(rollout_path.as_path()),
include_turns,
)
.await?;
Ok(Some(thread))
}
Err(err) => Err(ThreadReadViewError::Internal(format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
))),
}
}
async fn load_live_thread_view(
&self,
thread_id: ThreadId,
include_turns: bool,
loaded_thread: Option<&Arc<CodexThread>>,
) -> Result<Option<Thread>, ThreadReadViewError> {
let Some(thread) = loaded_thread else {
return Ok(None);
};
let config_snapshot = thread.config_snapshot().await;
let loaded_rollout_path = thread.rollout_path();
if include_turns && loaded_rollout_path.is_none() {
return Err(ThreadReadViewError::InvalidRequest(
"ephemeral threads do not support includeTurns".to_string(),
));
}
let mut thread =
build_thread_from_snapshot(thread_id, &config_snapshot, loaded_rollout_path.clone());
self.apply_thread_read_rollout_fields(
thread_id,
&mut thread,
loaded_rollout_path.as_deref(),
include_turns,
)
.await?;
Ok(Some(thread))
}
async fn apply_thread_read_rollout_fields(
&self,
thread_id: ThreadId,
thread: &mut Thread,
rollout_path: Option<&Path>,
include_turns: bool,
) -> Result<(), ThreadReadViewError> {
if thread.forked_from_id.is_none()
&& let Some(rollout_path) = rollout_path
{
thread.forked_from_id = forked_from_id_from_rollout(rollout_path).await;
}
self.attach_thread_name(thread_id, thread).await;
if include_turns && let Some(rollout_path) = rollout_path {
match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return Err(ThreadReadViewError::InvalidRequest(format!(
"thread {thread_id} is not materialized yet; includeTurns is unavailable before first user message"
)));
}
Err(err) => {
return Err(ThreadReadViewError::Internal(format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
)));
}
}
}
Ok(())
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {