mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Compare commits
7 Commits
dev/cc/dyn
...
rollout
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
381516ee3e | ||
|
|
dfaa3fb242 | ||
|
|
28c57aded3 | ||
|
|
5e41303911 | ||
|
|
bcf855fe1f | ||
|
|
617b1c1ea4 | ||
|
|
f94e1e3298 |
@@ -104,6 +104,7 @@ use crate::protocol::TurnDiffEvent;
|
||||
use crate::protocol::WebSearchBeginEvent;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::RolloutRecorderParams;
|
||||
use crate::rollout::recorder::RolloutItemSliceExt;
|
||||
use crate::safety::SafetyCheck;
|
||||
use crate::safety::assess_command_safety;
|
||||
use crate::safety::assess_safety_for_untrusted_command;
|
||||
@@ -512,9 +513,7 @@ impl Session {
|
||||
})
|
||||
.chain(post_session_configured_error_events.into_iter());
|
||||
for event in events {
|
||||
if let Err(e) = tx_event.send(event).await {
|
||||
error!("failed to send event: {e:?}");
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
|
||||
Ok((sess, turn_context))
|
||||
@@ -547,11 +546,15 @@ impl Session {
|
||||
self.record_initial_history_new(turn_context).await;
|
||||
}
|
||||
InitialHistory::Forked(items) => {
|
||||
self.record_initial_history_from_items(items).await;
|
||||
self.record_initial_history_from_rollout_items(turn_context, items)
|
||||
.await;
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
self.record_initial_history_from_items(resumed_history.history)
|
||||
.await;
|
||||
self.record_initial_history_from_rollout_items(
|
||||
turn_context,
|
||||
resumed_history.history,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -570,27 +573,51 @@ impl Session {
|
||||
Some(turn_context.sandbox_policy.clone()),
|
||||
Some(self.user_shell.clone()),
|
||||
)));
|
||||
self.record_conversation_items(&conversation_items).await;
|
||||
self.record_conversation_items(turn_context, &conversation_items)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn record_initial_history_from_items(&self, items: Vec<ResponseItem>) {
|
||||
self.record_conversation_items_internal(&items, false).await;
|
||||
async fn record_initial_history_from_rollout_items(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
items: Vec<crate::rollout::RolloutItem>,
|
||||
) {
|
||||
use crate::rollout::recorder::RolloutItemSliceExt as _;
|
||||
let response_items = items.get_response_items();
|
||||
self.record_conversation_items_internal(turn_context, &response_items, false)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// build the initial messages vector for SessionConfigured by converting
|
||||
/// ResponseItems into EventMsg.
|
||||
fn build_initial_messages(&self, items: &[ResponseItem]) -> Vec<EventMsg> {
|
||||
items
|
||||
/// Build the initial UI messages vector for SessionConfigured from
|
||||
/// persisted rollout items. Prefer persisted events when present; fall back
|
||||
/// to converting response items for legacy rollouts.
|
||||
fn build_initial_messages(&self, items: &[crate::rollout::RolloutItem]) -> Vec<EventMsg> {
|
||||
let evs = items.get_events();
|
||||
if !evs.is_empty() {
|
||||
return evs;
|
||||
}
|
||||
let responses = items.get_response_items();
|
||||
responses
|
||||
.iter()
|
||||
.flat_map(|item| {
|
||||
map_response_item_to_event_messages(item, self.show_raw_agent_reasoning)
|
||||
})
|
||||
.collect()
|
||||
.collect::<Vec<EventMsg>>()
|
||||
}
|
||||
|
||||
/// Sends the given event to the client and swallows the send event, if
|
||||
/// any, logging it as an error.
|
||||
/// Sends an event to the client and records it to the rollout (if enabled).
|
||||
pub(crate) async fn send_event(&self, event: Event) {
|
||||
// Clone recorder handle outside the lock scope to avoid holding the
|
||||
// mutex guard across an await point.
|
||||
let recorder = {
|
||||
let guard = self.rollout.lock_unchecked();
|
||||
guard.as_ref().cloned()
|
||||
};
|
||||
if let Some(rec) = recorder
|
||||
&& let Err(e) = rec.record_events(std::slice::from_ref(&event)).await
|
||||
{
|
||||
error!("failed to record rollout event: {e:#}");
|
||||
}
|
||||
if let Err(e) = self.tx_event.send(event).await {
|
||||
error!("failed to send tool call event: {e}");
|
||||
}
|
||||
@@ -624,7 +651,7 @@ impl Session {
|
||||
reason,
|
||||
}),
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
rx_approve
|
||||
}
|
||||
|
||||
@@ -656,7 +683,7 @@ impl Session {
|
||||
grant_root,
|
||||
}),
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
rx_approve
|
||||
}
|
||||
|
||||
@@ -682,21 +709,36 @@ impl Session {
|
||||
|
||||
/// Records items to both the rollout and the chat completions/ZDR
|
||||
/// transcript, if enabled.
|
||||
async fn record_conversation_items(&self, items: &[ResponseItem]) {
|
||||
self.record_conversation_items_internal(items, true).await;
|
||||
async fn record_conversation_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
|
||||
self.record_conversation_items_internal(turn_context, items, true)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn record_conversation_items_internal(&self, items: &[ResponseItem], persist: bool) {
|
||||
async fn record_conversation_items_internal(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
items: &[ResponseItem],
|
||||
persist: bool,
|
||||
) {
|
||||
debug!("Recording items for conversation: {items:?}");
|
||||
if persist {
|
||||
self.record_state_snapshot(items).await;
|
||||
self.record_state_snapshot(turn_context, items).await;
|
||||
}
|
||||
|
||||
self.state.lock_unchecked().history.record_items(items);
|
||||
}
|
||||
|
||||
async fn record_state_snapshot(&self, items: &[ResponseItem]) {
|
||||
let snapshot = { crate::rollout::SessionStateSnapshot {} };
|
||||
async fn record_state_snapshot(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
|
||||
let tc_snapshot = crate::rollout::recorder::TurnContextSnapshot {
|
||||
cwd: turn_context.cwd.clone(),
|
||||
approval_policy: turn_context.approval_policy,
|
||||
sandbox_policy: turn_context.sandbox_policy.clone(),
|
||||
model: turn_context.client.get_model(),
|
||||
show_raw_agent_reasoning: self.show_raw_agent_reasoning,
|
||||
};
|
||||
let snapshot = crate::rollout::SessionStateSnapshot {
|
||||
turn_context: tc_snapshot,
|
||||
};
|
||||
|
||||
let recorder = {
|
||||
let guard = self.rollout.lock_unchecked();
|
||||
@@ -707,9 +749,30 @@ impl Session {
|
||||
if let Err(e) = rec.record_state(snapshot).await {
|
||||
error!("failed to record rollout state: {e:#}");
|
||||
}
|
||||
if let Err(e) = rec.record_items(items).await {
|
||||
if let Err(e) = rec.record_response_items(items).await {
|
||||
error!("failed to record rollout items: {e:#}");
|
||||
}
|
||||
|
||||
// Also persist user input as EventMsg::UserMessage so resuming can
|
||||
// reconstruct an event stream without relying on mapping.
|
||||
// Only keep user messages; avoid duplicating assistant events which
|
||||
// are recorded when emitted during streaming.
|
||||
let derived_user_events: Vec<crate::protocol::Event> = items
|
||||
.iter()
|
||||
.flat_map(|it| {
|
||||
map_response_item_to_event_messages(it, self.show_raw_agent_reasoning)
|
||||
})
|
||||
.filter(|ev| matches!(ev, EventMsg::UserMessage(_)))
|
||||
.map(|msg| crate::protocol::Event {
|
||||
id: String::new(),
|
||||
msg,
|
||||
})
|
||||
.collect();
|
||||
if !derived_user_events.is_empty() {
|
||||
if let Err(e) = rec.record_events(&derived_user_events).await {
|
||||
error!("failed to record derived user events: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -752,7 +815,7 @@ impl Session {
|
||||
id: sub_id.to_string(),
|
||||
msg,
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
}
|
||||
|
||||
async fn on_exec_command_end(
|
||||
@@ -799,7 +862,7 @@ impl Session {
|
||||
id: sub_id.to_string(),
|
||||
msg,
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
|
||||
// If this is an apply_patch, after we emit the end patch, emit a second event
|
||||
// with the full turn diff if there is one.
|
||||
@@ -811,7 +874,7 @@ impl Session {
|
||||
id: sub_id.into(),
|
||||
msg,
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1050,9 +1113,9 @@ impl AgentTask {
|
||||
id: self.sub_id,
|
||||
msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
|
||||
};
|
||||
let tx_event = self.sess.tx_event.clone();
|
||||
let sess = self.sess.clone();
|
||||
tokio::spawn(async move {
|
||||
tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1148,13 +1211,16 @@ async fn submission_loop(
|
||||
// Install the new persistent context for subsequent tasks/turns.
|
||||
turn_context = Arc::new(new_turn_context);
|
||||
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
|
||||
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
// Shell is not configurable from turn to turn
|
||||
None,
|
||||
))])
|
||||
sess.record_conversation_items(
|
||||
&turn_context,
|
||||
&[ResponseItem::from(EnvironmentContext::new(
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
// Shell is not configurable from turn to turn
|
||||
None,
|
||||
))],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
@@ -1257,7 +1323,7 @@ async fn submission_loop(
|
||||
|
||||
Op::GetHistoryEntryRequest { offset, log_id } => {
|
||||
let config = config.clone();
|
||||
let tx_event = sess.tx_event.clone();
|
||||
let sess2 = sess.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
@@ -1285,13 +1351,11 @@ async fn submission_loop(
|
||||
),
|
||||
};
|
||||
|
||||
if let Err(e) = tx_event.send(event).await {
|
||||
warn!("failed to send GetHistoryEntryResponse event: {e}");
|
||||
}
|
||||
sess2.send_event(event).await;
|
||||
});
|
||||
}
|
||||
Op::ListMcpTools => {
|
||||
let tx_event = sess.tx_event.clone();
|
||||
let sess2 = sess.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
// This is a cheap lookup from the connection manager's cache.
|
||||
@@ -1302,12 +1366,10 @@ async fn submission_loop(
|
||||
crate::protocol::McpListToolsResponseEvent { tools },
|
||||
),
|
||||
};
|
||||
if let Err(e) = tx_event.send(event).await {
|
||||
warn!("failed to send McpListToolsResponse event: {e}");
|
||||
}
|
||||
sess2.send_event(event).await;
|
||||
}
|
||||
Op::ListCustomPrompts => {
|
||||
let tx_event = sess.tx_event.clone();
|
||||
let sess2 = sess.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
let custom_prompts: Vec<CustomPrompt> =
|
||||
@@ -1323,9 +1385,7 @@ async fn submission_loop(
|
||||
custom_prompts,
|
||||
}),
|
||||
};
|
||||
if let Err(e) = tx_event.send(event).await {
|
||||
warn!("failed to send ListCustomPromptsResponse event: {e}");
|
||||
}
|
||||
sess2.send_event(event).await;
|
||||
}
|
||||
Op::Compact => {
|
||||
// Create a summarization request as user input
|
||||
@@ -1376,9 +1436,7 @@ async fn submission_loop(
|
||||
break;
|
||||
}
|
||||
Op::GetHistory => {
|
||||
let tx_event = sess.tx_event.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
|
||||
@@ -1386,9 +1444,7 @@ async fn submission_loop(
|
||||
entries: sess.state.lock_unchecked().history.contents(),
|
||||
}),
|
||||
};
|
||||
if let Err(e) = tx_event.send(event).await {
|
||||
warn!("failed to send ConversationHistory event: {e}");
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
_ => {
|
||||
// Ignore unknown ops; enum is non_exhaustive to allow extensions.
|
||||
@@ -1426,12 +1482,10 @@ async fn run_task(
|
||||
model_context_window: turn_context.client.get_model_context_window(),
|
||||
}),
|
||||
};
|
||||
if sess.tx_event.send(event).await.is_err() {
|
||||
return;
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
|
||||
sess.record_conversation_items(turn_context, &[initial_input_for_turn.clone().into()])
|
||||
.await;
|
||||
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
@@ -1448,7 +1502,8 @@ async fn run_task(
|
||||
.into_iter()
|
||||
.map(ResponseItem::from)
|
||||
.collect::<Vec<ResponseItem>>();
|
||||
sess.record_conversation_items(&pending_input).await;
|
||||
sess.record_conversation_items(turn_context, &pending_input)
|
||||
.await;
|
||||
|
||||
// Construct the input that we will send to the model. When using the
|
||||
// Chat completions API (or ZDR clients), the model needs the full
|
||||
@@ -1575,8 +1630,11 @@ async fn run_task(
|
||||
|
||||
// Only attempt to take the lock if there is something to record.
|
||||
if !items_to_record_in_conversation_history.is_empty() {
|
||||
sess.record_conversation_items(&items_to_record_in_conversation_history)
|
||||
.await;
|
||||
sess.record_conversation_items(
|
||||
turn_context,
|
||||
&items_to_record_in_conversation_history,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
if responses.is_empty() {
|
||||
@@ -1600,7 +1658,7 @@ async fn run_task(
|
||||
message: e.to_string(),
|
||||
}),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
// let the user continue the conversation
|
||||
break;
|
||||
}
|
||||
@@ -1611,7 +1669,7 @@ async fn run_task(
|
||||
id: sub_id,
|
||||
msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
|
||||
async fn run_turn(
|
||||
@@ -1787,13 +1845,11 @@ async fn try_run_turn(
|
||||
output.push(ProcessedResponseItem { item, response });
|
||||
}
|
||||
ResponseEvent::WebSearchCallBegin { call_id } => {
|
||||
let _ = sess
|
||||
.tx_event
|
||||
.send(Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }),
|
||||
})
|
||||
.await;
|
||||
sess.send_event(Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
ResponseEvent::Completed {
|
||||
response_id: _,
|
||||
@@ -1809,13 +1865,11 @@ async fn try_run_turn(
|
||||
st.token_info = info.clone();
|
||||
info
|
||||
};
|
||||
sess.tx_event
|
||||
.send(Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
|
||||
})
|
||||
.await
|
||||
.ok();
|
||||
sess.send_event(Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
|
||||
})
|
||||
.await;
|
||||
|
||||
let unified_diff = turn_diff_tracker.get_unified_diff();
|
||||
if let Ok(Some(unified_diff)) = unified_diff {
|
||||
@@ -1824,7 +1878,7 @@ async fn try_run_turn(
|
||||
id: sub_id.to_string(),
|
||||
msg,
|
||||
};
|
||||
let _ = sess.tx_event.send(event).await;
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
|
||||
return Ok(output);
|
||||
@@ -1834,21 +1888,21 @@ async fn try_run_turn(
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryDelta(delta) => {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryPartAdded => {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
ResponseEvent::ReasoningContentDelta(delta) => {
|
||||
if sess.show_raw_agent_reasoning {
|
||||
@@ -1858,7 +1912,7 @@ async fn try_run_turn(
|
||||
AgentReasoningRawContentDeltaEvent { delta },
|
||||
),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1879,9 +1933,7 @@ async fn run_compact_task(
|
||||
model_context_window,
|
||||
}),
|
||||
};
|
||||
if sess.tx_event.send(start_event).await.is_err() {
|
||||
return;
|
||||
}
|
||||
sess.send_event(start_event).await;
|
||||
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
let turn_input: Vec<ResponseItem> =
|
||||
@@ -2059,7 +2111,7 @@ async fn handle_response_item(
|
||||
id: sub_id.to_string(),
|
||||
msg,
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::error::Result as CodexResult;
|
||||
use crate::protocol::Event;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::rollout::RolloutItem;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -21,7 +22,7 @@ use tokio::sync::RwLock;
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct ResumedHistory {
|
||||
pub conversation_id: ConversationId,
|
||||
pub history: Vec<ResponseItem>,
|
||||
pub history: Vec<RolloutItem>,
|
||||
pub rollout_path: PathBuf,
|
||||
}
|
||||
|
||||
@@ -29,7 +30,7 @@ pub struct ResumedHistory {
|
||||
pub enum InitialHistory {
|
||||
New,
|
||||
Resumed(ResumedHistory),
|
||||
Forked(Vec<ResponseItem>),
|
||||
Forked(Vec<RolloutItem>),
|
||||
}
|
||||
|
||||
/// Represents a newly created Codex conversation, including the first event
|
||||
@@ -178,7 +179,11 @@ impl ConversationManager {
|
||||
/// and all items that follow them.
|
||||
fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) -> InitialHistory {
|
||||
if n == 0 {
|
||||
return InitialHistory::Forked(items);
|
||||
let rollout_items = items
|
||||
.into_iter()
|
||||
.map(crate::rollout::RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
return InitialHistory::Forked(rollout_items);
|
||||
}
|
||||
|
||||
// Walk backwards counting only `user` Message items, find cut index.
|
||||
@@ -200,7 +205,13 @@ fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) ->
|
||||
// No prefix remains after dropping; start a new conversation.
|
||||
InitialHistory::New
|
||||
} else {
|
||||
InitialHistory::Forked(items.into_iter().take(cut_index).collect())
|
||||
InitialHistory::Forked(
|
||||
items
|
||||
.into_iter()
|
||||
.take(cut_index)
|
||||
.map(crate::rollout::RolloutItem::ResponseItem)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -258,7 +269,11 @@ mod tests {
|
||||
let truncated = truncate_after_dropping_last_messages(items.clone(), 1);
|
||||
assert_eq!(
|
||||
truncated,
|
||||
InitialHistory::Forked(vec![items[0].clone(), items[1].clone(), items[2].clone(),])
|
||||
InitialHistory::Forked(vec![
|
||||
crate::rollout::RolloutItem::ResponseItem(items[0].clone()),
|
||||
crate::rollout::RolloutItem::ResponseItem(items[1].clone()),
|
||||
crate::rollout::RolloutItem::ResponseItem(items[2].clone()),
|
||||
])
|
||||
);
|
||||
|
||||
let truncated2 = truncate_after_dropping_last_messages(items, 2);
|
||||
|
||||
@@ -61,11 +61,11 @@ pub mod spawn;
|
||||
pub mod terminal;
|
||||
mod tool_apply_patch;
|
||||
pub mod turn_diff_tracker;
|
||||
pub use rollout::Cursor;
|
||||
pub use rollout::RolloutRecorder;
|
||||
pub use rollout::SessionMeta;
|
||||
pub use rollout::list::ConversationItem;
|
||||
pub use rollout::list::ConversationsPage;
|
||||
pub use rollout::list::Cursor;
|
||||
mod user_notification;
|
||||
pub mod util;
|
||||
pub use apply_patch::CODEX_APPLY_PATCH_ARG1;
|
||||
|
||||
91
codex-rs/core/src/rollout/format.rs
Normal file
91
codex-rs/core/src/rollout/format.rs
Normal file
@@ -0,0 +1,91 @@
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use time::OffsetDateTime;
|
||||
use time::PrimitiveDateTime;
|
||||
use time::format_description::FormatItem;
|
||||
use time::macros::format_description;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Timestamp format used in rollout filenames: YYYY-MM-DDThh-mm-ss
|
||||
pub const FILENAME_TS_FMT: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
||||
|
||||
/// Timestamp format used for JSONL records (UTC, second precision): YYYY-MM-DDThh:mm:ssZ
|
||||
pub const RECORD_TS_FMT: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z");
|
||||
|
||||
/// Pagination cursor identifying a file by timestamp and UUID.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Cursor {
|
||||
pub(crate) ts: OffsetDateTime,
|
||||
pub(crate) id: Uuid,
|
||||
}
|
||||
|
||||
impl Cursor {
|
||||
pub fn new(ts: OffsetDateTime, id: Uuid) -> Self {
|
||||
Self { ts, id }
|
||||
}
|
||||
}
|
||||
|
||||
impl serde::Serialize for Cursor {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let ts_str = self
|
||||
.ts
|
||||
.format(FILENAME_TS_FMT)
|
||||
.map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?;
|
||||
serializer.serialize_str(&format!("{ts_str}|{}", self.id))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for Cursor {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let s = String::deserialize(deserializer)?;
|
||||
parse_cursor(&s).ok_or_else(|| serde::de::Error::custom("invalid cursor"))
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a pagination cursor token in the form "<file_ts>|<uuid>".
|
||||
pub fn parse_cursor(token: &str) -> Option<Cursor> {
|
||||
let (file_ts, uuid_str) = token.split_once('|')?;
|
||||
|
||||
let Ok(uuid) = Uuid::parse_str(uuid_str) else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let ts = PrimitiveDateTime::parse(file_ts, FILENAME_TS_FMT)
|
||||
.ok()?
|
||||
.assume_utc();
|
||||
|
||||
Some(Cursor::new(ts, uuid))
|
||||
}
|
||||
|
||||
/// Parse timestamp and UUID from a rollout filename.
|
||||
/// Expected format: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
|
||||
pub fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
|
||||
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
|
||||
|
||||
// Scan from the right for a '-' such that the suffix parses as a UUID.
|
||||
let (sep_idx, uuid) = core
|
||||
.match_indices('-')
|
||||
.rev()
|
||||
.find_map(|(i, _)| Uuid::parse_str(&core[i + 1..]).ok().map(|u| (i, u)))?;
|
||||
|
||||
let ts_str = &core[..sep_idx];
|
||||
let ts = PrimitiveDateTime::parse(ts_str, FILENAME_TS_FMT)
|
||||
.ok()?
|
||||
.assume_utc();
|
||||
Some((ts, uuid))
|
||||
}
|
||||
|
||||
/// Build a rollout filename for a given timestamp and conversation id.
|
||||
pub fn build_rollout_filename(ts: OffsetDateTime, conversation_id: ConversationId) -> String {
|
||||
let date_str = ts
|
||||
.format(FILENAME_TS_FMT)
|
||||
.unwrap_or_else(|e| panic!("failed to format timestamp: {e}"));
|
||||
format!("rollout-{date_str}-{conversation_id}.jsonl")
|
||||
}
|
||||
@@ -4,12 +4,12 @@ use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use time::OffsetDateTime;
|
||||
use time::PrimitiveDateTime;
|
||||
use time::format_description::FormatItem;
|
||||
use time::macros::format_description;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::SESSIONS_SUBDIR;
|
||||
use super::format::Cursor;
|
||||
use super::format::parse_timestamp_uuid_from_filename;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Returned page of conversation summaries.
|
||||
#[derive(Debug, Default, PartialEq)]
|
||||
@@ -33,48 +33,14 @@ pub struct ConversationItem {
|
||||
pub head: Vec<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// A filter applied to a discovered conversation. All filters must pass for
|
||||
/// the item to be included in results.
|
||||
pub type ConversationFilter = Arc<dyn Fn(&ConversationItem) -> bool + Send + Sync>;
|
||||
|
||||
/// Hard cap to bound worst‑case work per request.
|
||||
const MAX_SCAN_FILES: usize = 10_000;
|
||||
const HEAD_RECORD_LIMIT: usize = 10;
|
||||
|
||||
/// Pagination cursor identifying a file by timestamp and UUID.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Cursor {
|
||||
ts: OffsetDateTime,
|
||||
id: Uuid,
|
||||
}
|
||||
|
||||
impl Cursor {
|
||||
fn new(ts: OffsetDateTime, id: Uuid) -> Self {
|
||||
Self { ts, id }
|
||||
}
|
||||
}
|
||||
|
||||
impl serde::Serialize for Cursor {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let ts_str = self
|
||||
.ts
|
||||
.format(&format_description!(
|
||||
"[year]-[month]-[day]T[hour]-[minute]-[second]"
|
||||
))
|
||||
.map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?;
|
||||
serializer.serialize_str(&format!("{ts_str}|{}", self.id))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for Cursor {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let s = String::deserialize(deserializer)?;
|
||||
parse_cursor(&s).ok_or_else(|| serde::de::Error::custom("invalid cursor"))
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve recorded conversation file paths with token pagination. The returned `next_cursor`
|
||||
/// can be supplied on the next call to resume after the last returned item, resilient to
|
||||
/// concurrent new sessions being appended. Ordering is stable by timestamp desc, then UUID desc.
|
||||
@@ -82,23 +48,27 @@ pub(crate) async fn get_conversations(
|
||||
codex_home: &Path,
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
) -> io::Result<ConversationsPage> {
|
||||
get_conversations_filtered(codex_home, page_size, cursor, &[]).await
|
||||
}
|
||||
|
||||
/// Retrieve recorded conversations with filters. All provided filters must
|
||||
/// return `true` for an item to be included.
|
||||
pub(crate) async fn get_conversations_filtered(
|
||||
codex_home: &Path,
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
filters: &[ConversationFilter],
|
||||
) -> io::Result<ConversationsPage> {
|
||||
let mut root = codex_home.to_path_buf();
|
||||
root.push(SESSIONS_SUBDIR);
|
||||
|
||||
if !root.exists() {
|
||||
return Ok(ConversationsPage {
|
||||
items: Vec::new(),
|
||||
next_cursor: None,
|
||||
num_scanned_files: 0,
|
||||
reached_scan_cap: false,
|
||||
});
|
||||
return Ok(empty_page());
|
||||
}
|
||||
|
||||
let anchor = cursor.cloned();
|
||||
|
||||
let result = traverse_directories_for_paths(root.clone(), page_size, anchor).await?;
|
||||
Ok(result)
|
||||
traverse_directories_for_paths_filtered(root.clone(), page_size, anchor, filters).await
|
||||
}
|
||||
|
||||
/// Load the full contents of a single conversation session file at `path`.
|
||||
@@ -112,10 +82,11 @@ pub(crate) async fn get_conversation(path: &Path) -> io::Result<String> {
|
||||
///
|
||||
/// Directory layout: `~/.codex/sessions/YYYY/MM/DD/rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl`
|
||||
/// Returned newest (latest) first.
|
||||
async fn traverse_directories_for_paths(
|
||||
async fn traverse_directories_for_paths_filtered(
|
||||
root: PathBuf,
|
||||
page_size: usize,
|
||||
anchor: Option<Cursor>,
|
||||
filters: &[ConversationFilter],
|
||||
) -> io::Result<ConversationsPage> {
|
||||
let mut items: Vec<ConversationItem> = Vec::with_capacity(page_size);
|
||||
let mut scanned_files = 0usize;
|
||||
@@ -170,7 +141,12 @@ async fn traverse_directories_for_paths(
|
||||
let head = read_first_jsonl_records(&path, HEAD_RECORD_LIMIT)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
items.push(ConversationItem { path, head });
|
||||
let item = ConversationItem { path, head };
|
||||
// Apply all filters
|
||||
let include = filters.iter().all(|f| f(&item));
|
||||
if include {
|
||||
items.push(item);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -185,23 +161,6 @@ async fn traverse_directories_for_paths(
|
||||
})
|
||||
}
|
||||
|
||||
/// Pagination cursor token format: "<file_ts>|<uuid>" where `file_ts` matches the
|
||||
/// filename timestamp portion (YYYY-MM-DDThh-mm-ss) used in rollout filenames.
|
||||
/// The cursor orders files by timestamp desc, then UUID desc.
|
||||
fn parse_cursor(token: &str) -> Option<Cursor> {
|
||||
let (file_ts, uuid_str) = token.split_once('|')?;
|
||||
|
||||
let Ok(uuid) = Uuid::parse_str(uuid_str) else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let format: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
||||
let ts = PrimitiveDateTime::parse(file_ts, format).ok()?.assume_utc();
|
||||
|
||||
Some(Cursor::new(ts, uuid))
|
||||
}
|
||||
|
||||
fn build_next_cursor(items: &[ConversationItem]) -> Option<Cursor> {
|
||||
let last = items.last()?;
|
||||
let file_name = last.path.file_name()?.to_string_lossy();
|
||||
@@ -256,21 +215,13 @@ where
|
||||
Ok(collected)
|
||||
}
|
||||
|
||||
fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
|
||||
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
|
||||
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
|
||||
|
||||
// Scan from the right for a '-' such that the suffix parses as a UUID.
|
||||
let (sep_idx, uuid) = core
|
||||
.match_indices('-')
|
||||
.rev()
|
||||
.find_map(|(i, _)| Uuid::parse_str(&core[i + 1..]).ok().map(|u| (i, u)))?;
|
||||
|
||||
let ts_str = &core[..sep_idx];
|
||||
let format: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
||||
let ts = PrimitiveDateTime::parse(ts_str, format).ok()?.assume_utc();
|
||||
Some((ts, uuid))
|
||||
fn empty_page() -> ConversationsPage {
|
||||
ConversationsPage {
|
||||
items: Vec::new(),
|
||||
next_cursor: None,
|
||||
num_scanned_files: 0,
|
||||
reached_scan_cap: false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_first_jsonl_records(
|
||||
@@ -296,3 +247,16 @@ async fn read_first_jsonl_records(
|
||||
}
|
||||
Ok(head)
|
||||
}
|
||||
|
||||
/// Returns a filter that requires the first JSONL record to be a tagged
|
||||
/// session meta line: { "record_type": "session_meta", ... }.
|
||||
pub(crate) fn requires_tagged_session_meta_filter() -> ConversationFilter {
|
||||
Arc::new(|item: &ConversationItem| {
|
||||
item.head
|
||||
.get(0)
|
||||
.and_then(|v| v.get("record_type"))
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s == "session_meta")
|
||||
.unwrap_or(false)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2,10 +2,13 @@
|
||||
|
||||
pub(crate) const SESSIONS_SUBDIR: &str = "sessions";
|
||||
|
||||
pub mod format;
|
||||
pub mod list;
|
||||
pub(crate) mod policy;
|
||||
pub mod recorder;
|
||||
|
||||
pub use format::Cursor;
|
||||
pub use recorder::RolloutItem;
|
||||
pub use recorder::RolloutRecorder;
|
||||
pub use recorder::RolloutRecorderParams;
|
||||
pub use recorder::SessionMeta;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::protocol::EventMsg;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
/// Whether a `ResponseItem` should be persisted in rollout files.
|
||||
@@ -14,3 +15,82 @@ pub(crate) fn is_persisted_response_item(item: &ResponseItem) -> bool {
|
||||
ResponseItem::WebSearchCall { .. } | ResponseItem::Other => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether an `EventMsg` should be persisted in rollout files.
|
||||
///
|
||||
/// Keep only high-signal, compact items. Avoid deltas and verbose streams.
|
||||
#[inline]
|
||||
pub(crate) fn is_persisted_event_msg(event: &EventMsg) -> bool {
|
||||
match event {
|
||||
// Core content to replay UI meaningfully
|
||||
EventMsg::AgentMessage(_)
|
||||
| EventMsg::AgentReasoning(_)
|
||||
| EventMsg::TokenCount(_)
|
||||
| EventMsg::UserMessage(_) => true,
|
||||
|
||||
// Everything else is either transient, redundant, or too verbose
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::AgentMessageDeltaEvent;
|
||||
use crate::protocol::AgentMessageEvent;
|
||||
use crate::protocol::AgentReasoningDeltaEvent;
|
||||
use crate::protocol::AgentReasoningEvent;
|
||||
use crate::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use crate::protocol::ExecCommandEndEvent;
|
||||
use crate::protocol::TokenCountEvent;
|
||||
|
||||
#[test]
|
||||
fn test_event_persistence_policy() {
|
||||
assert!(is_persisted_event_msg(&EventMsg::AgentMessage(
|
||||
AgentMessageEvent {
|
||||
message: "hi".to_string(),
|
||||
}
|
||||
)));
|
||||
assert!(is_persisted_event_msg(&EventMsg::AgentReasoning(
|
||||
AgentReasoningEvent {
|
||||
text: "think".to_string(),
|
||||
}
|
||||
)));
|
||||
assert!(is_persisted_event_msg(&EventMsg::TokenCount(
|
||||
TokenCountEvent { info: None }
|
||||
)));
|
||||
|
||||
assert!(!is_persisted_event_msg(&EventMsg::AgentMessageDelta(
|
||||
AgentMessageDeltaEvent {
|
||||
delta: "d".to_string(),
|
||||
}
|
||||
)));
|
||||
assert!(!is_persisted_event_msg(&EventMsg::AgentReasoningDelta(
|
||||
AgentReasoningDeltaEvent {
|
||||
delta: "d".to_string(),
|
||||
}
|
||||
)));
|
||||
assert!(!is_persisted_event_msg(&EventMsg::ExecCommandEnd(
|
||||
ExecCommandEndEvent {
|
||||
call_id: "c".to_string(),
|
||||
stdout: Default::default(),
|
||||
stderr: Default::default(),
|
||||
aggregated_output: Default::default(),
|
||||
exit_code: 0,
|
||||
duration: std::time::Duration::from_secs(0),
|
||||
formatted_output: String::new(),
|
||||
}
|
||||
)));
|
||||
use crate::protocol::FileChange;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
assert!(!is_persisted_event_msg(
|
||||
&EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
call_id: "c".to_string(),
|
||||
changes: HashMap::<PathBuf, FileChange>::new(),
|
||||
reason: None,
|
||||
grant_root: None,
|
||||
})
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::io::Error as IoError;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::protocol::Event;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -20,43 +21,63 @@ use tokio::sync::oneshot;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use super::Cursor;
|
||||
use super::SESSIONS_SUBDIR;
|
||||
use super::format::RECORD_TS_FMT;
|
||||
use super::format::build_rollout_filename;
|
||||
use super::list::ConversationFilter;
|
||||
use super::list::ConversationsPage;
|
||||
use super::list::Cursor;
|
||||
use super::list::get_conversations;
|
||||
use super::list::get_conversations_filtered;
|
||||
use super::policy::is_persisted_event_msg;
|
||||
use super::policy::is_persisted_response_item;
|
||||
use crate::config::Config;
|
||||
use crate::conversation_manager::InitialHistory;
|
||||
use crate::conversation_manager::ResumedHistory;
|
||||
use crate::git_info::GitInfo;
|
||||
use crate::git_info::collect_git_info;
|
||||
use crate::protocol::EventMsg;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Default)]
|
||||
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
|
||||
pub struct SessionMeta {
|
||||
pub id: ConversationId,
|
||||
pub timestamp: String,
|
||||
pub cwd: String,
|
||||
pub originator: String,
|
||||
pub cli_version: String,
|
||||
pub instructions: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SessionMetaWithGit {
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct SessionMetaWithGit {
|
||||
#[serde(flatten)]
|
||||
meta: SessionMeta,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
git: Option<GitInfo>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Default, Clone)]
|
||||
pub struct SessionStateSnapshot {}
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct SessionStateSnapshot {
|
||||
pub turn_context: TurnContextSnapshot,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Default, Clone)]
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct TurnContextSnapshot {
|
||||
pub cwd: std::path::PathBuf,
|
||||
pub approval_policy: crate::protocol::AskForApproval,
|
||||
pub sandbox_policy: crate::protocol::SandboxPolicy,
|
||||
pub model: String,
|
||||
pub show_raw_agent_reasoning: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct SavedSession {
|
||||
pub session: SessionMeta,
|
||||
#[serde(default)]
|
||||
pub items: Vec<ResponseItem>,
|
||||
#[serde(default)]
|
||||
pub state: SessionStateSnapshot,
|
||||
pub state: Option<SessionStateSnapshot>,
|
||||
pub session_id: ConversationId,
|
||||
}
|
||||
|
||||
@@ -86,11 +107,89 @@ pub enum RolloutRecorderParams {
|
||||
}
|
||||
|
||||
enum RolloutCmd {
|
||||
AddItems(Vec<ResponseItem>),
|
||||
AddResponseItems(Vec<ResponseItem>),
|
||||
AddEvents(Vec<Event>),
|
||||
UpdateState(SessionStateSnapshot),
|
||||
Shutdown { ack: oneshot::Sender<()> },
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(tag = "record_type", rename_all = "snake_case")]
|
||||
enum TaggedLine {
|
||||
#[serde(rename = "response")]
|
||||
Response {
|
||||
#[serde(flatten)]
|
||||
item: ResponseItem,
|
||||
},
|
||||
#[serde(rename = "event")]
|
||||
Event {
|
||||
#[serde(flatten)]
|
||||
event: Event,
|
||||
},
|
||||
#[serde(rename = "session_meta")]
|
||||
SessionMeta {
|
||||
#[serde(flatten)]
|
||||
meta: SessionMetaWithGit,
|
||||
},
|
||||
#[serde(rename = "state")]
|
||||
State {
|
||||
#[serde(flatten)]
|
||||
state: SessionStateSnapshot,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
struct TimestampedLine {
|
||||
timestamp: String,
|
||||
#[serde(flatten)]
|
||||
record: TaggedLine,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RolloutItem {
|
||||
ResponseItem(ResponseItem),
|
||||
Event(Event),
|
||||
SessionMeta(SessionMetaWithGit),
|
||||
}
|
||||
|
||||
impl PartialEq for RolloutItem {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(RolloutItem::ResponseItem(a), RolloutItem::ResponseItem(b)) => a == b,
|
||||
(RolloutItem::SessionMeta(_), RolloutItem::SessionMeta(_)) => false,
|
||||
// Event comparison omitted (foreign type without PartialEq); treat as not equal
|
||||
(RolloutItem::Event(_), RolloutItem::Event(_)) => false,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience helpers to extract typed items from a list of rollout items.
|
||||
pub trait RolloutItemSliceExt {
|
||||
fn get_response_items(&self) -> Vec<ResponseItem>;
|
||||
fn get_events(&self) -> Vec<EventMsg>;
|
||||
}
|
||||
|
||||
impl RolloutItemSliceExt for [RolloutItem] {
|
||||
fn get_response_items(&self) -> Vec<ResponseItem> {
|
||||
self.iter()
|
||||
.filter_map(|it| match it {
|
||||
RolloutItem::ResponseItem(ri) => Some(ri.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn get_events(&self) -> Vec<EventMsg> {
|
||||
self.iter()
|
||||
.filter_map(|it| match it {
|
||||
RolloutItem::Event(ev) => Some(ev.msg.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl RolloutRecorderParams {
|
||||
pub fn new(conversation_id: ConversationId, instructions: Option<String>) -> Self {
|
||||
Self::Create {
|
||||
@@ -112,7 +211,21 @@ impl RolloutRecorder {
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
) -> std::io::Result<ConversationsPage> {
|
||||
get_conversations(codex_home, page_size, cursor).await
|
||||
// Apply a default filter to require files to start with a tagged
|
||||
// session_meta record. This skips legacy/invalid files from listings.
|
||||
let filters = vec![super::list::requires_tagged_session_meta_filter()];
|
||||
get_conversations_filtered(codex_home, page_size, cursor, &filters).await
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// List conversations with filters; all filters must pass for inclusion.
|
||||
pub async fn list_conversations_with_filters(
|
||||
codex_home: &Path,
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
filters: &[ConversationFilter],
|
||||
) -> std::io::Result<ConversationsPage> {
|
||||
get_conversations_filtered(codex_home, page_size, cursor, filters).await
|
||||
}
|
||||
|
||||
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
|
||||
@@ -143,6 +256,9 @@ impl RolloutRecorder {
|
||||
Some(SessionMeta {
|
||||
timestamp,
|
||||
id: session_id,
|
||||
cwd: config.cwd.display().to_string(),
|
||||
originator: config.responses_originator_header.clone(),
|
||||
cli_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
instructions,
|
||||
}),
|
||||
)
|
||||
@@ -172,25 +288,29 @@ impl RolloutRecorder {
|
||||
Ok(Self { tx })
|
||||
}
|
||||
|
||||
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
|
||||
let mut filtered = Vec::new();
|
||||
for item in items {
|
||||
// Note that function calls may look a bit strange if they are
|
||||
// "fully qualified MCP tool calls," so we could consider
|
||||
// reformatting them in that case.
|
||||
if is_persisted_response_item(item) {
|
||||
filtered.push(item.clone());
|
||||
}
|
||||
}
|
||||
if filtered.is_empty() {
|
||||
pub(crate) async fn record_response_items(
|
||||
&self,
|
||||
items: &[ResponseItem],
|
||||
) -> std::io::Result<()> {
|
||||
if items.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
self.tx
|
||||
.send(RolloutCmd::AddItems(filtered))
|
||||
.send(RolloutCmd::AddResponseItems(items.to_vec()))
|
||||
.await
|
||||
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
|
||||
}
|
||||
|
||||
pub(crate) async fn record_events(&self, events: &[Event]) -> std::io::Result<()> {
|
||||
if events.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
self.tx
|
||||
.send(RolloutCmd::AddEvents(events.to_vec()))
|
||||
.await
|
||||
.map_err(|e| IoError::other(format!("failed to queue rollout events: {e}")))
|
||||
}
|
||||
|
||||
pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> {
|
||||
self.tx
|
||||
.send(RolloutCmd::UpdateState(state))
|
||||
@@ -200,28 +320,36 @@ impl RolloutRecorder {
|
||||
|
||||
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
|
||||
info!("Resuming rollout from {path:?}");
|
||||
tracing::error!("Resuming rollout from {path:?}");
|
||||
let text = tokio::fs::read_to_string(path).await?;
|
||||
let mut lines = text.lines();
|
||||
let first_line = lines
|
||||
.next()
|
||||
.ok_or_else(|| IoError::other("empty session file"))?;
|
||||
let conversation_id = match serde_json::from_str::<SessionMeta>(first_line) {
|
||||
Ok(rollout_session_meta) => {
|
||||
tracing::error!(
|
||||
"Parsed conversation ID from rollout file: {:?}",
|
||||
rollout_session_meta.id
|
||||
);
|
||||
Some(rollout_session_meta.id)
|
||||
// Support both legacy (bare SessionMeta) and new tagged format
|
||||
let v_first: Value = serde_json::from_str(first_line)
|
||||
.map_err(|e| IoError::other(format!("failed to parse first line: {e}")))?;
|
||||
let conversation_id = if v_first.get("record_type").is_some() {
|
||||
let rt_ok = v_first
|
||||
.get("record_type")
|
||||
.and_then(|s| s.as_str())
|
||||
.map(|s| s == "session_meta")
|
||||
.unwrap_or(false);
|
||||
if !rt_ok {
|
||||
return Err(IoError::other("first line is not session_meta"));
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(IoError::other(format!(
|
||||
"failed to parse first line of rollout file as SessionMeta: {e}"
|
||||
)));
|
||||
match serde_json::from_value::<SessionMetaWithGit>(v_first.clone()) {
|
||||
Ok(rollout_session_meta) => Some(rollout_session_meta.meta.id),
|
||||
Err(e) => {
|
||||
return Err(IoError::other(format!(
|
||||
"failed to parse first line (tagged) as SessionMeta: {e}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(IoError::other("first line missing record_type"));
|
||||
};
|
||||
|
||||
let mut items = Vec::new();
|
||||
let mut items: Vec<RolloutItem> = Vec::new();
|
||||
for line in lines {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
@@ -230,23 +358,35 @@ impl RolloutRecorder {
|
||||
Ok(v) => v,
|
||||
Err(_) => continue,
|
||||
};
|
||||
if v.get("record_type")
|
||||
.and_then(|rt| rt.as_str())
|
||||
.map(|s| s == "state")
|
||||
.unwrap_or(false)
|
||||
{
|
||||
// Tagged format: collect responses and events; skip state
|
||||
if let Some(rt) = v.get("record_type").and_then(|rt| rt.as_str()) {
|
||||
match rt {
|
||||
"state" => continue,
|
||||
"response" => match serde_json::from_value::<ResponseItem>(v.clone()) {
|
||||
Ok(item) => {
|
||||
if is_persisted_response_item(&item) {
|
||||
items.push(RolloutItem::ResponseItem(item));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to parse item: {v:?}, error: {e}");
|
||||
}
|
||||
},
|
||||
"event" => match serde_json::from_value::<Event>(v.clone()) {
|
||||
Ok(event) => {
|
||||
if is_persisted_event_msg(&event.msg) {
|
||||
items.push(RolloutItem::Event(event));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to parse event: {v:?}, error: {e}");
|
||||
}
|
||||
},
|
||||
_ => continue,
|
||||
}
|
||||
continue;
|
||||
}
|
||||
match serde_json::from_value::<ResponseItem>(v.clone()) {
|
||||
Ok(item) => {
|
||||
if is_persisted_response_item(&item) {
|
||||
items.push(item);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to parse item: {v:?}, error: {e}");
|
||||
}
|
||||
}
|
||||
// Non-tagged lines are ignored in the new format
|
||||
}
|
||||
|
||||
tracing::error!(
|
||||
@@ -312,13 +452,7 @@ fn create_log_file(
|
||||
|
||||
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
|
||||
// compatibility with filesystems that do not allow colons in filenames.
|
||||
let format: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
||||
let date_str = timestamp
|
||||
.format(format)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
|
||||
let filename = format!("rollout-{date_str}-{conversation_id}.jsonl");
|
||||
let filename = build_rollout_filename(timestamp, conversation_id);
|
||||
|
||||
let path = dir.join(filename);
|
||||
let file = std::fs::OpenOptions::new()
|
||||
@@ -350,32 +484,32 @@ async fn rollout_writer(
|
||||
};
|
||||
|
||||
// Write the SessionMeta as the first item in the file
|
||||
writer.write_line(&session_meta_with_git).await?;
|
||||
writer
|
||||
.write_tagged(TaggedLine::SessionMeta {
|
||||
meta: session_meta_with_git,
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Process rollout commands
|
||||
while let Some(cmd) = rx.recv().await {
|
||||
match cmd {
|
||||
RolloutCmd::AddItems(items) => {
|
||||
RolloutCmd::AddResponseItems(items) => {
|
||||
for item in items {
|
||||
if is_persisted_response_item(&item) {
|
||||
writer.write_line(&item).await?;
|
||||
writer.write_tagged(TaggedLine::Response { item }).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
RolloutCmd::AddEvents(events) => {
|
||||
for event in events {
|
||||
if is_persisted_event_msg(&event.msg) {
|
||||
writer.write_tagged(TaggedLine::Event { event }).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
RolloutCmd::UpdateState(state) => {
|
||||
#[derive(Serialize)]
|
||||
struct StateLine<'a> {
|
||||
record_type: &'static str,
|
||||
#[serde(flatten)]
|
||||
state: &'a SessionStateSnapshot,
|
||||
}
|
||||
writer
|
||||
.write_line(&StateLine {
|
||||
record_type: "state",
|
||||
state: &state,
|
||||
})
|
||||
.await?;
|
||||
writer.write_tagged(TaggedLine::State { state }).await?;
|
||||
}
|
||||
RolloutCmd::Shutdown { ack } => {
|
||||
let _ = ack.send(());
|
||||
@@ -391,8 +525,16 @@ struct JsonlWriter {
|
||||
}
|
||||
|
||||
impl JsonlWriter {
|
||||
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
|
||||
let mut json = serde_json::to_string(item)?;
|
||||
async fn write_tagged(&mut self, record: TaggedLine) -> std::io::Result<()> {
|
||||
// RFC3339 UTC, second precision
|
||||
let now = OffsetDateTime::now_utc()
|
||||
.format(RECORD_TS_FMT)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
let line = TimestampedLine {
|
||||
timestamp: now,
|
||||
record,
|
||||
};
|
||||
let mut json = serde_json::to_string(&line)?;
|
||||
json.push('\n');
|
||||
self.file.write_all(json.as_bytes()).await?;
|
||||
self.file.flush().await?;
|
||||
|
||||
@@ -12,11 +12,16 @@ use time::format_description::FormatItem;
|
||||
use time::macros::format_description;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::rollout::Cursor;
|
||||
use crate::rollout::RolloutItem;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::list::ConversationFilter;
|
||||
use crate::rollout::list::ConversationItem;
|
||||
use crate::rollout::list::ConversationsPage;
|
||||
use crate::rollout::list::Cursor;
|
||||
use crate::rollout::list::get_conversation;
|
||||
use crate::rollout::list::get_conversations;
|
||||
use crate::rollout::list::get_conversations_filtered;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
fn write_session_file(
|
||||
root: &Path,
|
||||
@@ -42,7 +47,10 @@ fn write_session_file(
|
||||
|
||||
let meta = serde_json::json!({
|
||||
"timestamp": ts_str,
|
||||
"id": uuid.to_string()
|
||||
"id": uuid.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
});
|
||||
writeln!(file, "{meta}")?;
|
||||
|
||||
@@ -56,6 +64,127 @@ fn write_session_file(
|
||||
Ok((dt, uuid))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_conversations_filtered_by_id() {
|
||||
let temp = TempDir::new().unwrap();
|
||||
let home = temp.path();
|
||||
|
||||
let u1 = Uuid::from_u128(101);
|
||||
let u2 = Uuid::from_u128(202);
|
||||
|
||||
write_session_file(home, "2025-07-01T00-00-00", u1, 1).unwrap();
|
||||
write_session_file(home, "2025-07-02T00-00-00", u2, 1).unwrap();
|
||||
|
||||
let target = u2.to_string();
|
||||
let filter: ConversationFilter = std::sync::Arc::new(move |item: &ConversationItem| {
|
||||
item.head
|
||||
.first()
|
||||
.and_then(|v| v.get("id"))
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s == target)
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
let page = get_conversations_filtered(home, 10, None, &[filter])
|
||||
.await
|
||||
.expect("filtered list ok");
|
||||
assert_eq!(page.items.len(), 1);
|
||||
let file_name = page.items[0]
|
||||
.path
|
||||
.file_name()
|
||||
.unwrap()
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
assert!(file_name.contains(&u2.to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rollout_history_parses_tagged_events_and_responses() {
|
||||
use std::io::Write as _;
|
||||
use uuid::Uuid;
|
||||
|
||||
let temp = TempDir::new().unwrap();
|
||||
let path = temp.path().join("rollout-test.jsonl");
|
||||
let mut f = File::create(&path).unwrap();
|
||||
|
||||
let session_id = Uuid::from_u128(555).to_string();
|
||||
// Tagged session_meta line (new format)
|
||||
writeln!(
|
||||
f,
|
||||
"{}",
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-09-08T16:03:12Z",
|
||||
"record_type": "session_meta",
|
||||
"id": session_id,
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version",
|
||||
"instructions": null
|
||||
})
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Allowed event: AgentMessage
|
||||
writeln!(
|
||||
f,
|
||||
"{}",
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-09-08T16:03:13Z",
|
||||
"record_type": "event",
|
||||
"id": "sub1",
|
||||
"msg": {"type": "agent_message", "message": "hi"}
|
||||
})
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Disallowed (delta) event: should be skipped by policy
|
||||
writeln!(
|
||||
f,
|
||||
"{}",
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-09-08T16:03:14Z",
|
||||
"record_type": "event",
|
||||
"id": "sub1",
|
||||
"msg": {"type": "agent_message_delta", "delta": "x"}
|
||||
})
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Response item (assistant message)
|
||||
writeln!(
|
||||
f,
|
||||
"{}",
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-09-08T16:03:15Z",
|
||||
"record_type": "response",
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "hello"}]
|
||||
})
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let hist = RolloutRecorder::get_rollout_history(&path)
|
||||
.await
|
||||
.expect("history ok");
|
||||
|
||||
let resumed = match hist {
|
||||
crate::conversation_manager::InitialHistory::Resumed(r) => r,
|
||||
_ => panic!("expected resumed history"),
|
||||
};
|
||||
|
||||
// We expect exactly 2 items: one event (allowed) and one response
|
||||
let kinds: Vec<&'static str> = resumed
|
||||
.history
|
||||
.iter()
|
||||
.map(|it| match it {
|
||||
RolloutItem::Event(_) => "event",
|
||||
RolloutItem::ResponseItem(_) => "response",
|
||||
RolloutItem::SessionMeta(_) => "meta",
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(kinds, vec!["event", "response"]);
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn test_list_conversations_latest_first() {
|
||||
let temp = TempDir::new().unwrap();
|
||||
@@ -94,19 +223,37 @@ async fn test_list_conversations_latest_first() {
|
||||
.join(format!("rollout-2025-01-01T12-00-00-{u1}.jsonl"));
|
||||
|
||||
let head_3 = vec![
|
||||
serde_json::json!({"timestamp": "2025-01-03T12-00-00", "id": u3.to_string()}),
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-01-03T12-00-00",
|
||||
"id": u3.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
serde_json::json!({"record_type": "response", "index": 1}),
|
||||
serde_json::json!({"record_type": "response", "index": 2}),
|
||||
];
|
||||
let head_2 = vec![
|
||||
serde_json::json!({"timestamp": "2025-01-02T12-00-00", "id": u2.to_string()}),
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-01-02T12-00-00",
|
||||
"id": u2.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
serde_json::json!({"record_type": "response", "index": 1}),
|
||||
serde_json::json!({"record_type": "response", "index": 2}),
|
||||
];
|
||||
let head_1 = vec![
|
||||
serde_json::json!({"timestamp": "2025-01-01T12-00-00", "id": u1.to_string()}),
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-01-01T12-00-00",
|
||||
"id": u1.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
serde_json::json!({"record_type": "response", "index": 1}),
|
||||
serde_json::json!({"record_type": "response", "index": 2}),
|
||||
@@ -171,11 +318,23 @@ async fn test_pagination_cursor() {
|
||||
.join("04")
|
||||
.join(format!("rollout-2025-03-04T09-00-00-{u4}.jsonl"));
|
||||
let head_5 = vec![
|
||||
serde_json::json!({"timestamp": "2025-03-05T09-00-00", "id": u5.to_string()}),
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-03-05T09-00-00",
|
||||
"id": u5.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
];
|
||||
let head_4 = vec![
|
||||
serde_json::json!({"timestamp": "2025-03-04T09-00-00", "id": u4.to_string()}),
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-03-04T09-00-00",
|
||||
"id": u4.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
];
|
||||
let expected_cursor1: Cursor =
|
||||
@@ -213,11 +372,23 @@ async fn test_pagination_cursor() {
|
||||
.join("02")
|
||||
.join(format!("rollout-2025-03-02T09-00-00-{u2}.jsonl"));
|
||||
let head_3 = vec![
|
||||
serde_json::json!({"timestamp": "2025-03-03T09-00-00", "id": u3.to_string()}),
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-03-03T09-00-00",
|
||||
"id": u3.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
];
|
||||
let head_2 = vec![
|
||||
serde_json::json!({"timestamp": "2025-03-02T09-00-00", "id": u2.to_string()}),
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-03-02T09-00-00",
|
||||
"id": u2.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
];
|
||||
let expected_cursor2: Cursor =
|
||||
@@ -249,7 +420,13 @@ async fn test_pagination_cursor() {
|
||||
.join("01")
|
||||
.join(format!("rollout-2025-03-01T09-00-00-{u1}.jsonl"));
|
||||
let head_1 = vec![
|
||||
serde_json::json!({"timestamp": "2025-03-01T09-00-00", "id": u1.to_string()}),
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-03-01T09-00-00",
|
||||
"id": u1.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
];
|
||||
let expected_cursor3: Cursor =
|
||||
@@ -288,7 +465,13 @@ async fn test_get_conversation_contents() {
|
||||
.join("01")
|
||||
.join(format!("rollout-2025-04-01T10-30-00-{uuid}.jsonl"));
|
||||
let expected_head = vec![
|
||||
serde_json::json!({"timestamp": ts, "id": uuid.to_string()}),
|
||||
serde_json::json!({
|
||||
"timestamp": ts,
|
||||
"id": uuid.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
serde_json::json!({"record_type": "response", "index": 1}),
|
||||
];
|
||||
@@ -305,7 +488,13 @@ async fn test_get_conversation_contents() {
|
||||
assert_eq!(page, expected_page);
|
||||
|
||||
// Entire file contents equality
|
||||
let meta = serde_json::json!({"timestamp": ts, "id": uuid.to_string()});
|
||||
let meta = serde_json::json!({
|
||||
"timestamp": ts,
|
||||
"id": uuid.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
});
|
||||
let rec0 = serde_json::json!({"record_type": "response", "index": 0});
|
||||
let rec1 = serde_json::json!({"record_type": "response", "index": 1});
|
||||
let expected_content = format!("{meta}\n{rec0}\n{rec1}\n");
|
||||
@@ -341,7 +530,13 @@ async fn test_stable_ordering_same_second_pagination() {
|
||||
.join("01")
|
||||
.join(format!("rollout-2025-07-01T00-00-00-{u2}.jsonl"));
|
||||
let head = |u: Uuid| -> Vec<serde_json::Value> {
|
||||
vec![serde_json::json!({"timestamp": ts, "id": u.to_string()})]
|
||||
vec![serde_json::json!({
|
||||
"timestamp": ts,
|
||||
"id": u.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
})]
|
||||
};
|
||||
let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap();
|
||||
let expected_page1 = ConversationsPage {
|
||||
|
||||
Reference in New Issue
Block a user