Compare commits

..

9 Commits

Author SHA1 Message Date
Ahmed Ibrahim
381516ee3e timestamp 2025-09-09 07:51:14 -07:00
Ahmed Ibrahim
dfaa3fb242 timestamp 2025-09-08 20:19:28 -07:00
Ahmed Ibrahim
28c57aded3 timestamp 2025-09-08 18:48:03 -07:00
Ahmed Ibrahim
5e41303911 timestamp 2025-09-08 18:47:39 -07:00
Ahmed Ibrahim
bcf855fe1f timestamp 2025-09-08 18:42:08 -07:00
Ahmed Ibrahim
617b1c1ea4 timestamp 2025-09-08 18:40:38 -07:00
Ahmed Ibrahim
f94e1e3298 timestamp 2025-09-08 18:38:19 -07:00
Jeremy Rose
ac58749bd3 allow mach-lookup for com.apple.system.opendirectoryd.libinfo (#3334)
in the base sandbox policy. this is [allowed in Chrome
renderers](https://source.chromium.org/chromium/chromium/src/+/main:sandbox/policy/mac/common.sb;l=266;drc=7afa0043cfcddb3ef9dafe5acbfc01c2f7e7df01),
so I feel it's fairly safe.
2025-09-08 16:28:52 -07:00
Robert
79cbd2ab1b Improve explanation of how the shell handles quotes in config.md (#3169)
* Clarify how the shell's handling of quotes affects the interpretation
of TOML values in `--config`/`-c`
* Provide examples of the right way to pass complex TOML values
* The previous explanation incorrectly demonstrated how to pass TOML
values to `--config`/`-c` (misunderstanding how the shell’s handling of
quotes affects things) and would result in invalid invocations of
`codex`.
2025-09-08 15:58:25 -07:00
18 changed files with 927 additions and 708 deletions

View File

@@ -10,15 +10,13 @@ use std::time::Duration;
use crate::AuthManager;
use crate::event_mapping::map_response_item_to_event_messages;
use crate::rollout::RolloutItem;
use crate::rollout::recorder::RolloutItemSliceExt;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_apply_patch::MaybeApplyPatchVerified;
use codex_apply_patch::maybe_parse_apply_patch_verified;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ConversationHistoryResponseEvent;
use codex_protocol::protocol::TaskStartedEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
@@ -105,6 +103,8 @@ use crate::protocol::TokenUsageInfo;
use crate::protocol::TurnDiffEvent;
use crate::protocol::WebSearchBeginEvent;
use crate::rollout::RolloutRecorder;
use crate::rollout::RolloutRecorderParams;
use crate::rollout::recorder::RolloutItemSliceExt;
use crate::safety::SafetyCheck;
use crate::safety::assess_command_safety;
use crate::safety::assess_safety_for_untrusted_command;
@@ -204,6 +204,9 @@ impl Codex {
error!("Failed to create session: {e:#}");
CodexErr::InternalAgentDied
})?;
session
.record_initial_history(&turn_context, conversation_history)
.await;
let conversation_id = session.conversation_id;
// This task will run until Op::Shutdown is received.
@@ -377,9 +380,18 @@ impl Session {
return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}"));
}
let conversation_id = match &initial_history {
InitialHistory::New | InitialHistory::Forked(_) => ConversationId::default(),
InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id,
let (conversation_id, rollout_params) = match &initial_history {
InitialHistory::New | InitialHistory::Forked(_) => {
let conversation_id = ConversationId::default();
(
conversation_id,
RolloutRecorderParams::new(conversation_id, user_instructions.clone()),
)
}
InitialHistory::Resumed(resumed_history) => (
resumed_history.conversation_id,
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
),
};
// Error messages to dispatch after SessionConfigured is sent.
@@ -391,7 +403,7 @@ impl Session {
// - spin up MCP connection manager
// - perform default shell discovery
// - load history metadata
let rollout_fut = RolloutRecorder::new(&config, conversation_id, user_instructions.clone());
let rollout_fut = RolloutRecorder::new(&config, rollout_params);
let mcp_fut = McpConnectionManager::new(config.mcp_servers.clone());
let default_shell_fut = shell::default_user_shell();
@@ -481,10 +493,13 @@ impl Session {
// Dispatch the SessionConfiguredEvent first and then report any errors.
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
let initial_messages = Some(
sess.apply_initial_history(&turn_context, initial_history.clone())
.await,
);
let initial_messages = match &initial_history {
InitialHistory::New => None,
InitialHistory::Forked(items) => Some(sess.build_initial_messages(items)),
InitialHistory::Resumed(resumed_history) => {
Some(sess.build_initial_messages(&resumed_history.history))
}
};
let events = std::iter::once(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
@@ -521,31 +536,30 @@ impl Session {
}
}
async fn apply_initial_history(
async fn record_initial_history(
&self,
turn_context: &TurnContext,
conversation_history: InitialHistory,
) -> Vec<EventMsg> {
) {
match conversation_history {
InitialHistory::New => self.record_initial_history_new(turn_context).await,
InitialHistory::New => {
self.record_initial_history_new(turn_context).await;
}
InitialHistory::Forked(items) => {
self.record_conversation_items_internal(&items, true).await;
items
.into_iter()
.flat_map(|ri| {
map_response_item_to_event_messages(&ri, self.show_raw_agent_reasoning)
})
.filter(|m| matches!(m, EventMsg::UserMessage(_)))
.collect()
self.record_initial_history_from_rollout_items(turn_context, items)
.await;
}
InitialHistory::Resumed(resumed_history) => {
self.record_initial_history_resumed(resumed_history.history)
.await
self.record_initial_history_from_rollout_items(
turn_context,
resumed_history.history,
)
.await;
}
}
}
async fn record_initial_history_new(&self, turn_context: &TurnContext) -> Vec<EventMsg> {
async fn record_initial_history_new(&self, turn_context: &TurnContext) {
// record the initial user instructions and environment context,
// regardless of whether we restored items.
// TODO: Those items shouldn't be "user messages" IMO. Maybe developer messages.
@@ -559,45 +573,54 @@ impl Session {
Some(turn_context.sandbox_policy.clone()),
Some(self.user_shell.clone()),
)));
for item in conversation_items {
self.record_conversation_item(item).await;
self.record_conversation_items(turn_context, &conversation_items)
.await;
}
async fn record_initial_history_from_rollout_items(
&self,
turn_context: &TurnContext,
items: Vec<crate::rollout::RolloutItem>,
) {
use crate::rollout::recorder::RolloutItemSliceExt as _;
let response_items = items.get_response_items();
self.record_conversation_items_internal(turn_context, &response_items, false)
.await;
}
/// Build the initial UI messages vector for SessionConfigured from
/// persisted rollout items. Prefer persisted events when present; fall back
/// to converting response items for legacy rollouts.
fn build_initial_messages(&self, items: &[crate::rollout::RolloutItem]) -> Vec<EventMsg> {
let evs = items.get_events();
if !evs.is_empty() {
return evs;
}
vec![]
let responses = items.get_response_items();
responses
.iter()
.flat_map(|item| {
map_response_item_to_event_messages(item, self.show_raw_agent_reasoning)
})
.collect::<Vec<EventMsg>>()
}
async fn record_initial_history_from_items(&self, items: Vec<ResponseItem>) {
self.record_conversation_items_internal(&items, false).await;
}
async fn record_initial_history_resumed(&self, items: Vec<RolloutItem>) -> Vec<EventMsg> {
// Record transcript (without persisting again)
let responses: Vec<ResponseItem> = items.as_slice().get_response_items();
if !responses.is_empty() {
self.record_conversation_items_internal(&responses, true)
.await;
}
items.as_slice().get_events()
}
/// Sends the given event to the client and records it to the rollout (if enabled).
/// Any send/record errors are logged and swallowed.
/// Sends an event to the client and records it to the rollout (if enabled).
pub(crate) async fn send_event(&self, event: Event) {
let event_to_record = event.clone();
if let Err(e) = self.tx_event.send(event).await {
error!("failed to send event: {e}");
}
// Clone recorder handle outside the lock scope to avoid holding the
// mutex guard across an await point.
let recorder = {
let guard = self.rollout.lock_unchecked();
guard.as_ref().cloned()
};
if let Some(rec) = recorder
&& let Err(e) = rec
.record_items(crate::rollout::RolloutItem::Event(event_to_record))
.await
&& let Err(e) = rec.record_events(std::slice::from_ref(&event)).await
{
error!("failed to record rollout event: {e:#}");
}
if let Err(e) = self.tx_event.send(event).await {
error!("failed to send tool call event: {e}");
}
}
pub async fn request_command_approval(
@@ -686,59 +709,70 @@ impl Session {
/// Records items to both the rollout and the chat completions/ZDR
/// transcript, if enabled.
async fn record_conversation_items(&self, items: &[ResponseItem]) {
self.record_conversation_items_internal(items, true).await;
async fn record_conversation_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
self.record_conversation_items_internal(turn_context, items, true)
.await;
}
async fn record_conversation_item(&self, item: ResponseItem) {
let items = [item];
self.record_conversation_items_internal(&items, true).await;
}
async fn record_conversation_items_internal(&self, items: &[ResponseItem], persist: bool) {
async fn record_conversation_items_internal(
&self,
turn_context: &TurnContext,
items: &[ResponseItem],
persist: bool,
) {
debug!("Recording items for conversation: {items:?}");
if persist {
// Record snapshot of these items into rollout
for item in items {
self.record_state_snapshot(RolloutItem::ResponseItem(item.clone()))
.await;
}
self.record_state_snapshot(turn_context, items).await;
}
self.state.lock_unchecked().history.record_items(items);
}
async fn record_state_snapshot(&self, item: RolloutItem) {
async fn record_state_snapshot(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
let tc_snapshot = crate::rollout::recorder::TurnContextSnapshot {
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
sandbox_policy: turn_context.sandbox_policy.clone(),
model: turn_context.client.get_model(),
show_raw_agent_reasoning: self.show_raw_agent_reasoning,
};
let snapshot = crate::rollout::SessionStateSnapshot {
turn_context: tc_snapshot,
};
let recorder = {
let guard = self.rollout.lock_unchecked();
guard.as_ref().cloned()
};
if let Some(rec) = recorder
&& let Err(e) = rec.record_items(item).await
{
error!("failed to record rollout items: {e:#}");
}
}
if let Some(rec) = recorder {
if let Err(e) = rec.record_state(snapshot).await {
error!("failed to record rollout state: {e:#}");
}
if let Err(e) = rec.record_response_items(items).await {
error!("failed to record rollout items: {e:#}");
}
/// Records a user input into conversation history AND a corresponding UserMessage event in rollout.
/// Does not send events to the UI.
async fn record_user_input(&self, sub_id: &str, response_item: ResponseItem) {
// Record the message/tool input in conversation history/rollout state
self.record_conversation_item(response_item.clone()).await;
// Derive and record a UserMessage event alongside it in the rollout
let user_events =
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning)
.into_iter()
.filter(|m| matches!(m, EventMsg::UserMessage(_)));
for msg in user_events {
let event = Event {
id: sub_id.to_string(),
msg,
};
self.record_state_snapshot(RolloutItem::Event(event)).await;
// Also persist user input as EventMsg::UserMessage so resuming can
// reconstruct an event stream without relying on mapping.
// Only keep user messages; avoid duplicating assistant events which
// are recorded when emitted during streaming.
let derived_user_events: Vec<crate::protocol::Event> = items
.iter()
.flat_map(|it| {
map_response_item_to_event_messages(it, self.show_raw_agent_reasoning)
})
.filter(|ev| matches!(ev, EventMsg::UserMessage(_)))
.map(|msg| crate::protocol::Event {
id: String::new(),
msg,
})
.collect();
if !derived_user_events.is_empty() {
if let Err(e) = rec.record_events(&derived_user_events).await {
error!("failed to record derived user events: {e:#}");
}
}
}
}
@@ -906,7 +940,7 @@ impl Session {
message: message.into(),
}),
};
self.send_event(event).await;
let _ = self.tx_event.send(event).await;
}
async fn notify_stream_error(&self, sub_id: &str, message: impl Into<String>) {
@@ -916,7 +950,7 @@ impl Session {
message: message.into(),
}),
};
self.send_event(event).await;
let _ = self.tx_event.send(event).await;
}
/// Build the full turn input by concatenating the current conversation
@@ -1177,13 +1211,16 @@ async fn submission_loop(
// Install the new persistent context for subsequent tasks/turns.
turn_context = Arc::new(new_turn_context);
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
sess.record_conversation_item(ResponseItem::from(EnvironmentContext::new(
cwd,
approval_policy,
sandbox_policy,
// Shell is not configurable from turn to turn
None,
)))
sess.record_conversation_items(
&turn_context,
&[ResponseItem::from(EnvironmentContext::new(
cwd,
approval_policy,
sandbox_policy,
// Shell is not configurable from turn to turn
None,
))],
)
.await;
}
}
@@ -1286,7 +1323,7 @@ async fn submission_loop(
Op::GetHistoryEntryRequest { offset, log_id } => {
let config = config.clone();
let sess_for_spawn = sess.clone();
let sess2 = sess.clone();
let sub_id = sub.id.clone();
tokio::spawn(async move {
@@ -1314,10 +1351,11 @@ async fn submission_loop(
),
};
sess_for_spawn.send_event(event).await;
sess2.send_event(event).await;
});
}
Op::ListMcpTools => {
let sess2 = sess.clone();
let sub_id = sub.id.clone();
// This is a cheap lookup from the connection manager's cache.
@@ -1328,9 +1366,10 @@ async fn submission_loop(
crate::protocol::McpListToolsResponseEvent { tools },
),
};
sess.send_event(event).await;
sess2.send_event(event).await;
}
Op::ListCustomPrompts => {
let sess2 = sess.clone();
let sub_id = sub.id.clone();
let custom_prompts: Vec<CustomPrompt> =
@@ -1346,7 +1385,7 @@ async fn submission_loop(
custom_prompts,
}),
};
sess.send_event(event).await;
sess2.send_event(event).await;
}
Op::Compact => {
// Create a summarization request as user input
@@ -1382,33 +1421,27 @@ async fn submission_loop(
message: "Failed to shutdown rollout recorder".to_string(),
}),
};
sess.send_event(event).await;
if let Err(e) = sess.tx_event.send(event).await {
warn!("failed to send error message: {e:?}");
}
}
let event = Event {
id: sub.id.clone(),
msg: EventMsg::ShutdownComplete,
};
sess.send_event(event).await;
if let Err(e) = sess.tx_event.send(event).await {
warn!("failed to send Shutdown event: {e}");
}
break;
}
Op::GetConversationPath => {
Op::GetHistory => {
let sub_id = sub.id.clone();
// Ensure rollout file is flushed so consumers can read it immediately.
let rec_opt = { sess.rollout.lock_unchecked().as_ref().cloned() };
if let Some(rec) = rec_opt {
let _ = rec.flush().await;
}
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ConversationHistory(ConversationPathResponseEvent {
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
conversation_id: sess.conversation_id,
path: sess
.rollout
.lock_unchecked()
.as_ref()
.map(|r| r.path().to_path_buf())
.unwrap_or_default(),
entries: sess.state.lock_unchecked().history.contents(),
}),
};
sess.send_event(event).await;
@@ -1443,10 +1476,6 @@ async fn run_task(
if input.is_empty() {
return;
}
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
// Record the user's input and corresponding event into the rollout
let user_input_response: ResponseItem = ResponseItem::from(initial_input_for_turn.clone());
sess.record_user_input(&sub_id, user_input_response).await;
let event = Event {
id: sub_id.clone(),
msg: EventMsg::TaskStarted(TaskStartedEvent {
@@ -1455,6 +1484,10 @@ async fn run_task(
};
sess.send_event(event).await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
sess.record_conversation_items(turn_context, &[initial_input_for_turn.clone().into()])
.await;
let mut last_agent_message: Option<String> = None;
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
// many turns, from the perspective of the user, it is a single turn.
@@ -1469,9 +1502,8 @@ async fn run_task(
.into_iter()
.map(ResponseItem::from)
.collect::<Vec<ResponseItem>>();
for item in pending_input.iter() {
sess.record_user_input(&sub_id, item.clone()).await;
}
sess.record_conversation_items(turn_context, &pending_input)
.await;
// Construct the input that we will send to the model. When using the
// Chat completions API (or ZDR clients), the model needs the full
@@ -1598,9 +1630,11 @@ async fn run_task(
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
for item in items_to_record_in_conversation_history.iter().cloned() {
sess.record_conversation_item(item).await;
}
sess.record_conversation_items(
turn_context,
&items_to_record_in_conversation_history,
)
.await;
}
if responses.is_empty() {
@@ -2910,11 +2944,13 @@ async fn drain_to_completed(
info
};
sess.send_event(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await;
sess.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await
.ok();
return Ok(());
}

View File

@@ -12,7 +12,6 @@ use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutItem;
use crate::rollout::RolloutRecorder;
use crate::rollout::recorder::RolloutItemSliceExt;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::models::ResponseItem;
use std::collections::HashMap;
@@ -20,53 +19,18 @@ use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct ResumedHistory {
pub conversation_id: ConversationId,
pub history: Vec<RolloutItem>,
pub rollout_path: PathBuf,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum InitialHistory {
New,
Resumed(ResumedHistory),
Forked(Vec<ResponseItem>),
}
impl PartialEq for InitialHistory {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(InitialHistory::New, InitialHistory::New) => true,
(InitialHistory::Forked(a), InitialHistory::Forked(b)) => a == b,
(InitialHistory::Resumed(_), InitialHistory::Resumed(_)) => true,
_ => false,
}
}
}
impl InitialHistory {
/// Return all response items contained in this initial history.
pub fn get_response_items(&self) -> Vec<ResponseItem> {
match self {
InitialHistory::New => Vec::new(),
InitialHistory::Forked(_) => Vec::new(),
InitialHistory::Resumed(items) => {
<[_] as RolloutItemSliceExt>::get_response_items(items.history.as_slice())
}
}
}
/// Return all events contained in this initial history.
pub fn get_events(&self) -> Vec<crate::protocol::EventMsg> {
match self {
InitialHistory::New => Vec::new(),
InitialHistory::Forked(_) => Vec::new(),
InitialHistory::Resumed(items) => {
<[_] as RolloutItemSliceExt>::get_events(items.history.as_slice())
}
}
}
Forked(Vec<RolloutItem>),
}
/// Represents a newly created Codex conversation, including the first event
@@ -192,43 +156,21 @@ impl ConversationManager {
/// caller's `config`). The new conversation will have a fresh id.
pub async fn fork_conversation(
&self,
base_rollout_path: PathBuf,
_base_conversation_id: ConversationId,
conversation_history: Vec<ResponseItem>,
num_messages_to_drop: usize,
config: Config,
) -> CodexResult<NewConversation> {
// Read prior responses from the rollout file (tolerate both tagged and legacy formats).
let text = tokio::fs::read_to_string(&base_rollout_path)
.await
.map_err(|e| CodexErr::Io(std::io::Error::other(format!("read rollout: {e}"))))?;
let mut responses: Vec<ResponseItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
// Only consider response items (legacy lines have no record_type)
match v.get("record_type").and_then(|s| s.as_str()) {
Some("response") | None => {
if let Ok(item) = serde_json::from_value::<ResponseItem>(v) {
responses.push(item);
}
}
_ => {}
}
}
let kept = truncate_after_dropping_last_messages(responses, num_messages_to_drop);
// Compute the prefix up to the cut point.
let history =
truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop);
// Spawn a new conversation with the computed initial history.
let auth_manager = self.auth_manager.clone();
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, kept).await?;
} = Codex::spawn(config, auth_manager, history).await?;
self.finalize_spawn(codex, conversation_id).await
}
}
@@ -237,7 +179,11 @@ impl ConversationManager {
/// and all items that follow them.
fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) -> InitialHistory {
if n == 0 {
return InitialHistory::Forked(items);
let rollout_items = items
.into_iter()
.map(crate::rollout::RolloutItem::ResponseItem)
.collect();
return InitialHistory::Forked(rollout_items);
}
// Walk backwards counting only `user` Message items, find cut index.
@@ -259,7 +205,13 @@ fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) ->
// No prefix remains after dropping; start a new conversation.
InitialHistory::New
} else {
InitialHistory::Forked(items.into_iter().take(cut_index).collect())
InitialHistory::Forked(
items
.into_iter()
.take(cut_index)
.map(crate::rollout::RolloutItem::ResponseItem)
.collect(),
)
}
}
@@ -317,10 +269,14 @@ mod tests {
let truncated = truncate_after_dropping_last_messages(items.clone(), 1);
assert_eq!(
truncated,
InitialHistory::Forked(vec![items[0].clone(), items[1].clone(), items[2].clone(),])
InitialHistory::Forked(vec![
crate::rollout::RolloutItem::ResponseItem(items[0].clone()),
crate::rollout::RolloutItem::ResponseItem(items[1].clone()),
crate::rollout::RolloutItem::ResponseItem(items[2].clone()),
])
);
let truncated2 = truncate_after_dropping_last_messages(items, 2);
assert!(matches!(truncated2, InitialHistory::New));
assert_eq!(truncated2, InitialHistory::New);
}
}

