Compare commits

...

7 Commits

Author SHA1 Message Date
Ahmed Ibrahim
381516ee3e timestamp 2025-09-09 07:51:14 -07:00
Ahmed Ibrahim
dfaa3fb242 timestamp 2025-09-08 20:19:28 -07:00
Ahmed Ibrahim
28c57aded3 timestamp 2025-09-08 18:48:03 -07:00
Ahmed Ibrahim
5e41303911 timestamp 2025-09-08 18:47:39 -07:00
Ahmed Ibrahim
bcf855fe1f timestamp 2025-09-08 18:42:08 -07:00
Ahmed Ibrahim
617b1c1ea4 timestamp 2025-09-08 18:40:38 -07:00
Ahmed Ibrahim
f94e1e3298 timestamp 2025-09-08 18:38:19 -07:00
9 changed files with 805 additions and 263 deletions

View File

@@ -104,6 +104,7 @@ use crate::protocol::TurnDiffEvent;
use crate::protocol::WebSearchBeginEvent;
use crate::rollout::RolloutRecorder;
use crate::rollout::RolloutRecorderParams;
use crate::rollout::recorder::RolloutItemSliceExt;
use crate::safety::SafetyCheck;
use crate::safety::assess_command_safety;
use crate::safety::assess_safety_for_untrusted_command;
@@ -512,9 +513,7 @@ impl Session {
})
.chain(post_session_configured_error_events.into_iter());
for event in events {
if let Err(e) = tx_event.send(event).await {
error!("failed to send event: {e:?}");
}
sess.send_event(event).await;
}
Ok((sess, turn_context))
@@ -547,11 +546,15 @@ impl Session {
self.record_initial_history_new(turn_context).await;
}
InitialHistory::Forked(items) => {
self.record_initial_history_from_items(items).await;
self.record_initial_history_from_rollout_items(turn_context, items)
.await;
}
InitialHistory::Resumed(resumed_history) => {
self.record_initial_history_from_items(resumed_history.history)
.await;
self.record_initial_history_from_rollout_items(
turn_context,
resumed_history.history,
)
.await;
}
}
}
@@ -570,27 +573,51 @@ impl Session {
Some(turn_context.sandbox_policy.clone()),
Some(self.user_shell.clone()),
)));
self.record_conversation_items(&conversation_items).await;
self.record_conversation_items(turn_context, &conversation_items)
.await;
}
async fn record_initial_history_from_items(&self, items: Vec<ResponseItem>) {
self.record_conversation_items_internal(&items, false).await;
async fn record_initial_history_from_rollout_items(
&self,
turn_context: &TurnContext,
items: Vec<crate::rollout::RolloutItem>,
) {
use crate::rollout::recorder::RolloutItemSliceExt as _;
let response_items = items.get_response_items();
self.record_conversation_items_internal(turn_context, &response_items, false)
.await;
}
/// build the initial messages vector for SessionConfigured by converting
/// ResponseItems into EventMsg.
fn build_initial_messages(&self, items: &[ResponseItem]) -> Vec<EventMsg> {
items
/// Build the initial UI messages vector for SessionConfigured from
/// persisted rollout items. Prefer persisted events when present; fall back
/// to converting response items for legacy rollouts.
fn build_initial_messages(&self, items: &[crate::rollout::RolloutItem]) -> Vec<EventMsg> {
let evs = items.get_events();
if !evs.is_empty() {
return evs;
}
let responses = items.get_response_items();
responses
.iter()
.flat_map(|item| {
map_response_item_to_event_messages(item, self.show_raw_agent_reasoning)
})
.collect()
.collect::<Vec<EventMsg>>()
}
/// Sends the given event to the client and swallows the send event, if
/// any, logging it as an error.
/// Sends an event to the client and records it to the rollout (if enabled).
pub(crate) async fn send_event(&self, event: Event) {
// Clone recorder handle outside the lock scope to avoid holding the
// mutex guard across an await point.
let recorder = {
let guard = self.rollout.lock_unchecked();
guard.as_ref().cloned()
};
if let Some(rec) = recorder
&& let Err(e) = rec.record_events(std::slice::from_ref(&event)).await
{
error!("failed to record rollout event: {e:#}");
}
if let Err(e) = self.tx_event.send(event).await {
error!("failed to send tool call event: {e}");
}
@@ -624,7 +651,7 @@ impl Session {
reason,
}),
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
rx_approve
}
@@ -656,7 +683,7 @@ impl Session {
grant_root,
}),
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
rx_approve
}
@@ -682,21 +709,36 @@ impl Session {
/// Records items to both the rollout and the chat completions/ZDR
/// transcript, if enabled.
async fn record_conversation_items(&self, items: &[ResponseItem]) {
self.record_conversation_items_internal(items, true).await;
async fn record_conversation_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
self.record_conversation_items_internal(turn_context, items, true)
.await;
}
async fn record_conversation_items_internal(&self, items: &[ResponseItem], persist: bool) {
async fn record_conversation_items_internal(
&self,
turn_context: &TurnContext,
items: &[ResponseItem],
persist: bool,
) {
debug!("Recording items for conversation: {items:?}");
if persist {
self.record_state_snapshot(items).await;
self.record_state_snapshot(turn_context, items).await;
}
self.state.lock_unchecked().history.record_items(items);
}
async fn record_state_snapshot(&self, items: &[ResponseItem]) {
let snapshot = { crate::rollout::SessionStateSnapshot {} };
async fn record_state_snapshot(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
let tc_snapshot = crate::rollout::recorder::TurnContextSnapshot {
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
sandbox_policy: turn_context.sandbox_policy.clone(),
model: turn_context.client.get_model(),
show_raw_agent_reasoning: self.show_raw_agent_reasoning,
};
let snapshot = crate::rollout::SessionStateSnapshot {
turn_context: tc_snapshot,
};
let recorder = {
let guard = self.rollout.lock_unchecked();
@@ -707,9 +749,30 @@ impl Session {
if let Err(e) = rec.record_state(snapshot).await {
error!("failed to record rollout state: {e:#}");
}
if let Err(e) = rec.record_items(items).await {
if let Err(e) = rec.record_response_items(items).await {
error!("failed to record rollout items: {e:#}");
}
// Also persist user input as EventMsg::UserMessage so resuming can
// reconstruct an event stream without relying on mapping.
// Only keep user messages; avoid duplicating assistant events which
// are recorded when emitted during streaming.
let derived_user_events: Vec<crate::protocol::Event> = items
.iter()
.flat_map(|it| {
map_response_item_to_event_messages(it, self.show_raw_agent_reasoning)
})
.filter(|ev| matches!(ev, EventMsg::UserMessage(_)))
.map(|msg| crate::protocol::Event {
id: String::new(),
msg,
})
.collect();
if !derived_user_events.is_empty() {
if let Err(e) = rec.record_events(&derived_user_events).await {
error!("failed to record derived user events: {e:#}");
}
}
}
}
@@ -752,7 +815,7 @@ impl Session {
id: sub_id.to_string(),
msg,
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
}
async fn on_exec_command_end(
@@ -799,7 +862,7 @@ impl Session {
id: sub_id.to_string(),
msg,
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
// If this is an apply_patch, after we emit the end patch, emit a second event
// with the full turn diff if there is one.
@@ -811,7 +874,7 @@ impl Session {
id: sub_id.into(),
msg,
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
}
}
}
@@ -1050,9 +1113,9 @@ impl AgentTask {
id: self.sub_id,
msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
};
let tx_event = self.sess.tx_event.clone();
let sess = self.sess.clone();
tokio::spawn(async move {
tx_event.send(event).await.ok();
sess.send_event(event).await;
});
}
}
@@ -1148,13 +1211,16 @@ async fn submission_loop(
// Install the new persistent context for subsequent tasks/turns.
turn_context = Arc::new(new_turn_context);
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
cwd,
approval_policy,
sandbox_policy,
// Shell is not configurable from turn to turn
None,
))])
sess.record_conversation_items(
&turn_context,
&[ResponseItem::from(EnvironmentContext::new(
cwd,
approval_policy,
sandbox_policy,
// Shell is not configurable from turn to turn
None,
))],
)
.await;
}
}
@@ -1257,7 +1323,7 @@ async fn submission_loop(
Op::GetHistoryEntryRequest { offset, log_id } => {
let config = config.clone();
let tx_event = sess.tx_event.clone();
let sess2 = sess.clone();
let sub_id = sub.id.clone();
tokio::spawn(async move {
@@ -1285,13 +1351,11 @@ async fn submission_loop(
),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send GetHistoryEntryResponse event: {e}");
}
sess2.send_event(event).await;
});
}
Op::ListMcpTools => {
let tx_event = sess.tx_event.clone();
let sess2 = sess.clone();
let sub_id = sub.id.clone();
// This is a cheap lookup from the connection manager's cache.
@@ -1302,12 +1366,10 @@ async fn submission_loop(
crate::protocol::McpListToolsResponseEvent { tools },
),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send McpListToolsResponse event: {e}");
}
sess2.send_event(event).await;
}
Op::ListCustomPrompts => {
let tx_event = sess.tx_event.clone();
let sess2 = sess.clone();
let sub_id = sub.id.clone();
let custom_prompts: Vec<CustomPrompt> =
@@ -1323,9 +1385,7 @@ async fn submission_loop(
custom_prompts,
}),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send ListCustomPromptsResponse event: {e}");
}
sess2.send_event(event).await;
}
Op::Compact => {
// Create a summarization request as user input
@@ -1376,9 +1436,7 @@ async fn submission_loop(
break;
}
Op::GetHistory => {
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
@@ -1386,9 +1444,7 @@ async fn submission_loop(
entries: sess.state.lock_unchecked().history.contents(),
}),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send ConversationHistory event: {e}");
}
sess.send_event(event).await;
}
_ => {
// Ignore unknown ops; enum is non_exhaustive to allow extensions.
@@ -1426,12 +1482,10 @@ async fn run_task(
model_context_window: turn_context.client.get_model_context_window(),
}),
};
if sess.tx_event.send(event).await.is_err() {
return;
}
sess.send_event(event).await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
sess.record_conversation_items(turn_context, &[initial_input_for_turn.clone().into()])
.await;
let mut last_agent_message: Option<String> = None;
@@ -1448,7 +1502,8 @@ async fn run_task(
.into_iter()
.map(ResponseItem::from)
.collect::<Vec<ResponseItem>>();
sess.record_conversation_items(&pending_input).await;
sess.record_conversation_items(turn_context, &pending_input)
.await;
// Construct the input that we will send to the model. When using the
// Chat completions API (or ZDR clients), the model needs the full
@@ -1575,8 +1630,11 @@ async fn run_task(
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
sess.record_conversation_items(
turn_context,
&items_to_record_in_conversation_history,
)
.await;
}
if responses.is_empty() {
@@ -1600,7 +1658,7 @@ async fn run_task(
message: e.to_string(),
}),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
// let the user continue the conversation
break;
}
@@ -1611,7 +1669,7 @@ async fn run_task(
id: sub_id,
msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
async fn run_turn(
@@ -1787,13 +1845,11 @@ async fn try_run_turn(
output.push(ProcessedResponseItem { item, response });
}
ResponseEvent::WebSearchCallBegin { call_id } => {
let _ = sess
.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }),
})
.await;
sess.send_event(Event {
id: sub_id.to_string(),
msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }),
})
.await;
}
ResponseEvent::Completed {
response_id: _,
@@ -1809,13 +1865,11 @@ async fn try_run_turn(
st.token_info = info.clone();
info
};
sess.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await
.ok();
sess.send_event(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await;
let unified_diff = turn_diff_tracker.get_unified_diff();
if let Ok(Some(unified_diff)) = unified_diff {
@@ -1824,7 +1878,7 @@ async fn try_run_turn(
id: sub_id.to_string(),
msg,
};
let _ = sess.tx_event.send(event).await;
sess.send_event(event).await;
}
return Ok(output);
@@ -1834,21 +1888,21 @@ async fn try_run_turn(
id: sub_id.to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
ResponseEvent::ReasoningSummaryDelta(delta) => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
ResponseEvent::ReasoningSummaryPartAdded => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
ResponseEvent::ReasoningContentDelta(delta) => {
if sess.show_raw_agent_reasoning {
@@ -1858,7 +1912,7 @@ async fn try_run_turn(
AgentReasoningRawContentDeltaEvent { delta },
),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
}
}
@@ -1879,9 +1933,7 @@ async fn run_compact_task(
model_context_window,
}),
};
if sess.tx_event.send(start_event).await.is_err() {
return;
}
sess.send_event(start_event).await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let turn_input: Vec<ResponseItem> =
@@ -2059,7 +2111,7 @@ async fn handle_response_item(
id: sub_id.to_string(),
msg,
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
None
}

View File

@@ -10,6 +10,7 @@ use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutItem;
use crate::rollout::RolloutRecorder;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::models::ResponseItem;
@@ -21,7 +22,7 @@ use tokio::sync::RwLock;
#[derive(Debug, Clone, PartialEq)]
pub struct ResumedHistory {
pub conversation_id: ConversationId,
pub history: Vec<ResponseItem>,
pub history: Vec<RolloutItem>,
pub rollout_path: PathBuf,
}
@@ -29,7 +30,7 @@ pub struct ResumedHistory {
pub enum InitialHistory {
New,
Resumed(ResumedHistory),
Forked(Vec<ResponseItem>),
Forked(Vec<RolloutItem>),
}
/// Represents a newly created Codex conversation, including the first event
@@ -178,7 +179,11 @@ impl ConversationManager {
/// and all items that follow them.
fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) -> InitialHistory {
if n == 0 {
return InitialHistory::Forked(items);
let rollout_items = items
.into_iter()
.map(crate::rollout::RolloutItem::ResponseItem)
.collect();
return InitialHistory::Forked(rollout_items);
}
// Walk backwards counting only `user` Message items, find cut index.
@@ -200,7 +205,13 @@ fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) ->
// No prefix remains after dropping; start a new conversation.
InitialHistory::New
} else {
InitialHistory::Forked(items.into_iter().take(cut_index).collect())
InitialHistory::Forked(
items
.into_iter()
.take(cut_index)
.map(crate::rollout::RolloutItem::ResponseItem)
.collect(),
)
}
}
@@ -258,7 +269,11 @@ mod tests {
let truncated = truncate_after_dropping_last_messages(items.clone(), 1);
assert_eq!(
truncated,
InitialHistory::Forked(vec![items[0].clone(), items[1].clone(), items[2].clone(),])
InitialHistory::Forked(vec![
crate::rollout::RolloutItem::ResponseItem(items[0].clone()),
crate::rollout::RolloutItem::ResponseItem(items[1].clone()),
crate::rollout::RolloutItem::ResponseItem(items[2].clone()),
])
);
let truncated2 = truncate_after_dropping_last_messages(items, 2);

View File

@@ -61,11 +61,11 @@ pub mod spawn;
pub mod terminal;
mod tool_apply_patch;
pub mod turn_diff_tracker;
pub use rollout::Cursor;
pub use rollout::RolloutRecorder;
pub use rollout::SessionMeta;
pub use rollout::list::ConversationItem;
pub use rollout::list::ConversationsPage;
pub use rollout::list::Cursor;
mod user_notification;
pub mod util;
pub use apply_patch::CODEX_APPLY_PATCH_ARG1;

View File

@@ -0,0 +1,91 @@
use codex_protocol::mcp_protocol::ConversationId;
use time::OffsetDateTime;
use time::PrimitiveDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
/// Timestamp format used in rollout filenames: YYYY-MM-DDThh-mm-ss
pub const FILENAME_TS_FMT: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
/// Timestamp format used for JSONL records (UTC, second precision): YYYY-MM-DDThh:mm:ssZ
pub const RECORD_TS_FMT: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z");
/// Pagination cursor identifying a file by timestamp and UUID.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cursor {
pub(crate) ts: OffsetDateTime,
pub(crate) id: Uuid,
}
impl Cursor {
pub fn new(ts: OffsetDateTime, id: Uuid) -> Self {
Self { ts, id }
}
}
impl serde::Serialize for Cursor {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let ts_str = self
.ts
.format(FILENAME_TS_FMT)
.map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?;
serializer.serialize_str(&format!("{ts_str}|{}", self.id))
}
}
impl<'de> serde::Deserialize<'de> for Cursor {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
parse_cursor(&s).ok_or_else(|| serde::de::Error::custom("invalid cursor"))
}
}
/// Parses a pagination cursor token in the form "<file_ts>|<uuid>".
pub fn parse_cursor(token: &str) -> Option<Cursor> {
let (file_ts, uuid_str) = token.split_once('|')?;
let Ok(uuid) = Uuid::parse_str(uuid_str) else {
return None;
};
let ts = PrimitiveDateTime::parse(file_ts, FILENAME_TS_FMT)
.ok()?
.assume_utc();
Some(Cursor::new(ts, uuid))
}
/// Parse timestamp and UUID from a rollout filename.
/// Expected format: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
pub fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
// Scan from the right for a '-' such that the suffix parses as a UUID.
let (sep_idx, uuid) = core
.match_indices('-')
.rev()
.find_map(|(i, _)| Uuid::parse_str(&core[i + 1..]).ok().map(|u| (i, u)))?;
let ts_str = &core[..sep_idx];
let ts = PrimitiveDateTime::parse(ts_str, FILENAME_TS_FMT)
.ok()?
.assume_utc();
Some((ts, uuid))
}
/// Build a rollout filename for a given timestamp and conversation id.
pub fn build_rollout_filename(ts: OffsetDateTime, conversation_id: ConversationId) -> String {
let date_str = ts
.format(FILENAME_TS_FMT)
.unwrap_or_else(|e| panic!("failed to format timestamp: {e}"));
format!("rollout-{date_str}-{conversation_id}.jsonl")
}

