Compare commits

...

1 Commits

Author SHA1 Message Date
Thibault Sottiaux
c52dc9fd93 fix(core): guard history log id after trims 2026-01-08 09:31:23 -08:00

View File

@@ -16,8 +16,11 @@
//! Note: `conversation_id` stores the thread id; the field name is preserved for
//! backwards compatibility with existing history files.
use std::collections::hash_map::DefaultHasher;
use std::fs::File;
use std::fs::OpenOptions;
use std::hash::Hash;
use std::hash::Hasher;
use std::io::BufRead;
use std::io::BufReader;
use std::io::Read;
@@ -33,7 +36,9 @@ use serde::Serialize;
use std::time::Duration;
use tokio::fs;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt;
use tokio::io::BufReader as AsyncBufReader;
use crate::config::Config;
use crate::config::types::HistoryPersistence;
@@ -243,6 +248,23 @@ fn trim_target_bytes(max_bytes: u64, newest_entry_len: u64) -> u64 {
soft_cap_bytes.max(newest_entry_len)
}
fn trim_history_line_end(line: &str) -> &str {
line.trim_end_matches(['\n', '\r'])
}
fn history_log_id_with_first_line(base_id: u64, first_line: Option<&str>) -> u64 {
if base_id == 0 {
return 0;
}
let Some(line) = first_line else {
return base_id;
};
let mut hasher = DefaultHasher::new();
line.hash(&mut hasher);
base_id ^ hasher.finish()
}
/// Asynchronously fetch the history file's *identifier* (inode on Unix) and
/// the current number of entries by counting newline characters.
pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) {
@@ -286,7 +308,7 @@ async fn ensure_owner_only_permissions(_file: &File) -> Result<()> {
}
async fn history_metadata_for_file(path: &Path) -> (u64, usize) {
let log_id = match fs::metadata(path).await {
let base_id = match fs::metadata(path).await {
Ok(metadata) => history_log_id(&metadata).unwrap_or(0),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return (0, 0),
Err(_) => return (0, 0),
@@ -295,14 +317,29 @@ async fn history_metadata_for_file(path: &Path) -> (u64, usize) {
// Open the file.
let mut file = match fs::File::open(path).await {
Ok(f) => f,
Err(_) => return (log_id, 0),
Err(_) => return (base_id, 0),
};
// Count newline bytes.
let mut buf = [0u8; 8192];
let mut reader = AsyncBufReader::new(&mut file);
let mut first_line = String::new();
let first_line_bytes = match reader.read_line(&mut first_line).await {
Ok(0) => return (history_log_id_with_first_line(base_id, None), 0),
Ok(bytes) => bytes,
Err(_) => return (history_log_id_with_first_line(base_id, None), 0),
};
let trimmed_first_line = trim_history_line_end(&first_line);
let log_id = history_log_id_with_first_line(base_id, Some(trimmed_first_line));
let mut count = 0usize;
if first_line_bytes > 0 && first_line.ends_with('\n') {
count += 1;
}
// Count newline bytes in the remainder.
let mut buf = [0u8; 8192];
loop {
match file.read(&mut buf).await {
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
count += buf[..n].iter().filter(|&&b| b == b'\n').count();
@@ -334,11 +371,7 @@ fn lookup_history_entry(path: &Path, log_id: u64, offset: usize) -> Option<Histo
}
};
let current_log_id = history_log_id(&metadata)?;
if log_id != 0 && current_log_id != log_id {
return None;
}
let base_id = history_log_id(&metadata)?;
// Open & lock file for reading using a shared lock.
// Retry a few times to avoid indefinite blocking.
@@ -348,6 +381,7 @@ fn lookup_history_entry(path: &Path, log_id: u64, offset: usize) -> Option<Histo
match lock_result {
Ok(()) => {
let reader = BufReader::new(&file);
let mut saw_first_line = false;
for (idx, line_res) in reader.lines().enumerate() {
let line = match line_res {
Ok(l) => l,
@@ -357,6 +391,15 @@ fn lookup_history_entry(path: &Path, log_id: u64, offset: usize) -> Option<Histo
}
};
if idx == 0 {
saw_first_line = true;
let current_log_id =
history_log_id_with_first_line(base_id, Some(line.as_str()));
if log_id != 0 && current_log_id != log_id {
return None;
}
}
if idx == offset {
match serde_json::from_str::<HistoryEntry>(&line) {
Ok(entry) => return Some(entry),
@@ -367,6 +410,12 @@ fn lookup_history_entry(path: &Path, log_id: u64, offset: usize) -> Option<Histo
}
}
}
if !saw_first_line
&& log_id != 0
&& history_log_id_with_first_line(base_id, None) != log_id
{
return None;
}
// Not found at requested offset.
return None;
}
@@ -489,6 +538,43 @@ mod tests {
assert_eq!(fetched, appended);
}
#[tokio::test]
async fn lookup_rejects_stale_log_id_after_trim() {
let codex_home = TempDir::new().expect("create temp dir");
let mut config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.build()
.await
.expect("load config");
let conversation_id = ThreadId::new();
let entry_one = "a".repeat(200);
let entry_two = "b".repeat(200);
let history_path = codex_home.path().join("history.jsonl");
append_entry(&entry_one, &conversation_id, &config)
.await
.expect("write first entry");
let (log_id, count) = history_metadata_for_file(&history_path).await;
assert_eq!(count, 1);
let first_len = std::fs::metadata(&history_path)
.expect("metadata")
.len()
.saturating_add(1);
config.history.max_bytes = Some(usize::try_from(first_len).expect("max bytes fits usize"));
append_entry(&entry_two, &conversation_id, &config)
.await
.expect("write second entry");
let entry = lookup_history_entry(&history_path, log_id, 0);
assert_eq!(entry, None);
}
#[tokio::test]
async fn append_entry_trims_history_when_beyond_max_bytes() {
let codex_home = TempDir::new().expect("create temp dir");