View File

@@ -61,11 +61,11 @@ pub mod spawn;
pub mod terminal;
mod tool_apply_patch;
pub mod turn_diff_tracker;
pub use rollout::Cursor;
pub use rollout::RolloutRecorder;
pub use rollout::SessionMeta;
pub use rollout::list::ConversationItem;
pub use rollout::list::ConversationsPage;
pub use rollout::list::Cursor;
mod user_notification;
pub mod util;
pub use apply_patch::CODEX_APPLY_PATCH_ARG1;

View File

@@ -0,0 +1,91 @@
use codex_protocol::mcp_protocol::ConversationId;
use time::OffsetDateTime;
use time::PrimitiveDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
/// Timestamp format used in rollout filenames: YYYY-MM-DDThh-mm-ss
pub const FILENAME_TS_FMT: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
/// Timestamp format used for JSONL records (UTC, second precision): YYYY-MM-DDThh:mm:ssZ
pub const RECORD_TS_FMT: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z");
/// Pagination cursor identifying a file by timestamp and UUID.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cursor {
pub(crate) ts: OffsetDateTime,
pub(crate) id: Uuid,
}
impl Cursor {
pub fn new(ts: OffsetDateTime, id: Uuid) -> Self {
Self { ts, id }
}
}
impl serde::Serialize for Cursor {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let ts_str = self
.ts
.format(FILENAME_TS_FMT)
.map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?;
serializer.serialize_str(&format!("{ts_str}|{}", self.id))
}
}
impl<'de> serde::Deserialize<'de> for Cursor {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
parse_cursor(&s).ok_or_else(|| serde::de::Error::custom("invalid cursor"))
}
}
/// Parses a pagination cursor token in the form "<file_ts>|<uuid>".
pub fn parse_cursor(token: &str) -> Option<Cursor> {
let (file_ts, uuid_str) = token.split_once('|')?;
let Ok(uuid) = Uuid::parse_str(uuid_str) else {
return None;
};
let ts = PrimitiveDateTime::parse(file_ts, FILENAME_TS_FMT)
.ok()?
.assume_utc();
Some(Cursor::new(ts, uuid))
}
/// Parse timestamp and UUID from a rollout filename.
/// Expected format: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
pub fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
// Scan from the right for a '-' such that the suffix parses as a UUID.
let (sep_idx, uuid) = core
.match_indices('-')
.rev()
.find_map(|(i, _)| Uuid::parse_str(&core[i + 1..]).ok().map(|u| (i, u)))?;
let ts_str = &core[..sep_idx];
let ts = PrimitiveDateTime::parse(ts_str, FILENAME_TS_FMT)
.ok()?
.assume_utc();
Some((ts, uuid))
}
/// Build a rollout filename for a given timestamp and conversation id.
pub fn build_rollout_filename(ts: OffsetDateTime, conversation_id: ConversationId) -> String {
let date_str = ts
.format(FILENAME_TS_FMT)
.unwrap_or_else(|e| panic!("failed to format timestamp: {e}"));
format!("rollout-{date_str}-{conversation_id}.jsonl")
}