View File

@@ -4,12 +4,12 @@ use std::path::Path;
use std::path::PathBuf;
use time::OffsetDateTime;
use time::PrimitiveDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
use super::SESSIONS_SUBDIR;
use super::format::Cursor;
use super::format::parse_timestamp_uuid_from_filename;
use std::sync::Arc;
/// Returned page of conversation summaries.
#[derive(Debug, Default, PartialEq)]
@@ -33,48 +33,14 @@ pub struct ConversationItem {
pub head: Vec<serde_json::Value>,
}
/// A filter applied to a discovered conversation. All filters must pass for
/// the item to be included in results.
pub type ConversationFilter = Arc<dyn Fn(&ConversationItem) -> bool + Send + Sync>;
/// Hard cap to bound worstcase work per request.
const MAX_SCAN_FILES: usize = 10_000;
const HEAD_RECORD_LIMIT: usize = 10;
/// Pagination cursor identifying a file by timestamp and UUID.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cursor {
ts: OffsetDateTime,
id: Uuid,
}
impl Cursor {
fn new(ts: OffsetDateTime, id: Uuid) -> Self {
Self { ts, id }
}
}
impl serde::Serialize for Cursor {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let ts_str = self
.ts
.format(&format_description!(
"[year]-[month]-[day]T[hour]-[minute]-[second]"
))
.map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?;
serializer.serialize_str(&format!("{ts_str}|{}", self.id))
}
}
impl<'de> serde::Deserialize<'de> for Cursor {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
parse_cursor(&s).ok_or_else(|| serde::de::Error::custom("invalid cursor"))
}
}
/// Retrieve recorded conversation file paths with token pagination. The returned `next_cursor`
/// can be supplied on the next call to resume after the last returned item, resilient to
/// concurrent new sessions being appended. Ordering is stable by timestamp desc, then UUID desc.
@@ -82,23 +48,27 @@ pub(crate) async fn get_conversations(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
) -> io::Result<ConversationsPage> {
get_conversations_filtered(codex_home, page_size, cursor, &[]).await
}
/// Retrieve recorded conversations with filters. All provided filters must
/// return `true` for an item to be included.
pub(crate) async fn get_conversations_filtered(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
filters: &[ConversationFilter],
) -> io::Result<ConversationsPage> {
let mut root = codex_home.to_path_buf();
root.push(SESSIONS_SUBDIR);
if !root.exists() {
return Ok(ConversationsPage {
items: Vec::new(),
next_cursor: None,
num_scanned_files: 0,
reached_scan_cap: false,
});
return Ok(empty_page());
}
let anchor = cursor.cloned();
let result = traverse_directories_for_paths(root.clone(), page_size, anchor).await?;
Ok(result)
traverse_directories_for_paths_filtered(root.clone(), page_size, anchor, filters).await
}
/// Load the full contents of a single conversation session file at `path`.
@@ -112,10 +82,11 @@ pub(crate) async fn get_conversation(path: &Path) -> io::Result<String> {
///
/// Directory layout: `~/.codex/sessions/YYYY/MM/DD/rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl`
/// Returned newest (latest) first.
async fn traverse_directories_for_paths(
async fn traverse_directories_for_paths_filtered(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
filters: &[ConversationFilter],
) -> io::Result<ConversationsPage> {
let mut items: Vec<ConversationItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
@@ -170,7 +141,12 @@ async fn traverse_directories_for_paths(
let head = read_first_jsonl_records(&path, HEAD_RECORD_LIMIT)
.await
.unwrap_or_default();
items.push(ConversationItem { path, head });
let item = ConversationItem { path, head };
// Apply all filters
let include = filters.iter().all(|f| f(&item));
if include {
items.push(item);
}
}
}
}
@@ -185,23 +161,6 @@ async fn traverse_directories_for_paths(
})
}
/// Pagination cursor token format: "<file_ts>|<uuid>" where `file_ts` matches the
/// filename timestamp portion (YYYY-MM-DDThh-mm-ss) used in rollout filenames.
/// The cursor orders files by timestamp desc, then UUID desc.
fn parse_cursor(token: &str) -> Option<Cursor> {
let (file_ts, uuid_str) = token.split_once('|')?;
let Ok(uuid) = Uuid::parse_str(uuid_str) else {
return None;
};
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let ts = PrimitiveDateTime::parse(file_ts, format).ok()?.assume_utc();
Some(Cursor::new(ts, uuid))
}
fn build_next_cursor(items: &[ConversationItem]) -> Option<Cursor> {
let last = items.last()?;
let file_name = last.path.file_name()?.to_string_lossy();
@@ -256,21 +215,13 @@ where
Ok(collected)
}
fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
// Scan from the right for a '-' such that the suffix parses as a UUID.
let (sep_idx, uuid) = core
.match_indices('-')
.rev()
.find_map(|(i, _)| Uuid::parse_str(&core[i + 1..]).ok().map(|u| (i, u)))?;
let ts_str = &core[..sep_idx];
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let ts = PrimitiveDateTime::parse(ts_str, format).ok()?.assume_utc();
Some((ts, uuid))
fn empty_page() -> ConversationsPage {
ConversationsPage {
items: Vec::new(),
next_cursor: None,
num_scanned_files: 0,
reached_scan_cap: false,
}
}
async fn read_first_jsonl_records(
@@ -296,3 +247,16 @@ async fn read_first_jsonl_records(
}
Ok(head)
}
/// Returns a filter that requires the first JSONL record to be a tagged
/// session meta line: { "record_type": "session_meta", ... }.
pub(crate) fn requires_tagged_session_meta_filter() -> ConversationFilter {
Arc::new(|item: &ConversationItem| {
item.head
.get(0)
.and_then(|v| v.get("record_type"))
.and_then(|v| v.as_str())
.map(|s| s == "session_meta")
.unwrap_or(false)
})
}

