mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Skip unsaved thread names during resume lookup
This commit is contained in:
@@ -5375,6 +5375,8 @@ mod handlers {
|
||||
return;
|
||||
};
|
||||
|
||||
sess.ensure_rollout_materialized().await;
|
||||
|
||||
let codex_home = sess.codex_home().await;
|
||||
if let Err(e) =
|
||||
session_index::append_thread_name(&codex_home, sess.conversation_id, &name).await
|
||||
|
||||
@@ -2166,7 +2166,7 @@ async fn attach_rollout_recorder(session: &Arc<Session>) -> PathBuf {
|
||||
let recorder = RolloutRecorder::new(
|
||||
config.as_ref(),
|
||||
RolloutRecorderParams::new(
|
||||
ThreadId::default(),
|
||||
session.conversation_id,
|
||||
/*forked_from_id*/ None,
|
||||
SessionSource::Exec,
|
||||
BaseInstructions::default(),
|
||||
|
||||
@@ -136,10 +136,32 @@ pub async fn find_thread_path_by_name_str(
|
||||
codex_home: &Path,
|
||||
name: &str,
|
||||
) -> std::io::Result<Option<PathBuf>> {
|
||||
let Some(thread_id) = find_thread_id_by_name(codex_home, name).await? else {
|
||||
if name.trim().is_empty() {
|
||||
return Ok(None);
|
||||
};
|
||||
super::list::find_thread_path_by_id_str(codex_home, &thread_id.to_string()).await
|
||||
}
|
||||
let path = session_index_path(codex_home);
|
||||
if !path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
let name = name.to_string();
|
||||
// Collect all matching thread ids newest-first instead of stopping at the first name hit:
|
||||
// the newest entry may point at a thread whose rollout was never materialized.
|
||||
let thread_ids =
|
||||
tokio::task::spawn_blocking(move || collect_thread_ids_from_end_by_name(&path, &name))
|
||||
.await
|
||||
.map_err(std::io::Error::other)??;
|
||||
|
||||
for thread_id in thread_ids {
|
||||
// Keep walking until a matching id resolves to an existing rollout so an unsaved rename
|
||||
// cannot shadow an older persisted session with the same name.
|
||||
if let Some(path) =
|
||||
super::list::find_thread_path_by_id_str(codex_home, &thread_id.to_string()).await?
|
||||
{
|
||||
return Ok(Some(path));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn session_index_path(codex_home: &Path) -> PathBuf {
|
||||
@@ -160,12 +182,39 @@ fn scan_index_from_end_by_name(
|
||||
scan_index_from_end(path, |entry| entry.thread_name == name)
|
||||
}
|
||||
|
||||
fn collect_thread_ids_from_end_by_name(path: &Path, name: &str) -> std::io::Result<Vec<ThreadId>> {
|
||||
let mut seen = HashSet::new();
|
||||
let mut ids = Vec::new();
|
||||
scan_index_from_end_for_each(path, |entry| {
|
||||
if entry.thread_name == name && seen.insert(entry.id) {
|
||||
ids.push(entry.id);
|
||||
}
|
||||
Ok(None)
|
||||
})?;
|
||||
Ok(ids)
|
||||
}
|
||||
|
||||
fn scan_index_from_end<F>(
|
||||
path: &Path,
|
||||
mut predicate: F,
|
||||
) -> std::io::Result<Option<SessionIndexEntry>>
|
||||
where
|
||||
F: FnMut(&SessionIndexEntry) -> bool,
|
||||
{
|
||||
scan_index_from_end_for_each(path, |entry| {
|
||||
if predicate(entry) {
|
||||
return Ok(Some(entry.clone()));
|
||||
}
|
||||
Ok(None)
|
||||
})
|
||||
}
|
||||
|
||||
fn scan_index_from_end_for_each<F>(
|
||||
path: &Path,
|
||||
mut visit_entry: F,
|
||||
) -> std::io::Result<Option<SessionIndexEntry>>
|
||||
where
|
||||
F: FnMut(&SessionIndexEntry) -> std::io::Result<Option<SessionIndexEntry>>,
|
||||
{
|
||||
let mut file = File::open(path)?;
|
||||
let mut remaining = file.metadata()?.len();
|
||||
@@ -181,7 +230,7 @@ where
|
||||
|
||||
for &byte in buf[..read_size].iter().rev() {
|
||||
if byte == b'\n' {
|
||||
if let Some(entry) = parse_line_from_rev(&mut line_rev, &mut predicate)? {
|
||||
if let Some(entry) = parse_line_from_rev(&mut line_rev, &mut visit_entry)? {
|
||||
return Ok(Some(entry));
|
||||
}
|
||||
continue;
|
||||
@@ -190,7 +239,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(entry) = parse_line_from_rev(&mut line_rev, &mut predicate)? {
|
||||
if let Some(entry) = parse_line_from_rev(&mut line_rev, &mut visit_entry)? {
|
||||
return Ok(Some(entry));
|
||||
}
|
||||
|
||||
@@ -199,10 +248,10 @@ where
|
||||
|
||||
fn parse_line_from_rev<F>(
|
||||
line_rev: &mut Vec<u8>,
|
||||
predicate: &mut F,
|
||||
visit_entry: &mut F,
|
||||
) -> std::io::Result<Option<SessionIndexEntry>>
|
||||
where
|
||||
F: FnMut(&SessionIndexEntry) -> bool,
|
||||
F: FnMut(&SessionIndexEntry) -> std::io::Result<Option<SessionIndexEntry>>,
|
||||
{
|
||||
if line_rev.is_empty() {
|
||||
return Ok(None);
|
||||
@@ -222,10 +271,7 @@ where
|
||||
let Ok(entry) = serde_json::from_str::<SessionIndexEntry>(trimmed) else {
|
||||
return Ok(None);
|
||||
};
|
||||
if predicate(&entry) {
|
||||
return Ok(Some(entry));
|
||||
}
|
||||
Ok(None)
|
||||
visit_entry(&entry)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -39,6 +39,39 @@ fn find_thread_id_by_name_prefers_latest_entry() -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_thread_path_by_name_str_skips_newest_entry_without_rollout() -> std::io::Result<()> {
|
||||
// A newer unsaved name entry should not shadow an older persisted rollout with the same name.
|
||||
let temp = TempDir::new()?;
|
||||
let path = session_index_path(temp.path());
|
||||
let saved_id = ThreadId::new();
|
||||
let unsaved_id = ThreadId::new();
|
||||
let saved_rollout_path = temp
|
||||
.path()
|
||||
.join("sessions/2024/01/01")
|
||||
.join(format!("rollout-2024-01-01T00-00-00-{saved_id}.jsonl"));
|
||||
std::fs::create_dir_all(saved_rollout_path.parent().expect("rollout parent"))?;
|
||||
std::fs::write(&saved_rollout_path, "")?;
|
||||
let lines = vec![
|
||||
SessionIndexEntry {
|
||||
id: saved_id,
|
||||
thread_name: "same".to_string(),
|
||||
updated_at: "2024-01-01T00:00:00Z".to_string(),
|
||||
},
|
||||
SessionIndexEntry {
|
||||
id: unsaved_id,
|
||||
thread_name: "same".to_string(),
|
||||
updated_at: "2024-01-02T00:00:00Z".to_string(),
|
||||
},
|
||||
];
|
||||
write_index(&path, &lines)?;
|
||||
|
||||
let found = find_thread_path_by_name_str(temp.path(), "same").await?;
|
||||
|
||||
assert_eq!(found, Some(saved_rollout_path));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn find_thread_name_by_id_prefers_latest_entry() -> std::io::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
|
||||
Reference in New Issue
Block a user