View File

@@ -4,13 +4,12 @@ use std::path::Path;
use std::path::PathBuf;
use time::OffsetDateTime;
use time::PrimitiveDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
use super::SESSIONS_SUBDIR;
use super::recorder::SessionMetaWithGit;
use super::format::Cursor;
use super::format::parse_timestamp_uuid_from_filename;
use std::sync::Arc;
/// Returned page of conversation summaries.
#[derive(Debug, Default, PartialEq)]
@@ -34,48 +33,14 @@ pub struct ConversationItem {
pub head: Vec<serde_json::Value>,
}
/// A filter applied to a discovered conversation. All filters must pass for
/// the item to be included in results.
pub type ConversationFilter = Arc<dyn Fn(&ConversationItem) -> bool + Send + Sync>;
/// Hard cap to bound worstcase work per request.
const MAX_SCAN_FILES: usize = 10_000;
const HEAD_RECORD_LIMIT: usize = 10;
/// Pagination cursor identifying a file by timestamp and UUID.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cursor {
ts: OffsetDateTime,
id: Uuid,
}
impl Cursor {
fn new(ts: OffsetDateTime, id: Uuid) -> Self {
Self { ts, id }
}
}
impl serde::Serialize for Cursor {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let ts_str = self
.ts
.format(&format_description!(
"[year]-[month]-[day]T[hour]-[minute]-[second]"
))
.map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?;
serializer.serialize_str(&format!("{ts_str}|{}", self.id))
}
}
impl<'de> serde::Deserialize<'de> for Cursor {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
parse_cursor(&s).ok_or_else(|| serde::de::Error::custom("invalid cursor"))
}
}
/// Retrieve recorded conversation file paths with token pagination. The returned `next_cursor`
/// can be supplied on the next call to resume after the last returned item, resilient to
/// concurrent new sessions being appended. Ordering is stable by timestamp desc, then UUID desc.
@@ -83,23 +48,27 @@ pub(crate) async fn get_conversations(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
) -> io::Result<ConversationsPage> {
get_conversations_filtered(codex_home, page_size, cursor, &[]).await
}
/// Retrieve recorded conversations with filters. All provided filters must
/// return `true` for an item to be included.
pub(crate) async fn get_conversations_filtered(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
filters: &[ConversationFilter],
) -> io::Result<ConversationsPage> {
let mut root = codex_home.to_path_buf();
root.push(SESSIONS_SUBDIR);
if !root.exists() {
return Ok(ConversationsPage {
items: Vec::new(),
next_cursor: None,
num_scanned_files: 0,
reached_scan_cap: false,
});
return Ok(empty_page());
}
let anchor = cursor.cloned();
let result = traverse_directories_for_paths(root.clone(), page_size, anchor).await?;
Ok(result)
traverse_directories_for_paths_filtered(root.clone(), page_size, anchor, filters).await
}
/// Load the full contents of a single conversation session file at `path`.
@@ -113,10 +82,11 @@ pub(crate) async fn get_conversation(path: &Path) -> io::Result<String> {
///
/// Directory layout: `~/.codex/sessions/YYYY/MM/DD/rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl`
/// Returned newest (latest) first.
async fn traverse_directories_for_paths(
async fn traverse_directories_for_paths_filtered(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
filters: &[ConversationFilter],
) -> io::Result<ConversationsPage> {
let mut items: Vec<ConversationItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
@@ -171,8 +141,11 @@ async fn traverse_directories_for_paths(
let head = read_first_jsonl_records(&path, HEAD_RECORD_LIMIT)
.await
.unwrap_or_default();
if should_include_session(&head) {
items.push(ConversationItem { path, head });
let item = ConversationItem { path, head };
// Apply all filters
let include = filters.iter().all(|f| f(&item));
if include {
items.push(item);
}
}
}
@@ -188,23 +161,6 @@ async fn traverse_directories_for_paths(
})
}
/// Pagination cursor token format: "<file_ts>|<uuid>" where `file_ts` matches the
/// filename timestamp portion (YYYY-MM-DDThh-mm-ss) used in rollout filenames.
/// The cursor orders files by timestamp desc, then UUID desc.
fn parse_cursor(token: &str) -> Option<Cursor> {
let (file_ts, uuid_str) = token.split_once('|')?;
let Ok(uuid) = Uuid::parse_str(uuid_str) else {
return None;
};
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let ts = PrimitiveDateTime::parse(file_ts, format).ok()?.assume_utc();
Some(Cursor::new(ts, uuid))
}
fn build_next_cursor(items: &[ConversationItem]) -> Option<Cursor> {
let last = items.last()?;
let file_name = last.path.file_name()?.to_string_lossy();
@@ -259,21 +215,13 @@ where
Ok(collected)
}
fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
// Scan from the right for a '-' such that the suffix parses as a UUID.
let (sep_idx, uuid) = core
.match_indices('-')
.rev()
.find_map(|(i, _)| Uuid::parse_str(&core[i + 1..]).ok().map(|u| (i, u)))?;
let ts_str = &core[..sep_idx];
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let ts = PrimitiveDateTime::parse(ts_str, format).ok()?.assume_utc();
Some((ts, uuid))
fn empty_page() -> ConversationsPage {
ConversationsPage {
items: Vec::new(),
next_cursor: None,
num_scanned_files: 0,
reached_scan_cap: false,
}
}
async fn read_first_jsonl_records(
@@ -300,36 +248,15 @@ async fn read_first_jsonl_records(
Ok(head)
}
/// Return true if this conversation should be included in the listing.
///
/// Current rule: include only when the first JSON object is a session meta record
/// (i.e., has `{"record_type": "session_meta", ...}`), which is how rollout
/// files are written. Empty or malformed heads are excluded.
fn should_include_session(head: &[serde_json::Value]) -> bool {
let Some(first) = head.first() else {
return false;
};
passes_session_meta_filter(first)
}
/// Validate that the first record is a fullyformed session meta line.
///
/// Requirements:
/// - `record_type == "session_meta"`
/// - Remaining fields (after removing `record_type`) deserialize into
/// `SessionMetaWithGit`.
fn passes_session_meta_filter(first: &serde_json::Value) -> bool {
let Some(obj) = first.as_object() else {
return false;
};
let record_type = obj.get("record_type").and_then(|v| v.as_str());
if record_type != Some("session_meta") {
return false;
}
// Remove the marker field and validate the remainder matches SessionMetaWithGit
let mut cleaned = obj.clone();
cleaned.remove("record_type");
let val = serde_json::Value::Object(cleaned);
serde_json::from_value::<SessionMetaWithGit>(val).is_ok()
/// Returns a filter that requires the first JSONL record to be a tagged
/// session meta line: { "record_type": "session_meta", ... }.
pub(crate) fn requires_tagged_session_meta_filter() -> ConversationFilter {
Arc::new(|item: &ConversationItem| {
item.head
.get(0)
.and_then(|v| v.get("record_type"))
.and_then(|v| v.as_str())
.map(|s| s == "session_meta")
.unwrap_or(false)
})
}

View File

@@ -2,12 +2,15 @@
pub(crate) const SESSIONS_SUBDIR: &str = "sessions";
pub mod format;
pub mod list;
pub(crate) mod policy;
pub mod recorder;
pub use format::Cursor;
pub use recorder::RolloutItem;
pub use recorder::RolloutRecorder;
pub use recorder::RolloutRecorderParams;
pub use recorder::SessionMeta;
pub use recorder::SessionStateSnapshot;

View File

@@ -1,6 +1,5 @@
use crate::protocol::EventMsg;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
/// Whether a `ResponseItem` should be persisted in rollout files.
#[inline]
@@ -17,41 +16,81 @@ pub(crate) fn is_persisted_response_item(item: &ResponseItem) -> bool {
}
}
pub(crate) fn is_persisted_event(event: &Event) -> bool {
match event.msg {
EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::TaskStarted(_)
| EventMsg::TaskComplete(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::WebSearchBegin(_)
| EventMsg::WebSearchEnd(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::TurnDiff(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::McpListToolsResponse(_)
| EventMsg::ListCustomPromptsResponse(_)
| EventMsg::ShutdownComplete
| EventMsg::ConversationHistory(_)
| EventMsg::PlanUpdate(_)
| EventMsg::TurnAborted(_)
| EventMsg::StreamError(_)
| EventMsg::Error(_)
| EventMsg::AgentReasoningSectionBreak(_)
| EventMsg::SessionConfigured(_) => false,
EventMsg::UserMessage(_)
| EventMsg::AgentMessage(_)
/// Whether an `EventMsg` should be persisted in rollout files.
///
/// Keep only high-signal, compact items. Avoid deltas and verbose streams.
#[inline]
pub(crate) fn is_persisted_event_msg(event: &EventMsg) -> bool {
match event {
// Core content to replay UI meaningfully
EventMsg::AgentMessage(_)
| EventMsg::AgentReasoning(_)
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::TokenCount(_) => true,
| EventMsg::TokenCount(_)
| EventMsg::UserMessage(_) => true,
// Everything else is either transient, redundant, or too verbose
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::AgentMessageDeltaEvent;
use crate::protocol::AgentMessageEvent;
use crate::protocol::AgentReasoningDeltaEvent;
use crate::protocol::AgentReasoningEvent;
use crate::protocol::ApplyPatchApprovalRequestEvent;
use crate::protocol::ExecCommandEndEvent;
use crate::protocol::TokenCountEvent;
#[test]
fn test_event_persistence_policy() {
assert!(is_persisted_event_msg(&EventMsg::AgentMessage(
AgentMessageEvent {
message: "hi".to_string(),
}
)));
assert!(is_persisted_event_msg(&EventMsg::AgentReasoning(
AgentReasoningEvent {
text: "think".to_string(),
}
)));
assert!(is_persisted_event_msg(&EventMsg::TokenCount(
TokenCountEvent { info: None }
)));
assert!(!is_persisted_event_msg(&EventMsg::AgentMessageDelta(
AgentMessageDeltaEvent {
delta: "d".to_string(),
}
)));
assert!(!is_persisted_event_msg(&EventMsg::AgentReasoningDelta(
AgentReasoningDeltaEvent {
delta: "d".to_string(),
}
)));
assert!(!is_persisted_event_msg(&EventMsg::ExecCommandEnd(
ExecCommandEndEvent {
call_id: "c".to_string(),
stdout: Default::default(),
stderr: Default::default(),
aggregated_output: Default::default(),
exit_code: 0,
duration: std::time::Duration::from_secs(0),
formatted_output: String::new(),
}
)));
use crate::protocol::FileChange;
use std::collections::HashMap;
use std::path::PathBuf;
assert!(!is_persisted_event_msg(
&EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c".to_string(),
changes: HashMap::<PathBuf, FileChange>::new(),
reason: None,
grant_root: None,
})
));
}
}

View File

@@ -6,11 +6,11 @@ use std::io::Error as IoError;
use std::path::Path;
use std::path::PathBuf;
use crate::protocol::Event;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
@@ -21,17 +21,22 @@ use tokio::sync::oneshot;
use tracing::info;
use tracing::warn;
use super::Cursor;
use super::SESSIONS_SUBDIR;
use super::format::RECORD_TS_FMT;
use super::format::build_rollout_filename;
use super::list::ConversationFilter;
use super::list::ConversationsPage;
use super::list::Cursor;
use super::list::get_conversations;
use super::list::get_conversations_filtered;
use super::policy::is_persisted_event_msg;
use super::policy::is_persisted_response_item;
use crate::config::Config;
use crate::conversation_manager::InitialHistory;
use crate::conversation_manager::ResumedHistory;
use crate::git_info::GitInfo;
use crate::git_info::collect_git_info;
use crate::rollout::policy::is_persisted_event;
use crate::protocol::EventMsg;
use codex_protocol::models::ResponseItem;
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
@@ -44,7 +49,6 @@ pub struct SessionMeta {
pub instructions: Option<String>,
}
// SessionMetaWithGit is used in writes and reads; ensure it implements Debug.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SessionMetaWithGit {
#[serde(flatten)]
@@ -53,16 +57,27 @@ pub struct SessionMetaWithGit {
git: Option<GitInfo>,
}
#[derive(Serialize, Deserialize, Default, Clone, Debug)]
pub struct SessionStateSnapshot {}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SessionStateSnapshot {
pub turn_context: TurnContextSnapshot,
}
#[derive(Serialize, Deserialize, Default, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TurnContextSnapshot {
pub cwd: std::path::PathBuf,
pub approval_policy: crate::protocol::AskForApproval,
pub sandbox_policy: crate::protocol::SandboxPolicy,
pub model: String,
pub show_raw_agent_reasoning: bool,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct SavedSession {
pub session: SessionMeta,
#[serde(default)]
pub items: Vec<ResponseItem>,
#[serde(default)]
pub state: SessionStateSnapshot,
pub state: Option<SessionStateSnapshot>,
pub session_id: ConversationId,
}
@@ -78,28 +93,45 @@ pub struct SavedSession {
#[derive(Clone)]
pub struct RolloutRecorder {
tx: Sender<RolloutCmd>,
path: PathBuf,
}
#[derive(Clone)]
pub enum RolloutRecorderParams {
Create {
conversation_id: ConversationId,
instructions: Option<String>,
},
Resume {
path: PathBuf,
},
}
enum RolloutCmd {
AddResponseItems(Vec<ResponseItem>),
AddEvents(Vec<Event>),
UpdateState(SessionStateSnapshot),
Shutdown { ack: oneshot::Sender<()> },
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "record_type", rename_all = "snake_case")]
enum TaggedLine {
#[serde(rename = "response")]
Response {
#[serde(flatten)]
item: ResponseItem,
},
#[serde(rename = "event")]
Event {
#[serde(flatten)]
event: Event,
},
#[serde(rename = "session_meta")]
SessionMeta {
#[serde(flatten)]
meta: SessionMetaWithGit,
},
PrevSessionMeta {
#[serde(flatten)]
meta: SessionMetaWithGit,
},
#[serde(rename = "state")]
State {
#[serde(flatten)]
state: SessionStateSnapshot,
@@ -120,15 +152,15 @@ pub enum RolloutItem {
SessionMeta(SessionMetaWithGit),
}
impl From<ResponseItem> for RolloutItem {
fn from(item: ResponseItem) -> Self {
RolloutItem::ResponseItem(item)
}
}
impl From<Event> for RolloutItem {
fn from(event: Event) -> Self {
RolloutItem::Event(event)
impl PartialEq for RolloutItem {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(RolloutItem::ResponseItem(a), RolloutItem::ResponseItem(b)) => a == b,
(RolloutItem::SessionMeta(_), RolloutItem::SessionMeta(_)) => false,
// Event comparison omitted (foreign type without PartialEq); treat as not equal
(RolloutItem::Event(_), RolloutItem::Event(_)) => false,
_ => false,
}
}
}
@@ -158,18 +190,20 @@ impl RolloutItemSliceExt for [RolloutItem] {
}
}
enum RolloutCmd {
AddResponseItems(Vec<ResponseItem>),
AddEvents(Vec<Event>),
AddSessionMeta(SessionMetaWithGit),
Flush { ack: oneshot::Sender<()> },
Shutdown { ack: oneshot::Sender<()> },
impl RolloutRecorderParams {
pub fn new(conversation_id: ConversationId, instructions: Option<String>) -> Self {
Self::Create {
conversation_id,
instructions,
}
}
pub fn resume(path: PathBuf) -> Self {
Self::Resume { path }
}
}
impl RolloutRecorder {
pub fn path(&self) -> &Path {
&self.path
}
#[allow(dead_code)]
/// List conversations (rollout files) under the provided Codex home directory.
pub async fn list_conversations(
@@ -177,130 +211,142 @@ impl RolloutRecorder {
page_size: usize,
cursor: Option<&Cursor>,
) -> std::io::Result<ConversationsPage> {
get_conversations(codex_home, page_size, cursor).await
// Apply a default filter to require files to start with a tagged
// session_meta record. This skips legacy/invalid files from listings.
let filters = vec![super::list::requires_tagged_session_meta_filter()];
get_conversations_filtered(codex_home, page_size, cursor, &filters).await
}
#[allow(dead_code)]
/// List conversations with filters; all filters must pass for inclusion.
pub async fn list_conversations_with_filters(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
filters: &[ConversationFilter],
) -> std::io::Result<ConversationsPage> {
get_conversations_filtered(codex_home, page_size, cursor, filters).await
}
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
/// cannot be created or the rollout file cannot be opened we return the
/// error so the caller can decide whether to disable persistence.
pub async fn new(
config: &Config,
conversation_id: ConversationId,
instructions: Option<String>,
) -> std::io::Result<Self> {
let LogFileInfo {
file,
conversation_id: session_id,
timestamp,
path,
} = create_log_file(config, conversation_id)?;
let timestamp_format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z");
let timestamp = timestamp
.to_offset(time::UtcOffset::UTC)
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let cwd = config.cwd.to_path_buf();
let (tx, rx) = mpsc::channel(100);
tokio::task::spawn(rollout_writer(
tokio::fs::File::from_std(file),
rx,
Some(SessionMeta {
timestamp,
id: session_id,
cwd: config.cwd.to_string_lossy().to_string(),
originator: config.responses_originator_header.clone(),
cli_version: env!("CARGO_PKG_VERSION").to_string(),
pub async fn new(config: &Config, params: RolloutRecorderParams) -> std::io::Result<Self> {
let (file, meta) = match params {
RolloutRecorderParams::Create {
conversation_id,
instructions,
}),
cwd,
));
} => {
let LogFileInfo {
file,
conversation_id: session_id,
timestamp,
} = create_log_file(config, conversation_id)?;
Ok(Self { tx, path })
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
let timestamp = timestamp
.to_offset(time::UtcOffset::UTC)
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
(
tokio::fs::File::from_std(file),
Some(SessionMeta {
timestamp,
id: session_id,
cwd: config.cwd.display().to_string(),
originator: config.responses_originator_header.clone(),
cli_version: env!("CARGO_PKG_VERSION").to_string(),
instructions,
}),
)
}
RolloutRecorderParams::Resume { path } => (
tokio::fs::OpenOptions::new()
.append(true)
.open(path)
.await?,
None,
),
};
// Clone the cwd for the spawned task to collect git info asynchronously
let cwd = config.cwd.clone();
// A reasonably-sized bounded channel. If the buffer fills up the send
// future will yield, which is fine we only need to ensure we do not
// perform *blocking* I/O on the caller's thread.
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
// Spawn a Tokio task that owns the file handle and performs async
// writes. Using `tokio::fs::File` keeps everything on the async I/O
// driver instead of blocking the runtime.
tokio::task::spawn(rollout_writer(file, rx, meta, cwd));
Ok(Self { tx })
}
pub(crate) async fn record_items(&self, item: RolloutItem) -> std::io::Result<()> {
match item {
RolloutItem::ResponseItem(item) => self.record_response_item(&item).await,
RolloutItem::Event(event) => self.record_event(&event).await,
RolloutItem::SessionMeta(meta) => self.record_session_meta(&meta).await,
}
}
/// Ensure all writes up to this point have been processed by the writer task.
///
/// This is a sequencing barrier for readers that plan to open and read the
/// rollout file immediately after calling this method. The background writer
/// processes the channel serially; when it dequeues `Flush`, all prior
/// `AddResponseItems`/`AddEvents`/`AddSessionMeta` have already been written
/// via `write_line`, which calls `file.flush()` (OSbuffer flush).
pub async fn flush(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
self.tx
.send(RolloutCmd::Flush { ack: tx_done })
.await
.map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?;
rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))
}
async fn record_response_item(&self, item: &ResponseItem) -> std::io::Result<()> {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
// reformatting them in that case.
if !is_persisted_response_item(item) {
pub(crate) async fn record_response_items(
&self,
items: &[ResponseItem],
) -> std::io::Result<()> {
if items.is_empty() {
return Ok(());
}
self.tx
.send(RolloutCmd::AddResponseItems(vec![item.clone()]))
.send(RolloutCmd::AddResponseItems(items.to_vec()))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
async fn record_event(&self, event: &Event) -> std::io::Result<()> {
if !is_persisted_event(event) {
pub(crate) async fn record_events(&self, events: &[Event]) -> std::io::Result<()> {
if events.is_empty() {
return Ok(());
}
self.tx
.send(RolloutCmd::AddEvents(vec![event.clone()]))
.send(RolloutCmd::AddEvents(events.to_vec()))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout event: {e}")))
.map_err(|e| IoError::other(format!("failed to queue rollout events: {e}")))
}
async fn record_session_meta(&self, meta: &SessionMetaWithGit) -> std::io::Result<()> {
pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> {
self.tx
.send(RolloutCmd::AddSessionMeta(meta.clone()))
.send(RolloutCmd::UpdateState(state))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout session meta: {e}")))
.map_err(|e| IoError::other(format!("failed to queue rollout state: {e}")))
}
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
info!("Resuming rollout from {path:?}");
tracing::error!("Resuming rollout from {path:?}");
let text = tokio::fs::read_to_string(path).await?;
let mut lines = text.lines();
let first_line = lines
.next()
.ok_or_else(|| IoError::other("empty session file"))?;
let conversation_id = if let Ok(TimestampedLine {
record: TaggedLine::SessionMeta { meta },
..
}) = serde_json::from_str::<TimestampedLine>(first_line)
{
Some(meta.meta.id)
} else if let Ok(meta) = serde_json::from_str::<SessionMetaWithGit>(first_line) {
Some(meta.meta.id)
} else if let Ok(meta) = serde_json::from_str::<SessionMeta>(first_line) {
Some(meta.id)
// Support both legacy (bare SessionMeta) and new tagged format
let v_first: Value = serde_json::from_str(first_line)
.map_err(|e| IoError::other(format!("failed to parse first line: {e}")))?;
let conversation_id = if v_first.get("record_type").is_some() {
let rt_ok = v_first
.get("record_type")
.and_then(|s| s.as_str())
.map(|s| s == "session_meta")
.unwrap_or(false);
if !rt_ok {
return Err(IoError::other("first line is not session_meta"));
}
match serde_json::from_value::<SessionMetaWithGit>(v_first.clone()) {
Ok(rollout_session_meta) => Some(rollout_session_meta.meta.id),
Err(e) => {
return Err(IoError::other(format!(
"failed to parse first line (tagged) as SessionMeta: {e}"
)));
}
}
} else {
return Err(IoError::other(
"failed to parse first line of rollout file as SessionMeta",
));
return Err(IoError::other("first line missing record_type"));
};
let mut items: Vec<RolloutItem> = Vec::new();
@@ -308,33 +354,39 @@ impl RolloutRecorder {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<TimestampedLine>(line) {
Ok(TimestampedLine {
record: TaggedLine::State { .. },
..
}) => {}
Ok(TimestampedLine {
record: TaggedLine::Event { event },
..
}) => items.push(RolloutItem::Event(event)),
Ok(TimestampedLine {
record: TaggedLine::SessionMeta { meta },
..
})
| Ok(TimestampedLine {
record: TaggedLine::PrevSessionMeta { meta },
..
}) => items.push(RolloutItem::SessionMeta(meta)),
Ok(TimestampedLine {
record: TaggedLine::Response { item },
..
}) => {
if is_persisted_response_item(&item) {
items.push(RolloutItem::ResponseItem(item));
}
let v: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
// Tagged format: collect responses and events; skip state
if let Some(rt) = v.get("record_type").and_then(|rt| rt.as_str()) {
match rt {
"state" => continue,
"response" => match serde_json::from_value::<ResponseItem>(v.clone()) {
Ok(item) => {
if is_persisted_response_item(&item) {
items.push(RolloutItem::ResponseItem(item));
}
}
Err(e) => {
warn!("failed to parse item: {v:?}, error: {e}");
}
},
"event" => match serde_json::from_value::<Event>(v.clone()) {
Ok(event) => {
if is_persisted_event_msg(&event.msg) {
items.push(RolloutItem::Event(event));
}
}
Err(e) => {
warn!("failed to parse event: {v:?}, error: {e}");
}
},
_ => continue,
}
Err(_) => warn!("failed to parse rollout line: {line}"),
continue;
}
// Non-tagged lines are ignored in the new format
}
tracing::error!(
@@ -382,9 +434,6 @@ struct LogFileInfo {
/// Timestamp for the start of the session.
timestamp: OffsetDateTime,
/// Full filesystem path to the rollout file.
path: PathBuf,
}
fn create_log_file(
@@ -392,7 +441,8 @@ fn create_log_file(
conversation_id: ConversationId,
) -> std::io::Result<LogFileInfo> {
// Resolve ~/.codex/sessions/YYYY/MM/DD and create it if missing.
let timestamp = OffsetDateTime::now_utc();
let timestamp = OffsetDateTime::now_local()
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
let mut dir = config.codex_home.clone();
dir.push(SESSIONS_SUBDIR);
dir.push(timestamp.year().to_string());
@@ -402,13 +452,7 @@ fn create_log_file(
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
// compatibility with filesystems that do not allow colons in filenames.
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let date_str = timestamp
.format(format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let filename = format!("rollout-{date_str}-{conversation_id}.jsonl");
let filename = build_rollout_filename(timestamp, conversation_id);
let path = dir.join(filename);
let file = std::fs::OpenOptions::new()
@@ -420,7 +464,6 @@ fn create_log_file(
file,
conversation_id,
timestamp,
path,
})
}
@@ -439,6 +482,7 @@ async fn rollout_writer(
meta: session_meta,
git: git_info,
};
// Write the SessionMeta as the first item in the file
writer
.write_tagged(TaggedLine::SessionMeta {
@@ -459,18 +503,13 @@ async fn rollout_writer(
}
RolloutCmd::AddEvents(events) => {
for event in events {
writer.write_tagged(TaggedLine::Event { event }).await?;
if is_persisted_event_msg(&event.msg) {
writer.write_tagged(TaggedLine::Event { event }).await?;
}
}
}
// Sequencing barrier: by the time we handle `Flush`, all previously
// queued writes have been applied and flushed to OS buffers.
RolloutCmd::Flush { ack } => {
let _ = ack.send(());
}
RolloutCmd::AddSessionMeta(meta) => {
writer
.write_tagged(TaggedLine::PrevSessionMeta { meta })
.await?;
RolloutCmd::UpdateState(state) => {
writer.write_tagged(TaggedLine::State { state }).await?;
}
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
@@ -486,19 +525,19 @@ struct JsonlWriter {
}
impl JsonlWriter {
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
let mut json = serde_json::to_string(item)?;
async fn write_tagged(&mut self, record: TaggedLine) -> std::io::Result<()> {
// RFC3339 UTC, second precision
let now = OffsetDateTime::now_utc()
.format(RECORD_TS_FMT)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let line = TimestampedLine {
timestamp: now,
record,
};
let mut json = serde_json::to_string(&line)?;
json.push('\n');
self.file.write_all(json.as_bytes()).await?;
self.file.flush().await?;
Ok(())
}
async fn write_tagged(&mut self, record: TaggedLine) -> std::io::Result<()> {
let timestamp = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let line = TimestampedLine { timestamp, record };
self.write_line(&line).await
}
}

View File

@@ -12,11 +12,16 @@ use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
use crate::rollout::Cursor;
use crate::rollout::RolloutItem;
use crate::rollout::RolloutRecorder;
use crate::rollout::list::ConversationFilter;
use crate::rollout::list::ConversationItem;
use crate::rollout::list::ConversationsPage;
use crate::rollout::list::Cursor;
use crate::rollout::list::get_conversation;
use crate::rollout::list::get_conversations;
use crate::rollout::list::get_conversations_filtered;
use pretty_assertions::assert_eq;
fn write_session_file(
root: &Path,
@@ -41,13 +46,11 @@ fn write_session_file(
let mut file = File::create(file_path)?;
let meta = serde_json::json!({
"record_type": "session_meta",
"timestamp": ts_str,
"id": uuid.to_string(),
"cwd": "/",
"originator": "test",
"cli_version": "0.0.0",
"instructions": null
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
});
writeln!(file, "{meta}")?;
@@ -61,18 +64,127 @@ fn write_session_file(
Ok((dt, uuid))
}
fn expected_session_meta(ts: &str, uuid: Uuid) -> serde_json::Value {
serde_json::json!({
"record_type": "session_meta",
"timestamp": ts,
"id": uuid.to_string(),
"cwd": "/",
"originator": "test",
"cli_version": "0.0.0",
"instructions": null
})
#[tokio::test]
async fn test_get_conversations_filtered_by_id() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let u1 = Uuid::from_u128(101);
let u2 = Uuid::from_u128(202);
write_session_file(home, "2025-07-01T00-00-00", u1, 1).unwrap();
write_session_file(home, "2025-07-02T00-00-00", u2, 1).unwrap();
let target = u2.to_string();
let filter: ConversationFilter = std::sync::Arc::new(move |item: &ConversationItem| {
item.head
.first()
.and_then(|v| v.get("id"))
.and_then(|v| v.as_str())
.map(|s| s == target)
.unwrap_or(false)
});
let page = get_conversations_filtered(home, 10, None, &[filter])
.await
.expect("filtered list ok");
assert_eq!(page.items.len(), 1);
let file_name = page.items[0]
.path
.file_name()
.unwrap()
.to_string_lossy()
.to_string();
assert!(file_name.contains(&u2.to_string()));
}
#[tokio::test]
async fn test_rollout_history_parses_tagged_events_and_responses() {
use std::io::Write as _;
use uuid::Uuid;
let temp = TempDir::new().unwrap();
let path = temp.path().join("rollout-test.jsonl");
let mut f = File::create(&path).unwrap();
let session_id = Uuid::from_u128(555).to_string();
// Tagged session_meta line (new format)
writeln!(
f,
"{}",
serde_json::json!({
"timestamp": "2025-09-08T16:03:12Z",
"record_type": "session_meta",
"id": session_id,
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version",
"instructions": null
})
)
.unwrap();
// Allowed event: AgentMessage
writeln!(
f,
"{}",
serde_json::json!({
"timestamp": "2025-09-08T16:03:13Z",
"record_type": "event",
"id": "sub1",
"msg": {"type": "agent_message", "message": "hi"}
})
)
.unwrap();
// Disallowed (delta) event: should be skipped by policy
writeln!(
f,
"{}",
serde_json::json!({
"timestamp": "2025-09-08T16:03:14Z",
"record_type": "event",
"id": "sub1",
"msg": {"type": "agent_message_delta", "delta": "x"}
})
)
.unwrap();
// Response item (assistant message)
writeln!(
f,
"{}",
serde_json::json!({
"timestamp": "2025-09-08T16:03:15Z",
"record_type": "response",
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": "hello"}]
})
)
.unwrap();
let hist = RolloutRecorder::get_rollout_history(&path)
.await
.expect("history ok");
let resumed = match hist {
crate::conversation_manager::InitialHistory::Resumed(r) => r,
_ => panic!("expected resumed history"),
};
// We expect exactly 2 items: one event (allowed) and one response
let kinds: Vec<&'static str> = resumed
.history
.iter()
.map(|it| match it {
RolloutItem::Event(_) => "event",
RolloutItem::ResponseItem(_) => "response",
RolloutItem::SessionMeta(_) => "meta",
})
.collect();
assert_eq!(kinds, vec!["event", "response"]);
}
#[tokio::test]
async fn test_list_conversations_latest_first() {
let temp = TempDir::new().unwrap();
@@ -111,19 +223,37 @@ async fn test_list_conversations_latest_first() {
.join(format!("rollout-2025-01-01T12-00-00-{u1}.jsonl"));
let head_3 = vec![
expected_session_meta("2025-01-03T12-00-00", u3),
serde_json::json!({
"timestamp": "2025-01-03T12-00-00",
"id": u3.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
];
let head_2 = vec![
expected_session_meta("2025-01-02T12-00-00", u2),
serde_json::json!({
"timestamp": "2025-01-02T12-00-00",
"id": u2.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
];
let head_1 = vec![
expected_session_meta("2025-01-01T12-00-00", u1),
serde_json::json!({
"timestamp": "2025-01-01T12-00-00",
"id": u1.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
@@ -188,11 +318,23 @@ async fn test_pagination_cursor() {
.join("04")
.join(format!("rollout-2025-03-04T09-00-00-{u4}.jsonl"));
let head_5 = vec![
expected_session_meta("2025-03-05T09-00-00", u5),
serde_json::json!({
"timestamp": "2025-03-05T09-00-00",
"id": u5.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let head_4 = vec![
expected_session_meta("2025-03-04T09-00-00", u4),
serde_json::json!({
"timestamp": "2025-03-04T09-00-00",
"id": u4.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor1: Cursor =
@@ -230,11 +372,23 @@ async fn test_pagination_cursor() {
.join("02")
.join(format!("rollout-2025-03-02T09-00-00-{u2}.jsonl"));
let head_3 = vec![
expected_session_meta("2025-03-03T09-00-00", u3),
serde_json::json!({
"timestamp": "2025-03-03T09-00-00",
"id": u3.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let head_2 = vec![
expected_session_meta("2025-03-02T09-00-00", u2),
serde_json::json!({
"timestamp": "2025-03-02T09-00-00",
"id": u2.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor2: Cursor =
@@ -266,7 +420,13 @@ async fn test_pagination_cursor() {
.join("01")
.join(format!("rollout-2025-03-01T09-00-00-{u1}.jsonl"));
let head_1 = vec![
expected_session_meta("2025-03-01T09-00-00", u1),
serde_json::json!({
"timestamp": "2025-03-01T09-00-00",
"id": u1.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor3: Cursor =
@@ -305,7 +465,13 @@ async fn test_get_conversation_contents() {
.join("01")
.join(format!("rollout-2025-04-01T10-30-00-{uuid}.jsonl"));
let expected_head = vec![
expected_session_meta(ts, uuid),
serde_json::json!({
"timestamp": ts,
"id": uuid.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
];
@@ -322,7 +488,13 @@ async fn test_get_conversation_contents() {
assert_eq!(page, expected_page);
// Entire file contents equality
let meta = expected_session_meta(ts, uuid);
let meta = serde_json::json!({
"timestamp": ts,
"id": uuid.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
});
let rec0 = serde_json::json!({"record_type": "response", "index": 0});
let rec1 = serde_json::json!({"record_type": "response", "index": 1});
let expected_content = format!("{meta}\n{rec0}\n{rec1}\n");
@@ -357,7 +529,15 @@ async fn test_stable_ordering_same_second_pagination() {
.join("07")
.join("01")
.join(format!("rollout-2025-07-01T00-00-00-{u2}.jsonl"));
let head = |u: Uuid| -> Vec<serde_json::Value> { vec![expected_session_meta(ts, u)] };
let head = |u: Uuid| -> Vec<serde_json::Value> {
vec![serde_json::json!({
"timestamp": ts,
"id": u.to_string(),
"cwd": "/tmp",
"originator": "codex_cli_rs",
"cli_version": "test-version"
})]
};
let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap();
let expected_page1 = ConversationsPage {
items: vec![

View File

@@ -69,3 +69,8 @@
; Added on top of Chrome profile
; Needed for python multiprocessing on MacOS for the SemLock
(allow ipc-posix-sem)
; needed to look up user info, see https://crbug.com/792228
(allow mach-lookup
(global-name "com.apple.system.opendirectoryd.libinfo")
)

View File

@@ -4,12 +4,9 @@ use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::WireApi;
use codex_core::built_in_model_providers;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::InputMessageKind;
use codex_core::protocol::Op;
use codex_core::protocol::UserMessageEvent;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_protocol::mcp_protocol::AuthMode;
use core_test_support::load_default_config_for_test;
@@ -129,14 +126,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
writeln!(
f,
"{}",
json!({
"record_type": "session_meta",
"id": Uuid::new_v4(),
"timestamp": "2024-01-01T00:00:00Z",
"cwd": tmpdir.path().to_string_lossy(),
"originator": "test",
"cli_version": "0.0.0-test"
})
json!({"meta":"test","instructions":"be nice", "id": Uuid::new_v4(), "timestamp": "2024-01-01T00:00:00Z"})
)
.unwrap();
@@ -148,30 +138,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed user message".to_string(),
}],
};
let mut prior_user_obj = serde_json::to_value(&prior_user)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_user_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_user_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00Z"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_user_obj)).unwrap();
// Also include a matching user message event to preserve ordering at resume
let prior_user_event = EventMsg::UserMessage(UserMessageEvent {
message: "resumed user message".to_string(),
kind: Some(InputMessageKind::Plain),
});
let prior_user_event_line = serde_json::json!({
"timestamp": "2025-01-01T00:00:00Z",
"record_type": "event",
"id": "resume-0",
"msg": prior_user_event,
});
writeln!(f, "{prior_user_event_line}").unwrap();
writeln!(f, "{}", serde_json::to_string(&prior_user).unwrap()).unwrap();
// Prior item: system message (excluded from API history)
let prior_system = codex_protocol::models::ResponseItem::Message {
@@ -181,17 +148,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed system instruction".to_string(),
}],
};
let mut prior_system_obj = serde_json::to_value(&prior_system)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_system_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_system_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00Z"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_system_obj)).unwrap();
writeln!(f, "{}", serde_json::to_string(&prior_system).unwrap()).unwrap();
// Prior item: assistant message
let prior_item = codex_protocol::models::ResponseItem::Message {
@@ -201,27 +158,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed assistant message".to_string(),
}],
};
let mut prior_item_obj = serde_json::to_value(&prior_item)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_item_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_item_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00Z"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_item_obj)).unwrap();
let prior_item_event = EventMsg::AgentMessage(AgentMessageEvent {
message: "resumed assistant message".to_string(),
});
let prior_event_line = serde_json::json!({
"timestamp": "2025-01-01T00:00:00Z",
"record_type": "event",
"id": "resume-1",
"msg": prior_item_event,
});
writeln!(f, "{prior_event_line}").unwrap();
writeln!(f, "{}", serde_json::to_string(&prior_item).unwrap()).unwrap();
drop(f);
// Mock server that will receive the resumed request

View File

@@ -3,11 +3,10 @@ use codex_core::ConversationManager;
use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::built_in_model_providers;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_protocol::models::ResponseItem;
use core_test_support::load_default_config_for_test;
use core_test_support::wait_for_event;
use tempfile::TempDir;
@@ -73,34 +72,17 @@ async fn fork_conversation_twice_drops_to_first_message() {
}
// Request history from the base conversation.
codex.submit(Op::GetConversationPath).await.unwrap();
codex.submit(Op::GetHistory).await.unwrap();
let base_history =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await;
// Capture path/id from the base history and compute expected prefixes after each fork.
let (base_conv_id, base_path) = match &base_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
// Capture entries from the base history and compute expected prefixes after each fork.
let entries_after_three = match &base_history {
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
entries.clone()
}
_ => panic!("expected ConversationHistory event"),
};
// Read entries from rollout file.
async fn read_response_entries(path: &std::path::Path) -> Vec<ResponseItem> {
let text = tokio::fs::read_to_string(path).await.unwrap_or_default();
let mut out = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(item) = serde_json::from_str::<ResponseItem>(line) {
out.push(item);
}
}
out
}
let entries_after_three: Vec<ResponseItem> = read_response_entries(&base_path).await;
// History layout for this test:
// [0] user instructions,
// [1] environment context,
@@ -131,46 +113,42 @@ async fn fork_conversation_twice_drops_to_first_message() {
conversation: codex_fork1,
..
} = conversation_manager
.fork_conversation(base_path.clone(), base_conv_id, 1, config_for_fork.clone())
.fork_conversation(entries_after_three.clone(), 1, config_for_fork.clone())
.await
.expect("fork 1");
codex_fork1.submit(Op::GetConversationPath).await.unwrap();
codex_fork1.submit(Op::GetHistory).await.unwrap();
let fork1_history = wait_for_event(&codex_fork1, |ev| {
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
let (fork1_id, fork1_path) = match &fork1_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
let entries_after_first_fork = match &fork1_history {
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
assert!(matches!(
fork1_history,
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_first
));
entries.clone()
}
_ => panic!("expected ConversationHistory event after first fork"),
};
let entries_after_first_fork: Vec<ResponseItem> = read_response_entries(&fork1_path).await;
assert_eq!(entries_after_first_fork, expected_after_first);
// Fork again with n=1 → drops the (new) last user message, leaving only the first.
let NewConversation {
conversation: codex_fork2,
..
} = conversation_manager
.fork_conversation(fork1_path.clone(), fork1_id, 1, config_for_fork.clone())
.fork_conversation(entries_after_first_fork.clone(), 1, config_for_fork.clone())
.await
.expect("fork 2");
codex_fork2.submit(Op::GetConversationPath).await.unwrap();
codex_fork2.submit(Op::GetHistory).await.unwrap();
let fork2_history = wait_for_event(&codex_fork2, |ev| {
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
let (_fork2_id, fork2_path) = match &fork2_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
_ => panic!("expected ConversationHistory event after second fork"),
};
let entries_after_second_fork: Vec<ResponseItem> = read_response_entries(&fork2_path).await;
assert_eq!(entries_after_second_fork, expected_after_second);
assert!(matches!(
fork2_history,
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_second
));
}

View File

@@ -159,6 +159,41 @@ async fn read_only_forbids_all_writes() {
.await;
}
/// Verify that user lookups via `pwd.getpwuid(os.getuid())` work under the
/// seatbelt sandbox. Prior to allowing the necessary machlookup for
/// OpenDirectory libinfo, this would fail with `KeyError: getpwuid(): uid not found`.
#[tokio::test]
async fn python_getpwuid_works_under_seatbelt() {
if std::env::var(CODEX_SANDBOX_ENV_VAR) == Ok("seatbelt".to_string()) {
eprintln!("{CODEX_SANDBOX_ENV_VAR} is set to 'seatbelt', skipping test.");
return;
}
// ReadOnly is sufficient here since we are only exercising user lookup.
let policy = SandboxPolicy::ReadOnly;
let mut child = spawn_command_under_seatbelt(
vec![
"python3".to_string(),
"-c".to_string(),
// Print the passwd struct; success implies lookup worked.
"import pwd, os; print(pwd.getpwuid(os.getuid()))".to_string(),
],
&policy,
std::env::current_dir().expect("should be able to get current dir"),
StdioPolicy::RedirectForShellTool,
HashMap::new(),
)
.await
.expect("should be able to spawn python under seatbelt");
let status = child
.wait()
.await
.expect("should be able to wait for child process");
assert!(status.success(), "python exited with {status:?}");
}
#[expect(clippy::expect_used)]
fn create_test_scenario(tmp: &TempDir) -> TestScenario {
let repo_parent = tmp.path().to_path_buf();

View File

@@ -156,17 +156,8 @@ fn create_fake_rollout(codex_home: &Path, filename_ts: &str, meta_rfc3339: &str,
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
let mut lines = Vec::new();
lines.push(
json!({
"record_type": "session_meta",
"id": uuid,
"timestamp": meta_rfc3339,
"cwd": codex_home.to_string_lossy(),
"originator": "test",
"cli_version": "0.0.0-test"
})
.to_string(),
);
// Meta line with timestamp
lines.push(json!({"timestamp": meta_rfc3339, "id": uuid}).to_string());
// Minimal user message entry as a persisted response item
lines.push(
json!({

View File

@@ -15,6 +15,7 @@ use crate::config_types::ReasoningSummary as ReasoningSummaryConfig;
use crate::custom_prompts::CustomPrompt;
use crate::mcp_protocol::ConversationId;
use crate::message_history::HistoryEntry;
use crate::models::ResponseItem;
use crate::num_format::format_with_separators;
use crate::parse_command::ParsedCommand;
use crate::plan_tool::UpdatePlanArgs;
@@ -148,7 +149,7 @@ pub enum Op {
/// Request the full in-memory conversation transcript for the current session.
/// Reply is delivered via `EventMsg::ConversationHistory`.
GetConversationPath,
GetHistory,
/// Request the list of MCP tools available across all configured servers.
/// Reply is delivered via `EventMsg::McpListToolsResponse`.
@@ -424,7 +425,7 @@ pub enum EventMsg {
/// Agent text output message
AgentMessage(AgentMessageEvent),
/// User/system input message (what was sent to the model).
/// User/system input message (what was sent to the model)
UserMessage(UserMessageEvent),
/// Agent text output delta message
@@ -498,7 +499,7 @@ pub enum EventMsg {
/// Notification that the agent is shutting down.
ShutdownComplete,
ConversationHistory(ConversationPathResponseEvent),
ConversationHistory(ConversationHistoryResponseEvent),
}
// Individual event payload types matching each `EventMsg` variant.
@@ -798,9 +799,9 @@ pub struct WebSearchEndEvent {
/// Response payload for `Op::GetHistory` containing the current session's
/// in-memory transcript.
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
pub struct ConversationPathResponseEvent {
pub struct ConversationHistoryResponseEvent {
pub conversation_id: ConversationId,
pub path: PathBuf,
pub entries: Vec<ResponseItem>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]

View File

@@ -3,7 +3,7 @@ use crate::backtrack_helpers;
use crate::pager_overlay::Overlay;
use crate::tui;
use crate::tui::TuiEvent;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_protocol::mcp_protocol::ConversationId;
use color_eyre::eyre::Result;
use crossterm::event::KeyCode;
@@ -98,7 +98,7 @@ impl App {
) {
self.backtrack.pending = Some((base_id, drop_last_messages, prefill));
self.app_event_tx.send(crate::app_event::AppEvent::CodexOp(
codex_core::protocol::Op::GetConversationPath,
codex_core::protocol::Op::GetHistory,
));
}
@@ -265,7 +265,7 @@ impl App {
pub(crate) async fn on_conversation_history_for_backtrack(
&mut self,
tui: &mut tui::Tui,
ev: ConversationPathResponseEvent,
ev: ConversationHistoryResponseEvent,
) -> Result<()> {
if let Some((base_id, _, _)) = self.backtrack.pending.as_ref()
&& ev.conversation_id == *base_id
@@ -281,16 +281,15 @@ impl App {
async fn fork_and_switch_to_new_conversation(
&mut self,
tui: &mut tui::Tui,
ev: ConversationPathResponseEvent,
ev: ConversationHistoryResponseEvent,
drop_count: usize,
prefill: String,
) {
let cfg = self.chat_widget.config_ref().clone();
// Perform the fork via a thin wrapper for clarity/testability.
let result = self
.perform_fork(ev.path.clone(), ev.conversation_id, drop_count, cfg.clone())
.perform_fork(ev.entries.clone(), drop_count, cfg.clone())
.await;
// We aren't using the initial history UI replay in session configured because we have more accurate version of the history.
match result {
Ok(new_conv) => {
self.install_forked_conversation(tui, cfg, new_conv, drop_count, &prefill)
@@ -302,13 +301,12 @@ impl App {
/// Thin wrapper around ConversationManager::fork_conversation.
async fn perform_fork(
&self,
conversation_path: std::path::PathBuf,
conversation_id: codex_protocol::mcp_protocol::ConversationId,
entries: Vec<codex_protocol::models::ResponseItem>,
drop_count: usize,
cfg: codex_core::config::Config,
) -> codex_core::error::Result<codex_core::NewConversation> {
self.server
.fork_conversation(conversation_path, conversation_id, drop_count, cfg)
.fork_conversation(entries, drop_count, cfg)
.await
}

View File

@@ -1,4 +1,4 @@
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::Event;
use codex_file_search::FileMatch;
@@ -58,5 +58,5 @@ pub(crate) enum AppEvent {
UpdateSandboxPolicy(SandboxPolicy),
/// Forwarded conversation history snapshot from the current conversation.
ConversationHistory(ConversationPathResponseEvent),
ConversationHistory(ConversationHistoryResponseEvent),
}

View File

@@ -6,9 +6,12 @@ Codex supports several mechanisms for setting config values:
- Config-specific command-line flags, such as `--model o3` (highest precedence).
- A generic `-c`/`--config` flag that takes a `key=value` pair, such as `--config model="o3"`.
- The key can contain dots to set a value deeper than the root, e.g. `--config model_providers.openai.wire_api="chat"`.
- Values can contain objects, such as `--config shell_environment_policy.include_only=["PATH", "HOME", "USER"]`.
- For consistency with `config.toml`, values are in TOML format rather than JSON format, so use `{a = 1, b = 2}` rather than `{"a": 1, "b": 2}`.
- If `value` cannot be parsed as a valid TOML value, it is treated as a string value. This means that both `-c model="o3"` and `-c model=o3` are equivalent.
- For consistency with `config.toml`, values are a string in TOML format rather than JSON format, so use `key='{a = 1, b = 2}'` rather than `key='{"a": 1, "b": 2}'`.
- The quotes around the value are necessary, as without them your shell would split the config argument on spaces, resulting in `codex` receiving `-c key={a` with (invalid) additional arguments `=`, `1,`, `b`, `=`, `2}`.
- Values can contain any TOML object, such as `--config shell_environment_policy.include_only='["PATH", "HOME", "USER"]'`.
- If `value` cannot be parsed as a valid TOML value, it is treated as a string value. This means that `-c model='"o3"'` and `-c model=o3` are equivalent.
- In the first case, the value is the TOML string `"o3"`, while in the second the value is `o3`, which is not valid TOML and therefore treated as the TOML string `"o3"`.
- Because quotes are interpreted by one's shell, `-c key="true"` will be correctly interpreted in TOML as `key = true` (a boolean) and not `key = "true"` (a string). If for some reason you needed the string `"true"`, you would need to use `-c key='"true"'` (note the two sets of quotes).
- The `$CODEX_HOME/config.toml` configuration file where the `CODEX_HOME` environment value defaults to `~/.codex`. (Note `CODEX_HOME` will also be where logs and other Codex-related information are stored.)
Both the `--config` flag and the `config.toml` file support the following options: