fix: make resume cwd filtering and thread renames deterministic

This commit is contained in:
Yaroslav Volovich
2026-02-25 20:45:27 +00:00
parent 5d91f2b2c2
commit d7602c3081
2 changed files with 128 additions and 13 deletions

View File

@@ -2238,7 +2238,7 @@ impl CodexMessageProcessor {
return;
};
let (_, thread) = match self.load_thread(&thread_id).await {
let (thread_id, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
@@ -2246,17 +2246,54 @@ impl CodexMessageProcessor {
}
};
let expected_name = name.clone();
if let Err(err) = thread.submit(Op::SetThreadName { name }).await {
self.send_internal_error(request_id, format!("failed to set thread name: {err}"))
.await;
return;
}
if let Err(err) = self
.wait_for_thread_name_persisted(thread_id, &expected_name)
.await
{
self.send_internal_error(
request_id,
format!("failed to observe persisted thread name: {err}"),
)
.await;
return;
}
self.outgoing
.send_response(request_id, ThreadSetNameResponse {})
.await;
}
async fn wait_for_thread_name_persisted(
&self,
thread_id: ThreadId,
expected_name: &str,
) -> std::io::Result<()> {
let codex_home = self.config.codex_home.clone();
tokio::time::timeout(Duration::from_secs(2), async {
loop {
match find_thread_name_by_id(&codex_home, &thread_id).await? {
Some(name) if name == expected_name => return Ok(()),
_ => tokio::time::sleep(Duration::from_millis(10)).await,
}
}
})
.await
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!("timed out waiting for thread name {expected_name:?}"),
)
})?
}
async fn thread_unarchive(
&mut self,
request_id: ConnectionRequestId,

View File

@@ -348,7 +348,7 @@ impl RolloutRecorder {
default_provider,
)
.await?;
if let Some(path) = select_resume_path(&page, filter_cwd) {
if let Some(path) = select_resume_path(&page, filter_cwd).await {
return Ok(Some(path));
}
cursor = page.next_cursor;
@@ -961,23 +961,36 @@ impl From<codex_state::ThreadsPage> for ThreadsPage {
}
}
fn select_resume_path(page: &ThreadsPage, filter_cwd: Option<&Path>) -> Option<PathBuf> {
async fn select_resume_path(page: &ThreadsPage, filter_cwd: Option<&Path>) -> Option<PathBuf> {
match filter_cwd {
Some(cwd) => page.items.iter().find_map(|item| {
if item
.cwd
.as_ref()
.is_some_and(|session_cwd| cwd_matches(session_cwd, cwd))
{
Some(item.path.clone())
} else {
None
Some(cwd) => {
for item in &page.items {
let effective_cwd = latest_turn_context_cwd(item.path.as_path())
.await
.or_else(|| item.cwd.clone());
if effective_cwd
.as_ref()
.is_some_and(|session_cwd| cwd_matches(session_cwd, cwd))
{
return Some(item.path.clone());
}
}
}),
None
}
None => page.items.first().map(|item| item.path.clone()),
}
}
async fn latest_turn_context_cwd(rollout_path: &Path) -> Option<PathBuf> {
let (items, _thread_id, _parse_errors) = RolloutRecorder::load_rollout_items(rollout_path)
.await
.ok()?;
items.iter().rev().find_map(|item| match item {
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.clone()),
_ => None,
})
}
fn select_resume_path_from_db_page(
page: &codex_state::ThreadsPage,
filter_cwd: Option<&Path>,
@@ -1010,8 +1023,13 @@ mod tests {
use crate::config::ConfigBuilder;
use crate::features::Feature;
use chrono::TimeZone;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::UserMessageEvent;
use pretty_assertions::assert_eq;
use std::fs::File;
@@ -1054,6 +1072,35 @@ mod tests {
Ok(path)
}
fn append_turn_context(path: &Path, ts: &str, cwd: &Path) -> std::io::Result<()> {
let mut file = fs::OpenOptions::new().append(true).open(path)?;
let line = RolloutLine {
timestamp: ts.to_string(),
item: RolloutItem::TurnContext(TurnContextItem {
turn_id: None,
cwd: cwd.to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
network: None,
model: "test-model".to_string(),
personality: None,
collaboration_mode: None,
effort: None,
summary: ReasoningSummary::Auto,
user_instructions: None,
developer_instructions: None,
final_output_json_schema: None,
truncation_policy: None,
}),
};
writeln!(
file,
"{}",
serde_json::to_string(&line).map_err(std::io::Error::other)?
)?;
Ok(())
}
#[tokio::test]
async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
@@ -1320,4 +1367,35 @@ mod tests {
assert_eq!(repaired_path, Some(real_path));
Ok(())
}
#[tokio::test]
async fn find_latest_thread_path_uses_latest_turn_context_cwd_without_sqlite()
-> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let mut config = ConfigBuilder::default()
.codex_home(home.path().to_path_buf())
.build()
.await?;
config.features.disable(Feature::Sqlite);
let latest_cwd = home.path().join("latest-cwd");
let rollout_path =
write_session_file(home.path(), "2025-01-03T12-00-00", Uuid::from_u128(9012))?;
append_turn_context(&rollout_path, "2025-01-03T12-00-01", latest_cwd.as_path())?;
let found = RolloutRecorder::find_latest_thread_path(
&config,
1,
None,
ThreadSortKey::UpdatedAt,
&[],
None,
config.model_provider_id.as_str(),
Some(latest_cwd.as_path()),
)
.await?;
assert_eq!(found, Some(rollout_path));
Ok(())
}
}