View File

@@ -2,10 +2,13 @@
pub(crate) const SESSIONS_SUBDIR: &str = "sessions";
pub mod format;
pub mod list;
pub(crate) mod policy;
pub mod recorder;
pub use format::Cursor;
pub use recorder::RolloutItem;
pub use recorder::RolloutRecorder;
pub use recorder::RolloutRecorderParams;
pub use recorder::SessionMeta;

View File

@@ -1,3 +1,4 @@
use crate::protocol::EventMsg;
use codex_protocol::models::ResponseItem;
/// Whether a `ResponseItem` should be persisted in rollout files.
@@ -14,3 +15,82 @@ pub(crate) fn is_persisted_response_item(item: &ResponseItem) -> bool {
ResponseItem::WebSearchCall { .. } | ResponseItem::Other => false,
}
}
/// Whether an `EventMsg` should be persisted in rollout files.
///
/// Keep only high-signal, compact items. Avoid deltas and verbose streams.
#[inline]
pub(crate) fn is_persisted_event_msg(event: &EventMsg) -> bool {
match event {
// Core content to replay UI meaningfully
EventMsg::AgentMessage(_)
| EventMsg::AgentReasoning(_)
| EventMsg::TokenCount(_)
| EventMsg::UserMessage(_) => true,
// Everything else is either transient, redundant, or too verbose
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::AgentMessageDeltaEvent;
use crate::protocol::AgentMessageEvent;
use crate::protocol::AgentReasoningDeltaEvent;
use crate::protocol::AgentReasoningEvent;
use crate::protocol::ApplyPatchApprovalRequestEvent;
use crate::protocol::ExecCommandEndEvent;
use crate::protocol::TokenCountEvent;
#[test]
fn test_event_persistence_policy() {
assert!(is_persisted_event_msg(&EventMsg::AgentMessage(
AgentMessageEvent {
message: "hi".to_string(),
}
)));
assert!(is_persisted_event_msg(&EventMsg::AgentReasoning(
AgentReasoningEvent {
text: "think".to_string(),
}
)));
assert!(is_persisted_event_msg(&EventMsg::TokenCount(
TokenCountEvent { info: None }
)));
assert!(!is_persisted_event_msg(&EventMsg::AgentMessageDelta(
AgentMessageDeltaEvent {
delta: "d".to_string(),
}
)));
assert!(!is_persisted_event_msg(&EventMsg::AgentReasoningDelta(
AgentReasoningDeltaEvent {
delta: "d".to_string(),
}
)));
assert!(!is_persisted_event_msg(&EventMsg::ExecCommandEnd(
ExecCommandEndEvent {
call_id: "c".to_string(),
stdout: Default::default(),
stderr: Default::default(),
aggregated_output: Default::default(),
exit_code: 0,
duration: std::time::Duration::from_secs(0),
formatted_output: String::new(),
}
)));
use crate::protocol::FileChange;
use std::collections::HashMap;
use std::path::PathBuf;
assert!(!is_persisted_event_msg(
&EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c".to_string(),
changes: HashMap::<PathBuf, FileChange>::new(),
reason: None,
grant_root: None,
})
));
}
}

