mirror of
https://github.com/openai/codex.git
synced 2026-02-08 09:53:39 +00:00
Compare commits
29 Commits
rollout
...
tokencount
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
082b775a88 | ||
|
|
e2318f38b2 | ||
|
|
87a4de05db | ||
|
|
91036ea7df | ||
|
|
543e22b215 | ||
|
|
2354594eeb | ||
|
|
b7c95b57fd | ||
|
|
5f18406e8d | ||
|
|
e9b78c9296 | ||
|
|
2a1499a626 | ||
|
|
52a5ea5f4e | ||
|
|
aa94d9c4b3 | ||
|
|
c2c7778c38 | ||
|
|
5f37b45158 | ||
|
|
d5e2032490 | ||
|
|
268be2ac52 | ||
|
|
a045ef05f2 | ||
|
|
660327aa9c | ||
|
|
2d976aa667 | ||
|
|
adf9dae907 | ||
|
|
4a7b842b53 | ||
|
|
e7a20f5109 | ||
|
|
f7671481c1 | ||
|
|
50e001888b | ||
|
|
b48d47de21 | ||
|
|
4d5335f797 | ||
|
|
0c732e4a53 | ||
|
|
777fbba58c | ||
|
|
40da893c46 |
@@ -10,13 +10,15 @@ use std::time::Duration;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::event_mapping::map_response_item_to_event_messages;
|
||||
use crate::rollout::RolloutItem;
|
||||
use crate::rollout::recorder::RolloutItemSliceExt;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use codex_apply_patch::ApplyPatchAction;
|
||||
use codex_apply_patch::MaybeApplyPatchVerified;
|
||||
use codex_apply_patch::maybe_parse_apply_patch_verified;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::protocol::ConversationHistoryResponseEvent;
|
||||
use codex_protocol::protocol::ConversationPathResponseEvent;
|
||||
use codex_protocol::protocol::TaskStartedEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
@@ -103,8 +105,6 @@ use crate::protocol::TokenUsageInfo;
|
||||
use crate::protocol::TurnDiffEvent;
|
||||
use crate::protocol::WebSearchBeginEvent;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::RolloutRecorderParams;
|
||||
use crate::rollout::recorder::RolloutItemSliceExt;
|
||||
use crate::safety::SafetyCheck;
|
||||
use crate::safety::assess_command_safety;
|
||||
use crate::safety::assess_safety_for_untrusted_command;
|
||||
@@ -204,9 +204,6 @@ impl Codex {
|
||||
error!("Failed to create session: {e:#}");
|
||||
CodexErr::InternalAgentDied
|
||||
})?;
|
||||
session
|
||||
.record_initial_history(&turn_context, conversation_history)
|
||||
.await;
|
||||
let conversation_id = session.conversation_id;
|
||||
|
||||
// This task will run until Op::Shutdown is received.
|
||||
@@ -380,18 +377,9 @@ impl Session {
|
||||
return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}"));
|
||||
}
|
||||
|
||||
let (conversation_id, rollout_params) = match &initial_history {
|
||||
InitialHistory::New | InitialHistory::Forked(_) => {
|
||||
let conversation_id = ConversationId::default();
|
||||
(
|
||||
conversation_id,
|
||||
RolloutRecorderParams::new(conversation_id, user_instructions.clone()),
|
||||
)
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => (
|
||||
resumed_history.conversation_id,
|
||||
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
|
||||
),
|
||||
let conversation_id = match &initial_history {
|
||||
InitialHistory::New | InitialHistory::Forked(_) => ConversationId::default(),
|
||||
InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id,
|
||||
};
|
||||
|
||||
// Error messages to dispatch after SessionConfigured is sent.
|
||||
@@ -403,7 +391,7 @@ impl Session {
|
||||
// - spin up MCP connection manager
|
||||
// - perform default shell discovery
|
||||
// - load history metadata
|
||||
let rollout_fut = RolloutRecorder::new(&config, rollout_params);
|
||||
let rollout_fut = RolloutRecorder::new(&config, conversation_id, user_instructions.clone());
|
||||
|
||||
let mcp_fut = McpConnectionManager::new(config.mcp_servers.clone());
|
||||
let default_shell_fut = shell::default_user_shell();
|
||||
@@ -493,13 +481,10 @@ impl Session {
|
||||
|
||||
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
||||
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
|
||||
let initial_messages = match &initial_history {
|
||||
InitialHistory::New => None,
|
||||
InitialHistory::Forked(items) => Some(sess.build_initial_messages(items)),
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
Some(sess.build_initial_messages(&resumed_history.history))
|
||||
}
|
||||
};
|
||||
let initial_messages = Some(
|
||||
sess.apply_initial_history(&turn_context, initial_history.clone())
|
||||
.await,
|
||||
);
|
||||
|
||||
let events = std::iter::once(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
@@ -536,30 +521,31 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
async fn record_initial_history(
|
||||
async fn apply_initial_history(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
conversation_history: InitialHistory,
|
||||
) {
|
||||
) -> Vec<EventMsg> {
|
||||
match conversation_history {
|
||||
InitialHistory::New => {
|
||||
self.record_initial_history_new(turn_context).await;
|
||||
}
|
||||
InitialHistory::New => self.record_initial_history_new(turn_context).await,
|
||||
InitialHistory::Forked(items) => {
|
||||
self.record_initial_history_from_rollout_items(turn_context, items)
|
||||
.await;
|
||||
self.record_conversation_items_internal(&items, true).await;
|
||||
items
|
||||
.into_iter()
|
||||
.flat_map(|ri| {
|
||||
map_response_item_to_event_messages(&ri, self.show_raw_agent_reasoning)
|
||||
})
|
||||
.filter(|m| matches!(m, EventMsg::UserMessage(_)))
|
||||
.collect()
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
self.record_initial_history_from_rollout_items(
|
||||
turn_context,
|
||||
resumed_history.history,
|
||||
)
|
||||
.await;
|
||||
self.record_initial_history_resumed(resumed_history.history)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn record_initial_history_new(&self, turn_context: &TurnContext) {
|
||||
async fn record_initial_history_new(&self, turn_context: &TurnContext) -> Vec<EventMsg> {
|
||||
// record the initial user instructions and environment context,
|
||||
// regardless of whether we restored items.
|
||||
// TODO: Those items shouldn't be "user messages" IMO. Maybe developer messages.
|
||||
@@ -573,54 +559,45 @@ impl Session {
|
||||
Some(turn_context.sandbox_policy.clone()),
|
||||
Some(self.user_shell.clone()),
|
||||
)));
|
||||
self.record_conversation_items(turn_context, &conversation_items)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn record_initial_history_from_rollout_items(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
items: Vec<crate::rollout::RolloutItem>,
|
||||
) {
|
||||
use crate::rollout::recorder::RolloutItemSliceExt as _;
|
||||
let response_items = items.get_response_items();
|
||||
self.record_conversation_items_internal(turn_context, &response_items, false)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Build the initial UI messages vector for SessionConfigured from
|
||||
/// persisted rollout items. Prefer persisted events when present; fall back
|
||||
/// to converting response items for legacy rollouts.
|
||||
fn build_initial_messages(&self, items: &[crate::rollout::RolloutItem]) -> Vec<EventMsg> {
|
||||
let evs = items.get_events();
|
||||
if !evs.is_empty() {
|
||||
return evs;
|
||||
for item in conversation_items {
|
||||
self.record_conversation_item(item).await;
|
||||
}
|
||||
let responses = items.get_response_items();
|
||||
responses
|
||||
.iter()
|
||||
.flat_map(|item| {
|
||||
map_response_item_to_event_messages(item, self.show_raw_agent_reasoning)
|
||||
})
|
||||
.collect::<Vec<EventMsg>>()
|
||||
vec![]
|
||||
}
|
||||
|
||||
/// Sends an event to the client and records it to the rollout (if enabled).
|
||||
async fn record_initial_history_from_items(&self, items: Vec<ResponseItem>) {
|
||||
self.record_conversation_items_internal(&items, false).await;
|
||||
}
|
||||
|
||||
async fn record_initial_history_resumed(&self, items: Vec<RolloutItem>) -> Vec<EventMsg> {
|
||||
// Record transcript (without persisting again)
|
||||
let responses: Vec<ResponseItem> = items.as_slice().get_response_items();
|
||||
if !responses.is_empty() {
|
||||
self.record_conversation_items_internal(&responses, true)
|
||||
.await;
|
||||
}
|
||||
|
||||
items.as_slice().get_events()
|
||||
}
|
||||
|
||||
/// Sends the given event to the client and records it to the rollout (if enabled).
|
||||
/// Any send/record errors are logged and swallowed.
|
||||
pub(crate) async fn send_event(&self, event: Event) {
|
||||
// Clone recorder handle outside the lock scope to avoid holding the
|
||||
// mutex guard across an await point.
|
||||
let event_to_record = event.clone();
|
||||
if let Err(e) = self.tx_event.send(event).await {
|
||||
error!("failed to send event: {e}");
|
||||
}
|
||||
let recorder = {
|
||||
let guard = self.rollout.lock_unchecked();
|
||||
guard.as_ref().cloned()
|
||||
};
|
||||
if let Some(rec) = recorder
|
||||
&& let Err(e) = rec.record_events(std::slice::from_ref(&event)).await
|
||||
&& let Err(e) = rec
|
||||
.record_items(crate::rollout::RolloutItem::Event(event_to_record))
|
||||
.await
|
||||
{
|
||||
error!("failed to record rollout event: {e:#}");
|
||||
}
|
||||
if let Err(e) = self.tx_event.send(event).await {
|
||||
error!("failed to send tool call event: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn request_command_approval(
|
||||
@@ -709,70 +686,59 @@ impl Session {
|
||||
|
||||
/// Records items to both the rollout and the chat completions/ZDR
|
||||
/// transcript, if enabled.
|
||||
async fn record_conversation_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
|
||||
self.record_conversation_items_internal(turn_context, items, true)
|
||||
.await;
|
||||
async fn record_conversation_items(&self, items: &[ResponseItem]) {
|
||||
self.record_conversation_items_internal(items, true).await;
|
||||
}
|
||||
|
||||
async fn record_conversation_items_internal(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
items: &[ResponseItem],
|
||||
persist: bool,
|
||||
) {
|
||||
async fn record_conversation_item(&self, item: ResponseItem) {
|
||||
let items = [item];
|
||||
self.record_conversation_items_internal(&items, true).await;
|
||||
}
|
||||
|
||||
async fn record_conversation_items_internal(&self, items: &[ResponseItem], persist: bool) {
|
||||
debug!("Recording items for conversation: {items:?}");
|
||||
if persist {
|
||||
self.record_state_snapshot(turn_context, items).await;
|
||||
// Record snapshot of these items into rollout
|
||||
for item in items {
|
||||
self.record_state_snapshot(RolloutItem::ResponseItem(item.clone()))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
self.state.lock_unchecked().history.record_items(items);
|
||||
}
|
||||
|
||||
async fn record_state_snapshot(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
|
||||
let tc_snapshot = crate::rollout::recorder::TurnContextSnapshot {
|
||||
cwd: turn_context.cwd.clone(),
|
||||
approval_policy: turn_context.approval_policy,
|
||||
sandbox_policy: turn_context.sandbox_policy.clone(),
|
||||
model: turn_context.client.get_model(),
|
||||
show_raw_agent_reasoning: self.show_raw_agent_reasoning,
|
||||
};
|
||||
let snapshot = crate::rollout::SessionStateSnapshot {
|
||||
turn_context: tc_snapshot,
|
||||
};
|
||||
|
||||
async fn record_state_snapshot(&self, item: RolloutItem) {
|
||||
let recorder = {
|
||||
let guard = self.rollout.lock_unchecked();
|
||||
guard.as_ref().cloned()
|
||||
};
|
||||
|
||||
if let Some(rec) = recorder {
|
||||
if let Err(e) = rec.record_state(snapshot).await {
|
||||
error!("failed to record rollout state: {e:#}");
|
||||
}
|
||||
if let Err(e) = rec.record_response_items(items).await {
|
||||
error!("failed to record rollout items: {e:#}");
|
||||
}
|
||||
if let Some(rec) = recorder
|
||||
&& let Err(e) = rec.record_items(item).await
|
||||
{
|
||||
error!("failed to record rollout items: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
// Also persist user input as EventMsg::UserMessage so resuming can
|
||||
// reconstruct an event stream without relying on mapping.
|
||||
// Only keep user messages; avoid duplicating assistant events which
|
||||
// are recorded when emitted during streaming.
|
||||
let derived_user_events: Vec<crate::protocol::Event> = items
|
||||
.iter()
|
||||
.flat_map(|it| {
|
||||
map_response_item_to_event_messages(it, self.show_raw_agent_reasoning)
|
||||
})
|
||||
.filter(|ev| matches!(ev, EventMsg::UserMessage(_)))
|
||||
.map(|msg| crate::protocol::Event {
|
||||
id: String::new(),
|
||||
msg,
|
||||
})
|
||||
.collect();
|
||||
if !derived_user_events.is_empty() {
|
||||
if let Err(e) = rec.record_events(&derived_user_events).await {
|
||||
error!("failed to record derived user events: {e:#}");
|
||||
}
|
||||
}
|
||||
/// Records a user input into conversation history AND a corresponding UserMessage event in rollout.
|
||||
/// Does not send events to the UI.
|
||||
async fn record_user_input(&self, sub_id: &str, response_item: ResponseItem) {
|
||||
// Record the message/tool input in conversation history/rollout state
|
||||
self.record_conversation_item(response_item.clone()).await;
|
||||
|
||||
// Derive and record a UserMessage event alongside it in the rollout
|
||||
let user_events =
|
||||
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning)
|
||||
.into_iter()
|
||||
.filter(|m| matches!(m, EventMsg::UserMessage(_)));
|
||||
|
||||
for msg in user_events {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg,
|
||||
};
|
||||
self.record_state_snapshot(RolloutItem::Event(event)).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -940,7 +906,7 @@ impl Session {
|
||||
message: message.into(),
|
||||
}),
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
}
|
||||
|
||||
async fn notify_stream_error(&self, sub_id: &str, message: impl Into<String>) {
|
||||
@@ -950,7 +916,7 @@ impl Session {
|
||||
message: message.into(),
|
||||
}),
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
}
|
||||
|
||||
/// Build the full turn input by concatenating the current conversation
|
||||
@@ -1211,16 +1177,13 @@ async fn submission_loop(
|
||||
// Install the new persistent context for subsequent tasks/turns.
|
||||
turn_context = Arc::new(new_turn_context);
|
||||
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
|
||||
sess.record_conversation_items(
|
||||
&turn_context,
|
||||
&[ResponseItem::from(EnvironmentContext::new(
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
// Shell is not configurable from turn to turn
|
||||
None,
|
||||
))],
|
||||
)
|
||||
sess.record_conversation_item(ResponseItem::from(EnvironmentContext::new(
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
// Shell is not configurable from turn to turn
|
||||
None,
|
||||
)))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
@@ -1323,7 +1286,7 @@ async fn submission_loop(
|
||||
|
||||
Op::GetHistoryEntryRequest { offset, log_id } => {
|
||||
let config = config.clone();
|
||||
let sess2 = sess.clone();
|
||||
let sess_for_spawn = sess.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
@@ -1351,11 +1314,10 @@ async fn submission_loop(
|
||||
),
|
||||
};
|
||||
|
||||
sess2.send_event(event).await;
|
||||
sess_for_spawn.send_event(event).await;
|
||||
});
|
||||
}
|
||||
Op::ListMcpTools => {
|
||||
let sess2 = sess.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
// This is a cheap lookup from the connection manager's cache.
|
||||
@@ -1366,10 +1328,9 @@ async fn submission_loop(
|
||||
crate::protocol::McpListToolsResponseEvent { tools },
|
||||
),
|
||||
};
|
||||
sess2.send_event(event).await;
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
Op::ListCustomPrompts => {
|
||||
let sess2 = sess.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
let custom_prompts: Vec<CustomPrompt> =
|
||||
@@ -1385,7 +1346,7 @@ async fn submission_loop(
|
||||
custom_prompts,
|
||||
}),
|
||||
};
|
||||
sess2.send_event(event).await;
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
Op::Compact => {
|
||||
// Create a summarization request as user input
|
||||
@@ -1421,27 +1382,33 @@ async fn submission_loop(
|
||||
message: "Failed to shutdown rollout recorder".to_string(),
|
||||
}),
|
||||
};
|
||||
if let Err(e) = sess.tx_event.send(event).await {
|
||||
warn!("failed to send error message: {e:?}");
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
|
||||
let event = Event {
|
||||
id: sub.id.clone(),
|
||||
msg: EventMsg::ShutdownComplete,
|
||||
};
|
||||
if let Err(e) = sess.tx_event.send(event).await {
|
||||
warn!("failed to send Shutdown event: {e}");
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
break;
|
||||
}
|
||||
Op::GetHistory => {
|
||||
Op::GetConversationPath => {
|
||||
let sub_id = sub.id.clone();
|
||||
// Ensure rollout file is flushed so consumers can read it immediately.
|
||||
let rec_opt = { sess.rollout.lock_unchecked().as_ref().cloned() };
|
||||
if let Some(rec) = rec_opt {
|
||||
let _ = rec.flush().await;
|
||||
}
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
|
||||
msg: EventMsg::ConversationHistory(ConversationPathResponseEvent {
|
||||
conversation_id: sess.conversation_id,
|
||||
entries: sess.state.lock_unchecked().history.contents(),
|
||||
path: sess
|
||||
.rollout
|
||||
.lock_unchecked()
|
||||
.as_ref()
|
||||
.map(|r| r.path().to_path_buf())
|
||||
.unwrap_or_default(),
|
||||
}),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
@@ -1476,6 +1443,10 @@ async fn run_task(
|
||||
if input.is_empty() {
|
||||
return;
|
||||
}
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
// Record the user's input and corresponding event into the rollout
|
||||
let user_input_response: ResponseItem = ResponseItem::from(initial_input_for_turn.clone());
|
||||
sess.record_user_input(&sub_id, user_input_response).await;
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::TaskStarted(TaskStartedEvent {
|
||||
@@ -1484,10 +1455,6 @@ async fn run_task(
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
sess.record_conversation_items(turn_context, &[initial_input_for_turn.clone().into()])
|
||||
.await;
|
||||
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
|
||||
// many turns, from the perspective of the user, it is a single turn.
|
||||
@@ -1502,8 +1469,9 @@ async fn run_task(
|
||||
.into_iter()
|
||||
.map(ResponseItem::from)
|
||||
.collect::<Vec<ResponseItem>>();
|
||||
sess.record_conversation_items(turn_context, &pending_input)
|
||||
.await;
|
||||
for item in pending_input.iter() {
|
||||
sess.record_user_input(&sub_id, item.clone()).await;
|
||||
}
|
||||
|
||||
// Construct the input that we will send to the model. When using the
|
||||
// Chat completions API (or ZDR clients), the model needs the full
|
||||
@@ -1630,11 +1598,9 @@ async fn run_task(
|
||||
|
||||
// Only attempt to take the lock if there is something to record.
|
||||
if !items_to_record_in_conversation_history.is_empty() {
|
||||
sess.record_conversation_items(
|
||||
turn_context,
|
||||
&items_to_record_in_conversation_history,
|
||||
)
|
||||
.await;
|
||||
for item in items_to_record_in_conversation_history.iter().cloned() {
|
||||
sess.record_conversation_item(item).await;
|
||||
}
|
||||
}
|
||||
|
||||
if responses.is_empty() {
|
||||
@@ -2944,13 +2910,11 @@ async fn drain_to_completed(
|
||||
info
|
||||
};
|
||||
|
||||
sess.tx_event
|
||||
.send(Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
|
||||
})
|
||||
.await
|
||||
.ok();
|
||||
sess.send_event(Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
|
||||
})
|
||||
.await;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::protocol::EventMsg;
|
||||
use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::rollout::RolloutItem;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::recorder::RolloutItemSliceExt;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use std::collections::HashMap;
|
||||
@@ -19,18 +20,53 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ResumedHistory {
|
||||
pub conversation_id: ConversationId,
|
||||
pub history: Vec<RolloutItem>,
|
||||
pub rollout_path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum InitialHistory {
|
||||
New,
|
||||
Resumed(ResumedHistory),
|
||||
Forked(Vec<RolloutItem>),
|
||||
Forked(Vec<ResponseItem>),
|
||||
}
|
||||
|
||||
impl PartialEq for InitialHistory {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(InitialHistory::New, InitialHistory::New) => true,
|
||||
(InitialHistory::Forked(a), InitialHistory::Forked(b)) => a == b,
|
||||
(InitialHistory::Resumed(_), InitialHistory::Resumed(_)) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InitialHistory {
|
||||
/// Return all response items contained in this initial history.
|
||||
pub fn get_response_items(&self) -> Vec<ResponseItem> {
|
||||
match self {
|
||||
InitialHistory::New => Vec::new(),
|
||||
InitialHistory::Forked(_) => Vec::new(),
|
||||
InitialHistory::Resumed(items) => {
|
||||
<[_] as RolloutItemSliceExt>::get_response_items(items.history.as_slice())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return all events contained in this initial history.
|
||||
pub fn get_events(&self) -> Vec<crate::protocol::EventMsg> {
|
||||
match self {
|
||||
InitialHistory::New => Vec::new(),
|
||||
InitialHistory::Forked(_) => Vec::new(),
|
||||
InitialHistory::Resumed(items) => {
|
||||
<[_] as RolloutItemSliceExt>::get_events(items.history.as_slice())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a newly created Codex conversation, including the first event
|
||||
@@ -156,21 +192,43 @@ impl ConversationManager {
|
||||
/// caller's `config`). The new conversation will have a fresh id.
|
||||
pub async fn fork_conversation(
|
||||
&self,
|
||||
conversation_history: Vec<ResponseItem>,
|
||||
base_rollout_path: PathBuf,
|
||||
_base_conversation_id: ConversationId,
|
||||
num_messages_to_drop: usize,
|
||||
config: Config,
|
||||
) -> CodexResult<NewConversation> {
|
||||
// Compute the prefix up to the cut point.
|
||||
let history =
|
||||
truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop);
|
||||
// Read prior responses from the rollout file (tolerate both tagged and legacy formats).
|
||||
let text = tokio::fs::read_to_string(&base_rollout_path)
|
||||
.await
|
||||
.map_err(|e| CodexErr::Io(std::io::Error::other(format!("read rollout: {e}"))))?;
|
||||
let mut responses: Vec<ResponseItem> = Vec::new();
|
||||
for line in text.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let v: serde_json::Value = match serde_json::from_str(line) {
|
||||
Ok(v) => v,
|
||||
Err(_) => continue,
|
||||
};
|
||||
// Only consider response items (legacy lines have no record_type)
|
||||
match v.get("record_type").and_then(|s| s.as_str()) {
|
||||
Some("response") | None => {
|
||||
if let Ok(item) = serde_json::from_value::<ResponseItem>(v) {
|
||||
responses.push(item);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let kept = truncate_after_dropping_last_messages(responses, num_messages_to_drop);
|
||||
|
||||
// Spawn a new conversation with the computed initial history.
|
||||
let auth_manager = self.auth_manager.clone();
|
||||
let CodexSpawnOk {
|
||||
codex,
|
||||
conversation_id,
|
||||
} = Codex::spawn(config, auth_manager, history).await?;
|
||||
|
||||
} = Codex::spawn(config, auth_manager, kept).await?;
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
}
|
||||
}
|
||||
@@ -179,11 +237,7 @@ impl ConversationManager {
|
||||
/// and all items that follow them.
|
||||
fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) -> InitialHistory {
|
||||
if n == 0 {
|
||||
let rollout_items = items
|
||||
.into_iter()
|
||||
.map(crate::rollout::RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
return InitialHistory::Forked(rollout_items);
|
||||
return InitialHistory::Forked(items);
|
||||
}
|
||||
|
||||
// Walk backwards counting only `user` Message items, find cut index.
|
||||
@@ -205,13 +259,7 @@ fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) ->
|
||||
// No prefix remains after dropping; start a new conversation.
|
||||
InitialHistory::New
|
||||
} else {
|
||||
InitialHistory::Forked(
|
||||
items
|
||||
.into_iter()
|
||||
.take(cut_index)
|
||||
.map(crate::rollout::RolloutItem::ResponseItem)
|
||||
.collect(),
|
||||
)
|
||||
InitialHistory::Forked(items.into_iter().take(cut_index).collect())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -269,14 +317,10 @@ mod tests {
|
||||
let truncated = truncate_after_dropping_last_messages(items.clone(), 1);
|
||||
assert_eq!(
|
||||
truncated,
|
||||
InitialHistory::Forked(vec![
|
||||
crate::rollout::RolloutItem::ResponseItem(items[0].clone()),
|
||||
crate::rollout::RolloutItem::ResponseItem(items[1].clone()),
|
||||
crate::rollout::RolloutItem::ResponseItem(items[2].clone()),
|
||||
])
|
||||
InitialHistory::Forked(vec![items[0].clone(), items[1].clone(), items[2].clone(),])
|
||||
);
|
||||
|
||||
let truncated2 = truncate_after_dropping_last_messages(items, 2);
|
||||
assert_eq!(truncated2, InitialHistory::New);
|
||||
assert!(matches!(truncated2, InitialHistory::New));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,11 +61,11 @@ pub mod spawn;
|
||||
pub mod terminal;
|
||||
mod tool_apply_patch;
|
||||
pub mod turn_diff_tracker;
|
||||
pub use rollout::Cursor;
|
||||
pub use rollout::RolloutRecorder;
|
||||
pub use rollout::SessionMeta;
|
||||
pub use rollout::list::ConversationItem;
|
||||
pub use rollout::list::ConversationsPage;
|
||||
pub use rollout::list::Cursor;
|
||||
mod user_notification;
|
||||
pub mod util;
|
||||
pub use apply_patch::CODEX_APPLY_PATCH_ARG1;
|
||||
|
||||
@@ -1,91 +0,0 @@
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use time::OffsetDateTime;
|
||||
use time::PrimitiveDateTime;
|
||||
use time::format_description::FormatItem;
|
||||
use time::macros::format_description;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Timestamp format used in rollout filenames: YYYY-MM-DDThh-mm-ss
|
||||
pub const FILENAME_TS_FMT: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
||||
|
||||
/// Timestamp format used for JSONL records (UTC, second precision): YYYY-MM-DDThh:mm:ssZ
|
||||
pub const RECORD_TS_FMT: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z");
|
||||
|
||||
/// Pagination cursor identifying a file by timestamp and UUID.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Cursor {
|
||||
pub(crate) ts: OffsetDateTime,
|
||||
pub(crate) id: Uuid,
|
||||
}
|
||||
|
||||
impl Cursor {
|
||||
pub fn new(ts: OffsetDateTime, id: Uuid) -> Self {
|
||||
Self { ts, id }
|
||||
}
|
||||
}
|
||||
|
||||
impl serde::Serialize for Cursor {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let ts_str = self
|
||||
.ts
|
||||
.format(FILENAME_TS_FMT)
|
||||
.map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?;
|
||||
serializer.serialize_str(&format!("{ts_str}|{}", self.id))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for Cursor {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let s = String::deserialize(deserializer)?;
|
||||
parse_cursor(&s).ok_or_else(|| serde::de::Error::custom("invalid cursor"))
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a pagination cursor token in the form "<file_ts>|<uuid>".
|
||||
pub fn parse_cursor(token: &str) -> Option<Cursor> {
|
||||
let (file_ts, uuid_str) = token.split_once('|')?;
|
||||
|
||||
let Ok(uuid) = Uuid::parse_str(uuid_str) else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let ts = PrimitiveDateTime::parse(file_ts, FILENAME_TS_FMT)
|
||||
.ok()?
|
||||
.assume_utc();
|
||||
|
||||
Some(Cursor::new(ts, uuid))
|
||||
}
|
||||
|
||||
/// Parse timestamp and UUID from a rollout filename.
|
||||
/// Expected format: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
|
||||
pub fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
|
||||
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
|
||||
|
||||
// Scan from the right for a '-' such that the suffix parses as a UUID.
|
||||
let (sep_idx, uuid) = core
|
||||
.match_indices('-')
|
||||
.rev()
|
||||
.find_map(|(i, _)| Uuid::parse_str(&core[i + 1..]).ok().map(|u| (i, u)))?;
|
||||
|
||||
let ts_str = &core[..sep_idx];
|
||||
let ts = PrimitiveDateTime::parse(ts_str, FILENAME_TS_FMT)
|
||||
.ok()?
|
||||
.assume_utc();
|
||||
Some((ts, uuid))
|
||||
}
|
||||
|
||||
/// Build a rollout filename for a given timestamp and conversation id.
|
||||
pub fn build_rollout_filename(ts: OffsetDateTime, conversation_id: ConversationId) -> String {
|
||||
let date_str = ts
|
||||
.format(FILENAME_TS_FMT)
|
||||
.unwrap_or_else(|e| panic!("failed to format timestamp: {e}"));
|
||||
format!("rollout-{date_str}-{conversation_id}.jsonl")
|
||||
}
|
||||
@@ -4,12 +4,13 @@ use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use time::OffsetDateTime;
|
||||
use time::PrimitiveDateTime;
|
||||
use time::format_description::FormatItem;
|
||||
use time::macros::format_description;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::SESSIONS_SUBDIR;
|
||||
use super::format::Cursor;
|
||||
use super::format::parse_timestamp_uuid_from_filename;
|
||||
use std::sync::Arc;
|
||||
use super::recorder::SessionMetaWithGit;
|
||||
|
||||
/// Returned page of conversation summaries.
|
||||
#[derive(Debug, Default, PartialEq)]
|
||||
@@ -33,14 +34,48 @@ pub struct ConversationItem {
|
||||
pub head: Vec<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// A filter applied to a discovered conversation. All filters must pass for
|
||||
/// the item to be included in results.
|
||||
pub type ConversationFilter = Arc<dyn Fn(&ConversationItem) -> bool + Send + Sync>;
|
||||
|
||||
/// Hard cap to bound worst‑case work per request.
|
||||
const MAX_SCAN_FILES: usize = 10_000;
|
||||
const HEAD_RECORD_LIMIT: usize = 10;
|
||||
|
||||
/// Pagination cursor identifying a file by timestamp and UUID.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Cursor {
|
||||
ts: OffsetDateTime,
|
||||
id: Uuid,
|
||||
}
|
||||
|
||||
impl Cursor {
|
||||
fn new(ts: OffsetDateTime, id: Uuid) -> Self {
|
||||
Self { ts, id }
|
||||
}
|
||||
}
|
||||
|
||||
impl serde::Serialize for Cursor {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let ts_str = self
|
||||
.ts
|
||||
.format(&format_description!(
|
||||
"[year]-[month]-[day]T[hour]-[minute]-[second]"
|
||||
))
|
||||
.map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?;
|
||||
serializer.serialize_str(&format!("{ts_str}|{}", self.id))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for Cursor {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let s = String::deserialize(deserializer)?;
|
||||
parse_cursor(&s).ok_or_else(|| serde::de::Error::custom("invalid cursor"))
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve recorded conversation file paths with token pagination. The returned `next_cursor`
|
||||
/// can be supplied on the next call to resume after the last returned item, resilient to
|
||||
/// concurrent new sessions being appended. Ordering is stable by timestamp desc, then UUID desc.
|
||||
@@ -48,27 +83,23 @@ pub(crate) async fn get_conversations(
|
||||
codex_home: &Path,
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
) -> io::Result<ConversationsPage> {
|
||||
get_conversations_filtered(codex_home, page_size, cursor, &[]).await
|
||||
}
|
||||
|
||||
/// Retrieve recorded conversations with filters. All provided filters must
|
||||
/// return `true` for an item to be included.
|
||||
pub(crate) async fn get_conversations_filtered(
|
||||
codex_home: &Path,
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
filters: &[ConversationFilter],
|
||||
) -> io::Result<ConversationsPage> {
|
||||
let mut root = codex_home.to_path_buf();
|
||||
root.push(SESSIONS_SUBDIR);
|
||||
|
||||
if !root.exists() {
|
||||
return Ok(empty_page());
|
||||
return Ok(ConversationsPage {
|
||||
items: Vec::new(),
|
||||
next_cursor: None,
|
||||
num_scanned_files: 0,
|
||||
reached_scan_cap: false,
|
||||
});
|
||||
}
|
||||
|
||||
let anchor = cursor.cloned();
|
||||
traverse_directories_for_paths_filtered(root.clone(), page_size, anchor, filters).await
|
||||
|
||||
let result = traverse_directories_for_paths(root.clone(), page_size, anchor).await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Load the full contents of a single conversation session file at `path`.
|
||||
@@ -82,11 +113,10 @@ pub(crate) async fn get_conversation(path: &Path) -> io::Result<String> {
|
||||
///
|
||||
/// Directory layout: `~/.codex/sessions/YYYY/MM/DD/rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl`
|
||||
/// Returned newest (latest) first.
|
||||
async fn traverse_directories_for_paths_filtered(
|
||||
async fn traverse_directories_for_paths(
|
||||
root: PathBuf,
|
||||
page_size: usize,
|
||||
anchor: Option<Cursor>,
|
||||
filters: &[ConversationFilter],
|
||||
) -> io::Result<ConversationsPage> {
|
||||
let mut items: Vec<ConversationItem> = Vec::with_capacity(page_size);
|
||||
let mut scanned_files = 0usize;
|
||||
@@ -141,11 +171,8 @@ async fn traverse_directories_for_paths_filtered(
|
||||
let head = read_first_jsonl_records(&path, HEAD_RECORD_LIMIT)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let item = ConversationItem { path, head };
|
||||
// Apply all filters
|
||||
let include = filters.iter().all(|f| f(&item));
|
||||
if include {
|
||||
items.push(item);
|
||||
if should_include_session(&head) {
|
||||
items.push(ConversationItem { path, head });
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -161,6 +188,23 @@ async fn traverse_directories_for_paths_filtered(
|
||||
})
|
||||
}
|
||||
|
||||
/// Pagination cursor token format: "<file_ts>|<uuid>" where `file_ts` matches the
|
||||
/// filename timestamp portion (YYYY-MM-DDThh-mm-ss) used in rollout filenames.
|
||||
/// The cursor orders files by timestamp desc, then UUID desc.
|
||||
fn parse_cursor(token: &str) -> Option<Cursor> {
|
||||
let (file_ts, uuid_str) = token.split_once('|')?;
|
||||
|
||||
let Ok(uuid) = Uuid::parse_str(uuid_str) else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let format: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
||||
let ts = PrimitiveDateTime::parse(file_ts, format).ok()?.assume_utc();
|
||||
|
||||
Some(Cursor::new(ts, uuid))
|
||||
}
|
||||
|
||||
fn build_next_cursor(items: &[ConversationItem]) -> Option<Cursor> {
|
||||
let last = items.last()?;
|
||||
let file_name = last.path.file_name()?.to_string_lossy();
|
||||
@@ -215,13 +259,21 @@ where
|
||||
Ok(collected)
|
||||
}
|
||||
|
||||
fn empty_page() -> ConversationsPage {
|
||||
ConversationsPage {
|
||||
items: Vec::new(),
|
||||
next_cursor: None,
|
||||
num_scanned_files: 0,
|
||||
reached_scan_cap: false,
|
||||
}
|
||||
fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
|
||||
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
|
||||
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
|
||||
|
||||
// Scan from the right for a '-' such that the suffix parses as a UUID.
|
||||
let (sep_idx, uuid) = core
|
||||
.match_indices('-')
|
||||
.rev()
|
||||
.find_map(|(i, _)| Uuid::parse_str(&core[i + 1..]).ok().map(|u| (i, u)))?;
|
||||
|
||||
let ts_str = &core[..sep_idx];
|
||||
let format: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
||||
let ts = PrimitiveDateTime::parse(ts_str, format).ok()?.assume_utc();
|
||||
Some((ts, uuid))
|
||||
}
|
||||
|
||||
async fn read_first_jsonl_records(
|
||||
@@ -248,15 +300,36 @@ async fn read_first_jsonl_records(
|
||||
Ok(head)
|
||||
}
|
||||
|
||||
/// Returns a filter that requires the first JSONL record to be a tagged
|
||||
/// session meta line: { "record_type": "session_meta", ... }.
|
||||
pub(crate) fn requires_tagged_session_meta_filter() -> ConversationFilter {
|
||||
Arc::new(|item: &ConversationItem| {
|
||||
item.head
|
||||
.get(0)
|
||||
.and_then(|v| v.get("record_type"))
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s == "session_meta")
|
||||
.unwrap_or(false)
|
||||
})
|
||||
/// Return true if this conversation should be included in the listing.
|
||||
///
|
||||
/// Current rule: include only when the first JSON object is a session meta record
|
||||
/// (i.e., has `{"record_type": "session_meta", ...}`), which is how rollout
|
||||
/// files are written. Empty or malformed heads are excluded.
|
||||
fn should_include_session(head: &[serde_json::Value]) -> bool {
|
||||
let Some(first) = head.first() else {
|
||||
return false;
|
||||
};
|
||||
passes_session_meta_filter(first)
|
||||
}
|
||||
|
||||
/// Validate that the first record is a fully‑formed session meta line.
|
||||
///
|
||||
/// Requirements:
|
||||
/// - `record_type == "session_meta"`
|
||||
/// - Remaining fields (after removing `record_type`) deserialize into
|
||||
/// `SessionMetaWithGit`.
|
||||
fn passes_session_meta_filter(first: &serde_json::Value) -> bool {
|
||||
let Some(obj) = first.as_object() else {
|
||||
return false;
|
||||
};
|
||||
let record_type = obj.get("record_type").and_then(|v| v.as_str());
|
||||
if record_type != Some("session_meta") {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Remove the marker field and validate the remainder matches SessionMetaWithGit
|
||||
let mut cleaned = obj.clone();
|
||||
cleaned.remove("record_type");
|
||||
let val = serde_json::Value::Object(cleaned);
|
||||
serde_json::from_value::<SessionMetaWithGit>(val).is_ok()
|
||||
}
|
||||
|
||||
@@ -2,15 +2,12 @@
|
||||
|
||||
pub(crate) const SESSIONS_SUBDIR: &str = "sessions";
|
||||
|
||||
pub mod format;
|
||||
pub mod list;
|
||||
pub(crate) mod policy;
|
||||
pub mod recorder;
|
||||
|
||||
pub use format::Cursor;
|
||||
pub use recorder::RolloutItem;
|
||||
pub use recorder::RolloutRecorder;
|
||||
pub use recorder::RolloutRecorderParams;
|
||||
pub use recorder::SessionMeta;
|
||||
pub use recorder::SessionStateSnapshot;
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::protocol::EventMsg;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
|
||||
/// Whether a `ResponseItem` should be persisted in rollout files.
|
||||
#[inline]
|
||||
@@ -16,81 +17,41 @@ pub(crate) fn is_persisted_response_item(item: &ResponseItem) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether an `EventMsg` should be persisted in rollout files.
|
||||
///
|
||||
/// Keep only high-signal, compact items. Avoid deltas and verbose streams.
|
||||
#[inline]
|
||||
pub(crate) fn is_persisted_event_msg(event: &EventMsg) -> bool {
|
||||
match event {
|
||||
// Core content to replay UI meaningfully
|
||||
EventMsg::AgentMessage(_)
|
||||
pub(crate) fn is_persisted_event(event: &Event) -> bool {
|
||||
match event.msg {
|
||||
EventMsg::ExecApprovalRequest(_)
|
||||
| EventMsg::ApplyPatchApprovalRequest(_)
|
||||
| EventMsg::AgentReasoningDelta(_)
|
||||
| EventMsg::AgentReasoningRawContentDelta(_)
|
||||
| EventMsg::ExecCommandOutputDelta(_)
|
||||
| EventMsg::GetHistoryEntryResponse(_)
|
||||
| EventMsg::AgentMessageDelta(_)
|
||||
| EventMsg::TaskStarted(_)
|
||||
| EventMsg::TaskComplete(_)
|
||||
| EventMsg::McpToolCallBegin(_)
|
||||
| EventMsg::McpToolCallEnd(_)
|
||||
| EventMsg::WebSearchBegin(_)
|
||||
| EventMsg::WebSearchEnd(_)
|
||||
| EventMsg::ExecCommandBegin(_)
|
||||
| EventMsg::ExecCommandEnd(_)
|
||||
| EventMsg::PatchApplyBegin(_)
|
||||
| EventMsg::PatchApplyEnd(_)
|
||||
| EventMsg::TurnDiff(_)
|
||||
| EventMsg::BackgroundEvent(_)
|
||||
| EventMsg::McpListToolsResponse(_)
|
||||
| EventMsg::ListCustomPromptsResponse(_)
|
||||
| EventMsg::ShutdownComplete
|
||||
| EventMsg::ConversationHistory(_)
|
||||
| EventMsg::PlanUpdate(_)
|
||||
| EventMsg::TurnAborted(_)
|
||||
| EventMsg::StreamError(_)
|
||||
| EventMsg::Error(_)
|
||||
| EventMsg::AgentReasoningSectionBreak(_)
|
||||
| EventMsg::SessionConfigured(_) => false,
|
||||
EventMsg::UserMessage(_)
|
||||
| EventMsg::AgentMessage(_)
|
||||
| EventMsg::AgentReasoning(_)
|
||||
| EventMsg::TokenCount(_)
|
||||
| EventMsg::UserMessage(_) => true,
|
||||
|
||||
// Everything else is either transient, redundant, or too verbose
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::AgentMessageDeltaEvent;
|
||||
use crate::protocol::AgentMessageEvent;
|
||||
use crate::protocol::AgentReasoningDeltaEvent;
|
||||
use crate::protocol::AgentReasoningEvent;
|
||||
use crate::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use crate::protocol::ExecCommandEndEvent;
|
||||
use crate::protocol::TokenCountEvent;
|
||||
|
||||
#[test]
|
||||
fn test_event_persistence_policy() {
|
||||
assert!(is_persisted_event_msg(&EventMsg::AgentMessage(
|
||||
AgentMessageEvent {
|
||||
message: "hi".to_string(),
|
||||
}
|
||||
)));
|
||||
assert!(is_persisted_event_msg(&EventMsg::AgentReasoning(
|
||||
AgentReasoningEvent {
|
||||
text: "think".to_string(),
|
||||
}
|
||||
)));
|
||||
assert!(is_persisted_event_msg(&EventMsg::TokenCount(
|
||||
TokenCountEvent { info: None }
|
||||
)));
|
||||
|
||||
assert!(!is_persisted_event_msg(&EventMsg::AgentMessageDelta(
|
||||
AgentMessageDeltaEvent {
|
||||
delta: "d".to_string(),
|
||||
}
|
||||
)));
|
||||
assert!(!is_persisted_event_msg(&EventMsg::AgentReasoningDelta(
|
||||
AgentReasoningDeltaEvent {
|
||||
delta: "d".to_string(),
|
||||
}
|
||||
)));
|
||||
assert!(!is_persisted_event_msg(&EventMsg::ExecCommandEnd(
|
||||
ExecCommandEndEvent {
|
||||
call_id: "c".to_string(),
|
||||
stdout: Default::default(),
|
||||
stderr: Default::default(),
|
||||
aggregated_output: Default::default(),
|
||||
exit_code: 0,
|
||||
duration: std::time::Duration::from_secs(0),
|
||||
formatted_output: String::new(),
|
||||
}
|
||||
)));
|
||||
use crate::protocol::FileChange;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
assert!(!is_persisted_event_msg(
|
||||
&EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
call_id: "c".to_string(),
|
||||
changes: HashMap::<PathBuf, FileChange>::new(),
|
||||
reason: None,
|
||||
grant_root: None,
|
||||
})
|
||||
));
|
||||
| EventMsg::AgentReasoningRawContent(_)
|
||||
| EventMsg::TokenCount(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,11 +6,11 @@ use std::io::Error as IoError;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::protocol::Event;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use time::OffsetDateTime;
|
||||
use time::format_description::FormatItem;
|
||||
use time::macros::format_description;
|
||||
@@ -21,22 +21,17 @@ use tokio::sync::oneshot;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use super::Cursor;
|
||||
use super::SESSIONS_SUBDIR;
|
||||
use super::format::RECORD_TS_FMT;
|
||||
use super::format::build_rollout_filename;
|
||||
use super::list::ConversationFilter;
|
||||
use super::list::ConversationsPage;
|
||||
use super::list::Cursor;
|
||||
use super::list::get_conversations;
|
||||
use super::list::get_conversations_filtered;
|
||||
use super::policy::is_persisted_event_msg;
|
||||
use super::policy::is_persisted_response_item;
|
||||
use crate::config::Config;
|
||||
use crate::conversation_manager::InitialHistory;
|
||||
use crate::conversation_manager::ResumedHistory;
|
||||
use crate::git_info::GitInfo;
|
||||
use crate::git_info::collect_git_info;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::rollout::policy::is_persisted_event;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
|
||||
@@ -49,6 +44,7 @@ pub struct SessionMeta {
|
||||
pub instructions: Option<String>,
|
||||
}
|
||||
|
||||
// SessionMetaWithGit is used in writes and reads; ensure it implements Debug.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct SessionMetaWithGit {
|
||||
#[serde(flatten)]
|
||||
@@ -57,27 +53,16 @@ pub struct SessionMetaWithGit {
|
||||
git: Option<GitInfo>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct SessionStateSnapshot {
|
||||
pub turn_context: TurnContextSnapshot,
|
||||
}
|
||||
#[derive(Serialize, Deserialize, Default, Clone, Debug)]
|
||||
pub struct SessionStateSnapshot {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct TurnContextSnapshot {
|
||||
pub cwd: std::path::PathBuf,
|
||||
pub approval_policy: crate::protocol::AskForApproval,
|
||||
pub sandbox_policy: crate::protocol::SandboxPolicy,
|
||||
pub model: String,
|
||||
pub show_raw_agent_reasoning: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[derive(Serialize, Deserialize, Default, Clone)]
|
||||
pub struct SavedSession {
|
||||
pub session: SessionMeta,
|
||||
#[serde(default)]
|
||||
pub items: Vec<ResponseItem>,
|
||||
#[serde(default)]
|
||||
pub state: Option<SessionStateSnapshot>,
|
||||
pub state: SessionStateSnapshot,
|
||||
pub session_id: ConversationId,
|
||||
}
|
||||
|
||||
@@ -93,45 +78,28 @@ pub struct SavedSession {
|
||||
#[derive(Clone)]
|
||||
pub struct RolloutRecorder {
|
||||
tx: Sender<RolloutCmd>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum RolloutRecorderParams {
|
||||
Create {
|
||||
conversation_id: ConversationId,
|
||||
instructions: Option<String>,
|
||||
},
|
||||
Resume {
|
||||
path: PathBuf,
|
||||
},
|
||||
}
|
||||
|
||||
enum RolloutCmd {
|
||||
AddResponseItems(Vec<ResponseItem>),
|
||||
AddEvents(Vec<Event>),
|
||||
UpdateState(SessionStateSnapshot),
|
||||
Shutdown { ack: oneshot::Sender<()> },
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(tag = "record_type", rename_all = "snake_case")]
|
||||
enum TaggedLine {
|
||||
#[serde(rename = "response")]
|
||||
Response {
|
||||
#[serde(flatten)]
|
||||
item: ResponseItem,
|
||||
},
|
||||
#[serde(rename = "event")]
|
||||
Event {
|
||||
#[serde(flatten)]
|
||||
event: Event,
|
||||
},
|
||||
#[serde(rename = "session_meta")]
|
||||
SessionMeta {
|
||||
#[serde(flatten)]
|
||||
meta: SessionMetaWithGit,
|
||||
},
|
||||
#[serde(rename = "state")]
|
||||
PrevSessionMeta {
|
||||
#[serde(flatten)]
|
||||
meta: SessionMetaWithGit,
|
||||
},
|
||||
State {
|
||||
#[serde(flatten)]
|
||||
state: SessionStateSnapshot,
|
||||
@@ -152,15 +120,15 @@ pub enum RolloutItem {
|
||||
SessionMeta(SessionMetaWithGit),
|
||||
}
|
||||
|
||||
impl PartialEq for RolloutItem {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(RolloutItem::ResponseItem(a), RolloutItem::ResponseItem(b)) => a == b,
|
||||
(RolloutItem::SessionMeta(_), RolloutItem::SessionMeta(_)) => false,
|
||||
// Event comparison omitted (foreign type without PartialEq); treat as not equal
|
||||
(RolloutItem::Event(_), RolloutItem::Event(_)) => false,
|
||||
_ => false,
|
||||
}
|
||||
impl From<ResponseItem> for RolloutItem {
|
||||
fn from(item: ResponseItem) -> Self {
|
||||
RolloutItem::ResponseItem(item)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Event> for RolloutItem {
|
||||
fn from(event: Event) -> Self {
|
||||
RolloutItem::Event(event)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,20 +158,18 @@ impl RolloutItemSliceExt for [RolloutItem] {
|
||||
}
|
||||
}
|
||||
|
||||
impl RolloutRecorderParams {
|
||||
pub fn new(conversation_id: ConversationId, instructions: Option<String>) -> Self {
|
||||
Self::Create {
|
||||
conversation_id,
|
||||
instructions,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn resume(path: PathBuf) -> Self {
|
||||
Self::Resume { path }
|
||||
}
|
||||
enum RolloutCmd {
|
||||
AddResponseItems(Vec<ResponseItem>),
|
||||
AddEvents(Vec<Event>),
|
||||
AddSessionMeta(SessionMetaWithGit),
|
||||
Flush { ack: oneshot::Sender<()> },
|
||||
Shutdown { ack: oneshot::Sender<()> },
|
||||
}
|
||||
|
||||
impl RolloutRecorder {
|
||||
pub fn path(&self) -> &Path {
|
||||
&self.path
|
||||
}
|
||||
#[allow(dead_code)]
|
||||
/// List conversations (rollout files) under the provided Codex home directory.
|
||||
pub async fn list_conversations(
|
||||
@@ -211,142 +177,130 @@ impl RolloutRecorder {
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
) -> std::io::Result<ConversationsPage> {
|
||||
// Apply a default filter to require files to start with a tagged
|
||||
// session_meta record. This skips legacy/invalid files from listings.
|
||||
let filters = vec![super::list::requires_tagged_session_meta_filter()];
|
||||
get_conversations_filtered(codex_home, page_size, cursor, &filters).await
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// List conversations with filters; all filters must pass for inclusion.
|
||||
pub async fn list_conversations_with_filters(
|
||||
codex_home: &Path,
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
filters: &[ConversationFilter],
|
||||
) -> std::io::Result<ConversationsPage> {
|
||||
get_conversations_filtered(codex_home, page_size, cursor, filters).await
|
||||
get_conversations(codex_home, page_size, cursor).await
|
||||
}
|
||||
|
||||
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
|
||||
/// cannot be created or the rollout file cannot be opened we return the
|
||||
/// error so the caller can decide whether to disable persistence.
|
||||
pub async fn new(config: &Config, params: RolloutRecorderParams) -> std::io::Result<Self> {
|
||||
let (file, meta) = match params {
|
||||
RolloutRecorderParams::Create {
|
||||
conversation_id,
|
||||
pub async fn new(
|
||||
config: &Config,
|
||||
conversation_id: ConversationId,
|
||||
instructions: Option<String>,
|
||||
) -> std::io::Result<Self> {
|
||||
let LogFileInfo {
|
||||
file,
|
||||
conversation_id: session_id,
|
||||
timestamp,
|
||||
path,
|
||||
} = create_log_file(config, conversation_id)?;
|
||||
|
||||
let timestamp_format: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z");
|
||||
let timestamp = timestamp
|
||||
.to_offset(time::UtcOffset::UTC)
|
||||
.format(timestamp_format)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
|
||||
let cwd = config.cwd.to_path_buf();
|
||||
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
|
||||
tokio::task::spawn(rollout_writer(
|
||||
tokio::fs::File::from_std(file),
|
||||
rx,
|
||||
Some(SessionMeta {
|
||||
timestamp,
|
||||
id: session_id,
|
||||
cwd: config.cwd.to_string_lossy().to_string(),
|
||||
originator: config.responses_originator_header.clone(),
|
||||
cli_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
instructions,
|
||||
} => {
|
||||
let LogFileInfo {
|
||||
file,
|
||||
conversation_id: session_id,
|
||||
timestamp,
|
||||
} = create_log_file(config, conversation_id)?;
|
||||
}),
|
||||
cwd,
|
||||
));
|
||||
|
||||
let timestamp_format: &[FormatItem] = format_description!(
|
||||
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
|
||||
);
|
||||
let timestamp = timestamp
|
||||
.to_offset(time::UtcOffset::UTC)
|
||||
.format(timestamp_format)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
|
||||
(
|
||||
tokio::fs::File::from_std(file),
|
||||
Some(SessionMeta {
|
||||
timestamp,
|
||||
id: session_id,
|
||||
cwd: config.cwd.display().to_string(),
|
||||
originator: config.responses_originator_header.clone(),
|
||||
cli_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
instructions,
|
||||
}),
|
||||
)
|
||||
}
|
||||
RolloutRecorderParams::Resume { path } => (
|
||||
tokio::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(path)
|
||||
.await?,
|
||||
None,
|
||||
),
|
||||
};
|
||||
|
||||
// Clone the cwd for the spawned task to collect git info asynchronously
|
||||
let cwd = config.cwd.clone();
|
||||
|
||||
// A reasonably-sized bounded channel. If the buffer fills up the send
|
||||
// future will yield, which is fine – we only need to ensure we do not
|
||||
// perform *blocking* I/O on the caller's thread.
|
||||
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
|
||||
|
||||
// Spawn a Tokio task that owns the file handle and performs async
|
||||
// writes. Using `tokio::fs::File` keeps everything on the async I/O
|
||||
// driver instead of blocking the runtime.
|
||||
tokio::task::spawn(rollout_writer(file, rx, meta, cwd));
|
||||
|
||||
Ok(Self { tx })
|
||||
Ok(Self { tx, path })
|
||||
}
|
||||
|
||||
pub(crate) async fn record_response_items(
|
||||
&self,
|
||||
items: &[ResponseItem],
|
||||
) -> std::io::Result<()> {
|
||||
if items.is_empty() {
|
||||
pub(crate) async fn record_items(&self, item: RolloutItem) -> std::io::Result<()> {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(item) => self.record_response_item(&item).await,
|
||||
RolloutItem::Event(event) => self.record_event(&event).await,
|
||||
RolloutItem::SessionMeta(meta) => self.record_session_meta(&meta).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensure all writes up to this point have been processed by the writer task.
|
||||
///
|
||||
/// This is a sequencing barrier for readers that plan to open and read the
|
||||
/// rollout file immediately after calling this method. The background writer
|
||||
/// processes the channel serially; when it dequeues `Flush`, all prior
|
||||
/// `AddResponseItems`/`AddEvents`/`AddSessionMeta` have already been written
|
||||
/// via `write_line`, which calls `file.flush()` (OS‐buffer flush).
|
||||
pub async fn flush(&self) -> std::io::Result<()> {
|
||||
let (tx_done, rx_done) = oneshot::channel();
|
||||
self.tx
|
||||
.send(RolloutCmd::Flush { ack: tx_done })
|
||||
.await
|
||||
.map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?;
|
||||
rx_done
|
||||
.await
|
||||
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))
|
||||
}
|
||||
|
||||
async fn record_response_item(&self, item: &ResponseItem) -> std::io::Result<()> {
|
||||
// Note that function calls may look a bit strange if they are
|
||||
// "fully qualified MCP tool calls," so we could consider
|
||||
// reformatting them in that case.
|
||||
if !is_persisted_response_item(item) {
|
||||
return Ok(());
|
||||
}
|
||||
self.tx
|
||||
.send(RolloutCmd::AddResponseItems(items.to_vec()))
|
||||
.send(RolloutCmd::AddResponseItems(vec![item.clone()]))
|
||||
.await
|
||||
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
|
||||
}
|
||||
|
||||
pub(crate) async fn record_events(&self, events: &[Event]) -> std::io::Result<()> {
|
||||
if events.is_empty() {
|
||||
async fn record_event(&self, event: &Event) -> std::io::Result<()> {
|
||||
if !is_persisted_event(event) {
|
||||
return Ok(());
|
||||
}
|
||||
self.tx
|
||||
.send(RolloutCmd::AddEvents(events.to_vec()))
|
||||
.send(RolloutCmd::AddEvents(vec![event.clone()]))
|
||||
.await
|
||||
.map_err(|e| IoError::other(format!("failed to queue rollout events: {e}")))
|
||||
.map_err(|e| IoError::other(format!("failed to queue rollout event: {e}")))
|
||||
}
|
||||
|
||||
pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> {
|
||||
async fn record_session_meta(&self, meta: &SessionMetaWithGit) -> std::io::Result<()> {
|
||||
self.tx
|
||||
.send(RolloutCmd::UpdateState(state))
|
||||
.send(RolloutCmd::AddSessionMeta(meta.clone()))
|
||||
.await
|
||||
.map_err(|e| IoError::other(format!("failed to queue rollout state: {e}")))
|
||||
.map_err(|e| IoError::other(format!("failed to queue rollout session meta: {e}")))
|
||||
}
|
||||
|
||||
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
|
||||
info!("Resuming rollout from {path:?}");
|
||||
tracing::error!("Resuming rollout from {path:?}");
|
||||
let text = tokio::fs::read_to_string(path).await?;
|
||||
let mut lines = text.lines();
|
||||
let first_line = lines
|
||||
.next()
|
||||
.ok_or_else(|| IoError::other("empty session file"))?;
|
||||
// Support both legacy (bare SessionMeta) and new tagged format
|
||||
let v_first: Value = serde_json::from_str(first_line)
|
||||
.map_err(|e| IoError::other(format!("failed to parse first line: {e}")))?;
|
||||
let conversation_id = if v_first.get("record_type").is_some() {
|
||||
let rt_ok = v_first
|
||||
.get("record_type")
|
||||
.and_then(|s| s.as_str())
|
||||
.map(|s| s == "session_meta")
|
||||
.unwrap_or(false);
|
||||
if !rt_ok {
|
||||
return Err(IoError::other("first line is not session_meta"));
|
||||
}
|
||||
match serde_json::from_value::<SessionMetaWithGit>(v_first.clone()) {
|
||||
Ok(rollout_session_meta) => Some(rollout_session_meta.meta.id),
|
||||
Err(e) => {
|
||||
return Err(IoError::other(format!(
|
||||
"failed to parse first line (tagged) as SessionMeta: {e}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
let conversation_id = if let Ok(TimestampedLine {
|
||||
record: TaggedLine::SessionMeta { meta },
|
||||
..
|
||||
}) = serde_json::from_str::<TimestampedLine>(first_line)
|
||||
{
|
||||
Some(meta.meta.id)
|
||||
} else if let Ok(meta) = serde_json::from_str::<SessionMetaWithGit>(first_line) {
|
||||
Some(meta.meta.id)
|
||||
} else if let Ok(meta) = serde_json::from_str::<SessionMeta>(first_line) {
|
||||
Some(meta.id)
|
||||
} else {
|
||||
return Err(IoError::other("first line missing record_type"));
|
||||
return Err(IoError::other(
|
||||
"failed to parse first line of rollout file as SessionMeta",
|
||||
));
|
||||
};
|
||||
|
||||
let mut items: Vec<RolloutItem> = Vec::new();
|
||||
@@ -354,39 +308,33 @@ impl RolloutRecorder {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let v: Value = match serde_json::from_str(line) {
|
||||
Ok(v) => v,
|
||||
Err(_) => continue,
|
||||
};
|
||||
// Tagged format: collect responses and events; skip state
|
||||
if let Some(rt) = v.get("record_type").and_then(|rt| rt.as_str()) {
|
||||
match rt {
|
||||
"state" => continue,
|
||||
"response" => match serde_json::from_value::<ResponseItem>(v.clone()) {
|
||||
Ok(item) => {
|
||||
if is_persisted_response_item(&item) {
|
||||
items.push(RolloutItem::ResponseItem(item));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to parse item: {v:?}, error: {e}");
|
||||
}
|
||||
},
|
||||
"event" => match serde_json::from_value::<Event>(v.clone()) {
|
||||
Ok(event) => {
|
||||
if is_persisted_event_msg(&event.msg) {
|
||||
items.push(RolloutItem::Event(event));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to parse event: {v:?}, error: {e}");
|
||||
}
|
||||
},
|
||||
_ => continue,
|
||||
match serde_json::from_str::<TimestampedLine>(line) {
|
||||
Ok(TimestampedLine {
|
||||
record: TaggedLine::State { .. },
|
||||
..
|
||||
}) => {}
|
||||
Ok(TimestampedLine {
|
||||
record: TaggedLine::Event { event },
|
||||
..
|
||||
}) => items.push(RolloutItem::Event(event)),
|
||||
Ok(TimestampedLine {
|
||||
record: TaggedLine::SessionMeta { meta },
|
||||
..
|
||||
})
|
||||
| Ok(TimestampedLine {
|
||||
record: TaggedLine::PrevSessionMeta { meta },
|
||||
..
|
||||
}) => items.push(RolloutItem::SessionMeta(meta)),
|
||||
Ok(TimestampedLine {
|
||||
record: TaggedLine::Response { item },
|
||||
..
|
||||
}) => {
|
||||
if is_persisted_response_item(&item) {
|
||||
items.push(RolloutItem::ResponseItem(item));
|
||||
}
|
||||
}
|
||||
continue;
|
||||
Err(_) => warn!("failed to parse rollout line: {line}"),
|
||||
}
|
||||
// Non-tagged lines are ignored in the new format
|
||||
}
|
||||
|
||||
tracing::error!(
|
||||
@@ -434,6 +382,9 @@ struct LogFileInfo {
|
||||
|
||||
/// Timestamp for the start of the session.
|
||||
timestamp: OffsetDateTime,
|
||||
|
||||
/// Full filesystem path to the rollout file.
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
fn create_log_file(
|
||||
@@ -441,8 +392,7 @@ fn create_log_file(
|
||||
conversation_id: ConversationId,
|
||||
) -> std::io::Result<LogFileInfo> {
|
||||
// Resolve ~/.codex/sessions/YYYY/MM/DD and create it if missing.
|
||||
let timestamp = OffsetDateTime::now_local()
|
||||
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
|
||||
let timestamp = OffsetDateTime::now_utc();
|
||||
let mut dir = config.codex_home.clone();
|
||||
dir.push(SESSIONS_SUBDIR);
|
||||
dir.push(timestamp.year().to_string());
|
||||
@@ -452,7 +402,13 @@ fn create_log_file(
|
||||
|
||||
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
|
||||
// compatibility with filesystems that do not allow colons in filenames.
|
||||
let filename = build_rollout_filename(timestamp, conversation_id);
|
||||
let format: &[FormatItem] =
|
||||
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
||||
let date_str = timestamp
|
||||
.format(format)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
|
||||
let filename = format!("rollout-{date_str}-{conversation_id}.jsonl");
|
||||
|
||||
let path = dir.join(filename);
|
||||
let file = std::fs::OpenOptions::new()
|
||||
@@ -464,6 +420,7 @@ fn create_log_file(
|
||||
file,
|
||||
conversation_id,
|
||||
timestamp,
|
||||
path,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -482,7 +439,6 @@ async fn rollout_writer(
|
||||
meta: session_meta,
|
||||
git: git_info,
|
||||
};
|
||||
|
||||
// Write the SessionMeta as the first item in the file
|
||||
writer
|
||||
.write_tagged(TaggedLine::SessionMeta {
|
||||
@@ -503,13 +459,18 @@ async fn rollout_writer(
|
||||
}
|
||||
RolloutCmd::AddEvents(events) => {
|
||||
for event in events {
|
||||
if is_persisted_event_msg(&event.msg) {
|
||||
writer.write_tagged(TaggedLine::Event { event }).await?;
|
||||
}
|
||||
writer.write_tagged(TaggedLine::Event { event }).await?;
|
||||
}
|
||||
}
|
||||
RolloutCmd::UpdateState(state) => {
|
||||
writer.write_tagged(TaggedLine::State { state }).await?;
|
||||
// Sequencing barrier: by the time we handle `Flush`, all previously
|
||||
// queued writes have been applied and flushed to OS buffers.
|
||||
RolloutCmd::Flush { ack } => {
|
||||
let _ = ack.send(());
|
||||
}
|
||||
RolloutCmd::AddSessionMeta(meta) => {
|
||||
writer
|
||||
.write_tagged(TaggedLine::PrevSessionMeta { meta })
|
||||
.await?;
|
||||
}
|
||||
RolloutCmd::Shutdown { ack } => {
|
||||
let _ = ack.send(());
|
||||
@@ -525,19 +486,19 @@ struct JsonlWriter {
|
||||
}
|
||||
|
||||
impl JsonlWriter {
|
||||
async fn write_tagged(&mut self, record: TaggedLine) -> std::io::Result<()> {
|
||||
// RFC3339 UTC, second precision
|
||||
let now = OffsetDateTime::now_utc()
|
||||
.format(RECORD_TS_FMT)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
let line = TimestampedLine {
|
||||
timestamp: now,
|
||||
record,
|
||||
};
|
||||
let mut json = serde_json::to_string(&line)?;
|
||||
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
|
||||
let mut json = serde_json::to_string(item)?;
|
||||
json.push('\n');
|
||||
self.file.write_all(json.as_bytes()).await?;
|
||||
self.file.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_tagged(&mut self, record: TaggedLine) -> std::io::Result<()> {
|
||||
let timestamp = time::OffsetDateTime::now_utc()
|
||||
.format(&time::format_description::well_known::Rfc3339)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
let line = TimestampedLine { timestamp, record };
|
||||
self.write_line(&line).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,16 +12,11 @@ use time::format_description::FormatItem;
|
||||
use time::macros::format_description;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::rollout::Cursor;
|
||||
use crate::rollout::RolloutItem;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::list::ConversationFilter;
|
||||
use crate::rollout::list::ConversationItem;
|
||||
use crate::rollout::list::ConversationsPage;
|
||||
use crate::rollout::list::Cursor;
|
||||
use crate::rollout::list::get_conversation;
|
||||
use crate::rollout::list::get_conversations;
|
||||
use crate::rollout::list::get_conversations_filtered;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
fn write_session_file(
|
||||
root: &Path,
|
||||
@@ -46,11 +41,13 @@ fn write_session_file(
|
||||
let mut file = File::create(file_path)?;
|
||||
|
||||
let meta = serde_json::json!({
|
||||
"record_type": "session_meta",
|
||||
"timestamp": ts_str,
|
||||
"id": uuid.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
"cwd": "/",
|
||||
"originator": "test",
|
||||
"cli_version": "0.0.0",
|
||||
"instructions": null
|
||||
});
|
||||
writeln!(file, "{meta}")?;
|
||||
|
||||
@@ -64,127 +61,18 @@ fn write_session_file(
|
||||
Ok((dt, uuid))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_conversations_filtered_by_id() {
|
||||
let temp = TempDir::new().unwrap();
|
||||
let home = temp.path();
|
||||
|
||||
let u1 = Uuid::from_u128(101);
|
||||
let u2 = Uuid::from_u128(202);
|
||||
|
||||
write_session_file(home, "2025-07-01T00-00-00", u1, 1).unwrap();
|
||||
write_session_file(home, "2025-07-02T00-00-00", u2, 1).unwrap();
|
||||
|
||||
let target = u2.to_string();
|
||||
let filter: ConversationFilter = std::sync::Arc::new(move |item: &ConversationItem| {
|
||||
item.head
|
||||
.first()
|
||||
.and_then(|v| v.get("id"))
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s == target)
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
let page = get_conversations_filtered(home, 10, None, &[filter])
|
||||
.await
|
||||
.expect("filtered list ok");
|
||||
assert_eq!(page.items.len(), 1);
|
||||
let file_name = page.items[0]
|
||||
.path
|
||||
.file_name()
|
||||
.unwrap()
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
assert!(file_name.contains(&u2.to_string()));
|
||||
fn expected_session_meta(ts: &str, uuid: Uuid) -> serde_json::Value {
|
||||
serde_json::json!({
|
||||
"record_type": "session_meta",
|
||||
"timestamp": ts,
|
||||
"id": uuid.to_string(),
|
||||
"cwd": "/",
|
||||
"originator": "test",
|
||||
"cli_version": "0.0.0",
|
||||
"instructions": null
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rollout_history_parses_tagged_events_and_responses() {
|
||||
use std::io::Write as _;
|
||||
use uuid::Uuid;
|
||||
|
||||
let temp = TempDir::new().unwrap();
|
||||
let path = temp.path().join("rollout-test.jsonl");
|
||||
let mut f = File::create(&path).unwrap();
|
||||
|
||||
let session_id = Uuid::from_u128(555).to_string();
|
||||
// Tagged session_meta line (new format)
|
||||
writeln!(
|
||||
f,
|
||||
"{}",
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-09-08T16:03:12Z",
|
||||
"record_type": "session_meta",
|
||||
"id": session_id,
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version",
|
||||
"instructions": null
|
||||
})
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Allowed event: AgentMessage
|
||||
writeln!(
|
||||
f,
|
||||
"{}",
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-09-08T16:03:13Z",
|
||||
"record_type": "event",
|
||||
"id": "sub1",
|
||||
"msg": {"type": "agent_message", "message": "hi"}
|
||||
})
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Disallowed (delta) event: should be skipped by policy
|
||||
writeln!(
|
||||
f,
|
||||
"{}",
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-09-08T16:03:14Z",
|
||||
"record_type": "event",
|
||||
"id": "sub1",
|
||||
"msg": {"type": "agent_message_delta", "delta": "x"}
|
||||
})
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Response item (assistant message)
|
||||
writeln!(
|
||||
f,
|
||||
"{}",
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-09-08T16:03:15Z",
|
||||
"record_type": "response",
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "hello"}]
|
||||
})
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let hist = RolloutRecorder::get_rollout_history(&path)
|
||||
.await
|
||||
.expect("history ok");
|
||||
|
||||
let resumed = match hist {
|
||||
crate::conversation_manager::InitialHistory::Resumed(r) => r,
|
||||
_ => panic!("expected resumed history"),
|
||||
};
|
||||
|
||||
// We expect exactly 2 items: one event (allowed) and one response
|
||||
let kinds: Vec<&'static str> = resumed
|
||||
.history
|
||||
.iter()
|
||||
.map(|it| match it {
|
||||
RolloutItem::Event(_) => "event",
|
||||
RolloutItem::ResponseItem(_) => "response",
|
||||
RolloutItem::SessionMeta(_) => "meta",
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(kinds, vec!["event", "response"]);
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn test_list_conversations_latest_first() {
|
||||
let temp = TempDir::new().unwrap();
|
||||
@@ -223,37 +111,19 @@ async fn test_list_conversations_latest_first() {
|
||||
.join(format!("rollout-2025-01-01T12-00-00-{u1}.jsonl"));
|
||||
|
||||
let head_3 = vec![
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-01-03T12-00-00",
|
||||
"id": u3.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
expected_session_meta("2025-01-03T12-00-00", u3),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
serde_json::json!({"record_type": "response", "index": 1}),
|
||||
serde_json::json!({"record_type": "response", "index": 2}),
|
||||
];
|
||||
let head_2 = vec![
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-01-02T12-00-00",
|
||||
"id": u2.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
expected_session_meta("2025-01-02T12-00-00", u2),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
serde_json::json!({"record_type": "response", "index": 1}),
|
||||
serde_json::json!({"record_type": "response", "index": 2}),
|
||||
];
|
||||
let head_1 = vec![
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-01-01T12-00-00",
|
||||
"id": u1.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
expected_session_meta("2025-01-01T12-00-00", u1),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
serde_json::json!({"record_type": "response", "index": 1}),
|
||||
serde_json::json!({"record_type": "response", "index": 2}),
|
||||
@@ -318,23 +188,11 @@ async fn test_pagination_cursor() {
|
||||
.join("04")
|
||||
.join(format!("rollout-2025-03-04T09-00-00-{u4}.jsonl"));
|
||||
let head_5 = vec![
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-03-05T09-00-00",
|
||||
"id": u5.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
expected_session_meta("2025-03-05T09-00-00", u5),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
];
|
||||
let head_4 = vec![
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-03-04T09-00-00",
|
||||
"id": u4.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
expected_session_meta("2025-03-04T09-00-00", u4),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
];
|
||||
let expected_cursor1: Cursor =
|
||||
@@ -372,23 +230,11 @@ async fn test_pagination_cursor() {
|
||||
.join("02")
|
||||
.join(format!("rollout-2025-03-02T09-00-00-{u2}.jsonl"));
|
||||
let head_3 = vec![
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-03-03T09-00-00",
|
||||
"id": u3.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
expected_session_meta("2025-03-03T09-00-00", u3),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
];
|
||||
let head_2 = vec![
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-03-02T09-00-00",
|
||||
"id": u2.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
expected_session_meta("2025-03-02T09-00-00", u2),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
];
|
||||
let expected_cursor2: Cursor =
|
||||
@@ -420,13 +266,7 @@ async fn test_pagination_cursor() {
|
||||
.join("01")
|
||||
.join(format!("rollout-2025-03-01T09-00-00-{u1}.jsonl"));
|
||||
let head_1 = vec![
|
||||
serde_json::json!({
|
||||
"timestamp": "2025-03-01T09-00-00",
|
||||
"id": u1.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
expected_session_meta("2025-03-01T09-00-00", u1),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
];
|
||||
let expected_cursor3: Cursor =
|
||||
@@ -465,13 +305,7 @@ async fn test_get_conversation_contents() {
|
||||
.join("01")
|
||||
.join(format!("rollout-2025-04-01T10-30-00-{uuid}.jsonl"));
|
||||
let expected_head = vec![
|
||||
serde_json::json!({
|
||||
"timestamp": ts,
|
||||
"id": uuid.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
}),
|
||||
expected_session_meta(ts, uuid),
|
||||
serde_json::json!({"record_type": "response", "index": 0}),
|
||||
serde_json::json!({"record_type": "response", "index": 1}),
|
||||
];
|
||||
@@ -488,13 +322,7 @@ async fn test_get_conversation_contents() {
|
||||
assert_eq!(page, expected_page);
|
||||
|
||||
// Entire file contents equality
|
||||
let meta = serde_json::json!({
|
||||
"timestamp": ts,
|
||||
"id": uuid.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
});
|
||||
let meta = expected_session_meta(ts, uuid);
|
||||
let rec0 = serde_json::json!({"record_type": "response", "index": 0});
|
||||
let rec1 = serde_json::json!({"record_type": "response", "index": 1});
|
||||
let expected_content = format!("{meta}\n{rec0}\n{rec1}\n");
|
||||
@@ -529,15 +357,7 @@ async fn test_stable_ordering_same_second_pagination() {
|
||||
.join("07")
|
||||
.join("01")
|
||||
.join(format!("rollout-2025-07-01T00-00-00-{u2}.jsonl"));
|
||||
let head = |u: Uuid| -> Vec<serde_json::Value> {
|
||||
vec![serde_json::json!({
|
||||
"timestamp": ts,
|
||||
"id": u.to_string(),
|
||||
"cwd": "/tmp",
|
||||
"originator": "codex_cli_rs",
|
||||
"cli_version": "test-version"
|
||||
})]
|
||||
};
|
||||
let head = |u: Uuid| -> Vec<serde_json::Value> { vec![expected_session_meta(ts, u)] };
|
||||
let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap();
|
||||
let expected_page1 = ConversationsPage {
|
||||
items: vec![
|
||||
|
||||
@@ -69,8 +69,3 @@
|
||||
; Added on top of Chrome profile
|
||||
; Needed for python multiprocessing on MacOS for the SemLock
|
||||
(allow ipc-posix-sem)
|
||||
|
||||
; needed to look up user info, see https://crbug.com/792228
|
||||
(allow mach-lookup
|
||||
(global-name "com.apple.system.opendirectoryd.libinfo")
|
||||
)
|
||||
|
||||
@@ -4,9 +4,12 @@ use codex_core::ModelProviderInfo;
|
||||
use codex_core::NewConversation;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::built_in_model_providers;
|
||||
use codex_core::protocol::AgentMessageEvent;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::InputItem;
|
||||
use codex_core::protocol::InputMessageKind;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::UserMessageEvent;
|
||||
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||
use codex_protocol::mcp_protocol::AuthMode;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
@@ -126,7 +129,14 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
|
||||
writeln!(
|
||||
f,
|
||||
"{}",
|
||||
json!({"meta":"test","instructions":"be nice", "id": Uuid::new_v4(), "timestamp": "2024-01-01T00:00:00Z"})
|
||||
json!({
|
||||
"record_type": "session_meta",
|
||||
"id": Uuid::new_v4(),
|
||||
"timestamp": "2024-01-01T00:00:00Z",
|
||||
"cwd": tmpdir.path().to_string_lossy(),
|
||||
"originator": "test",
|
||||
"cli_version": "0.0.0-test"
|
||||
})
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -138,7 +148,30 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
|
||||
text: "resumed user message".to_string(),
|
||||
}],
|
||||
};
|
||||
writeln!(f, "{}", serde_json::to_string(&prior_user).unwrap()).unwrap();
|
||||
let mut prior_user_obj = serde_json::to_value(&prior_user)
|
||||
.unwrap()
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.clone();
|
||||
prior_user_obj.insert("record_type".to_string(), serde_json::json!("response"));
|
||||
prior_user_obj.insert(
|
||||
"timestamp".to_string(),
|
||||
serde_json::json!("2025-01-01T00:00:00Z"),
|
||||
);
|
||||
writeln!(f, "{}", serde_json::Value::Object(prior_user_obj)).unwrap();
|
||||
|
||||
// Also include a matching user message event to preserve ordering at resume
|
||||
let prior_user_event = EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "resumed user message".to_string(),
|
||||
kind: Some(InputMessageKind::Plain),
|
||||
});
|
||||
let prior_user_event_line = serde_json::json!({
|
||||
"timestamp": "2025-01-01T00:00:00Z",
|
||||
"record_type": "event",
|
||||
"id": "resume-0",
|
||||
"msg": prior_user_event,
|
||||
});
|
||||
writeln!(f, "{prior_user_event_line}").unwrap();
|
||||
|
||||
// Prior item: system message (excluded from API history)
|
||||
let prior_system = codex_protocol::models::ResponseItem::Message {
|
||||
@@ -148,7 +181,17 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
|
||||
text: "resumed system instruction".to_string(),
|
||||
}],
|
||||
};
|
||||
writeln!(f, "{}", serde_json::to_string(&prior_system).unwrap()).unwrap();
|
||||
let mut prior_system_obj = serde_json::to_value(&prior_system)
|
||||
.unwrap()
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.clone();
|
||||
prior_system_obj.insert("record_type".to_string(), serde_json::json!("response"));
|
||||
prior_system_obj.insert(
|
||||
"timestamp".to_string(),
|
||||
serde_json::json!("2025-01-01T00:00:00Z"),
|
||||
);
|
||||
writeln!(f, "{}", serde_json::Value::Object(prior_system_obj)).unwrap();
|
||||
|
||||
// Prior item: assistant message
|
||||
let prior_item = codex_protocol::models::ResponseItem::Message {
|
||||
@@ -158,7 +201,27 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
|
||||
text: "resumed assistant message".to_string(),
|
||||
}],
|
||||
};
|
||||
writeln!(f, "{}", serde_json::to_string(&prior_item).unwrap()).unwrap();
|
||||
let mut prior_item_obj = serde_json::to_value(&prior_item)
|
||||
.unwrap()
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.clone();
|
||||
prior_item_obj.insert("record_type".to_string(), serde_json::json!("response"));
|
||||
prior_item_obj.insert(
|
||||
"timestamp".to_string(),
|
||||
serde_json::json!("2025-01-01T00:00:00Z"),
|
||||
);
|
||||
writeln!(f, "{}", serde_json::Value::Object(prior_item_obj)).unwrap();
|
||||
let prior_item_event = EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "resumed assistant message".to_string(),
|
||||
});
|
||||
let prior_event_line = serde_json::json!({
|
||||
"timestamp": "2025-01-01T00:00:00Z",
|
||||
"record_type": "event",
|
||||
"id": "resume-1",
|
||||
"msg": prior_item_event,
|
||||
});
|
||||
writeln!(f, "{prior_event_line}").unwrap();
|
||||
drop(f);
|
||||
|
||||
// Mock server that will receive the resumed request
|
||||
|
||||
@@ -3,10 +3,11 @@ use codex_core::ConversationManager;
|
||||
use codex_core::ModelProviderInfo;
|
||||
use codex_core::NewConversation;
|
||||
use codex_core::built_in_model_providers;
|
||||
use codex_core::protocol::ConversationHistoryResponseEvent;
|
||||
use codex_core::protocol::ConversationPathResponseEvent;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::InputItem;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use core_test_support::wait_for_event;
|
||||
use tempfile::TempDir;
|
||||
@@ -72,17 +73,34 @@ async fn fork_conversation_twice_drops_to_first_message() {
|
||||
}
|
||||
|
||||
// Request history from the base conversation.
|
||||
codex.submit(Op::GetHistory).await.unwrap();
|
||||
codex.submit(Op::GetConversationPath).await.unwrap();
|
||||
let base_history =
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await;
|
||||
|
||||
// Capture entries from the base history and compute expected prefixes after each fork.
|
||||
let entries_after_three = match &base_history {
|
||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
|
||||
entries.clone()
|
||||
}
|
||||
// Capture path/id from the base history and compute expected prefixes after each fork.
|
||||
let (base_conv_id, base_path) = match &base_history {
|
||||
EventMsg::ConversationHistory(ConversationPathResponseEvent {
|
||||
conversation_id,
|
||||
path,
|
||||
}) => (*conversation_id, path.clone()),
|
||||
_ => panic!("expected ConversationHistory event"),
|
||||
};
|
||||
|
||||
// Read entries from rollout file.
|
||||
async fn read_response_entries(path: &std::path::Path) -> Vec<ResponseItem> {
|
||||
let text = tokio::fs::read_to_string(path).await.unwrap_or_default();
|
||||
let mut out = Vec::new();
|
||||
for line in text.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
if let Ok(item) = serde_json::from_str::<ResponseItem>(line) {
|
||||
out.push(item);
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
let entries_after_three: Vec<ResponseItem> = read_response_entries(&base_path).await;
|
||||
// History layout for this test:
|
||||
// [0] user instructions,
|
||||
// [1] environment context,
|
||||
@@ -113,42 +131,46 @@ async fn fork_conversation_twice_drops_to_first_message() {
|
||||
conversation: codex_fork1,
|
||||
..
|
||||
} = conversation_manager
|
||||
.fork_conversation(entries_after_three.clone(), 1, config_for_fork.clone())
|
||||
.fork_conversation(base_path.clone(), base_conv_id, 1, config_for_fork.clone())
|
||||
.await
|
||||
.expect("fork 1");
|
||||
|
||||
codex_fork1.submit(Op::GetHistory).await.unwrap();
|
||||
codex_fork1.submit(Op::GetConversationPath).await.unwrap();
|
||||
let fork1_history = wait_for_event(&codex_fork1, |ev| {
|
||||
matches!(ev, EventMsg::ConversationHistory(_))
|
||||
})
|
||||
.await;
|
||||
let entries_after_first_fork = match &fork1_history {
|
||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
|
||||
assert!(matches!(
|
||||
fork1_history,
|
||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_first
|
||||
));
|
||||
entries.clone()
|
||||
}
|
||||
let (fork1_id, fork1_path) = match &fork1_history {
|
||||
EventMsg::ConversationHistory(ConversationPathResponseEvent {
|
||||
conversation_id,
|
||||
path,
|
||||
}) => (*conversation_id, path.clone()),
|
||||
_ => panic!("expected ConversationHistory event after first fork"),
|
||||
};
|
||||
let entries_after_first_fork: Vec<ResponseItem> = read_response_entries(&fork1_path).await;
|
||||
assert_eq!(entries_after_first_fork, expected_after_first);
|
||||
|
||||
// Fork again with n=1 → drops the (new) last user message, leaving only the first.
|
||||
let NewConversation {
|
||||
conversation: codex_fork2,
|
||||
..
|
||||
} = conversation_manager
|
||||
.fork_conversation(entries_after_first_fork.clone(), 1, config_for_fork.clone())
|
||||
.fork_conversation(fork1_path.clone(), fork1_id, 1, config_for_fork.clone())
|
||||
.await
|
||||
.expect("fork 2");
|
||||
|
||||
codex_fork2.submit(Op::GetHistory).await.unwrap();
|
||||
codex_fork2.submit(Op::GetConversationPath).await.unwrap();
|
||||
let fork2_history = wait_for_event(&codex_fork2, |ev| {
|
||||
matches!(ev, EventMsg::ConversationHistory(_))
|
||||
})
|
||||
.await;
|
||||
assert!(matches!(
|
||||
fork2_history,
|
||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_second
|
||||
));
|
||||
let (_fork2_id, fork2_path) = match &fork2_history {
|
||||
EventMsg::ConversationHistory(ConversationPathResponseEvent {
|
||||
conversation_id,
|
||||
path,
|
||||
}) => (*conversation_id, path.clone()),
|
||||
_ => panic!("expected ConversationHistory event after second fork"),
|
||||
};
|
||||
let entries_after_second_fork: Vec<ResponseItem> = read_response_entries(&fork2_path).await;
|
||||
assert_eq!(entries_after_second_fork, expected_after_second);
|
||||
}
|
||||
|
||||
@@ -159,41 +159,6 @@ async fn read_only_forbids_all_writes() {
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Verify that user lookups via `pwd.getpwuid(os.getuid())` work under the
|
||||
/// seatbelt sandbox. Prior to allowing the necessary mach‑lookup for
|
||||
/// OpenDirectory libinfo, this would fail with `KeyError: getpwuid(): uid not found`.
|
||||
#[tokio::test]
|
||||
async fn python_getpwuid_works_under_seatbelt() {
|
||||
if std::env::var(CODEX_SANDBOX_ENV_VAR) == Ok("seatbelt".to_string()) {
|
||||
eprintln!("{CODEX_SANDBOX_ENV_VAR} is set to 'seatbelt', skipping test.");
|
||||
return;
|
||||
}
|
||||
|
||||
// ReadOnly is sufficient here since we are only exercising user lookup.
|
||||
let policy = SandboxPolicy::ReadOnly;
|
||||
|
||||
let mut child = spawn_command_under_seatbelt(
|
||||
vec![
|
||||
"python3".to_string(),
|
||||
"-c".to_string(),
|
||||
// Print the passwd struct; success implies lookup worked.
|
||||
"import pwd, os; print(pwd.getpwuid(os.getuid()))".to_string(),
|
||||
],
|
||||
&policy,
|
||||
std::env::current_dir().expect("should be able to get current dir"),
|
||||
StdioPolicy::RedirectForShellTool,
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.expect("should be able to spawn python under seatbelt");
|
||||
|
||||
let status = child
|
||||
.wait()
|
||||
.await
|
||||
.expect("should be able to wait for child process");
|
||||
assert!(status.success(), "python exited with {status:?}");
|
||||
}
|
||||
|
||||
#[expect(clippy::expect_used)]
|
||||
fn create_test_scenario(tmp: &TempDir) -> TestScenario {
|
||||
let repo_parent = tmp.path().to_path_buf();
|
||||
|
||||
@@ -156,8 +156,17 @@ fn create_fake_rollout(codex_home: &Path, filename_ts: &str, meta_rfc3339: &str,
|
||||
|
||||
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
|
||||
let mut lines = Vec::new();
|
||||
// Meta line with timestamp
|
||||
lines.push(json!({"timestamp": meta_rfc3339, "id": uuid}).to_string());
|
||||
lines.push(
|
||||
json!({
|
||||
"record_type": "session_meta",
|
||||
"id": uuid,
|
||||
"timestamp": meta_rfc3339,
|
||||
"cwd": codex_home.to_string_lossy(),
|
||||
"originator": "test",
|
||||
"cli_version": "0.0.0-test"
|
||||
})
|
||||
.to_string(),
|
||||
);
|
||||
// Minimal user message entry as a persisted response item
|
||||
lines.push(
|
||||
json!({
|
||||
|
||||
@@ -15,7 +15,6 @@ use crate::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use crate::custom_prompts::CustomPrompt;
|
||||
use crate::mcp_protocol::ConversationId;
|
||||
use crate::message_history::HistoryEntry;
|
||||
use crate::models::ResponseItem;
|
||||
use crate::num_format::format_with_separators;
|
||||
use crate::parse_command::ParsedCommand;
|
||||
use crate::plan_tool::UpdatePlanArgs;
|
||||
@@ -149,7 +148,7 @@ pub enum Op {
|
||||
|
||||
/// Request the full in-memory conversation transcript for the current session.
|
||||
/// Reply is delivered via `EventMsg::ConversationHistory`.
|
||||
GetHistory,
|
||||
GetConversationPath,
|
||||
|
||||
/// Request the list of MCP tools available across all configured servers.
|
||||
/// Reply is delivered via `EventMsg::McpListToolsResponse`.
|
||||
@@ -425,7 +424,7 @@ pub enum EventMsg {
|
||||
/// Agent text output message
|
||||
AgentMessage(AgentMessageEvent),
|
||||
|
||||
/// User/system input message (what was sent to the model)
|
||||
/// User/system input message (what was sent to the model).
|
||||
UserMessage(UserMessageEvent),
|
||||
|
||||
/// Agent text output delta message
|
||||
@@ -499,7 +498,7 @@ pub enum EventMsg {
|
||||
/// Notification that the agent is shutting down.
|
||||
ShutdownComplete,
|
||||
|
||||
ConversationHistory(ConversationHistoryResponseEvent),
|
||||
ConversationHistory(ConversationPathResponseEvent),
|
||||
}
|
||||
|
||||
// Individual event payload types matching each `EventMsg` variant.
|
||||
@@ -799,9 +798,9 @@ pub struct WebSearchEndEvent {
|
||||
/// Response payload for `Op::GetHistory` containing the current session's
|
||||
/// in-memory transcript.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
||||
pub struct ConversationHistoryResponseEvent {
|
||||
pub struct ConversationPathResponseEvent {
|
||||
pub conversation_id: ConversationId,
|
||||
pub entries: Vec<ResponseItem>,
|
||||
pub path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::backtrack_helpers;
|
||||
use crate::pager_overlay::Overlay;
|
||||
use crate::tui;
|
||||
use crate::tui::TuiEvent;
|
||||
use codex_core::protocol::ConversationHistoryResponseEvent;
|
||||
use codex_core::protocol::ConversationPathResponseEvent;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use color_eyre::eyre::Result;
|
||||
use crossterm::event::KeyCode;
|
||||
@@ -98,7 +98,7 @@ impl App {
|
||||
) {
|
||||
self.backtrack.pending = Some((base_id, drop_last_messages, prefill));
|
||||
self.app_event_tx.send(crate::app_event::AppEvent::CodexOp(
|
||||
codex_core::protocol::Op::GetHistory,
|
||||
codex_core::protocol::Op::GetConversationPath,
|
||||
));
|
||||
}
|
||||
|
||||
@@ -265,7 +265,7 @@ impl App {
|
||||
pub(crate) async fn on_conversation_history_for_backtrack(
|
||||
&mut self,
|
||||
tui: &mut tui::Tui,
|
||||
ev: ConversationHistoryResponseEvent,
|
||||
ev: ConversationPathResponseEvent,
|
||||
) -> Result<()> {
|
||||
if let Some((base_id, _, _)) = self.backtrack.pending.as_ref()
|
||||
&& ev.conversation_id == *base_id
|
||||
@@ -281,15 +281,16 @@ impl App {
|
||||
async fn fork_and_switch_to_new_conversation(
|
||||
&mut self,
|
||||
tui: &mut tui::Tui,
|
||||
ev: ConversationHistoryResponseEvent,
|
||||
ev: ConversationPathResponseEvent,
|
||||
drop_count: usize,
|
||||
prefill: String,
|
||||
) {
|
||||
let cfg = self.chat_widget.config_ref().clone();
|
||||
// Perform the fork via a thin wrapper for clarity/testability.
|
||||
let result = self
|
||||
.perform_fork(ev.entries.clone(), drop_count, cfg.clone())
|
||||
.perform_fork(ev.path.clone(), ev.conversation_id, drop_count, cfg.clone())
|
||||
.await;
|
||||
// We aren't using the initial history UI replay in session configured because we have more accurate version of the history.
|
||||
match result {
|
||||
Ok(new_conv) => {
|
||||
self.install_forked_conversation(tui, cfg, new_conv, drop_count, &prefill)
|
||||
@@ -301,12 +302,13 @@ impl App {
|
||||
/// Thin wrapper around ConversationManager::fork_conversation.
|
||||
async fn perform_fork(
|
||||
&self,
|
||||
entries: Vec<codex_protocol::models::ResponseItem>,
|
||||
conversation_path: std::path::PathBuf,
|
||||
conversation_id: codex_protocol::mcp_protocol::ConversationId,
|
||||
drop_count: usize,
|
||||
cfg: codex_core::config::Config,
|
||||
) -> codex_core::error::Result<codex_core::NewConversation> {
|
||||
self.server
|
||||
.fork_conversation(entries, drop_count, cfg)
|
||||
.fork_conversation(conversation_path, conversation_id, drop_count, cfg)
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use codex_core::protocol::ConversationHistoryResponseEvent;
|
||||
use codex_core::protocol::ConversationPathResponseEvent;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_file_search::FileMatch;
|
||||
|
||||
@@ -58,5 +58,5 @@ pub(crate) enum AppEvent {
|
||||
UpdateSandboxPolicy(SandboxPolicy),
|
||||
|
||||
/// Forwarded conversation history snapshot from the current conversation.
|
||||
ConversationHistory(ConversationHistoryResponseEvent),
|
||||
ConversationHistory(ConversationPathResponseEvent),
|
||||
}
|
||||
|
||||
@@ -6,12 +6,9 @@ Codex supports several mechanisms for setting config values:
|
||||
- Config-specific command-line flags, such as `--model o3` (highest precedence).
|
||||
- A generic `-c`/`--config` flag that takes a `key=value` pair, such as `--config model="o3"`.
|
||||
- The key can contain dots to set a value deeper than the root, e.g. `--config model_providers.openai.wire_api="chat"`.
|
||||
- For consistency with `config.toml`, values are a string in TOML format rather than JSON format, so use `key='{a = 1, b = 2}'` rather than `key='{"a": 1, "b": 2}'`.
|
||||
- The quotes around the value are necessary, as without them your shell would split the config argument on spaces, resulting in `codex` receiving `-c key={a` with (invalid) additional arguments `=`, `1,`, `b`, `=`, `2}`.
|
||||
- Values can contain any TOML object, such as `--config shell_environment_policy.include_only='["PATH", "HOME", "USER"]'`.
|
||||
- If `value` cannot be parsed as a valid TOML value, it is treated as a string value. This means that `-c model='"o3"'` and `-c model=o3` are equivalent.
|
||||
- In the first case, the value is the TOML string `"o3"`, while in the second the value is `o3`, which is not valid TOML and therefore treated as the TOML string `"o3"`.
|
||||
- Because quotes are interpreted by one's shell, `-c key="true"` will be correctly interpreted in TOML as `key = true` (a boolean) and not `key = "true"` (a string). If for some reason you needed the string `"true"`, you would need to use `-c key='"true"'` (note the two sets of quotes).
|
||||
- Values can contain objects, such as `--config shell_environment_policy.include_only=["PATH", "HOME", "USER"]`.
|
||||
- For consistency with `config.toml`, values are in TOML format rather than JSON format, so use `{a = 1, b = 2}` rather than `{"a": 1, "b": 2}`.
|
||||
- If `value` cannot be parsed as a valid TOML value, it is treated as a string value. This means that both `-c model="o3"` and `-c model=o3` are equivalent.
|
||||
- The `$CODEX_HOME/config.toml` configuration file where the `CODEX_HOME` environment value defaults to `~/.codex`. (Note `CODEX_HOME` will also be where logs and other Codex-related information are stored.)
|
||||
|
||||
Both the `--config` flag and the `config.toml` file support the following options:
|
||||
|
||||
Reference in New Issue
Block a user