Compare commits

...

2 Commits

Author SHA1 Message Date
Owen Lin
a813bf8ab6 fix flaky test load_history_uses_live_writer_rollout_path_for_archived_source 2026-05-04 11:56:44 -07:00
Owen Lin
661e9c9665 tui: handle terminal thread-read fallback errors 2026-05-04 10:36:50 -07:00
4 changed files with 70 additions and 5 deletions

View File

@@ -1,3 +1,4 @@
use crate::ARCHIVED_SESSIONS_SUBDIR;
use crate::config::RolloutConfig;
use crate::config::RolloutConfigView;
use crate::list::Cursor;
@@ -14,6 +15,7 @@ pub use codex_state::LogEntry;
use codex_state::ThreadMetadataBuilder;
use codex_utils_path::normalize_for_path_comparison;
use serde_json::Value;
use std::ffi::OsStr;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
@@ -534,6 +536,10 @@ pub async fn apply_rollout_items(
}
builder.rollout_path = rollout_path.to_path_buf();
builder.cwd = normalize_cwd_for_state_db(&builder.cwd);
let updated_at = updated_at_override.unwrap_or_else(Utc::now);
if rollout_path_is_archived(rollout_path) && builder.archived_at.is_none() {
builder.archived_at = Some(updated_at);
}
if let Err(err) = ctx
.apply_rollout_items(&builder, items, new_thread_memory_mode, updated_at_override)
.await
@@ -542,7 +548,23 @@ pub async fn apply_rollout_items(
"state db apply_rollout_items failed during {stage} for {}: {err}",
rollout_path.display()
);
return;
}
if rollout_path_is_archived(rollout_path)
&& let Err(err) = ctx
.mark_archived(builder.id, rollout_path, updated_at)
.await
{
warn!(
"state db apply_rollout_items failed to preserve archived status during {stage} for {}: {err}",
rollout_path.display()
);
}
}
fn rollout_path_is_archived(path: &Path) -> bool {
path.components()
.any(|component| component.as_os_str() == OsStr::new(ARCHIVED_SESSIONS_SUBDIR))
}
pub async fn touch_thread_updated_at(

View File

@@ -664,6 +664,18 @@ mod tests {
.flush_thread(thread_id)
.await
.expect("flush live thread");
assert!(
store
.state_db()
.await
.expect("state db")
.get_thread(thread_id)
.await
.expect("read state db metadata")
.expect("state db metadata")
.archived_at
.is_some()
);
let err = store
.read_thread(ReadThreadParams {

View File

@@ -31,6 +31,19 @@ pub(super) async fn read_thread(
params: ReadThreadParams,
) -> ThreadStoreResult<StoredThread> {
let thread_id = params.thread_id;
if let Ok(rollout_path) = live_writer::rollout_path(store, thread_id).await {
if !params.include_archived
&& rollout_path_is_archived(store.config.codex_home.as_path(), rollout_path.as_path())
{
return Err(ThreadStoreError::InvalidRequest {
message: format!("thread {thread_id} is archived"),
});
}
let mut thread = read_thread_from_rollout_path(store, rollout_path).await?;
attach_history_if_requested(&mut thread, params.include_history).await?;
return Ok(thread);
}
if let Some(metadata) = read_sqlite_metadata(store, thread_id).await
&& (params.include_archived
|| (metadata.archived_at.is_none()
@@ -69,6 +82,13 @@ pub(super) async fn read_thread(
.ok_or_else(|| ThreadStoreError::InvalidRequest {
message: format!("no rollout found for thread id {thread_id}"),
})?;
if !params.include_archived
&& rollout_path_is_archived(store.config.codex_home.as_path(), path.as_path())
{
return Err(ThreadStoreError::InvalidRequest {
message: format!("thread {thread_id} is archived"),
});
}
let mut thread = read_thread_from_rollout_path(store, path).await?;
attach_history_if_requested(&mut thread, params.include_history).await?;

View File

@@ -103,6 +103,12 @@ impl App {
})
}
fn unavailable_live_thread_attach_error(thread_id: ThreadId) -> color_eyre::Report {
color_eyre::eyre::eyre!(
"Agent thread {thread_id} is not yet available for replay or live attach."
)
}
/// Updates cached picker metadata and then mirrors any visible-label change into the footer.
///
/// These two writes stay paired so the picker rows and contextual footer continue to describe
@@ -226,9 +232,16 @@ impl App {
(thread, turns)
}
Err(err) if Self::can_fallback_from_include_turns_error(&err) => {
let thread = app_server
let thread = match app_server
.thread_read(thread_id, /*include_turns*/ false)
.await?;
.await
{
Ok(thread) => thread,
Err(err) if Self::is_terminal_thread_read_error(&err) => {
return Err(Self::unavailable_live_thread_attach_error(thread_id));
}
Err(err) => return Err(err),
};
(thread, Vec::new())
}
Err(err) => return Err(err),
@@ -236,9 +249,7 @@ impl App {
if turns.is_empty() {
// A `thread/read` fallback without turns would create a blank local replay
// channel with no live listener attached, which blocks later real re-attach.
return Err(color_eyre::eyre::eyre!(
"Agent thread {thread_id} is not yet available for replay or live attach."
));
return Err(Self::unavailable_live_thread_attach_error(thread_id));
}
let mut session = self.session_state_for_thread_read(thread_id, &thread).await;
// `thread/read` can seed replay state, but it does not attach the app-server