View File

@@ -6,6 +6,7 @@ use std::io::Error as IoError;
use std::path::Path;
use std::path::PathBuf;
use crate::protocol::Event;
use codex_protocol::mcp_protocol::ConversationId;
use serde::Deserialize;
use serde::Serialize;
@@ -20,43 +21,63 @@ use tokio::sync::oneshot;
use tracing::info;
use tracing::warn;
use super::Cursor;
use super::SESSIONS_SUBDIR;
use super::format::RECORD_TS_FMT;
use super::format::build_rollout_filename;
use super::list::ConversationFilter;
use super::list::ConversationsPage;
use super::list::Cursor;
use super::list::get_conversations;
use super::list::get_conversations_filtered;
use super::policy::is_persisted_event_msg;
use super::policy::is_persisted_response_item;
use crate::config::Config;
use crate::conversation_manager::InitialHistory;
use crate::conversation_manager::ResumedHistory;
use crate::git_info::GitInfo;
use crate::git_info::collect_git_info;
use crate::protocol::EventMsg;
use codex_protocol::models::ResponseItem;
#[derive(Serialize, Deserialize, Clone, Default)]
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
pub struct SessionMeta {
pub id: ConversationId,
pub timestamp: String,
pub cwd: String,
pub originator: String,
pub cli_version: String,
pub instructions: Option<String>,
}
#[derive(Serialize)]
struct SessionMetaWithGit {
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SessionMetaWithGit {
#[serde(flatten)]
meta: SessionMeta,
#[serde(skip_serializing_if = "Option::is_none")]
git: Option<GitInfo>,
}
#[derive(Serialize, Deserialize, Default, Clone)]
pub struct SessionStateSnapshot {}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SessionStateSnapshot {
pub turn_context: TurnContextSnapshot,
}
#[derive(Serialize, Deserialize, Default, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TurnContextSnapshot {
pub cwd: std::path::PathBuf,
pub approval_policy: crate::protocol::AskForApproval,
pub sandbox_policy: crate::protocol::SandboxPolicy,
pub model: String,
pub show_raw_agent_reasoning: bool,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct SavedSession {
pub session: SessionMeta,
#[serde(default)]
pub items: Vec<ResponseItem>,
#[serde(default)]
pub state: SessionStateSnapshot,
pub state: Option<SessionStateSnapshot>,
pub session_id: ConversationId,
}
@@ -86,11 +107,89 @@ pub enum RolloutRecorderParams {
}
enum RolloutCmd {
AddItems(Vec<ResponseItem>),
AddResponseItems(Vec<ResponseItem>),
AddEvents(Vec<Event>),
UpdateState(SessionStateSnapshot),
Shutdown { ack: oneshot::Sender<()> },
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "record_type", rename_all = "snake_case")]
enum TaggedLine {
#[serde(rename = "response")]
Response {
#[serde(flatten)]
item: ResponseItem,
},
#[serde(rename = "event")]
Event {
#[serde(flatten)]
event: Event,
},
#[serde(rename = "session_meta")]
SessionMeta {
#[serde(flatten)]
meta: SessionMetaWithGit,
},
#[serde(rename = "state")]
State {
#[serde(flatten)]
state: SessionStateSnapshot,
},
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct TimestampedLine {
timestamp: String,
#[serde(flatten)]
record: TaggedLine,
}
#[derive(Debug, Clone)]
pub enum RolloutItem {
ResponseItem(ResponseItem),
Event(Event),
SessionMeta(SessionMetaWithGit),
}
impl PartialEq for RolloutItem {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(RolloutItem::ResponseItem(a), RolloutItem::ResponseItem(b)) => a == b,
(RolloutItem::SessionMeta(_), RolloutItem::SessionMeta(_)) => false,
// Event comparison omitted (foreign type without PartialEq); treat as not equal
(RolloutItem::Event(_), RolloutItem::Event(_)) => false,
_ => false,
}
}
}
/// Convenience helpers to extract typed items from a list of rollout items.
pub trait RolloutItemSliceExt {
fn get_response_items(&self) -> Vec<ResponseItem>;
fn get_events(&self) -> Vec<EventMsg>;
}
impl RolloutItemSliceExt for [RolloutItem] {
fn get_response_items(&self) -> Vec<ResponseItem> {
self.iter()
.filter_map(|it| match it {
RolloutItem::ResponseItem(ri) => Some(ri.clone()),
_ => None,
})
.collect()
}
fn get_events(&self) -> Vec<EventMsg> {
self.iter()
.filter_map(|it| match it {
RolloutItem::Event(ev) => Some(ev.msg.clone()),
_ => None,
})
.collect()
}
}
impl RolloutRecorderParams {
pub fn new(conversation_id: ConversationId, instructions: Option<String>) -> Self {
Self::Create {
@@ -112,7 +211,21 @@ impl RolloutRecorder {
page_size: usize,
cursor: Option<&Cursor>,
) -> std::io::Result<ConversationsPage> {
get_conversations(codex_home, page_size, cursor).await
// Apply a default filter to require files to start with a tagged
// session_meta record. This skips legacy/invalid files from listings.
let filters = vec![super::list::requires_tagged_session_meta_filter()];
get_conversations_filtered(codex_home, page_size, cursor, &filters).await
}
#[allow(dead_code)]
/// List conversations with filters; all filters must pass for inclusion.
pub async fn list_conversations_with_filters(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
filters: &[ConversationFilter],
) -> std::io::Result<ConversationsPage> {
get_conversations_filtered(codex_home, page_size, cursor, filters).await
}
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
@@ -143,6 +256,9 @@ impl RolloutRecorder {
Some(SessionMeta {
timestamp,
id: session_id,
cwd: config.cwd.display().to_string(),
originator: config.responses_originator_header.clone(),
cli_version: env!("CARGO_PKG_VERSION").to_string(),
instructions,
}),
)
@@ -172,25 +288,29 @@ impl RolloutRecorder {
Ok(Self { tx })
}
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
let mut filtered = Vec::new();
for item in items {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
// reformatting them in that case.
if is_persisted_response_item(item) {
filtered.push(item.clone());
}
}
if filtered.is_empty() {
pub(crate) async fn record_response_items(
&self,
items: &[ResponseItem],
) -> std::io::Result<()> {
if items.is_empty() {
return Ok(());
}
self.tx
.send(RolloutCmd::AddItems(filtered))
.send(RolloutCmd::AddResponseItems(items.to_vec()))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
pub(crate) async fn record_events(&self, events: &[Event]) -> std::io::Result<()> {
if events.is_empty() {
return Ok(());
}
self.tx
.send(RolloutCmd::AddEvents(events.to_vec()))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout events: {e}")))
}
pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> {
self.tx
.send(RolloutCmd::UpdateState(state))
@@ -200,28 +320,36 @@ impl RolloutRecorder {
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
info!("Resuming rollout from {path:?}");
tracing::error!("Resuming rollout from {path:?}");
let text = tokio::fs::read_to_string(path).await?;
let mut lines = text.lines();
let first_line = lines
.next()
.ok_or_else(|| IoError::other("empty session file"))?;
let conversation_id = match serde_json::from_str::<SessionMeta>(first_line) {
Ok(rollout_session_meta) => {
tracing::error!(
"Parsed conversation ID from rollout file: {:?}",
rollout_session_meta.id
);
Some(rollout_session_meta.id)
// Support both legacy (bare SessionMeta) and new tagged format
let v_first: Value = serde_json::from_str(first_line)
.map_err(|e| IoError::other(format!("failed to parse first line: {e}")))?;
let conversation_id = if v_first.get("record_type").is_some() {
let rt_ok = v_first
.get("record_type")
.and_then(|s| s.as_str())
.map(|s| s == "session_meta")
.unwrap_or(false);
if !rt_ok {
return Err(IoError::other("first line is not session_meta"));
}
Err(e) => {
return Err(IoError::other(format!(
"failed to parse first line of rollout file as SessionMeta: {e}"
)));
match serde_json::from_value::<SessionMetaWithGit>(v_first.clone()) {
Ok(rollout_session_meta) => Some(rollout_session_meta.meta.id),
Err(e) => {
return Err(IoError::other(format!(
"failed to parse first line (tagged) as SessionMeta: {e}"
)));
}
}
} else {
return Err(IoError::other("first line missing record_type"));
};
let mut items = Vec::new();
let mut items: Vec<RolloutItem> = Vec::new();
for line in lines {
if line.trim().is_empty() {
continue;
@@ -230,23 +358,35 @@ impl RolloutRecorder {
Ok(v) => v,
Err(_) => continue,
};
if v.get("record_type")
.and_then(|rt| rt.as_str())
.map(|s| s == "state")
.unwrap_or(false)
{
// Tagged format: collect responses and events; skip state
if let Some(rt) = v.get("record_type").and_then(|rt| rt.as_str()) {
match rt {
"state" => continue,
"response" => match serde_json::from_value::<ResponseItem>(v.clone()) {
Ok(item) => {
if is_persisted_response_item(&item) {
items.push(RolloutItem::ResponseItem(item));
}
}
Err(e) => {
warn!("failed to parse item: {v:?}, error: {e}");
}
},
"event" => match serde_json::from_value::<Event>(v.clone()) {
Ok(event) => {
if is_persisted_event_msg(&event.msg) {
items.push(RolloutItem::Event(event));
}
}
Err(e) => {
warn!("failed to parse event: {v:?}, error: {e}");
}
},
_ => continue,
}
continue;
}
match serde_json::from_value::<ResponseItem>(v.clone()) {
Ok(item) => {
if is_persisted_response_item(&item) {
items.push(item);
}
}
Err(e) => {
warn!("failed to parse item: {v:?}, error: {e}");
}
}
// Non-tagged lines are ignored in the new format
}
tracing::error!(
@@ -312,13 +452,7 @@ fn create_log_file(
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
// compatibility with filesystems that do not allow colons in filenames.
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let date_str = timestamp
.format(format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let filename = format!("rollout-{date_str}-{conversation_id}.jsonl");
let filename = build_rollout_filename(timestamp, conversation_id);
let path = dir.join(filename);
let file = std::fs::OpenOptions::new()
@@ -350,32 +484,32 @@ async fn rollout_writer(
};
// Write the SessionMeta as the first item in the file
writer.write_line(&session_meta_with_git).await?;
writer
.write_tagged(TaggedLine::SessionMeta {
meta: session_meta_with_git,
})
.await?;
}
// Process rollout commands
while let Some(cmd) = rx.recv().await {
match cmd {
RolloutCmd::AddItems(items) => {
RolloutCmd::AddResponseItems(items) => {
for item in items {
if is_persisted_response_item(&item) {
writer.write_line(&item).await?;
writer.write_tagged(TaggedLine::Response { item }).await?;
}
}
}
RolloutCmd::AddEvents(events) => {
for event in events {
if is_persisted_event_msg(&event.msg) {
writer.write_tagged(TaggedLine::Event { event }).await?;
}
}
}
RolloutCmd::UpdateState(state) => {
#[derive(Serialize)]
struct StateLine<'a> {
record_type: &'static str,
#[serde(flatten)]
state: &'a SessionStateSnapshot,
}
writer
.write_line(&StateLine {
record_type: "state",
state: &state,
})
.await?;
writer.write_tagged(TaggedLine::State { state }).await?;
}
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
@@ -391,8 +525,16 @@ struct JsonlWriter {
}
impl JsonlWriter {
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
let mut json = serde_json::to_string(item)?;
async fn write_tagged(&mut self, record: TaggedLine) -> std::io::Result<()> {
// RFC3339 UTC, second precision
let now = OffsetDateTime::now_utc()
.format(RECORD_TS_FMT)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let line = TimestampedLine {
timestamp: now,
record,
};
let mut json = serde_json::to_string(&line)?;
json.push('\n');
self.file.write_all(json.as_bytes()).await?;
self.file.flush().await?;

View File

@@ -12,11 +12,16 @@ use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
use crate::rollout::Cursor;
use crate::rollout::RolloutItem;
use crate::rollout::RolloutRecorder;
use crate::rollout::list::ConversationFilter;
use crate::rollout::list::ConversationItem;
use crate::rollout::list::ConversationsPage;
use crate::rollout::list::Cursor;
use crate::rollout::list::get_conversation;
use crate::rollout::list::get_conversations;
use crate::rollout::list::get_conversations_filtered;
use pretty_assertions::assert_eq;
fn write_session_file(
root: &Path,
@@ -42,7 +47,10 @@ fn write_session_file(
let meta = serde_json::json!({
"timestamp": ts_str,
"id": uuid.to_string()
"id": uuid.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
});
writeln!(file, "{meta}")?;
@@ -56,6 +64,127 @@ fn write_session_file(
Ok((dt, uuid))
}
#[tokio::test]
async fn test_get_conversations_filtered_by_id() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let u1 = Uuid::from_u128(101);
let u2 = Uuid::from_u128(202);
write_session_file(home, "2025-07-01T00-00-00", u1, 1).unwrap();
write_session_file(home, "2025-07-02T00-00-00", u2, 1).unwrap();
let target = u2.to_string();
let filter: ConversationFilter = std::sync::Arc::new(move |item: &ConversationItem| {
item.head
.first()
.and_then(|v| v.get("id"))
.and_then(|v| v.as_str())
.map(|s| s == target)
.unwrap_or(false)
});
let page = get_conversations_filtered(home, 10, None, &[filter])
.await
.expect("filtered list ok");
assert_eq!(page.items.len(), 1);
let file_name = page.items[0]
.path
.file_name()
.unwrap()
.to_string_lossy()
.to_string();
assert!(file_name.contains(&u2.to_string()));
}
#[tokio::test]
async fn test_rollout_history_parses_tagged_events_and_responses() {
use std::io::Write as _;
use uuid::Uuid;
let temp = TempDir::new().unwrap();
let path = temp.path().join("rollout-test.jsonl");
let mut f = File::create(&path).unwrap();
let session_id = Uuid::from_u128(555).to_string();
// Tagged session_meta line (new format)
writeln!(
f,
"{}",
serde_json::json!({
"timestamp": "2025-09-08T16:03:12Z",
"record_type": "session_meta",
"id": session_id,
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version",
"instructions": null
})
)
.unwrap();
// Allowed event: AgentMessage
writeln!(
f,
"{}",
serde_json::json!({
"timestamp": "2025-09-08T16:03:13Z",
"record_type": "event",
"id": "sub1",
"msg": {"type": "agent_message", "message": "hi"}
})
)
.unwrap();
// Disallowed (delta) event: should be skipped by policy
writeln!(
f,
"{}",
serde_json::json!({
"timestamp": "2025-09-08T16:03:14Z",
"record_type": "event",
"id": "sub1",
"msg": {"type": "agent_message_delta", "delta": "x"}
})
)
.unwrap();
// Response item (assistant message)
writeln!(
f,
"{}",
serde_json::json!({
"timestamp": "2025-09-08T16:03:15Z",
"record_type": "response",
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": "hello"}]
})
)
.unwrap();
let hist = RolloutRecorder::get_rollout_history(&path)
.await
.expect("history ok");
let resumed = match hist {
crate::conversation_manager::InitialHistory::Resumed(r) => r,
_ => panic!("expected resumed history"),
};
// We expect exactly 2 items: one event (allowed) and one response
let kinds: Vec<&'static str> = resumed
.history
.iter()
.map(|it| match it {
RolloutItem::Event(_) => "event",
RolloutItem::ResponseItem(_) => "response",
RolloutItem::SessionMeta(_) => "meta",
})
.collect();
assert_eq!(kinds, vec!["event", "response"]);
}
#[tokio::test]
async fn test_list_conversations_latest_first() {
let temp = TempDir::new().unwrap();
@@ -94,19 +223,37 @@ async fn test_list_conversations_latest_first() {
.join(format!("rollout-2025-01-01T12-00-00-{u1}.jsonl"));
let head_3 = vec![
serde_json::json!({"timestamp": "2025-01-03T12-00-00", "id": u3.to_string()}),
serde_json::json!({
"timestamp": "2025-01-03T12-00-00",
"id": u3.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
];
let head_2 = vec![
serde_json::json!({"timestamp": "2025-01-02T12-00-00", "id": u2.to_string()}),
serde_json::json!({
"timestamp": "2025-01-02T12-00-00",
"id": u2.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
];
let head_1 = vec![
serde_json::json!({"timestamp": "2025-01-01T12-00-00", "id": u1.to_string()}),
serde_json::json!({
"timestamp": "2025-01-01T12-00-00",
"id": u1.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
@@ -171,11 +318,23 @@ async fn test_pagination_cursor() {
.join("04")
.join(format!("rollout-2025-03-04T09-00-00-{u4}.jsonl"));
let head_5 = vec![
serde_json::json!({"timestamp": "2025-03-05T09-00-00", "id": u5.to_string()}),
serde_json::json!({
"timestamp": "2025-03-05T09-00-00",
"id": u5.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let head_4 = vec![
serde_json::json!({"timestamp": "2025-03-04T09-00-00", "id": u4.to_string()}),
serde_json::json!({
"timestamp": "2025-03-04T09-00-00",
"id": u4.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor1: Cursor =
@@ -213,11 +372,23 @@ async fn test_pagination_cursor() {
.join("02")
.join(format!("rollout-2025-03-02T09-00-00-{u2}.jsonl"));
let head_3 = vec![
serde_json::json!({"timestamp": "2025-03-03T09-00-00", "id": u3.to_string()}),
serde_json::json!({
"timestamp": "2025-03-03T09-00-00",
"id": u3.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let head_2 = vec![
serde_json::json!({"timestamp": "2025-03-02T09-00-00", "id": u2.to_string()}),
serde_json::json!({
"timestamp": "2025-03-02T09-00-00",
"id": u2.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor2: Cursor =
@@ -249,7 +420,13 @@ async fn test_pagination_cursor() {
.join("01")
.join(format!("rollout-2025-03-01T09-00-00-{u1}.jsonl"));
let head_1 = vec![
serde_json::json!({"timestamp": "2025-03-01T09-00-00", "id": u1.to_string()}),
serde_json::json!({
"timestamp": "2025-03-01T09-00-00",
"id": u1.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor3: Cursor =
@@ -288,7 +465,13 @@ async fn test_get_conversation_contents() {
.join("01")
.join(format!("rollout-2025-04-01T10-30-00-{uuid}.jsonl"));
let expected_head = vec![
serde_json::json!({"timestamp": ts, "id": uuid.to_string()}),
serde_json::json!({
"timestamp": ts,
"id": uuid.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
];
@@ -305,7 +488,13 @@ async fn test_get_conversation_contents() {
assert_eq!(page, expected_page);
// Entire file contents equality
let meta = serde_json::json!({"timestamp": ts, "id": uuid.to_string()});
let meta = serde_json::json!({
"timestamp": ts,
"id": uuid.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
});
let rec0 = serde_json::json!({"record_type": "response", "index": 0});
let rec1 = serde_json::json!({"record_type": "response", "index": 1});
let expected_content = format!("{meta}\n{rec0}\n{rec1}\n");
@@ -341,7 +530,13 @@ async fn test_stable_ordering_same_second_pagination() {
.join("01")
.join(format!("rollout-2025-07-01T00-00-00-{u2}.jsonl"));
let head = |u: Uuid| -> Vec<serde_json::Value> {
vec![serde_json::json!({"timestamp": ts, "id": u.to_string()})]
vec![serde_json::json!({
"timestamp": ts,
"id": u.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
})]
};
let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap();
let expected_page1 = ConversationsPage {