Compare commits

...

5 Commits

Author SHA1 Message Date
Eric Traut
098445eff5 Merge branch 'main' into codex/validate-codex-issue-10216 2026-01-30 14:12:22 -08:00
Eric Traut
60cf29cad5 Merge branch 'main' into codex/validate-codex-issue-10216 2026-01-30 13:27:14 -08:00
Eric Traut
7d85510d67 Merge branch 'main' into codex/validate-codex-issue-10216 2026-01-30 12:45:46 -08:00
Eric Traut
5c47c129a7 Review codex bug report 10216 2026-01-30 11:52:19 -08:00
Eric Traut
811882f6c2 Validate and fix bug report 10216 2026-01-30 11:37:08 -08:00
2 changed files with 129 additions and 2 deletions

View File

@@ -17,10 +17,13 @@ use uuid::Uuid;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
use crate::event_mapping::parse_turn_item;
use crate::protocol::EventMsg;
use crate::state_db;
use codex_file_search as file_search;
use codex_protocol::ThreadId;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMetaLine;
@@ -44,7 +47,9 @@ pub struct ThreadsPage {
pub struct ThreadItem {
/// Absolute path to the rollout file.
pub path: PathBuf,
/// First up to `HEAD_RECORD_LIMIT` JSONL records parsed as JSON (includes meta line).
/// Parsed records used for summaries.
/// Includes the first `HEAD_RECORD_LIMIT` JSONL records (including meta line),
/// and may append the first user message found shortly after the head.
pub head: Vec<serde_json::Value>,
/// RFC3339 timestamp string for when the session was created, if available.
/// created_at comes from the filename timestamp with second precision.
@@ -64,6 +69,7 @@ pub type ConversationsPage = ThreadsPage;
#[derive(Default)]
struct HeadTailSummary {
head: Vec<serde_json::Value>,
first_user_message: Option<serde_json::Value>,
saw_session_meta: bool,
saw_user_event: bool,
source: Option<SessionSource>,
@@ -940,6 +946,12 @@ impl<'a> ProviderMatcher<'a> {
}
}
fn head_contains_user_message(head: &[serde_json::Value]) -> bool {
head.iter()
.filter_map(|value| serde_json::from_value::<ResponseItem>(value.clone()).ok())
.any(|item| matches!(parse_turn_item(&item), Some(TurnItem::UserMessage(_))))
}
async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTailSummary> {
use tokio::io::AsyncBufReadExt;
@@ -985,10 +997,20 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
.created_at
.clone()
.or_else(|| Some(rollout_line.timestamp.clone()));
let is_user_message =
matches!(parse_turn_item(&item), Some(TurnItem::UserMessage(_)));
if is_user_message {
summary.saw_user_event = true;
}
if summary.head.len() < head_limit
&& let Ok(val) = serde_json::to_value(item)
&& let Ok(val) = serde_json::to_value(&item)
{
summary.head.push(val);
} else if is_user_message
&& summary.first_user_message.is_none()
&& let Ok(val) = serde_json::to_value(&item)
{
summary.first_user_message = Some(val);
}
}
RolloutItem::TurnContext(_) => {
@@ -1009,6 +1031,13 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
}
}
if summary.first_user_message.is_some()
&& !head_contains_user_message(&summary.head)
&& let Some(first_user_message) = summary.first_user_message.take()
{
summary.head.push(first_user_message);
}
Ok(summary)
}

View File

@@ -6,6 +6,7 @@ use std::fs::FileTimes;
use std::fs::{self};
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
@@ -16,6 +17,7 @@ use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
use crate::event_mapping;
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
use crate::rollout::list::Cursor;
use crate::rollout::list::ThreadItem;
@@ -25,6 +27,7 @@ use crate::rollout::list::get_threads;
use crate::rollout::rollout_date_parts;
use anyhow::Result;
use codex_protocol::ThreadId;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
@@ -200,6 +203,77 @@ fn write_session_file_with_delayed_user_event(
Ok(())
}
fn write_session_file_with_delayed_user_response_item(
root: &Path,
ts_str: &str,
uuid: Uuid,
assistant_before_user: usize,
) -> std::io::Result<PathBuf> {
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let dt = PrimitiveDateTime::parse(ts_str, format)
.unwrap()
.assume_utc();
let dir = root
.join("sessions")
.join(format!("{:04}", dt.year()))
.join(format!("{:02}", u8::from(dt.month())))
.join(format!("{:02}", dt.day()));
fs::create_dir_all(&dir)?;
let filename = format!("rollout-{ts_str}-{uuid}.jsonl");
let file_path = dir.join(filename);
let mut file = File::create(&file_path)?;
let payload = serde_json::json!({
"id": uuid,
"timestamp": ts_str,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "cli",
"model_provider": TEST_PROVIDER,
});
let meta = serde_json::json!({
"timestamp": ts_str,
"type": "session_meta",
"payload": payload,
});
writeln!(file, "{meta}")?;
for i in 0..assistant_before_user {
let assistant_line = serde_json::json!({
"timestamp": ts_str,
"type": "response_item",
"payload": {
"type": "message",
"role": "assistant",
"content": [
{"type": "output_text", "text": format!("assistant-{i}")}
]
}
});
writeln!(file, "{assistant_line}")?;
}
let user_line = serde_json::json!({
"timestamp": ts_str,
"type": "response_item",
"payload": {
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "real question"}
]
}
});
writeln!(file, "{user_line}")?;
let times = FileTimes::new().set_modified(dt.into());
file.set_times(times)?;
Ok(file_path)
}
fn write_session_file_with_meta_payload(
root: &Path,
ts_str: &str,
@@ -633,6 +707,30 @@ async fn test_list_threads_scans_past_head_for_user_event() {
assert_eq!(page.items.len(), 1);
}
#[tokio::test]
async fn test_head_summary_includes_first_user_message_beyond_head_limit() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let uuid = Uuid::from_u128(1234);
let ts = "2025-06-01T12-00-00";
let path = write_session_file_with_delayed_user_response_item(home, ts, uuid, 12).unwrap();
let head = crate::rollout::list::read_head_for_summary(&path)
.await
.unwrap();
let preview = head
.iter()
.filter_map(|value| serde_json::from_value::<ResponseItem>(value.clone()).ok())
.find_map(|item| match event_mapping::parse_turn_item(&item) {
Some(TurnItem::UserMessage(user)) => Some(user.message()),
_ => None,
});
assert_eq!(preview.as_deref(), Some("real question"));
}
#[tokio::test]
async fn test_get_thread_contents() {
let temp = TempDir::new().unwrap();